try this Job job = Job.getInstance(conf); Job.setName(name);
Artem Ervits On Feb 21, 2015 10:57 PM, "David Ginzburg" <davidginzb...@gmail.com> wrote: > Hi, > I am trying to run an MR job on emr with AvromultipleOutput > > > > > I get the following exception when running with AMI with hadoop 2.2 2.5 > Found interface org.apache.hadoop.mapreduce.JobContext, but class was > expected > > I read it is related to incompatible hadoop versions, So I modified > > When running with AMI with hadoop 103 I get the following exception: > > java.lang.NullPointerException > at > org.apache.hadoop.io.serializer.SerializationFactory.getSerializer(SerializationFactory.java:73) > at > org.apache.hadoop.mapred.MapTask$MapOutputBuffer.<init>(MapTask.java:981) > at > org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:681) > at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763) > at org.apache.hadoop.mapred.MapTask.run(MapTask.java:375) > at org.apache.hadoop.mapred.Child$4.run(Child.java:259) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:415) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1140) > at org.apache.hadoop.mapred.Child.main(Child.java:253) > > > The driver code is > > Job job = new Job(getConf(), "myad"); > > > > job.setOutputValueClass(NullWritable.class); > > > job.setJarByClass(myAdTextLineMapper.class); > Path inputOflineFiles = new Path(args[0]); > Path inputOfUbberFiles = new Path(args[1]); > > FileInputFormat.setInputPaths(job, inputOflineFiles); > > job.setMapperClass(myAdTextLineMapper.class); > job.setMapOutputKeyClass(Text.class); > job.setMapOutputValueClass(UberRecord.class); > > job.setOutputFormatClass(AvroSequenceFileOutputFormat.class); > AvroJob.setOutputKeySchema(job, Schema.create(Schema.Type.STRING)); > AvroJob.setOutputValueSchema(job, UberRecord.SCHEMA$); > > > job.setReducerClass(myAdReducer.class); > job.setOutputKeyClass(Text.class); > job.setOutputValueClass(UberRecord.class); > job.setNumReduceTasks(2); > String baseoutputFolder = args[2]; > job.getConfiguration().set(myAdReducer.BASE_OUTPUT_FOLDER, > baseoutputFolder); > ; > > LazyOutputFormat.setOutputFormatClass(job,AvroSequenceFileOutputFormat.class); > > FileOutputFormat.setOutputPath(job, new Path(baseoutputFolder)); > return job.waitForCompletion(true) ? 0 : 1; > > > the mapper and reducers > @Override > public void setup(Context ctx) { > > ubp = new UberRecordProcessor(); > } > > @Override > protected void map(LongWritable key, Text value, Context context) > throws IOException, InterruptedException { > try { > handleLineinMap(value); > if(ub!=null){ > context.write(new Text(ub.getAuctionId().toString()), ub); > context.getCounter("myAd", > "myAdTextLineMapper").increment(1); > }else{ > context.getCounter("myAd", > "myAdTextLineMapperNull").increment(1); > } > } catch (Exception e) { > context.getCounter("myAd", > "myAdTextLineMapperError").increment(1); > logger.warn("could not parse line "+value.toString(),e); > > > } > } > > public class myAdReducer extends > Reducer<Text, UberRecord, AvroKey<CharSequence>, > AvroValue<UberRecord>> { > > private static Logger logger = Logger.getLogger(myAdReducer.class); > public static final String BASE_OUTPUT_FOLDER = "base.output.folder"; > AvroMultipleOutputs amos; MultipleOutputs<Text, UberRecord> outputs; > UberRecordProcessor ubp = new UberRecordProcessor(); > // "year=%s/month=%s/day=%s/hour=%s" > private String baseOutputPath; > private long reduceAttemptUniqueIdentifier = > System.currentTimeMillis(); > > // 2015-02-01T18:00:25.673Z > static DateTimeFormatter dateformat = DateTimeFormat > .forPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZZ"); > > @Override > protected void setup(Context context) throws IOException, > InterruptedException { > > amos = new AvroMultipleOutputs(context); > baseOutputPath = > context.getConfiguration().get(BASE_OUTPUT_FOLDER); > > } > > @Override > protected void reduce(Text key, Iterable<UberRecord> values, Context > context) > throws IOException, InterruptedException { > > try { > UberRecord ub = new UberRecord(); > for (UberRecord ubi : values) { > // enrich > if (ubi.getExchange() == null) { > continue; > } > BaseBidRequestEnricher enc = BaseBidRequestEnricher > .getEnricher(ubi.getExchange().toString()); > enc.enrich(ubi); > ub = mergeUB(ub, ubi); > } > logger.info("Writing UberRecord [" + ub.toString() + "]"); > String partition = getPartition(ub); > > // context.write(); > // AvroKey<CharSequence>, AvroValue<UberRecord>> > amos.write(new AvroKey<CharSequence>(key.toString()), > new AvroValue<UberRecord>(ub), baseOutputPath + "/" > + partition + "/p" + > reduceAttemptUniqueIdentifier); > } catch (Exception e) { > // TODO Auto-generated catch block > e.printStackTrace(); > } > } > > public UberRecord mergeUB(UberRecord dest, UberRecord src) { > List<Field> fields = UberRecord.getClassSchema().getFields(); > List<Field> engFields = EngageData.getClassSchema().getFields(); > for (Field field : fields) { > if (field.name().equals("engageData") > && dest.getEngageData() != null) { > EngageData mergedEng = dest.getEngageData(); > for (Field engField : engFields) { > if (dest.getEngageData().get(engField.name()) == null) > { > mergedEng.put(engField.name(), > src.getEngageData().get(engField.name())); > } > > } > dest.setEngageData(mergedEng); > } else { > if (dest.get(field.name()) == null) { > dest.put(field.name(), src.get(field.name())); > } > } > } > return dest; > } > > > >