I got to the bottom of this – it was a namespace issue. My schema was;
{
  "type" : "record",
  "name" : "MyAvroSchema",
  "fields" : [ {
    "name" : "a",
    "type" : [ "null", "int" ]
  }, {
    "name" : "b",
    "type" : [ "null", "string" ]
  }]
}
But actually, I was putting the generated MyAvroSchema file into ‘my_stats’ 
namespace (along with my other application code) by adding a ‘package 
my_stats;’ line at the top. When I added “namespace”: “my_stats” to the schema 
and generated the Java that way it was fine.

From: Porritt, James <james.porr...@uk.mlp.com>
Sent: 17 July 2018 15:10
To: 'vino yang' <yanghua1...@gmail.com>
Cc: user@flink.apache.org
Subject: RE: AvroInputFormat NullPointerException issues

My MyAvroSchema class is as follows. It was generated using avro-tools:

/**
* Autogenerated by Avro
*
* DO NOT EDIT DIRECTLY
*/

import org.apache.avro.specific.SpecificData;
import org.apache.avro.message.BinaryMessageEncoder;
import org.apache.avro.message.BinaryMessageDecoder;
import org.apache.avro.message.SchemaStore;

@SuppressWarnings("all")
@org.apache.avro.specific.AvroGenerated
public class MyAvroSchema extends org.apache.avro.specific.SpecificRecordBase 
implements org.apache.avro.specific.SpecificRecord {
  private static final long serialVersionUID = 4994916517880671663L;
  public static final org.apache.avro.Schema SCHEMA$ = new 
org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"MyAvroSchema\",\"fields\":[{\"name\":\"a\",\"type\":[\"null\",\"int\"]},{\"name\":\"b\",\"type\":[\"null\",\"string\"]}]}");
  public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }

  private static SpecificData MODEL$ = new SpecificData();

  private static final BinaryMessageEncoder<MyAvroSchema> ENCODER =
      new BinaryMessageEncoder<MyAvroSchema>(MODEL$, SCHEMA$);

  private static final BinaryMessageDecoder<MyAvroSchema> DECODER =
      new BinaryMessageDecoder<MyAvroSchema>(MODEL$, SCHEMA$);

  /**
   * Return the BinaryMessageDecoder instance used by this class.
   */
  public static BinaryMessageDecoder<MyAvroSchema> getDecoder() {
    return DECODER;
  }

  /**
   * Create a new BinaryMessageDecoder instance for this class that uses the 
specified {@link SchemaStore}.
   * @param resolver a {@link SchemaStore} used to find schemas by fingerprint
   */
  public static BinaryMessageDecoder<MyAvroSchema> createDecoder(SchemaStore 
resolver) {
    return new BinaryMessageDecoder<MyAvroSchema>(MODEL$, SCHEMA$, resolver);
  }

  /** Serializes this MyAvroSchema to a ByteBuffer. */
  public java.nio.ByteBuffer toByteBuffer() throws java.io.IOException {
    return ENCODER.encode(this);
  }

  /** Deserializes a MyAvroSchema from a ByteBuffer. */
  public static MyAvroSchema fromByteBuffer(
      java.nio.ByteBuffer b) throws java.io.IOException {
    return DECODER.decode(b);
  }

  @Deprecated public java.lang.Integer a;
  @Deprecated public java.lang.CharSequence b;

  /**
   * Default constructor.  Note that this does not initialize fields
   * to their default values from the schema.  If that is desired then
   * one should use <code>newBuilder()</code>.
   */
  public MyAvroSchema() {}

  /**
   * All-args constructor.
   * @param a The new value for a
   * @param b The new value for b
   */
  public MyAvroSchema(java.lang.Integer a, java.lang.CharSequence b) {
    this.a = a;
    this.b = b;
  }

  public org.apache.avro.Schema getSchema() { return SCHEMA$; }
  // Used by DatumWriter.  Applications should not call.
  public java.lang.Object get(int field$) {
    switch (field$) {
    case 0: return a;
    case 1: return b;
    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
    }
  }

  // Used by DatumReader.  Applications should not call.
  @SuppressWarnings(value="unchecked")
  public void put(int field$, java.lang.Object value$) {
    switch (field$) {
    case 0: a = (java.lang.Integer)value$; break;
    case 1: b = (java.lang.CharSequence)value$; break;
    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
    }
  }

  /**
   * Gets the value of the 'a' field.
   * @return The value of the 'a' field.
   */
  public java.lang.Integer getA() {
    return a;
  }

  /**
   * Sets the value of the 'a' field.
   * @param value the value to set.
   */
  public void setA(java.lang.Integer value) {
    this.a = value;
  }

  /**
   * Gets the value of the 'b' field.
   * @return The value of the 'b' field.
   */
  public java.lang.CharSequence getB() {
    return b;
  }

  /**
   * Sets the value of the 'b' field.
   * @param value the value to set.
   */
  public void setB(java.lang.CharSequence value) {
    this.b = value;
  }

  /**
   * Creates a new MyAvroSchema RecordBuilder.
   * @return A new MyAvroSchema RecordBuilder
   */
  public static MyAvroSchema.Builder newBuilder() {
    return new MyAvroSchema.Builder();
  }

  /**
   * Creates a new MyAvroSchema RecordBuilder by copying an existing Builder.
   * @param other The existing builder to copy.
   * @return A new MyAvroSchema RecordBuilder
   */
  public static MyAvroSchema.Builder newBuilder(MyAvroSchema.Builder other) {
    return new MyAvroSchema.Builder(other);
  }

  /**
   * Creates a new MyAvroSchema RecordBuilder by copying an existing 
MyAvroSchema instance.
   * @param other The existing instance to copy.
   * @return A new MyAvroSchema RecordBuilder
   */
  public static MyAvroSchema.Builder newBuilder(MyAvroSchema other) {
    return new MyAvroSchema.Builder(other);
  }

  /**
   * RecordBuilder for MyAvroSchema instances.
   */
  public static class Builder extends 
