Check your Hadoop version. In older version JobContext was interface and in new one its class.
On Tue, Feb 24, 2015 at 10:28 PM, David Ginzburg <davidginzb...@gmail.com> wrote: > Thank you for the answer. > > Tried but the exception > * Error: Found interface org.apache.hadoop.mapreduce.JobContext, but class > was expected* > persists > > On Tue, Feb 24, 2015 at 3:12 PM, Artem Ervits <artemerv...@gmail.com> > wrote: > >> try this >> >> Job job = Job.getInstance(conf); >> Job.setName(name); >> >> Artem Ervits >> On Feb 21, 2015 10:57 PM, "David Ginzburg" <davidginzb...@gmail.com> >> wrote: >> >>> Hi, >>> I am trying to run an MR job on emr with AvromultipleOutput >>> >>> >>> >>> >>> I get the following exception when running with AMI with hadoop 2.2 2.5 >>> Found interface org.apache.hadoop.mapreduce.JobContext, but class was >>> expected >>> >>> I read it is related to incompatible hadoop versions, So I modified >>> >>> When running with AMI with hadoop 103 I get the following exception: >>> >>> java.lang.NullPointerException >>> at >>> org.apache.hadoop.io.serializer.SerializationFactory.getSerializer(SerializationFactory.java:73) >>> at >>> org.apache.hadoop.mapred.MapTask$MapOutputBuffer.<init>(MapTask.java:981) >>> at >>> org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:681) >>> at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763) >>> at org.apache.hadoop.mapred.MapTask.run(MapTask.java:375) >>> at org.apache.hadoop.mapred.Child$4.run(Child.java:259) >>> at java.security.AccessController.doPrivileged(Native Method) >>> at javax.security.auth.Subject.doAs(Subject.java:415) >>> at >>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1140) >>> at org.apache.hadoop.mapred.Child.main(Child.java:253) >>> >>> >>> The driver code is >>> >>> Job job = new Job(getConf(), "myad"); >>> >>> >>> >>> job.setOutputValueClass(NullWritable.class); >>> >>> >>> job.setJarByClass(myAdTextLineMapper.class); >>> Path inputOflineFiles = new Path(args[0]); >>> Path inputOfUbberFiles = new Path(args[1]); >>> >>> FileInputFormat.setInputPaths(job, inputOflineFiles); >>> >>> job.setMapperClass(myAdTextLineMapper.class); >>> job.setMapOutputKeyClass(Text.class); >>> job.setMapOutputValueClass(UberRecord.class); >>> >>> job.setOutputFormatClass(AvroSequenceFileOutputFormat.class); >>> AvroJob.setOutputKeySchema(job, >>> Schema.create(Schema.Type.STRING)); >>> AvroJob.setOutputValueSchema(job, UberRecord.SCHEMA$); >>> >>> >>> job.setReducerClass(myAdReducer.class); >>> job.setOutputKeyClass(Text.class); >>> job.setOutputValueClass(UberRecord.class); >>> job.setNumReduceTasks(2); >>> String baseoutputFolder = args[2]; >>> job.getConfiguration().set(myAdReducer.BASE_OUTPUT_FOLDER, >>> baseoutputFolder); >>> ; >>> >>> LazyOutputFormat.setOutputFormatClass(job,AvroSequenceFileOutputFormat.class); >>> >>> FileOutputFormat.setOutputPath(job, new Path(baseoutputFolder)); >>> return job.waitForCompletion(true) ? 0 : 1; >>> >>> >>> the mapper and reducers >>> @Override >>> public void setup(Context ctx) { >>> >>> ubp = new UberRecordProcessor(); >>> } >>> >>> @Override >>> protected void map(LongWritable key, Text value, Context context) >>> throws IOException, InterruptedException { >>> try { >>> handleLineinMap(value); >>> if(ub!=null){ >>> context.write(new Text(ub.getAuctionId().toString()), >>> ub); >>> context.getCounter("myAd", >>> "myAdTextLineMapper").increment(1); >>> }else{ >>> context.getCounter("myAd", >>> "myAdTextLineMapperNull").increment(1); >>> } >>> } catch (Exception e) { >>> context.getCounter("myAd", >>> "myAdTextLineMapperError").increment(1); >>> logger.warn("could not parse line "+value.toString(),e); >>> >>> >>> } >>> } >>> >>> public class myAdReducer extends >>> Reducer<Text, UberRecord, AvroKey<CharSequence>, >>> AvroValue<UberRecord>> { >>> >>> private static Logger logger = Logger.getLogger(myAdReducer.class); >>> public static final String BASE_OUTPUT_FOLDER = "base.output.folder"; >>> AvroMultipleOutputs amos; MultipleOutputs<Text, UberRecord> outputs; >>> UberRecordProcessor ubp = new UberRecordProcessor(); >>> // "year=%s/month=%s/day=%s/hour=%s" >>> private String baseOutputPath; >>> private long reduceAttemptUniqueIdentifier = >>> System.currentTimeMillis(); >>> >>> // 2015-02-01T18:00:25.673Z >>> static DateTimeFormatter dateformat = DateTimeFormat >>> .forPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZZ"); >>> >>> @Override >>> protected void setup(Context context) throws IOException, >>> InterruptedException { >>> >>> amos = new AvroMultipleOutputs(context); >>> baseOutputPath = >>> context.getConfiguration().get(BASE_OUTPUT_FOLDER); >>> >>> } >>> >>> @Override >>> protected void reduce(Text key, Iterable<UberRecord> values, Context >>> context) >>> throws IOException, InterruptedException { >>> >>> try { >>> UberRecord ub = new UberRecord(); >>> for (UberRecord ubi : values) { >>> // enrich >>> if (ubi.getExchange() == null) { >>> continue; >>> } >>> BaseBidRequestEnricher enc = BaseBidRequestEnricher >>> .getEnricher(ubi.getExchange().toString()); >>> enc.enrich(ubi); >>> ub = mergeUB(ub, ubi); >>> } >>> logger.info("Writing UberRecord [" + ub.toString() + "]"); >>> String partition = getPartition(ub); >>> >>> // context.write(); >>> // AvroKey<CharSequence>, AvroValue<UberRecord>> >>> amos.write(new AvroKey<CharSequence>(key.toString()), >>> new AvroValue<UberRecord>(ub), baseOutputPath + "/" >>> + partition + "/p" + >>> reduceAttemptUniqueIdentifier); >>> } catch (Exception e) { >>> // TODO Auto-generated catch block >>> e.printStackTrace(); >>> } >>> } >>> >>> public UberRecord mergeUB(UberRecord dest, UberRecord src) { >>> List<Field> fields = UberRecord.getClassSchema().getFields(); >>> List<Field> engFields = EngageData.getClassSchema().getFields(); >>> for (Field field : fields) { >>> if (field.name().equals("engageData") >>> && dest.getEngageData() != null) { >>> EngageData mergedEng = dest.getEngageData(); >>> for (Field engField : engFields) { >>> if (dest.getEngageData().get(engField.name()) == >>> null) { >>> mergedEng.put(engField.name(), >>> >>> src.getEngageData().get(engField.name())); >>> } >>> >>> } >>> dest.setEngageData(mergedEng); >>> } else { >>> if (dest.get(field.name()) == null) { >>> dest.put(field.name(), src.get(field.name())); >>> } >>> } >>> } >>> return dest; >>> } >>> >>> >>> >>> > -- Deepak