Hi Are you sure job1 created output where you expected it and in the format you expect? Have you tested job2 on its own with some hand-crafted test data? Is the output from job1 consistent with your hand-crafted test data for job2?
regards On 25 Oct 2013, at 06:46, Anseh Danesh <anseh.dan...@gmail.com> wrote: > Hi all. > I have a mapreduce program with two jobs. second job's key and value comes > from first job output. but I think the second map does not get the result > from first job. in other words I think my second job did not read the output > of my first job.. what should I do? > > here is the code: > > public class dewpoint extends Configured implements Tool > { > private static final Logger logger = > LoggerFactory.getLogger(dewpoint.class); > > static final String KEYSPACE = "weather"; > static final String COLUMN_FAMILY = "user"; > private static final String OUTPUT_PATH1 = "/tmp/intermediate1"; > private static final String OUTPUT_PATH2 = "/tmp/intermediate2"; > private static final String OUTPUT_PATH3 = "/tmp/intermediate3"; > private static final String INPUT_PATH1 = "/tmp/intermediate1"; > > public static void main(String[] args) throws Exception > { > > ToolRunner.run(new Configuration(), new dewpoint(), args); > System.exit(0); > } > > /////////////////////////////////////////////////////////// > > public static class dpmap1 extends Mapper<Map<String, ByteBuffer>, > Map<FloatWritable, ByteBuffer>, Text, DoubleWritable> > { > DoubleWritable val1 = new DoubleWritable(); > Text word = new Text(); > String date; > float temp; > public void map(Map<String, ByteBuffer> keys, Map<FloatWritable, > ByteBuffer> columns, Context context) throws IOException, InterruptedException > { > > for (Entry<String, ByteBuffer> key : keys.entrySet()) > { > //System.out.println(key.getKey()); > if (!"date".equals(key.getKey())) > continue; > date = ByteBufferUtil.string(key.getValue()); > word.set(date); > } > > > for (Entry<FloatWritable, ByteBuffer> column : columns.entrySet()) > { > if (!"temprature".equals(column.getKey())) > continue; > temp = ByteBufferUtil.toFloat(column.getValue()); > val1.set(temp); > //System.out.println(temp); > } > context.write(word, val1); > } > } > > /////////////////////////////////////////////////////////// > > public static class dpred1 extends Reducer<Text, DoubleWritable, Text, > DoubleWritable> > { > public void reduce(Text key, Iterable<DoubleWritable> values, Context > context) throws IOException, InterruptedException > { > double beta = 17.62; > double landa = 243.12; > DoubleWritable result1 = new DoubleWritable(); > DoubleWritable result2 = new DoubleWritable(); > for (DoubleWritable val : values){ > // System.out.println(val.get()); > beta *= val.get(); > landa+=val.get(); > } > result1.set(beta); > result2.set(landa); > > context.write(key, result1); > context.write(key, result2); > } > } > /////////////////////////////////////////////////////////// > > public static class dpmap2 extends Mapper <Text, DoubleWritable, Text, > DoubleWritable>{ > > Text key2 = new Text(); > double temp1, temp2 =0; > > public void map(Text key, Iterable<DoubleWritable> values, Context > context) throws IOException, InterruptedException { > String[] sp = values.toString().split("\t"); > for (int i=0; i< sp.length; i+=4) > //key2.set(sp[i]); > System.out.println(sp[i]); > for(int j=1;j< sp.length; j+=4) > temp1 = Double.valueOf(sp[j]); > for (int k=3;k< sp.length; k+=4) > temp2 = Double.valueOf(sp[k]); > context.write(key2, new DoubleWritable(temp2/temp1)); > > } > } > > /////////////////////////////////////////////////////////// > > > public static class dpred2 extends Reducer<Text, DoubleWritable, Text, > DoubleWritable> > { > public void reduce(Text key, Iterable<DoubleWritable> values, Context > context) throws IOException, InterruptedException > { > > double alpha = 6.112; > double tmp = 0; > DoubleWritable result3 = new DoubleWritable(); > for (DoubleWritable val : values){ > System.out.println(val.get()); > tmp = alpha*(Math.pow(Math.E, val.get())); > > } > result3.set(tmp); > context.write(key, result3); > > > } > } > > > /////////////////////////////////////////////////////////// > > > public int run(String[] args) throws Exception > { > > Job job1 = new Job(getConf(), "DewPoint"); > job1.setJarByClass(dewpoint.class); > job1.setMapperClass(dpmap1.class); > job1.setOutputFormatClass(SequenceFileOutputFormat.class); > job1.setCombinerClass(dpred1.class); > job1.setReducerClass(dpred1.class); > job1.setMapOutputKeyClass(Text.class); > job1.setMapOutputValueClass(DoubleWritable.class); > job1.setOutputKeyClass(Text.class); > job1.setOutputValueClass(DoubleWritable.class); > FileOutputFormat.setOutputPath(job1, new Path(OUTPUT_PATH1)); > > > job1.setInputFormatClass(CqlPagingInputFormat.class); > > ConfigHelper.setInputRpcPort(job1.getConfiguration(), "9160"); > ConfigHelper.setInputInitialAddress(job1.getConfiguration(), > "localhost"); > ConfigHelper.setInputColumnFamily(job1.getConfiguration(), KEYSPACE, > COLUMN_FAMILY); > ConfigHelper.setInputPartitioner(job1.getConfiguration(), > "Murmur3Partitioner"); > > CqlConfigHelper.setInputCQLPageRowSize(job1.getConfiguration(), "3"); > job1.waitForCompletion(true); > > /***************************************/ > > if (job1.isSuccessful()){ > Job job2 = new Job(getConf(), "DewPoint"); > job2.setJarByClass(dewpoint.class); > job2.setMapperClass(dpmap2.class); > job2.setCombinerClass(dpred2.class); > job2.setReducerClass(dpred2.class); > job2.setMapOutputKeyClass(Text.class); > job2.setMapOutputValueClass(DoubleWritable.class); > job2.setOutputKeyClass(Text.class); > job2.setOutputValueClass(DoubleWritable.class); > job2.setOutputFormatClass(TextOutputFormat.class); > job2.setInputFormatClass(SequenceFileInputFormat.class); > FileInputFormat.addInputPath(job2, new Path(OUTPUT_PATH1)); > FileOutputFormat.setOutputPath(job2, new Path(OUTPUT_PATH2)); > job2.waitForCompletion(true); > } > /////////////////////////////////////////////////// > > return 0; > } > } > > for example in my second map phase when I do a System.out.println(key) it > does not print any thing and in reduce result the value is 'infinity'....