org.apache.avro.specific.SpecificRecordBuilderBase<MyAvroSchema>
    implements org.apache.avro.data.RecordBuilder<MyAvroSchema> {

    private java.lang.Integer a;
    private java.lang.CharSequence b;

    /** Creates a new Builder */
    private Builder() {
      super(SCHEMA$);
    }

    /**
     * Creates a Builder by copying an existing Builder.
     * @param other The existing Builder to copy.
     */
    private Builder(MyAvroSchema.Builder other) {
      super(other);
      if (isValidValue(fields()[0], other.a)) {
        this.a = data().deepCopy(fields()[0].schema(), other.a);
        fieldSetFlags()[0] = true;
      }
      if (isValidValue(fields()[1], other.b)) {
        this.b = data().deepCopy(fields()[1].schema(), other.b);
        fieldSetFlags()[1] = true;
      }
    }

    /**
     * Creates a Builder by copying an existing MyAvroSchema instance
     * @param other The existing instance to copy.
     */
    private Builder(MyAvroSchema other) {
            super(SCHEMA$);
      if (isValidValue(fields()[0], other.a)) {
        this.a = data().deepCopy(fields()[0].schema(), other.a);
        fieldSetFlags()[0] = true;
      }
      if (isValidValue(fields()[1], other.b)) {
        this.b = data().deepCopy(fields()[1].schema(), other.b);
        fieldSetFlags()[1] = true;
      }
    }

    /**
      * Gets the value of the 'a' field.
      * @return The value.
      */
    public java.lang.Integer getA() {
      return a;
    }

    /**
      * Sets the value of the 'a' field.
      * @param value The value of 'a'.
      * @return This builder.
      */
    public MyAvroSchema.Builder setA(java.lang.Integer value) {
      validate(fields()[0], value);
      this.a = value;
      fieldSetFlags()[0] = true;
      return this;
    }

    /**
      * Checks whether the 'a' field has been set.
      * @return True if the 'a' field has been set, false otherwise.
      */
    public boolean hasA() {
      return fieldSetFlags()[0];
    }


    /**
      * Clears the value of the 'a' field.
      * @return This builder.
      */
    public MyAvroSchema.Builder clearA() {
      a = null;
      fieldSetFlags()[0] = false;
      return this;
    }

    /**
      * Gets the value of the 'b' field.
      * @return The value.
      */
    public java.lang.CharSequence getB() {
      return b;
    }

    /**
      * Sets the value of the 'b' field.
      * @param value The value of 'b'.
      * @return This builder.
      */
    public MyAvroSchema.Builder setB(java.lang.CharSequence value) {
      validate(fields()[1], value);
      this.b = value;
      fieldSetFlags()[1] = true;
      return this;
    }

    /**
      * Checks whether the 'b' field has been set.
      * @return True if the 'b' field has been set, false otherwise.
      */
    public boolean hasB() {
      return fieldSetFlags()[1];
    }


    /**
      * Clears the value of the 'b' field.
      * @return This builder.
      */
    public MyAvroSchema.Builder clearB() {
      b = null;
      fieldSetFlags()[1] = false;
      return this;
    }

    @Override
    @SuppressWarnings("unchecked")
    public MyAvroSchema build() {
      try {
        MyAvroSchema record = new MyAvroSchema();
        record.a = fieldSetFlags()[0] ? this.a : (java.lang.Integer) 
defaultValue(fields()[0]);
        record.b = fieldSetFlags()[1] ? this.b : (java.lang.CharSequence) 
defaultValue(fields()[1]);
        return record;
      } catch (java.lang.Exception e) {
        throw new org.apache.avro.AvroRuntimeException(e);
      }
    }
  }

  @SuppressWarnings("unchecked")
  private static final org.apache.avro.io.DatumWriter<MyAvroSchema>
    WRITER$ = 
