Understanding Hadoop MapReduce
MapReduce is a clustered data processing framework. Composed of Map and Reduce functions, it distributes data processing tasks between different computers, then reduces the results in a single summary.
How MapReduce works
Map function: It processes each input data, and generates new key-value pairs.
Reduce function: It takes the pairs produced by the mappers and then runs a reducer function on each of them to generate the final output.
Definitions
Job Tracker : Schedules jobs and tracks the assign jobs to Task tracker.
Task Tracker : Tracks the task and reports status to JobTracker.
Job : A program is an execution of a Mapper and Reducer across a dataset.
Task : An execution of a Mapper or a Reducer on a slice of data.
Task Attempt : There is a possibility that anytime any machine can go down,the framework reschedules the task to some other node, The default value of task attempt is 4.
Data Locality : “Move computation close to the data rather than data to computation”. A computation requested by an application is much more efficient if it is executed near the data it operates on.
How MapReduce works in Hadoop
- InputData(files) : The data stored in input files which lives in HDFS.
- InputFormat: defines how these input files are split and read.
- InputSplits: created by InputFormat, represent the data which will be processed by an individual Mapper.
- RecordReader: converts the data into key-value pairs(record) suitable for reading by the mapper.
- Mapper: The input file is passed to the mapper function line by line. The mapper processes the data and creates several small chunks of data.
- Combiner: ‘Mini-reducer’ helps to minimize the data transfer between mapper and reducer. to reduce the traffic.
- Partitioner: Partitioning of output, By hash function, key (or a subset of the key) is used to derive the partition.
- Shuffling and Sorting: redistribution of data so that the pairs produced by Map having the same keys are on the same machines.
- Reducer: The Reducer’s job is to process the data that comes from the mapper. After processing, it produces a new set of output, which will be stored in the HDFS.
- RecordWriter: It writes the key-value pair from the Reducer phase to the output files.
- OutputFormat: The way these output key-value pairs are written in output files by RecordWriter.
Speculative Execution
if there is a task which take a lot of time(more than 1 min) to be executed, in this case Hadoop doesn’t try to diagnose and fix slow running tasks, instead, it tries to detect them and runs backup tasks for them. This is called speculative execution(Speculative tasks).
- The speculative task is killed if the original task completes before it.
- The original task is killed if the speculative task finishes before it.
Enable and Disable Speculative Execution
Java API for MapReduce
Mapper Skeleton
public class TraitMapper extends Mapper<TypKeyIn,TypValIn, TypKeyOut,TypValOut> { @Override public void map(TypCleE cleE, TypValE valE, Context context)
throws Exception { /** traitement: cleI = ..., valI = ... **/ TypKeyOut KeyOut = new TypKeyOut(...); TypValOut ValOut = new TypValOut(...); context.write(KeyOut, ValOut); }}
Reducer Skeleton
public class TraitReducer extends Reducer<TypKeyIn,TypValIn, TypKeyOut,TypValOut> { @Override public void reduce(TypValIn cleI, Iterable<TypValOut> listeI, Context context) throws Exception { TypKeyOut KeyOut = new TypKeyOut(); TypValOut ValOut = new TypValOut(); for (TypValIn val: listeI) { /** traitement: KeyOut.set(...), ValOut.set(...) **/ } context.write(KeyOut, ValOut);}}
Driver Skeleton(the main class)
public class TraitementDriver extends Configured implements Tool{ public static void main(String[] args) throws Exception { if (args.length != 2) System.exit(-1); TraitementDriver traitement = new TraitementDriver(); System.exit( ToolRunner.run(traitement, args) ); } public int run(String[] args) throws Exception { Configuration conf = this.getConf(); Job job = Job.getInstance(conf, "traitement"); job.setJarByClass(TraitementDriver.class); job.setMapperClass(TraitementMapper.class); job.setReducerClass(TraitementReducer.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); boolean success = job.waitForCompletion(true); } }}
Run
The following command is used to run the MapReduce job by taking the input files from the input directory.
hadoop jar mapred.jar pathToMainClass input_file output_dir