I am following directions on
http://avro.apache.org/docs/current/api/java/org/apache/avro/mapred/package-summary.htmlto
write a job that takes Avro files as input and outputs non-Avro files,
I
created the following job. I should note that I have tried different
variations of ordering the setInput/OutputPath lines, the AvroJob lines,
and the reduce task settings. It always results the same: the job runs with
0 mappers and 1 reducer (which gets no data so is essentially an emtpy
SequenceFile). It always says there are 10 input files so that's not the
issue. There is an @Override statement on my map and my reduce so that's
not the issue. And I believe I have correctly followed the Avro
input/non-Avro output instructions mentioned in the link above. Any other
ideas would be welcome!!!


public class MyAvroJob extends Configured implements Tool {

@Override
public int run(String[] args) throws Exception {

JobConf job = new JobConf(getConf(), this.getClass());

FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

AvroJob.setMapperClass(job, MyAvroMapper.class);
AvroJob.setInputSchema(job, MySchema.SCHEMA$);
AvroJob.setMapOutputSchema(job,
Pair.getPairSchema(Schema.create(Type.STRING), Schema.create(Type.STRING)));

job.setReducerClass(MyNonAvroReducer.class);
job.setOutputFormat(SequenceFileOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setNumReduceTasks(1);

return JobClient.runJob(job).isSuccessful();
}

public static class MyAvroMapper extends AvroMapper<MySchema, Pair<String,
String>> {

@Override
public void map(MySchema in, AvroCollector<Pair<String, String>> collector,
Reporter reporter) throws IOException {

List<MyThings> things = in.getRecords();
...
collector.collect(new Pair<String, String>( newKey, newValue));
}
}

public static class MyNonAvroReducer extends MapReduceBase implements
Reducer<AvroKey<String>, AvroValue<String>, Text, Text> {

@Override
public void reduce(AvroKey<String> key, Iterator<AvroValue<String>> values,
OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
while (values.hasNext()) {
  output.collect(new Text(key.datum()), new Text(values.next().datum()));
}
}
}

public static void main(String[] args) throws Exception {
ToolRunner.run(new MyAvroJob(), args);

}




-Anna

Reply via email to