Understanding Hadoop MapReduce

MapReduce is a clustered data processing framework. Composed of Map and Reduce functions, it distributes data processing tasks between different computers, then reduces the results in a single summary.

How MapReduce works

Map function: It processes each input data, and generates new key-value pairs.

Reduce function: It takes the pairs produced by the mappers and then runs a reducer function on each of them to generate the final output.


Job Tracker : Schedules jobs and tracks the assign jobs to Task tracker.

Task Tracker : Tracks the task and reports status to JobTracker.

Job : A program is an execution of a Mapper and Reducer across a dataset.

Task : An execution of a Mapper or a Reducer on a slice of data.

Task Attempt : There is a possibility that anytime any machine can go down,the framework reschedules the task to some other node, The default value of task attempt is 4.

Data Locality : Move computation close to the data rather than data to computation”. A computation requested by an application is much more efficient if it is executed near the data it operates on.

How MapReduce works in Hadoop

  • InputData(files) : The data stored in input files which lives in HDFS.
  • InputFormat: defines how these input files are split and read.
  • InputSplits: created by InputFormat, represent the data which will be processed by an individual Mapper.
  • RecordReader: converts the data into key-value pairs(record) suitable for reading by the mapper.
  • Mapper: The input file is passed to the mapper function line by line. The mapper processes the data and creates several small chunks of data.
  • Combiner: ‘Mini-reducer’ helps to minimize the data transfer between mapper and reducer. to reduce the traffic.
  • Partitioner: Partitioning of output, By hash function, key (or a subset of the key) is used to derive the partition.
  • Shuffling and Sorting: redistribution of data so that the pairs produced by Map having the same keys are on the same machines.
  • Reducer: The Reducer’s job is to process the data that comes from the mapper. After processing, it produces a new set of output, which will be stored in the HDFS.
  • RecordWriter: It writes the key-value pair from the Reducer phase to the output files.
  • OutputFormat: The way these output key-value pairs are written in output files by RecordWriter.

Speculative Execution

if there is a task which take a lot of time(more than 1 min) to be executed, in this case Hadoop doesn’t try to diagnose and fix slow running tasks, instead, it tries to detect them and runs backup tasks for them. This is called speculative execution(Speculative tasks).

  • The speculative task is killed if the original task completes before it.
  • The original task is killed if the speculative task finishes before it.

Enable and Disable Speculative Execution


Java API for MapReduce

MapReduce data types (Box Classes)

Mapper Skeleton

public class TraitMapper extends Mapper<TypKeyIn,TypValIn, TypKeyOut,TypValOut> {  @Override  public void map(TypCleE cleE, TypValE valE, Context context)
throws Exception
{ /** traitement: cleI = ..., valI = ... **/ TypKeyOut KeyOut = new TypKeyOut(...); TypValOut ValOut = new TypValOut(...); context.write(KeyOut, ValOut); }}

Reducer Skeleton

public class TraitReducer extends Reducer<TypKeyIn,TypValIn, TypKeyOut,TypValOut> {   @Override   public void reduce(TypValIn cleI, Iterable<TypValOut> listeI,                                  Context context) throws Exception   {       TypKeyOut KeyOut = new TypKeyOut();       TypValOut ValOut = new TypValOut();       for (TypValIn val: listeI) {        /** traitement: KeyOut.set(...), ValOut.set(...) **/       }       context.write(KeyOut, ValOut);}}

Driver Skeleton(the main class)

public class TraitementDriver extends Configured implements Tool{  public static void main(String[] args) throws Exception {     if (args.length != 2) System.exit(-1);     TraitementDriver traitement = new TraitementDriver();     System.exit( ToolRunner.run(traitement, args) );  }  public int run(String[] args) throws Exception {      Configuration conf = this.getConf();      Job job = Job.getInstance(conf, "traitement");      job.setJarByClass(TraitementDriver.class);      job.setMapperClass(TraitementMapper.class);      job.setReducerClass(TraitementReducer.class);      FileInputFormat.addInputPath(job, new Path(args[0]));      FileOutputFormat.setOutputPath(job, new Path(args[1]));      boolean success = job.waitForCompletion(true);   }   }}


The following command is used to run the MapReduce job by taking the input files from the input directory.

hadoop jar mapred.jar pathToMainClass input_file output_dir