(org.apache.avro.io.DatumWriter<MyAvroSchema>)MODEL$.createDatumWriter(SCHEMA$);

  @Override public void writeExternal(java.io.ObjectOutput out)
    throws java.io.IOException {
    WRITER$.write(this, SpecificData.getEncoder(out));
  }

  @SuppressWarnings("unchecked")
  private static final org.apache.avro.io.DatumReader<MyAvroSchema>
    READER$ = 
(org.apache.avro.io.DatumReader<MyAvroSchema>)MODEL$.createDatumReader(SCHEMA$);

 @Override public void readExternal(java.io.ObjectInput in)
    throws java.io.IOException {
    READER$.read(this, SpecificData.getDecoder(in));
  }

}

I will check out the other suggestions you make. One concern I have is that 
from the stacktrace I posted it doesn’t actually look like the custom class is 
being called.

From: vino yang <yanghua1...@gmail.com<mailto:yanghua1...@gmail.com>>
Sent: 17 July 2018 05:49
To: Porritt, James <james.porr...@uk.mlp.com<mailto:james.porr...@uk.mlp.com>>
Cc: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Re: AvroInputFormat NullPointerException issues

Hi Porritt,

Based on the exception stack trace you provided, it seems the exception occurs 
when initializing Avro schema. You did not give the definition of the 
MyAvroSchema Class, so I'd to suggest you :

1. make sure the file path "file:///home/myuser/test.avro" exists in your tm 
node which run your source task.
2. here is the flink-avro connector documentation[1] you can refer
3. there are many test case such as this[2], you can use to test your program, 
just need a little change

[1]: 
https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/batch/connectors.html#avro-support-in-flink
[2]: 
https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroInputFormatTypeExtractionTest.java

---
thanks.
vino.

2018-07-16 20:22 GMT+08:00 Porritt, James 
<james.porr...@uk.mlp.com<mailto:james.porr...@uk.mlp.com>>:
I’ve been trying to use the following code:

        ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
        Path path = new Path("file:///home/myuser/test.avro");
        AvroInputFormat<MyAvroSchema> my_format = new AvroInputFormat<>(path, 
MyAvroSchema.class);
        DataSet<MyAvroSchema> my_input = env.createInput(my_format);
        my_input.print();
        env.execute();

to utilise this avro schema:

{
  "type" : "record",
  "name" : "MyAvroSchema",
  "fields" : [ {
    "name" : "a",
    "type" : [ "null", "int" ]
  }, {
    "name" : "b",
    "type" : [ "null", "string" ]
  }]
}

I created the MyAvroSchema class from this schema using avro tools. I also 
converted the following JSON into a compatible avro stored in 
file:///home/myuser/test.avro

{"a":{"int":123},"b":{"string":"hello"}}

When I try and run this however I get:

