try this

Job job = Job.getInstance(conf);

Artem Ervits
On Feb 21, 2015 10:57 PM, "David Ginzburg" <> wrote:

> Hi,
> I am trying to run an MR job on emr with AvromultipleOutput
> I get the following exception when running with AMI with hadoop 2.2 2.5
> Found interface org.apache.hadoop.mapreduce.JobContext, but class was
> expected
> I read it is related to incompatible hadoop versions, So I modified
> When running with AMI with hadoop 103 I get the following exception:
> java.lang.NullPointerException
>     at
>     at
> org.apache.hadoop.mapred.MapTask$MapOutputBuffer.<init>(
>     at
> org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(
>     at org.apache.hadoop.mapred.MapTask.runNewMapper(
>     at
>     at org.apache.hadoop.mapred.Child$
>     at Method)
>     at
>     at
>     at org.apache.hadoop.mapred.Child.main(
> The driver code is
> Job job = new Job(getConf(), "myad");
>         job.setOutputValueClass(NullWritable.class);
>         job.setJarByClass(myAdTextLineMapper.class);
>         Path inputOflineFiles = new Path(args[0]);
>         Path inputOfUbberFiles = new Path(args[1]);
>          FileInputFormat.setInputPaths(job, inputOflineFiles);
>         job.setMapperClass(myAdTextLineMapper.class);
>         job.setMapOutputKeyClass(Text.class);
>         job.setMapOutputValueClass(UberRecord.class);
>         job.setOutputFormatClass(AvroSequenceFileOutputFormat.class);
>         AvroJob.setOutputKeySchema(job, Schema.create(Schema.Type.STRING));
>         AvroJob.setOutputValueSchema(job, UberRecord.SCHEMA$);
>         job.setReducerClass(myAdReducer.class);
>         job.setOutputKeyClass(Text.class);
>         job.setOutputValueClass(UberRecord.class);
>         job.setNumReduceTasks(2);
>         String baseoutputFolder = args[2];
>         job.getConfiguration().set(myAdReducer.BASE_OUTPUT_FOLDER,
>                 baseoutputFolder);
>         ;
> LazyOutputFormat.setOutputFormatClass(job,AvroSequenceFileOutputFormat.class);
>         FileOutputFormat.setOutputPath(job, new Path(baseoutputFolder));
>         return job.waitForCompletion(true) ? 0 : 1;
> the mapper and reducers
> @Override
>     public void setup(Context ctx) {
>         ubp = new UberRecordProcessor();
>     }
>     @Override
>     protected void map(LongWritable key, Text value, Context context)
>             throws IOException, InterruptedException {
>         try {
>             handleLineinMap(value);
>             if(ub!=null){
>                 context.write(new Text(ub.getAuctionId().toString()), ub);
>                 context.getCounter("myAd",
> "myAdTextLineMapper").increment(1);
>             }else{
>                 context.getCounter("myAd",
> "myAdTextLineMapperNull").increment(1);
>             }
>         } catch (Exception e) {
>             context.getCounter("myAd",
> "myAdTextLineMapperError").increment(1);
>             logger.warn("could not parse line "+value.toString(),e);
>         }
>     }
> public class myAdReducer extends
>         Reducer<Text, UberRecord, AvroKey<CharSequence>,
> AvroValue<UberRecord>> {
>     private static Logger logger = Logger.getLogger(myAdReducer.class);
>     public static final String BASE_OUTPUT_FOLDER = "base.output.folder";
>     AvroMultipleOutputs amos; MultipleOutputs<Text, UberRecord> outputs;
>     UberRecordProcessor ubp = new UberRecordProcessor();
>     // "year=%s/month=%s/day=%s/hour=%s"
>     private String baseOutputPath;
>     private long reduceAttemptUniqueIdentifier =
> System.currentTimeMillis();
>     // 2015-02-01T18:00:25.673Z
>     static DateTimeFormatter dateformat = DateTimeFormat
>             .forPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZZ");
>     @Override
>     protected void setup(Context context) throws IOException,
>             InterruptedException {
>         amos = new AvroMultipleOutputs(context);
>         baseOutputPath =
> context.getConfiguration().get(BASE_OUTPUT_FOLDER);
>     }
>     @Override
>     protected void reduce(Text key, Iterable<UberRecord> values, Context
> context)
>             throws IOException, InterruptedException {
>         try {
>             UberRecord ub = new UberRecord();
>             for (UberRecord ubi : values) {
>                 // enrich
>                 if (ubi.getExchange() == null) {
>                     continue;
>                 }
>                 BaseBidRequestEnricher enc = BaseBidRequestEnricher
>                         .getEnricher(ubi.getExchange().toString());
>                 enc.enrich(ubi);
>                 ub = mergeUB(ub, ubi);
>             }
>   "Writing UberRecord [" + ub.toString() + "]");
>             String partition = getPartition(ub);
>             // context.write();
>             // AvroKey<CharSequence>, AvroValue<UberRecord>>
>             amos.write(new AvroKey<CharSequence>(key.toString()),
>                     new AvroValue<UberRecord>(ub), baseOutputPath + "/"
>                             + partition + "/p" +
> reduceAttemptUniqueIdentifier);
>         } catch (Exception e) {
>             // TODO Auto-generated catch block
>             e.printStackTrace();
>         }
>     }
>     public UberRecord mergeUB(UberRecord dest, UberRecord src) {
>         List<Field> fields = UberRecord.getClassSchema().getFields();
>         List<Field> engFields = EngageData.getClassSchema().getFields();
>         for (Field field : fields) {
>             if ("engageData")
>                     && dest.getEngageData() != null) {
>                 EngageData mergedEng = dest.getEngageData();
>                 for (Field engField : engFields) {
>                     if (dest.getEngageData().get( == null)
> {
>                         mergedEng.put(,
>                                 src.getEngageData().get(;
>                     }
>                 }
>                 dest.setEngageData(mergedEng);
>             } else {
>                 if (dest.get( == null) {
>                     dest.put(, src.get(;
>                 }
>             }
>         }
>         return dest;
>     }

