---------- Forwarded message ---------- From: Vikas Jadhav <vikascjadha...@gmail.com> Date: Thu, Jan 31, 2013 at 11:14 PM Subject: Re: Issue with Reduce Side join using datajoin package To: user@hadoop.apache.org
***************source **************** public class MyJoin extends Configured implements Tool { public static class MapClass extends DataJoinMapperBase { protected Text generateInputTag(String inputFile) { System.out.println("Starting generateInputTag() : "+inputFile); String datasource = inputFile.split("-")[0]; return new Text(datasource); } protected Text generateGroupKey(TaggedMapOutput aRecord) { System.out.println(" Statring generateGroupKey() : "+aRecord); String line = ((Text) aRecord.getData()).toString(); String[] tokens = line.split(","); String groupKey = tokens[0]; return new Text(groupKey); } protected TaggedMapOutput generateTaggedMapOutput(Object value) { System.out.println("starting generateTaggedMapOutput() value : "+value); TaggedWritable retv = new TaggedWritable((Text) value); retv.setTag(this.inputTag); return retv; } } public static class Reduce extends DataJoinReducerBase { protected TaggedMapOutput combine(Object[] tags, Object[] values) { System.out.println("combine :"); if (tags.length < 2) return null; String joinedStr = ""; for (int i=0; i<values.length; i++) { if (i > 0) joinedStr += ","; TaggedWritable tw = (TaggedWritable) values[i]; String line = ((Text) tw.getData()).toString(); String[] tokens = line.split(",", 2); joinedStr += tokens[1]; } TaggedWritable retv = new TaggedWritable(new Text(joinedStr)); retv.setTag((Text) tags[0]); return retv; } } public static class TaggedWritable extends TaggedMapOutput { private Writable data; public TaggedWritable() { this.tag = new Text(); }//end empty( taking no parameters) constructor TaggedWritable public TaggedWritable(Writable data) { this.tag = new Text(""); this.data = data; } public Writable getData() { return data; } public void write(DataOutput out) throws IOException { //System.out.println("); this.tag.write(out); this.data.write(out); System.out.println("Tag :"+tag+" Data :"+ data); } /* public void readFields(DataInput in) throws IOException { System.out.println(" Starting short readFields(): "+ in); this.tag.readFields(in); this.data.readFields(in); } */ public void readFields(DataInput in) throws IOException { System.out.println(" Starting short readFields(): "+ in); this.tag.readFields(in); String w = in.toString(); if(this.data == null) try { this.data =(Writable) ReflectionUtils.newInstance(Class.forName(w), null); } catch (ClassNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } this.data.readFields(in); } } public int run(String[] args) throws Exception { System.out.println("Starting run() Method:"); Configuration conf = getConf(); conf.addResource(new Path("/home/vikas/project/hadoop-1.0.3/conf/core-site.xml")); conf.addResource(new Path("/home/vikas/project/hadoop-1.0.3/conf/mapred-site.xml")); conf.addResource(new Path("/home/vikas/project/hadoop-1.0.3/conf/hdfs-site.xml")); JobConf job = new JobConf(conf, MyJoin.class); Path in = new Path(args[0]); Path out = new Path(args[1]); FileInputFormat.setInputPaths(job, in); FileOutputFormat.setOutputPath(job, out); job.setJobName("DataJoin_cust X order"); job.setMapperClass(MapClass.class); job.setReducerClass(Reduce.class); job.setInputFormat(TextInputFormat.class); job.setOutputFormat(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(TaggedWritable.class); job.set("mapred.textoutputformat.separator", ","); JobClient.runJob(job); return 0; } public static void main(String[] args) throws Exception { System.out.println("Starting main() function:"); int res = ToolRunner.run(new Configuration(), new MyJoin(), args); System.exit(res); } } *************************and error********************************************* 13/01/31 23:04:26 INFO util.NativeCodeLoader: Loaded the native-hadoop library 13/01/31 23:04:26 WARN snappy.LoadSnappy: Snappy native library not loaded 13/01/31 23:04:26 INFO mapred.FileInputFormat: Total input paths to process : 2 13/01/31 23:04:26 INFO mapred.JobClient: Running job: job_201301312254_0004 13/01/31 23:04:27 INFO mapred.JobClient: map 0% reduce 0% 13/01/31 23:04:41 INFO mapred.JobClient: map 66% reduce 0% 13/01/31 23:04:47 INFO mapred.JobClient: map 100% reduce 0% 13/01/31 23:04:50 INFO mapred.JobClient: map 100% reduce 22% 13/01/31 23:04:58 INFO mapred.JobClient: Task Id : attempt_201301312254_0004_r_000000_0, Status : FAILED java.lang.NullPointerException at MyJoin$TaggedWritable.readFields(MyJoin.java:125) at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:67) at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:40) at org.apache.hadoop.mapred.Task$ValuesIterator.readNextValue(Task.java:1271) at org.apache.hadoop.mapred.Task$ValuesIterator.next(Task.java:1211) at org.apache.hadoop.mapred.ReduceTask$ReduceValuesIterator.moveToNext(ReduceTask.java:249) at org.apache.hadoop.mapred.ReduceTask$ReduceValuesIterator.next(ReduceTask.java:245) at com.sas.join.DataJoinReducerBase.regroup(DataJoinReducerBase.java:107) at com.sas.join.DataJoinReducerBase.reduce(DataJoinReducerBase.java:132) at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:519) at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:420) at org.apache.hadoop.mapred.Child$4.run(Child.java:255) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:416) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121) at org.apache.hadoop.mapred.Child.main(Child.java:249) -- * * * Thanx and Regards* * Vikas Jadhav*