Like the Blog?

Followers

Sunday, 10 September 2017

Word Count Program : An example of a basic MapReduce Program


A MapReduce job usually splits the input data-set into independent chunks which are processed by the map tasks in a completely parallel manner. The framework sorts the outputs of the maps, which are then input to the reduce tasks. Typically both the input and the output of the job are stored in a file-system. 
WordCount is a simple application that counts the number of occurrences of each word in a given input set.
The WordCount operation takes place in two stages: 

i) A Mapper phase: Here, the test is tokenized into words and corresponding key value pairs are formed with these words where the key being the word itself and the value being '1'. 

Example: I love Hadoop as much as I love AI 

After the execution of the Map Phase, the output would look like as shown below:
<I,1>
<love,1>
<Hadoop,1>
<as,1>
<much,1>
<as,1>
<I,1>
<love,1>
<AI,1>

ii) A Reducer phase: Here the keys are grouped together and the values for similar keys are summed up.

So after the Reducer phase has completed its execution, the output would look like as shown below:
<I,2>
<love,2>
<Hadoop,1>
<as,2>
<much,1>
<AI,1>

Thus we get the number of occurrence of each word in the input file.

 
1. Write the WordCount.java program and save it your hduser's home directory like this : /home/hduser/WordCount.java

    package MRExample;
  
     import java.io.IOException;
     import java.util.*;
  
     import org.apache.hadoop.fs.Path;
     import org.apache.hadoop.conf.*;
     import org.apache.hadoop.io.*;
     import org.apache.hadoop.mapred.*;
     import org.apache.hadoop.util.*;
  
     public class WordCount
    {
  
        public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable>
       { 
//hadoop supported data types
          private final static IntWritable one = new IntWritable(1);
          private Text word = new Text();
 //map method that performs the tokenizer job and framing the initial key value pairs
          public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException
         { 
//taking one line at a time and tokenizing the same
            String line = value.toString();
            StringTokenizer tokenizer = new StringTokenizer(line);
//iterating through all the words available in that line and forming the key value pair
            while (tokenizer.hasMoreTokens())
           {
              word.set(tokenizer.nextToken());
//sending to output collector which in turn passes the same to reducer
              output.collect(word, one);
            }
          }
        }
  
        public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable>
       {
//reduce method accepts the Key Value pairs from mappers, do the aggregation based on keys and produce the final out put

          public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException
         {
            int sum = 0;
//iterates through all the values available with a key and add them together and give the final result as the key and sum of its values
            while (values.hasNext())
            {
              sum += values.next().get();
            }
            output.collect(key, new IntWritable(sum));
          }
        }
  
        public static void main(String[] args) throws Exception
       { 
//creating a JobConf object and assigning a job name for identification purposes
          JobConf conf = new JobConf(WordCount.class);
          conf.setJobName("wordcount");
//Setting configuration object with the Data Type of output Key and Value
          conf.setOutputKeyClass(Text.class);
          conf.setOutputValueClass(IntWritable.class);
//Providing the mapper and reducer class names
          conf.setMapperClass(Map.class);
          conf.setCombinerClass(Reduce.class);
          conf.setReducerClass(Reduce.class);
//sets the FileInputFormat and FileOutputFormat types
          conf.setInputFormat(TextInputFormat.class);
          conf.setOutputFormat(TextOutputFormat.class);
//the hdfs input and output directory to be fetched from the command line  
          FileInputFormat.setInputPaths(conf, new Path(args[0]));
          FileOutputFormat.setOutputPath(conf, new Path(args[1]));
  
          JobClient.runJob(conf);
        }
     }


2. Create Sample Input Files (say, file1, file2 in the path /home/hduser/ as before, i.e, in your local file system)

file1 : I watch Game of Thrones. I Watch House MD.
file2 : I love to watch Sherlock too.

hduser@Soumitra-PC:~$ cat file1
I watch Game of Thrones. I Watch House MD.
hduser@Soumitra-PC:~$ cat file2
I love to watch Sherlock too.


3. Make a directory (say, MRClasses in the same path /home/hduser) which will contain the class files(WordCount Class, Map Class and Reduce Class file) after the WordCount.java is compiled: 

hduser@Soumitra-PC:~$ mkdir MRClasses


I have done a ls and checked that the directory MRClasses has been indeed created or not, along with the files file1 and file2 that we have created in step 2.

4. Make a directory(say, WordCount) in HDFS to store the WordCount Program's Input and hold the Program's output. 
For that we create a directory WordCount, and inside that I have created another directory(/WordCount/Input) to store my program's input (i.e, file1 and file2)

hduser@Soumitra-PC:~$ hdfs dfs -mkdir /WordCount /WordCount/Input
 

5. Copy the Input Files from local file system to HDFS (i.e, copy from /home/hduser to /WordCount/Input): 

hduser@Soumitra-PC:~$ hdfs dfs -copyFromLocal /home/hduser/file1 /home/hduser/file2 /WordCount/Input 

We can do a ls to check whether the directories are correctly created or not. 
 

Also, check the contents of the files after copying into HDFS:

hduser@Soumitra-PC:~$ hdfs dfs -copyFromLocal /home/hduser/file /home/hduser/file2 /WordCount/Input

We can do a cat command on the copied files in HDFS to verify whether the file contents are same as we had created in the local file system.  


6. Run the following command to compile the WordCount.java program. This step will create individual classes: WordCount class, Map Class and a Reduce class and store them in the directory MRClasses.  

hduser@Soumitra-PC:~$ sudo javac -classpath /usr/local/hadoop/share/hadoop/common/hadoop-common-2.6.0.jar:/usr/local/hadoop/share/hadoop/common/lib/hadoop-annotations-2.6.0.jar:/usr/local/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.6.0.jar -d /home/hduser/MRClasses /home/hduser/WordCount.java




7. Create a JAR file out of the classes produced in the last step.
hduser@Soumitra-PC:~$ jar -cvf /home/hduser/WordCount.jar -C /home/hduser/MRClasses/ .



8. Execute the JAR file to run the WordCount program.

hduser@Soumitra-PC:~$ hadoop jar WordCount.jar MRExample.WordCount /WordCount/Input /WordCount/Output


hduser@Soumitra-PC:~$ hdfs dfs -ls /WordCount/Output
hduser@Soumitra-PC:~$ hdfs dfs -cat /WordCount/Output/part-00000




Document prepared by Mr. Soumitra Ghosh

Assistant Professor, Information Technology,
C.V.Raman College of Engineering, Bhubaneswar
Contact: soumitraghosh@cvrce.edu.in

1 comment:

  1. Sometime few educational blogs become very helpful while getting relevant and new information related to your targeted area. As I found this blog and appreciate the information delivered to my database. lorem ipsum generator

    ReplyDelete