I am following directions on http://avro.apache.org/docs/current/api/java/org/apache/avro/mapred/package-summary.htmlto write a job that takes Avro files as input and outputs non-Avro files, I created the following job. I should note that I have tried different variations of ordering the setInput/OutputPath lines, the AvroJob lines, and the reduce task settings. It always results the same: the job runs with 0 mappers and 1 reducer (which gets no data so is essentially an emtpy SequenceFile). It always says there are 10 input files so that's not the issue. There is an @Override statement on my map and my reduce so that's not the issue. And I believe I have correctly followed the Avro input/non-Avro output instructions mentioned in the link above. Any other ideas would be welcome!!!
public class MyAvroJob extends Configured implements Tool { @Override public int run(String[] args) throws Exception { JobConf job = new JobConf(getConf(), this.getClass()); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); AvroJob.setMapperClass(job, MyAvroMapper.class); AvroJob.setInputSchema(job, MySchema.SCHEMA$); AvroJob.setMapOutputSchema(job, Pair.getPairSchema(Schema.create(Type.STRING), Schema.create(Type.STRING))); job.setReducerClass(MyNonAvroReducer.class); job.setOutputFormat(SequenceFileOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setNumReduceTasks(1); return JobClient.runJob(job).isSuccessful(); } public static class MyAvroMapper extends AvroMapper<MySchema, Pair<String, String>> { @Override public void map(MySchema in, AvroCollector<Pair<String, String>> collector, Reporter reporter) throws IOException { List<MyThings> things = in.getRecords(); ... collector.collect(new Pair<String, String>( newKey, newValue)); } } public static class MyNonAvroReducer extends MapReduceBase implements Reducer<AvroKey<String>, AvroValue<String>, Text, Text> { @Override public void reduce(AvroKey<String> key, Iterator<AvroValue<String>> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { while (values.hasNext()) { output.collect(new Text(key.datum()), new Text(values.next().datum())); } } } public static void main(String[] args) throws Exception { ToolRunner.run(new MyAvroJob(), args); } -Anna