I've often found the issue behind such an observance to be that the
input files lack an .avro extension. Is that true in your case? Can
you retry after a rename if yes?
On Wed, Jul 31, 2013 at 1:02 AM, Anna Lahoud annalah...@gmail.com wrote:
I am following directions on
http://avro.apache.org/docs/current/api/java/org/apache/avro/mapred/package-summary.html
to write a job that takes Avro files as input and outputs non-Avro files, I
created the following job. I should note that I have tried different
variations of ordering the setInput/OutputPath lines, the AvroJob lines, and
the reduce task settings. It always results the same: the job runs with 0
mappers and 1 reducer (which gets no data so is essentially an emtpy
SequenceFile). It always says there are 10 input files so that's not the
issue. There is an @Override statement on my map and my reduce so that's not
the issue. And I believe I have correctly followed the Avro input/non-Avro
output instructions mentioned in the link above. Any other ideas would be
welcome!!!
public class MyAvroJob extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
JobConf job = new JobConf(getConf(), this.getClass());
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
AvroJob.setMapperClass(job, MyAvroMapper.class);
AvroJob.setInputSchema(job, MySchema.SCHEMA$);
AvroJob.setMapOutputSchema(job,
Pair.getPairSchema(Schema.create(Type.STRING), Schema.create(Type.STRING)));
job.setReducerClass(MyNonAvroReducer.class);
job.setOutputFormat(SequenceFileOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setNumReduceTasks(1);
return JobClient.runJob(job).isSuccessful();
}
public static class MyAvroMapper extends AvroMapperMySchema, PairString,
String {
@Override
public void map(MySchema in, AvroCollectorPairString, String collector,
Reporter reporter) throws IOException {
ListMyThings things = in.getRecords();
...
collector.collect(new PairString, String( newKey, newValue));
}
}
public static class MyNonAvroReducer extends MapReduceBase implements
ReducerAvroKeyString, AvroValueString, Text, Text {
@Override
public void reduce(AvroKeyString key, IteratorAvroValueString values,
OutputCollectorText, Text output, Reporter reporter) throws IOException {
while (values.hasNext()) {
output.collect(new Text(key.datum()), new Text(values.next().datum()));
}
}
}
public static void main(String[] args) throws Exception {
ToolRunner.run(new MyAvroJob(), args);
}
-Anna
--
Harsh J