Excellent! Many thanks Scott :-D P
On Wed, Jul 20, 2011 at 8:38 PM, Scott Carey <scottca...@apache.org> wrote: > An avro data file is not created with a FileOutputStream. That will write > = > avro binary data to a file, but not in the avro file format (which is > split= > table and contains header metadata). > > The API for Avro Data Files is here: > http://avro.apache.org/docs/current/api/java/org/apache/avro/file/package- > < > http://avro.apache.org/docs/current/api/java/org/apache/avro/file/package- > s=>summary.html > > > > > On 7/20/11 2:35 PM, "Peter Wolf" <opus...@gmail.com> wrote: > > > > > > > > > > > > > > > > Hello, anyone out there know about AVRO file formats and/or Hadoop > > support? > > > > My Hadoop AvroJob code does not recognize the AVRO files created by > > my other code. It seems that the MAGIC number is wrong. > > > > What is going on? How many different ways of encoding AVRO files > > are there, and how do I make sure they match. > > > > I am creating the input files like this... > > > > static public void write(String file, GenericRecord record, > > Schema schema) throws IOException { > > OutputStream o = new FileOutputStream(file); > > GenericDatumWriter w = new GenericDatumWriter(schema); > > Encoder e = EncoderFactory.get().binaryEncoder(o, null); > > w.write(record, e); > > e.flush(); > > } > > > > Hadoop is reading them using org.apache.avro.file.DataFileReader > > > > Here is where it breaks. I checked, and it really is trying to read > > the right file... > > > > /** Open a reader for a file. */ > > public static <D> FileReader<D> > > openReader(SeekableInput in, > > DatumReader<D> > > reader) > > throws IOException { > > if (in.length() < MAGIC.length) > > throw new IOException("Not an Avro data file"); > > > > // read magic header > > byte[] magic = new byte[MAGIC.length]; > > in.seek(0); > > for (int c = 0; c < magic.length; c = in.read(magic, c, > > magic.length-c)) {} > > in.seek(0); > > > > if (Arrays.equals(MAGIC, magic)) // current > > format > > return new DataFileReader<D>(in, reader); > > if (Arrays.equals(DataFileReader12.MAGIC, magic)) // 1.2 > > format > > return new DataFileReader12<D>(in, reader); > > > > >>> throw new IOException("Not an Avro data file"); > > <<< > > } > > > > > > > > Some background... > > > > I am trying to write my first AVRO Hadoop application. I am using > > Hadoop Cloudera 20.2-737 and AVRO 1.5.1 > > > > I followed the instructions here... > > > > > > > http://avro.apache.org/docs/current/api/java/org/apache/avro/mapred/packag > >e-summary.html#package_description > > > > > > The sample code here... > > > > > > > http://svn.apache.org/viewvc/avro/tags/release-1.5.1/lang/java/mapred/src/ > >test/java/org/apache/avro/mapred/TestWordCount.java?view=markup > > > > Here is my code which breaks with a "Not an Avro data file" error. > > > > > > public static class MapImpl extends AvroMapper<Account, > > Pair<Utf8, Long>> { > > @Override > > public void map(Account account, > > AvroCollector<Pair<Utf8, Long>> collector, > > Reporter reporter) throws IOException { > > StringTokenizer tokens = new > > StringTokenizer(account.timestamp.toString()); > > while (tokens.hasMoreTokens()) > > collector.collect(new Pair<Utf8, Long>(new > > Utf8(tokens.nextToken()), 1L)); > > } > > } > > > > public static class ReduceImpl > > extends AvroReducer<Utf8, Long, Pair<Utf8, > > Long>> { > > @Override > > public void reduce(Utf8 word, Iterable<Long> counts, > > AvroCollector<Pair<Utf8, > > Long>> collector, > > Reporter reporter) throws IOException { > > long sum = 0; > > for (long count : counts) > > sum += count; > > collector.collect(new Pair<Utf8, Long>(word, > > sum)); > > } > > } > > > > public int run(String[] args) throws Exception { > > > > if (args.length != 2) { > > System.err.println("Usage: " + getClass().getName() + " > > <input> <output>"); > > System.exit(2); > > } > > > > JobConf job = new JobConf(this.getClass()); > > Path outputPath = new Path(args[1]); > > > > outputPath.getFileSystem(job).delete(outputPath); > > //WordCountUtil.writeLinesFile(); > > > > job.setJobName(this.getClass().getName()); > > > > AvroJob.setInputSchema(job, Account.schema); > > //Schema.create(Schema.Type.STRING)); > > AvroJob.setOutputSchema(job, > > new Pair<Utf8, Long>(new Utf8(""), > > 0L).getSchema()); > > > > AvroJob.setMapperClass(job, MapImpl.class); > > AvroJob.setCombinerClass(job, ReduceImpl.class); > > AvroJob.setReducerClass(job, ReduceImpl.class); > > > > FileInputFormat.setInputPaths(job, new Path(args[0])); > > FileOutputFormat.setOutputPath(job, outputPath); > > //FileOutputFormat.setCompressOutput(job, true); > > > > //WordCountUtil.setMeta(job); > > > > JobClient.runJob(job); > > > > //WordCountUtil.validateCountsFile(); > > > > return 0; > > } > > > > > > > > > > >