Outside hadoop: avro-1.7.6
Inside hadoop:  avro-mapred-1.7.6-hadoop2

From: Stanley Shi <s...@gopivotal.com<mailto:s...@gopivotal.com>>
Reply-To: "user@hadoop.apache.org<mailto:user@hadoop.apache.org>" 
<user@hadoop.apache.org<mailto:user@hadoop.apache.org>>
Date: Monday, March 3, 2014 at 8:30 PM
To: "user@hadoop.apache.org<mailto:user@hadoop.apache.org>" 
<user@hadoop.apache.org<mailto:user@hadoop.apache.org>>
Subject: Re: [hadoop] AvroMultipleOutputs 
org.apache.avro.file.DataFileWriter$AppendWriteException

which avro version are you using when running outside of hadoop?

Regards,
Stanley Shi,
[http://www.gopivotal.com/files/media/logos/pivotal-logo-email-signature.png]


On Mon, Mar 3, 2014 at 11:49 PM, John Pauley 
<john.pau...@threattrack.com<mailto:john.pau...@threattrack.com>> wrote:
This is cross posted to avro-user list 
(http://mail-archives.apache.org/mod_mbox/avro-user/201402.mbox/%3ccf3612f6.94d2%25john.pau...@threattrack.com%3e).

Hello all,

I’m having an issue using AvroMultipleOutputs in a map/reduce job.  The issue 
occurs when using a schema that has a union of null and a fixed (among other 
complex types), default to null, and it is not null.  Please find the full 
stack trace below and a sample map/reduce job that generates an Avro container 
file and uses that for the m/r input.  Note that I can serialize/deserialize 
without issue using GenericDatumWriter/GenericDatumReader outside of hadoop…  
Any insight would be helpful.

Stack trace:
java.lang.Exception: org.apache.avro.file.DataFileWriter$AppendWriteException: 
java.lang.NullPointerException: in com.foo.bar.simple_schema in union null of 
union in field baz of com.foo.bar.simple_schema
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:404)
Caused by: org.apache.avro.file.DataFileWriter$AppendWriteException: 
java.lang.NullPointerException: in com.foo.bar.simple_schema in union null of 
union in field baz of com.foo.bar.simple_schema
at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:296)
at 
org.apache.avro.mapreduce.AvroKeyRecordWriter.write(AvroKeyRecordWriter.java:77)
at 
org.apache.avro.mapreduce.AvroKeyRecordWriter.write(AvroKeyRecordWriter.java:39)
at 
org.apache.avro.mapreduce.AvroMultipleOutputs.write(AvroMultipleOutputs.java:400)
at 
org.apache.avro.mapreduce.AvroMultipleOutputs.write(AvroMultipleOutputs.java:378)
at 
com.tts.ox.mapreduce.example.avro.AvroContainerFileDriver$SampleMapper.map(AvroContainerFileDriver.java:78)
at 
com.tts.ox.mapreduce.example.avro.AvroContainerFileDriver$SampleMapper.map(AvroContainerFileDriver.java:62)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:140)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:672)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:330)
at 
org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:266)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
at java.lang.Thread.run(Thread.java:695)
Caused by: java.lang.NullPointerException: in com.foo.bar.simple_schema in 
union null of union in field baz of com.foo.bar.simple_schema
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:145)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:58)
at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:290)
... 16 more
Caused by: java.lang.NullPointerException
at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:457)
at org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:189)
at org.apache.avro.reflect.ReflectData.isRecord(ReflectData.java:167)
at org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:608)
at org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:265)
at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:597)
at 
org.apache.avro.generic.GenericDatumWriter.resolveUnion(GenericDatumWriter.java:151)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:71)
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
at 
org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:114)
at 
org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:175)
at 
org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66)
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)

Sample m/r job:
<mr_job>
package com.tts.ox.mapreduce.example.avro;

import org.apache.avro.Schema;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapreduce.AvroJob;
import org.apache.avro.mapreduce.AvroKeyInputFormat;
import org.apache.avro.mapreduce.AvroKeyOutputFormat;
import org.apache.avro.mapreduce.AvroMultipleOutputs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.File;
import java.io.IOException;

