Hi,
I have created a Mapper class[3] that filters out key values pairs that
go to a specific partition. When I set the partition class in my code
[1], I get the error in [2] and I don’t understand why this is
happening. Any help to fix this?
[1]
|Configuration conf = cj.getConfiguration();
cj.setPartitionerClass(MyFilterMapper.class);
|
[2]
|The method setPartitionerClass(Class<? extends Partitioner>) in the type Job is not
applicable for the arguments (Class<JobExecution.MyFilterMapper>)
|
[3]
|public static class MyFilterMapper
extends Mapper<Object, Text, Text, IntWritable>{
private Text word = new Text();
private IntWritable rvalue = new IntWritable();
public static final String REDUCE_TASK_REEXECUTE =
"mapreduce.reduce.task.reexecute";
public static final int NULL_REDUCE_TASK = -1;
private Class<? extends Partitioner<?, ?>> partitionerClass;
private org.apache.hadoop.mapreduce.Partitioner<Object, Text>
partitionerInstance;
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
partitionerInstance = new MyHashPartitioner<Object, Text>();
int[] task_reexecute = conf.getInts(REDUCE_TASK_REEXECUTE);
int nr_reduce_tasks = conf.getInt("mapreduce.job.reduces", 0);
System.out.println("Tasks reexecute: " + task_reexecute + "
NRREDUCETASKS: " + nr_reduce_tasks);
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
String wword = itr.nextToken();
Integer rrvalue = Integer.valueOf(itr.nextToken());
int partition = partitionerInstance.getPartition(wword, value,
nr_reduce_tasks);
if (contains(partition, task_reexecute)) {
System.out.println("Partition Consumed: " + partition + " - key: " +
key.toString() + " word: " + wword + " value - " + value.toString());
System.out.println("Partition Consumed: " + partition + " - word: " +
wword + " value - ");// + rrvalue);
word.set(wword);
rvalue.set(rrvalue);
context.write(word, rvalue);
}
}
}
public boolean contains(int partition, int[] set) {
for(int i=0; i<set.length; i++){
if (partition == set[i])
return true;
}
return false;
}
}
|
--
--
Thanks,