What is Partitioner in MapReduce?

 

Partitioner in MapReduce

Intermediate-outputs in the key-value pairs partitioned by a partitioner. The number of reducer tasks is equal to the number of partitions in the job.

Implementation

 Let us take some employee details from the ONLINEITGURU company as an input table  with the name employee.

Emp_idnameagegendersalary
6001aaaaa45Male50,000
6002bbbbb40Female50,000
6003ccccc34Male30,000
6004ddddd30Male30,000
6005eeeee20Male40,000
6006fffff25Female35,000
6007ggggg20Female15,000
6008hhhhh19Female15,000
6009iiiii22Male22,000
6010jjjjj24Male25,000
6011kkkk25Male25,000
6012hhhh28Male20,000
6013tttt18Female8,000

To find the highest salaried employee  by gender in different age group
In “/home/hadoop/hadoopPartitioner” data is saved by the input.txt.

6001aaaaa45Male50,000
6002bbbbb40Female50,000
6003ccccc34Male30,000
6004ddddd30Male30,000
6005eeeee20Male40,000
6006fffff25Female35,000
6007ggggg20Female15,000
6008hhhhh19Female15,000
6009iiiii22Male22,000
6010jjjjj24Male25,000
6011kkkk25Male25,000
6012hhhh28Male20,000
6013tttt18Female8,000

Maptasks:

Maptask takes key-value pairs  as an input.

 Input: The key pattern should  like “special key + filename + line number”

For example: key = #onlineitguru.

Method:

  • Split function helps to separate the gender.
  • Value(record data);
  • Send the gender information.
String[] str = value.toString().split("t", -3);
String gender=str[3];
context.write(new Text(gender), new Text(value));

Output: To the partition task, the data value is used as output, key – value pair from the map task.

To learn complete course go through Big Data Online Training Blog

Partitioner task: In the partition process data is divided into smaller segments.In this scenario based on the age criteria the key-value pair is divided into three  parts.

  • Key-value pairs collection
  • Key = the value of a gender field in the record
  • Value =the gender data value in the record

 Method

 Read the age field from the key-value pair as an input.

String[] str = value.toString().split("t");int age = Integer.parseInt(str[2]);
With the following condition check the value of age.
    • Age <= 20


    • Age > 20 AND <= 30


    • Age > 30

if(age<=20)  {       return 0;   } else if(age>20 && age<=30)    {           return 1 % numReduceTasks;      } else      {            return 2 % numReduceTasks; }
Output: The output  data  are segmented into three sets of key-value pairs.
Reduce task: We have to execute three reduce task here, because  the total number of partitioner is equal to the total number of  reduce task.
Input: With different sets of key-value pairs reducer will execute three times.

  • Key = gender field value
  • Value = gender data record

    Method: Read each record of  salary field value.

String [] str = val.toString().split("t", -3);
Note: str[4] have the salary field value.
if(Integer.parseInt(str[4])>max)
{   
max=Integer.parseInt(str[4]);
}

Check the  salary with a maximum (max) variable, if   str[4] is  a maximum then assign the str[4]  to a maximum, otherwise skip this step.

Execute the step 1 and step 2  repeatedly for each key-value pair.

context.write(new Text(key), new IntWritable(max));

Output:   we will get three collections with different age group.

With respect to each age group,

  • Maximum salary from male group
  • Maximum salary from female group

In the configuration these below  jobs should be specified

  • Job
  • Input and Output formats of keys and values
  • Individual classes for Map, Reduce, and Partitioner tasks
Configuration conf = getConf(); 
//Create JobJob job = new Job(conf, "max_sal");
job.setJarByClass(PartitionerExample.class); 
// File Input and Output pathsFileInputFormat.setInputPaths(job, new Path(arg[0]));
FileOutputFormat.setOutputPath(job,new Path(arg[1]));
 //Set Mapper class and Output format for key-value pair.job.setMapperClass(MapClass.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class); 
//set partitioner statementjob.setPartitionerClass(CaderPartitioner.class);
 //Set Reducer class and Input/Output format for key-value pair.job.setReducerClass(ReduceClass.class); 
//Number of Reducer tasks.job.setNumReduceTasks(3); 
//Input and Output format for datajob.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
Example:
package onlineitguru_emp;
import java.io.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import org.apache.hadoop.util.*;
public class onlineitguru_emp extends Configured implements Tool
{
 //Map class
 public static class MapClass extends Mapper<LongWritable,Text,Text,Text>
 {
 public void map(LongWritable key, Text value, Context context)
 {
 try{
 String[] str = value.toString().split("t", -3);
 String gender=str[3];
 context.write(new Text(gender), new Text(value));
 }
 catch(Exception e)
 {
 System.out.println(e.getMessage());
 }
 }
 }
 //Reducer class
 public static class ReduceClass extends Reducer<Text,Text,Text,IntWritable>
 {
 public int max = -1;
 public void reduce(Text key, Iterable <Text> values, Context context) throws IOException, InterruptedException
 {
 max = -1;
 for (Text val : values)
 {
 String [] str = val.toString().split("t", -3);
 if(Integer.parseInt(str[4])>max)
 max=Integer.parseInt(str[4]);
 }
 context.write(new Text(key), new IntWritable(max));
 }
 }
 public static class CaderPartitioner extends
 Partitioner < Text, Text >
 {
 @Override
 public int getPartition(Text key, Text value, int numReduceTasks)
 {
 String[] str = value.toString().split("t");
 int age = Integer.parseInt(str[2]);
 if(numReduceTasks == 0)
 {
 return 0;
 }
 if(age<=20)
 {
 return 0;
 }
 else if(age>20 && age<=30)
 {
 return 1 % numReduceTasks;
 }
 else
 {
 return 2 % numReduceTasks;
 }
 }
 }
 @Override
 public int run(String[] arg) throws Exception
 {
 Configuration conf = getConf();
 Job job = new Job(conf, "topsal");
 job.setJarByClass(onlineitguru_emp.class);
 FileInputFormat.setInputPaths(job, new Path(arg[0]));
 FileOutputFormat.setOutputPath(job,new Path(arg[1]));
 job.setMapperClass(MapClass.class);
 job.setMapOutputKeyClass(Text.class);
 job.setMapOutputValueClass(Text.class);
 //set partitioner statement
 job.setPartitionerClass(CaderPartitioner.class);
 job.setReducerClass(ReduceClass.class);
 job.setNumReduceTasks(3);
 job.setInputFormatClass(TextInputFormat.class);
 job.setOutputFormatClass(TextOutputFormat.class);
 job.setOutputKeyClass(Text.class);
 job.setOutputValueClass(Text.class);
 System.exit(job.waitForCompletion(true)? 0 : 1);
 return 0;
 }
 public static void main(String ar[]) throws Exception
 {
 int res = ToolRunner.run(new Configuration(), new onlineitguru_emp(),ar);
System.exit(0);
 }
}

Save the above program by the name  onlineitguru_emp.java in “/home/hadoop/hadoopPartitioner”.


Download the jar using the following link http://mvnrepository.com/artifact/org.apache.hadoop/hadoop-core/1.2.

To get certification course visit:Big Data Hadoop Course

No comments:

Powered by Blogger.