2018-07-16 12:59:26,761 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph        - DataSource (at 
createInput(ExecutionEnvironment.java:548) 
(org.apache.flink.formats.avro.AvroInputFormat)
) (1/1) (302878b522f420f6b7866de4f32fcbd6) switched from RUNNING to FAILED.
org.apache.avro.AvroRuntimeException: 
avro.shaded.com.google.common.util.concurrent.UncheckedExecutionException: 
java.lang.NullPointerException
        at 
org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:227)
        at 
org.apache.avro.specific.SpecificDatumReader.<init>(SpecificDatumReader.java:37)
        at 
org.apache.flink.formats.avro.AvroInputFormat.initReader(AvroInputFormat.java:122)
        at 
org.apache.flink.formats.avro.AvroInputFormat.open(AvroInputFormat.java:111)
        at 
org.apache.flink.formats.avro.AvroInputFormat.open(AvroInputFormat.java:54)
        at 
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:170)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
        at java.lang.Thread.run(Thread.java:748)
Caused by: 
avro.shaded.com.google.common.util.concurrent.UncheckedExecutionException: 
java.lang.NullPointerException
        at 
avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2234)
        at 
avro.shaded.com.google.common.cache.LocalCache.get(LocalCache.java:3965)
        at 
avro.shaded.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3969)
        at 
avro.shaded.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4829)
        at 
org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:225)
        ... 7 more
Caused by: java.lang.NullPointerException
        at java.lang.String.replace(String.java:2239)
        at 
org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:281)
        at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:218)
        at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:215)
        at 
avro.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3568)
        at 
avro.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2350)
        at 
avro.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2313)
        at 
avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2228)
        ... 11 more

Can anyone suggest what might be causing the NullPointerException? I’m using 
flink-1.5.0 and avro-tools-1.8.2

######################################################################
The information contained in this communication is confidential and
intended only for the individual(s) named above. If you are not a named
addressee, please notify the sender immediately and delete this email
from your system and do not disclose the email or any part of it to any
person. The views expressed in this email are the views of the author
and do not necessarily represent the views of Millennium Capital Partners
LLP (MCP LLP) or any of its affiliates. Outgoing and incoming electronic
communications of MCP LLP and its affiliates, including telephone
communications, may be electronically archived and subject to review
and/or disclosure to someone other than the recipient. MCP LLP is
authorized and regulated by the Financial Conduct Authority. Millennium
Capital Partners LLP is a limited liability partnership registered in
England & Wales with number OC312897 and with its registered office at
50 Berkeley Street, London, W1J 
8HD<https://maps.google.com/?q=50+Berkeley+Street,+London,+W1J+8HD&entry=gmail&source=g>.
######################################################################

######################################################################
The information contained in this communication is confidential and
intended only for the individual(s) named above. If you are not a named
addressee, please notify the sender immediately and delete this email
from your system and do not disclose the email or any part of it to any
person. The views expressed in this email are the views of the author
and do not necessarily represent the views of Millennium Capital Partners
LLP (MCP LLP) or any of its affiliates. Outgoing and incoming electronic
communications of MCP LLP and its affiliates, including telephone
communications, may be electronically archived and subject to review
and/or disclosure to someone other than the recipient. MCP LLP is
authorized and regulated by the Financial Conduct Authority. Millennium
Capital Partners LLP is a limited liability partnership registered in
England & Wales with number OC312897 and with its registered office at
50 Berkeley Street, London, W1J 8HD.
######################################################################

######################################################################

The information contained in this communication is confidential and

intended only for the individual(s) named above. If you are not a named

addressee, please notify the sender immediately and delete this email

from your system and do not disclose the email or any part of it to any

person. The views expressed in this email are the views of the author

and do not necessarily represent the views of Millennium Capital Partners

LLP (MCP LLP) or any of its affiliates. Outgoing and incoming electronic

communications of MCP LLP and its affiliates, including telephone

communications, may be electronically archived and subject to review

and/or disclosure to someone other than the recipient. MCP LLP is

authorized and regulated by the Financial Conduct Authority. Millennium

Capital Partners LLP is a limited liability partnership registered in

England & Wales with number OC312897 and with its registered office at

50 Berkeley Street, London, W1J 8HD.

######################################################################

Reply via email to