Understanding Hadoop MapReduce

ELMASLOUHY Mouaad
4 min readApr 9, 2020

--

MapReduce is a clustered data processing framework. Composed of Map and Reduce functions, it distributes data processing tasks between different computers, then reduces the results in a single summary.

How MapReduce works

Map function: It processes each input data, and generates new key-value pairs.

Reduce function: It takes the pairs produced by the mappers and then runs a reducer function on each of them to generate the final output.

Definitions

Job Tracker : Schedules jobs and tracks the assign jobs to Task tracker.

Task Tracker : Tracks the task and reports status to JobTracker.

Job : A program is an execution of a Mapper and Reducer across a dataset.

Task : An execution of a Mapper or a Reducer on a slice of data.

Task Attempt : There is a possibility that anytime any machine can go down,the framework reschedules the task to some other node, The default value of task attempt is 4.

Data Locality : Move computation close to the data rather than data to computation”. A computation requested by an application is much more efficient if it is executed near the data it operates on.

How MapReduce works in Hadoop

  • InputData(files) : The data stored in input files which lives in HDFS.
  • InputFormat: defines how these input files are split and read.
  • InputSplits: created by InputFormat, represent the data which will be processed by an individual Mapper.
  • RecordReader: converts the data into key-value pairs(record) suitable for reading by the mapper.
  • Mapper: The input file is passed to the mapper function line by line. The mapper processes the data and creates several small chunks of data.
  • Combiner: ‘Mini-reducer’ helps to minimize the data transfer between mapper and reducer. to reduce the traffic.
  • Partitioner: Partitioning of output, By hash function, key (or a subset of the key) is used to derive the partition.
  • Shuffling and Sorting: redistribution of data so that the pairs produced by Map having the same keys are on the same machines.
  • Reducer: The Reducer’s job is to process the data that comes from the mapper. After processing, it produces a new set of output, which will be stored in the HDFS.
  • RecordWriter: It writes the key-value pair from the Reducer phase to the output files.
  • OutputFormat: The way these output key-value pairs are written in output files by RecordWriter.

Speculative Execution

if there is a task which take a lot of time(more than 1 min) to be executed, in this case Hadoop doesn’t try to diagnose and fix slow running tasks, instead, it tries to detect them and runs backup tasks for them. This is called speculative execution(Speculative tasks).

  • The speculative task is killed if the original task completes before it.
  • The original task is killed if the speculative task finishes before it.

Enable and Disable Speculative Execution

mapred-site.xml

Java API for MapReduce

MapReduce data types (Box Classes)

Mapper Skeleton

public class TraitMapper extends Mapper<TypKeyIn,TypValIn, TypKeyOut,TypValOut> {  @Override  public void map(TypCleE cleE, TypValE valE, Context context)
throws Exception
{ /** traitement: cleI = ..., valI = ... **/ TypKeyOut KeyOut = new TypKeyOut(...); TypValOut ValOut = new TypValOut(...); context.write(KeyOut, ValOut); }}

Reducer Skeleton

public class TraitReducer extends Reducer<TypKeyIn,TypValIn, TypKeyOut,TypValOut> {   @Override   public void reduce(TypValIn cleI, Iterable<TypValOut> listeI,                                  Context context) throws Exception   {       TypKeyOut KeyOut = new TypKeyOut();       TypValOut ValOut = new TypValOut();       for (TypValIn val: listeI) {        /** traitement: KeyOut.set(...), ValOut.set(...) **/       }       context.write(KeyOut, ValOut);}}

Driver Skeleton(the main class)

public class TraitementDriver extends Configured implements Tool{  public static void main(String[] args) throws Exception {     if (args.length != 2) System.exit(-1);     TraitementDriver traitement = new TraitementDriver();     System.exit( ToolRunner.run(traitement, args) );  }  public int run(String[] args) throws Exception {      Configuration conf = this.getConf();      Job job = Job.getInstance(conf, "traitement");      job.setJarByClass(TraitementDriver.class);      job.setMapperClass(TraitementMapper.class);      job.setReducerClass(TraitementReducer.class);      FileInputFormat.addInputPath(job, new Path(args[0]));      FileOutputFormat.setOutputPath(job, new Path(args[1]));      boolean success = job.waitForCompletion(true);   }   }}

Run

The following command is used to run the MapReduce job by taking the input files from the input directory.

hadoop jar mapred.jar pathToMainClass input_file output_dir

Thanks for Reading!

--

--

ELMASLOUHY Mouaad

Computer science Engineer Student, A lover of everything that urges the mind to work hard such as Quantum Physics, General Medicine, Personal dev…