Hi all. I have a mapreduce program with two jobs. second job's key and value comes from first job output. but I think the second map does not get the result from first job. in other words I think my second job did not read the output of my first job.. what should I do?
here is the code: public class dewpoint extends Configured implements Tool { private static final Logger logger = LoggerFactory.getLogger(dewpoint.class); static final String KEYSPACE = "weather"; static final String COLUMN_FAMILY = "user"; private static final String OUTPUT_PATH1 = "/tmp/intermediate1"; private static final String OUTPUT_PATH2 = "/tmp/intermediate2"; private static final String OUTPUT_PATH3 = "/tmp/intermediate3"; private static final String INPUT_PATH1 = "/tmp/intermediate1"; public static void main(String[] args) throws Exception { ToolRunner.run(new Configuration(), new dewpoint(), args); System.exit(0); } /////////////////////////////////////////////////////////// public static class dpmap1 extends Mapper<Map<String, ByteBuffer>, Map<FloatWritable, ByteBuffer>, Text, DoubleWritable> { DoubleWritable val1 = new DoubleWritable(); Text word = new Text(); String date; float temp; public void map(Map<String, ByteBuffer> keys, Map<FloatWritable, ByteBuffer> columns, Context context) throws IOException, InterruptedException { for (Entry<String, ByteBuffer> key : keys.entrySet()) { //System.out.println(key.getKey()); if (!"date".equals(key.getKey())) continue; date = ByteBufferUtil.string(key.getValue()); word.set(date); } for (Entry<FloatWritable, ByteBuffer> column : columns.entrySet()) { if (!"temprature".equals(column.getKey())) continue; temp = ByteBufferUtil.toFloat(column.getValue()); val1.set(temp); //System.out.println(temp); } context.write(word, val1); } } /////////////////////////////////////////////////////////// public static class dpred1 extends Reducer<Text, DoubleWritable, Text, DoubleWritable> { public void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException { double beta = 17.62; double landa = 243.12; DoubleWritable result1 = new DoubleWritable(); DoubleWritable result2 = new DoubleWritable(); for (DoubleWritable val : values){ // System.out.println(val.get()); beta *= val.get(); landa+=val.get(); } result1.set(beta); result2.set(landa); context.write(key, result1); context.write(key, result2); } } /////////////////////////////////////////////////////////// public static class dpmap2 extends Mapper <Text, DoubleWritable, Text, DoubleWritable>{ Text key2 = new Text(); double temp1, temp2 =0; public void map(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException { String[] sp = values.toString().split("\t"); for (int i=0; i< sp.length; i+=4) //key2.set(sp[i]); System.out.println(sp[i]); for(int j=1;j< sp.length; j+=4) temp1 = Double.valueOf(sp[j]); for (int k=3;k< sp.length; k+=4) temp2 = Double.valueOf(sp[k]); context.write(key2, new DoubleWritable(temp2/temp1)); } } /////////////////////////////////////////////////////////// public static class dpred2 extends Reducer<Text, DoubleWritable, Text, DoubleWritable> { public void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException { double alpha = 6.112; double tmp = 0; DoubleWritable result3 = new DoubleWritable(); for (DoubleWritable val : values){ System.out.println(val.get()); tmp = alpha*(Math.pow(Math.E, val.get())); } result3.set(tmp); context.write(key, result3); } } /////////////////////////////////////////////////////////// public int run(String[] args) throws Exception { Job job1 = new Job(getConf(), "DewPoint"); job1.setJarByClass(dewpoint.class); job1.setMapperClass(dpmap1.class); job1.setOutputFormatClass(SequenceFileOutputFormat.class); job1.setCombinerClass(dpred1.class); job1.setReducerClass(dpred1.class); job1.setMapOutputKeyClass(Text.class); job1.setMapOutputValueClass(DoubleWritable.class); job1.setOutputKeyClass(Text.class); job1.setOutputValueClass(DoubleWritable.class); FileOutputFormat.setOutputPath(job1, new Path(OUTPUT_PATH1)); job1.setInputFormatClass(CqlPagingInputFormat.class); ConfigHelper.setInputRpcPort(job1.getConfiguration(), "9160"); ConfigHelper.setInputInitialAddress(job1.getConfiguration(), "localhost"); ConfigHelper.setInputColumnFamily(job1.getConfiguration(), KEYSPACE, COLUMN_FAMILY); ConfigHelper.setInputPartitioner(job1.getConfiguration(), "Murmur3Partitioner"); CqlConfigHelper.setInputCQLPageRowSize(job1.getConfiguration(), "3"); job1.waitForCompletion(true); /***************************************/ if (job1.isSuccessful()){ Job job2 = new Job(getConf(), "DewPoint"); job2.setJarByClass(dewpoint.class); job2.setMapperClass(dpmap2.class); job2.setCombinerClass(dpred2.class); job2.setReducerClass(dpred2.class); job2.setMapOutputKeyClass(Text.class); job2.setMapOutputValueClass(DoubleWritable.class); job2.setOutputKeyClass(Text.class); job2.setOutputValueClass(DoubleWritable.class); job2.setOutputFormatClass(TextOutputFormat.class); job2.setInputFormatClass(SequenceFileInputFormat.class); FileInputFormat.addInputPath(job2, new Path(OUTPUT_PATH1)); FileOutputFormat.setOutputPath(job2, new Path(OUTPUT_PATH2)); job2.waitForCompletion(true); } /////////////////////////////////////////////////// return 0; } } for example in my second map phase when I do a System.out.println(key) it does not print any thing and in reduce result the value is 'infinity'....