As the error tells you, you cannot use a class as a Partitioner if it does
not satisfy the interface requirements of the partitioning mechanism. You
need to set a class a Partitioner which extends or implements the Partioner
contract.

Regards,
Shahab

On Wed, Apr 1, 2015 at 10:54 AM, xeonmailinglist-gmail <
xeonmailingl...@gmail.com> wrote:

>  Hi,
>
> I have created a Mapper class[3] that filters out key values pairs that go
> to a specific partition. When I set the partition class in my code [1], I
> get the error in [2] and I don’t understand why this is happening. Any help
> to fix this?
>
> [1]
>
> Configuration conf = cj.getConfiguration();
> cj.setPartitionerClass(MyFilterMapper.class);
>
> [2]
>
> The method setPartitionerClass(Class<? extends Partitioner>) in the type Job 
> is not applicable for the arguments (Class<JobExecution.MyFilterMapper>)
>
> [3]
>
> public static class MyFilterMapper
>     extends Mapper<Object, Text, Text, IntWritable>{
>
>         private Text word = new Text();
>         private IntWritable rvalue = new IntWritable();
>
>         public static final String REDUCE_TASK_REEXECUTE =
>                 "mapreduce.reduce.task.reexecute";
>         public static final int NULL_REDUCE_TASK = -1;
>
>         private Class<? extends Partitioner<?, ?>> partitionerClass;
>         private org.apache.hadoop.mapreduce.Partitioner<Object, Text> 
> partitionerInstance;
>
>         public void map(Object key, Text value, Context context
>                 ) throws IOException, InterruptedException {
>             Configuration conf = context.getConfiguration();
>             partitionerInstance = new MyHashPartitioner<Object, Text>();
>
>             int[] task_reexecute = conf.getInts(REDUCE_TASK_REEXECUTE);
>             int nr_reduce_tasks = conf.getInt("mapreduce.job.reduces", 0);
>             System.out.println("Tasks reexecute: " + task_reexecute + " 
> NRREDUCETASKS: " + nr_reduce_tasks);
>             StringTokenizer itr = new StringTokenizer(value.toString());
>             while (itr.hasMoreTokens()) {
>                 String wword = itr.nextToken();
>                 Integer rrvalue = Integer.valueOf(itr.nextToken());
>                 int partition = partitionerInstance.getPartition(wword, 
> value, nr_reduce_tasks);
>
>                 if (contains(partition, task_reexecute)) {
>                     System.out.println("Partition Consumed: " + partition + " 
> - key: " + key.toString() + " word: " + wword + " value - " + 
> value.toString());
>                     System.out.println("Partition Consumed: " + partition + " 
> - word: " + wword + " value - ");// + rrvalue);
>
>                     word.set(wword);
>                     rvalue.set(rrvalue);
>                     context.write(word, rvalue);
>                 }
>             }
>         }
>
>         public boolean contains(int partition, int[] set) {
>             for(int i=0; i<set.length; i++){
>                 if (partition == set[i])
>                     return true;
>             }
>
>             return false;
>         }
>     }
>
> ​
>
> --
> --
> Thanks,
>
>

Reply via email to