Excellent!  Many thanks Scott :-D
P


On Wed, Jul 20, 2011 at 8:38 PM, Scott Carey <scottca...@apache.org> wrote:

> An avro data file is not created with a FileOutputStream.  That will write
> =
> avro binary data to a file, but not in the avro file format (which is
> split=
> table and contains header metadata).
>
> The API for Avro Data Files is here:
> http://avro.apache.org/docs/current/api/java/org/apache/avro/file/package-
> <
> http://avro.apache.org/docs/current/api/java/org/apache/avro/file/package-
> s=>summary.html
>
>
>
>
> On 7/20/11 2:35 PM, "Peter Wolf" <opus...@gmail.com> wrote:
>
>
> >
> >
> >
> >
> >
> >
> >    Hello, anyone out there know about AVRO file formats and/or Hadoop
> >    support?
> >
> >    My Hadoop AvroJob code does not recognize the AVRO files created by
> >    my other code.  It seems that the MAGIC number is wrong.
> >
> >    What is going on?  How many different ways of encoding AVRO files
> >    are there, and how do I make sure they match.
> >
> >    I am creating the input files like this...
> >
> >        static public void write(String file, GenericRecord record,
> >    Schema schema) throws IOException {
> >            OutputStream o = new FileOutputStream(file);
> >            GenericDatumWriter w = new GenericDatumWriter(schema);
> >            Encoder e = EncoderFactory.get().binaryEncoder(o, null);
> >            w.write(record, e);
> >            e.flush();
> >        }
> >
> >    Hadoop is reading them using org.apache.avro.file.DataFileReader
> >
> >    Here is where it breaks.  I checked, and it really is trying to read
> >    the right file...
> >
> >  /** Open a reader for a file. */
> >        public static <D> FileReader<D>
> >      openReader(SeekableInput in,
> >                                                   DatumReader<D>
> >      reader)
> >          throws IOException {
> >          if (in.length() < MAGIC.length)
> >            throw new IOException("Not an Avro data file");
> >
> >          // read magic header
> >          byte[] magic = new byte[MAGIC.length];
> >          in.seek(0);
> >          for (int c = 0; c < magic.length; c = in.read(magic, c,
> >      magic.length-c)) {}
> >          in.seek(0);
> >
> >          if (Arrays.equals(MAGIC, magic))              // current
> >      format
> >            return new DataFileReader<D>(in, reader);
> >          if (Arrays.equals(DataFileReader12.MAGIC, magic)) // 1.2
> >      format
> >            return new DataFileReader12<D>(in, reader);
> >
> >      >>>    throw new IOException("Not an Avro data file");
> >      <<<
> >        }
> >
> >
> >
> >    Some background...
> >
> >    I am trying to write my first AVRO Hadoop application.  I am using
> >    Hadoop Cloudera 20.2-737 and AVRO 1.5.1
> >
> >    I followed the instructions here...
> >
> >
> >
> http://avro.apache.org/docs/current/api/java/org/apache/avro/mapred/packag
> >e-summary.html#package_description
> >
> >
> >    The sample code here...
> >
> >
> >
> http://svn.apache.org/viewvc/avro/tags/release-1.5.1/lang/java/mapred/src/
> >test/java/org/apache/avro/mapred/TestWordCount.java?view=markup
> >
> >    Here is my code which breaks with a "Not an Avro data file" error.
> >
> >
> >        public static class MapImpl extends AvroMapper<Account,
> >    Pair<Utf8, Long>> {
> >            @Override
> >            public void map(Account account,
> >    AvroCollector<Pair<Utf8, Long>> collector,
> >                            Reporter reporter) throws IOException {
> >                StringTokenizer tokens = new
> >    StringTokenizer(account.timestamp.toString());
> >                while (tokens.hasMoreTokens())
> >                    collector.collect(new Pair<Utf8, Long>(new
> >    Utf8(tokens.nextToken()), 1L));
> >            }
> >        }
> >
> >        public static class ReduceImpl
> >                extends AvroReducer<Utf8, Long, Pair<Utf8,
> >    Long>> {
> >            @Override
> >            public void reduce(Utf8 word, Iterable<Long> counts,
> >                               AvroCollector<Pair<Utf8,
> >    Long>> collector,
> >                               Reporter reporter) throws IOException {
> >                long sum = 0;
> >                for (long count : counts)
> >                    sum += count;
> >                collector.collect(new Pair<Utf8, Long>(word,
> >    sum));
> >            }
> >        }
> >
> >        public int run(String[] args) throws Exception {
> >
> >            if (args.length != 2) {
> >                System.err.println("Usage: " + getClass().getName() + "
> >    <input> <output>");
> >                System.exit(2);
> >            }
> >
> >            JobConf job = new JobConf(this.getClass());
> >            Path outputPath = new Path(args[1]);
> >
> >            outputPath.getFileSystem(job).delete(outputPath);
> >            //WordCountUtil.writeLinesFile();
> >
> >            job.setJobName(this.getClass().getName());
> >
> >            AvroJob.setInputSchema(job, Account.schema);
> >    //Schema.create(Schema.Type.STRING));
> >            AvroJob.setOutputSchema(job,
> >                    new Pair<Utf8, Long>(new Utf8(""),
> >    0L).getSchema());
> >
> >            AvroJob.setMapperClass(job, MapImpl.class);
> >            AvroJob.setCombinerClass(job, ReduceImpl.class);
> >            AvroJob.setReducerClass(job, ReduceImpl.class);
> >
> >            FileInputFormat.setInputPaths(job, new Path(args[0]));
> >            FileOutputFormat.setOutputPath(job, outputPath);
> >            //FileOutputFormat.setCompressOutput(job, true);
> >
> >            //WordCountUtil.setMeta(job);
> >
> >            JobClient.runJob(job);
> >
> >            //WordCountUtil.validateCountsFile();
> >
> >            return 0;
> >        }
> >
> >
> >
> >
>
>
>

Reply via email to