public class AvroContainerFileDriver extends Configured implements Tool {
    //
    // define a schema with a union of null and fixed
    private static final String SCHEMA = "{\n" +
            "    \"namespace\": \"com.foo.bar\",\n" +
            "    \"name\": \"simple_schema\",\n" +
            "    \"type\": \"record\",\n" +
            "    \"fields\": [{\n" +
            "        \"name\": \"foo\",\n" +
            "        \"type\": {\n" +
            "            \"name\": \"bar\",\n" +
            "            \"type\": \"fixed\",\n" +
            "            \"size\": 2\n" +
            "        }\n" +
            "    }, {\n" +
            "        \"name\": \"baz\",\n" +
            "        \"type\": [\"null\", \"bar\"],\n" +
            "        \"default\": null\n" +
            "    }]\n" +
            "}";

    public static class SampleMapper extends Mapper<AvroKey<GenericRecord>, 
NullWritable, NullWritable, NullWritable> {
        private AvroMultipleOutputs amos;

        @Override
        protected void setup(Context context) {
            amos = new AvroMultipleOutputs(context);
        }

        @Override
        protected void cleanup(Context context) throws IOException, 
InterruptedException {
            amos.close();
        }

        @Override
        protected void map(AvroKey<GenericRecord> record, NullWritable ignore, 
Context context)
                throws IOException, InterruptedException {
    // simply write the record to a container using AvroMultipleOutputs
            amos.write("avro", new AvroKey<GenericRecord>(record.datum()), 
NullWritable.get());
        }
    }

    @Override
    public int run(final String[] args) throws Exception {
        Schema.Parser parser = new Schema.Parser();
        Schema schema = parser.parse(SCHEMA);

        //
        // generate avro container file for input to mapper
        byte[] dummy = {(byte) 0x01, (byte) 0x02};
        GenericData.Fixed foo = new 
GenericData.Fixed(schema.getField("foo").schema(), dummy);
        GenericData.Fixed baz = new 
GenericData.Fixed(schema.getField("baz").schema().getTypes().get(1), dummy);

        GenericRecordBuilder builder = new GenericRecordBuilder(schema)
                .set(schema.getField("foo"), foo);
        GenericRecord record0 = builder.build(); // baz is null

        builder.set(schema.getField("baz"), baz);
        GenericRecord record1 = builder.build(); // baz is not null, bad news

        File file = new File("/tmp/avrotest/input/test.avro");
        DatumWriter<GenericRecord> datumWriter = new 
GenericDatumWriter<GenericRecord>(schema);
        DataFileWriter<GenericRecord> dataFileWriter = new 
DataFileWriter<GenericRecord>(datumWriter);
        dataFileWriter.create(schema, file);
        dataFileWriter.append(record0);
        //
        // HELP: job succeeds when we do not have record with non-null baz, 
comment out to succeed
        //
        dataFileWriter.append(record1);
        dataFileWriter.close();

        //
        // configure and run job
        Configuration configuration = new Configuration();
        String[] otherArgs = new GenericOptionsParser(configuration, 
args).getRemainingArgs();
        Job job = Job.getInstance(configuration, "Sample Avro Map Reduce");

        job.setInputFormatClass(AvroKeyInputFormat.class);
        AvroJob.setInputKeySchema(job, schema);

        job.setMapperClass(SampleMapper.class);
        job.setNumReduceTasks(0);

        AvroJob.setOutputKeySchema(job, schema);
        AvroMultipleOutputs.addNamedOutput(job, "avro", 
AvroKeyOutputFormat.class, schema);

        FileInputFormat.addInputPath(job, new Path(("/tmp/avrotest/input")));
        FileOutputFormat.setOutputPath(job, new Path("/tmp/avrotest/output"));

        return (job.waitForCompletion(true) ? 0 : 1);
    }

    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new AvroContainerFileDriver(), args);
        System.exit(exitCode);
    }
}
</mr_job>

Thanks,
John Pauley
Sr. Software Engineer
ThreatTrack Security

Reply via email to