yes.. thanks for the reply..
On Fri, Oct 25, 2013 at 1:46 PM, Dieter De Witte <drdwi...@gmail.com> wrote: > The question is also on stackoverflow, the problem is that she divides by > zero in the second mapper I think. (the logs show that both jobs have a > data flow.. > > > 2013/10/25 Robin East <robin.e...@xense.co.uk> > >> Hi >> >> Are you sure job1 created output where you expected it and in the format >> you expect? Have you tested job2 on its own with some hand-crafted test >> data? Is the output from job1 consistent with your hand-crafted test data >> for job2? >> >> regards >> On 25 Oct 2013, at 06:46, Anseh Danesh <anseh.dan...@gmail.com> wrote: >> >> Hi all. >> >> I have a mapreduce program with two jobs. second job's key and value >> comes from first job output. but I think the second map does not get the >> result from first job. in other words I think my second job did not read >> the output of my first job.. what should I do? >> >> here is the code: >> >> public class dewpoint extends Configured implements Tool >> { >> private static final Logger logger = >> LoggerFactory.getLogger(dewpoint.class); >> >> static final String KEYSPACE = "weather"; >> static final String COLUMN_FAMILY = "user"; >> private static final String OUTPUT_PATH1 = "/tmp/intermediate1"; >> private static final String OUTPUT_PATH2 = "/tmp/intermediate2"; >> private static final String OUTPUT_PATH3 = "/tmp/intermediate3"; >> private static final String INPUT_PATH1 = "/tmp/intermediate1"; >> >> public static void main(String[] args) throws Exception >> { >> >> ToolRunner.run(new Configuration(), new dewpoint(), args); >> System.exit(0); >> } >> >> /////////////////////////////////////////////////////////// >> >> public static class dpmap1 extends Mapper<Map<String, ByteBuffer>, >> Map<FloatWritable, ByteBuffer>, Text, DoubleWritable> >> { >> DoubleWritable val1 = new DoubleWritable(); >> Text word = new Text(); >> String date; >> float temp; >> public void map(Map<String, ByteBuffer> keys, Map<FloatWritable, >> ByteBuffer> columns, Context context) throws IOException, >> InterruptedException >> { >> >> for (Entry<String, ByteBuffer> key : keys.entrySet()) >> { >> //System.out.println(key.getKey()); >> if (!"date".equals(key.getKey())) >> continue; >> date = ByteBufferUtil.string(key.getValue()); >> word.set(date); >> } >> >> >> for (Entry<FloatWritable, ByteBuffer> column : columns.entrySet()) >> { >> if (!"temprature".equals(column.getKey())) >> continue; >> temp = ByteBufferUtil.toFloat(column.getValue()); >> val1.set(temp); >> //System.out.println(temp); >> } >> context.write(word, val1); >> } >> } >> >> /////////////////////////////////////////////////////////// >> >> public static class dpred1 extends Reducer<Text, DoubleWritable, Text, >> DoubleWritable> >> { >> public void reduce(Text key, Iterable<DoubleWritable> values, Context >> context) throws IOException, InterruptedException >> { >> double beta = 17.62; >> double landa = 243.12; >> DoubleWritable result1 = new DoubleWritable(); >> DoubleWritable result2 = new DoubleWritable(); >> for (DoubleWritable val : values){ >> // System.out.println(val.get()); >> beta *= val.get(); >> landa+=val.get(); >> } >> result1.set(beta); >> result2.set(landa); >> >> context.write(key, result1); >> context.write(key, result2); >> } >> } >> /////////////////////////////////////////////////////////// >> >> public static class dpmap2 extends Mapper <Text, DoubleWritable, Text, >> DoubleWritable>{ >> >> Text key2 = new Text(); >> double temp1, temp2 =0; >> >> public void map(Text key, Iterable<DoubleWritable> values, Context >> context) throws IOException, InterruptedException { >> String[] sp = values.toString().split("\t"); >> for (int i=0; i< sp.length; i+=4) >> //key2.set(sp[i]); >> System.out.println(sp[i]); >> for(int j=1;j< sp.length; j+=4) >> temp1 = Double.valueOf(sp[j]); >> for (int k=3;k< sp.length; k+=4) >> temp2 = Double.valueOf(sp[k]); >> context.write(key2, new DoubleWritable(temp2/temp1)); >> >> } >> } >> >> /////////////////////////////////////////////////////////// >> >> >> public static class dpred2 extends Reducer<Text, DoubleWritable, Text, >> DoubleWritable> >> { >> public void reduce(Text key, Iterable<DoubleWritable> values, Context >> context) throws IOException, InterruptedException >> { >> >> double alpha = 6.112; >> double tmp = 0; >> DoubleWritable result3 = new DoubleWritable(); >> for (DoubleWritable val : values){ >> System.out.println(val.get()); >> tmp = alpha*(Math.pow(Math.E, val.get())); >> >> } >> result3.set(tmp); >> context.write(key, result3); >> >> >> } >> } >> >> >> /////////////////////////////////////////////////////////// >> >> >> public int run(String[] args) throws Exception >> { >> >> Job job1 = new Job(getConf(), "DewPoint"); >> job1.setJarByClass(dewpoint.class); >> job1.setMapperClass(dpmap1.class); >> job1.setOutputFormatClass(SequenceFileOutputFormat.class); >> job1.setCombinerClass(dpred1.class); >> job1.setReducerClass(dpred1.class); >> job1.setMapOutputKeyClass(Text.class); >> job1.setMapOutputValueClass(DoubleWritable.class); >> job1.setOutputKeyClass(Text.class); >> job1.setOutputValueClass(DoubleWritable.class); >> FileOutputFormat.setOutputPath(job1, new Path(OUTPUT_PATH1)); >> >> >> job1.setInputFormatClass(CqlPagingInputFormat.class); >> >> ConfigHelper.setInputRpcPort(job1.getConfiguration(), "9160"); >> ConfigHelper.setInputInitialAddress(job1.getConfiguration(), >> "localhost"); >> ConfigHelper.setInputColumnFamily(job1.getConfiguration(), KEYSPACE, >> COLUMN_FAMILY); >> ConfigHelper.setInputPartitioner(job1.getConfiguration(), >> "Murmur3Partitioner"); >> >> CqlConfigHelper.setInputCQLPageRowSize(job1.getConfiguration(), "3"); >> job1.waitForCompletion(true); >> >> /***************************************/ >> >> if (job1.isSuccessful()){ >> Job job2 = new Job(getConf(), "DewPoint"); >> job2.setJarByClass(dewpoint.class); >> job2.setMapperClass(dpmap2.class); >> job2.setCombinerClass(dpred2.class); >> job2.setReducerClass(dpred2.class); >> job2.setMapOutputKeyClass(Text.class); >> job2.setMapOutputValueClass(DoubleWritable.class); >> job2.setOutputKeyClass(Text.class); >> job2.setOutputValueClass(DoubleWritable.class); >> job2.setOutputFormatClass(TextOutputFormat.class); >> job2.setInputFormatClass(SequenceFileInputFormat.class); >> FileInputFormat.addInputPath(job2, new Path(OUTPUT_PATH1)); >> FileOutputFormat.setOutputPath(job2, new Path(OUTPUT_PATH2)); >> job2.waitForCompletion(true); >> } >> /////////////////////////////////////////////////// >> >> return 0; >> } >> } >> >> for example in my second map phase when I do a System.out.println(key) it >> does not print any thing and in reduce result the value is 'infinity'.... >> >> >> >