Hi,

I have created a Mapper class[3] that filters out key values pairs that go to a specific partition. When I set the partition class in my code [1], I get the error in [2] and I don’t understand why this is happening. Any help to fix this?

[1]

|Configuration conf = cj.getConfiguration();
cj.setPartitionerClass(MyFilterMapper.class);
|

[2]

|The method setPartitionerClass(Class<? extends Partitioner>) in the type Job is not 
applicable for the arguments (Class<JobExecution.MyFilterMapper>)
|

[3]

|public static class MyFilterMapper
    extends Mapper<Object, Text, Text, IntWritable>{

        private Text word = new Text();
        private IntWritable rvalue = new IntWritable();

        public static final String REDUCE_TASK_REEXECUTE =
                "mapreduce.reduce.task.reexecute";
        public static final int NULL_REDUCE_TASK = -1;

        private Class<? extends Partitioner<?, ?>> partitionerClass;
        private org.apache.hadoop.mapreduce.Partitioner<Object, Text> 
partitionerInstance;

        public void map(Object key, Text value, Context context
                ) throws IOException, InterruptedException {
            Configuration conf = context.getConfiguration();
            partitionerInstance = new MyHashPartitioner<Object, Text>();

            int[] task_reexecute = conf.getInts(REDUCE_TASK_REEXECUTE);
            int nr_reduce_tasks = conf.getInt("mapreduce.job.reduces", 0);
            System.out.println("Tasks reexecute: " + task_reexecute + " 
NRREDUCETASKS: " + nr_reduce_tasks);
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                String wword = itr.nextToken();
                Integer rrvalue = Integer.valueOf(itr.nextToken());
                int partition = partitionerInstance.getPartition(wword, value, 
nr_reduce_tasks);

                if (contains(partition, task_reexecute)) {
                    System.out.println("Partition Consumed: " + partition + " - key: " + 
key.toString() + " word: " + wword + " value - " + value.toString());
                    System.out.println("Partition Consumed: " + partition + " - word: " + 
wword + " value - ");// + rrvalue);

                    word.set(wword);
                    rvalue.set(rrvalue);
                    context.write(word, rvalue);
                }
            }
        }

        public boolean contains(int partition, int[] set) {
            for(int i=0; i<set.length; i++){
                if (partition == set[i])
                    return true;
            }

            return false;
        }
    }
|

​

--
--
Thanks,

Reply via email to