Hello

I have a generated Avro class (i.e. extends SpecificRecordBase).  I am
serializing instances of it in my flink application using:

<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${avro.version}</version>
</dependency>

I send the resulting byte array out through my Flink Kafka Producer to a
Kafka consumer (separate JVM, non-Flink) where I attempt to deserialize it
resulting in the following failure:

java.lang.ArrayIndexOutOfBoundsException: 40
at org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:402)
at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290)
at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155)
at
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
at
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
at
org.apache.avro.generic.GenericDatumReader.readArray(GenericDatumReader.java:219)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
at
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
at
org.apache.avro.generic.GenericDatumReader.readArray(GenericDatumReader.java:219)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155)
at
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
at
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142)
at
com.oath.ads.identitygraph.acceptance.flink.util.AvroUtils.deserializeRecord(AvroUtils.java:58)

After investigation I've noted that my flink application contains the
following dependency (which seems to be necessary even though my
application makes no explicit use of its classes):

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>${flink.version}</version>
</dependency>

and that that package introduces an
org.apache.flink.formats.avro.typeutils.AvroInfoType class (
https://github.com/apache/flink/blob/release-1.14.2/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java#L82-L85)
which in turn seems to be used to automatically convert each CharSequence
in each schema to a Utf8.  The javadoc for this class (
https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.html)
says:

"Special type information to generate a special AvroTypeInfo for Avro POJOs
(implementing SpecificRecordBase, the typed Avro POJOs).  Proceeding: It
uses a regular pojo type analysis and replaces all
GenericType<CharSequence> with a GenericType<avro.Utf8>. All other types
used by Avro are standard Java types. Only strings are represented as
CharSequence fields and represented as Utf8 classes at runtime.
CharSequence is not comparable. To make them nicely usable with field
expressions, we replace them here by generic type infos containing Utf8
classes (which are comparable)".

I'm wondering if this could explain why the consumer cannot deserialize my
messages from Flink (as it still expects CharSequences)?  If so,
is there any way to bypass the effects of AvroTypeInfo.  (I've tried
removing the flink-avro dependency from my flink application but I then
get a missing class exception with respect to AvroTypeInfo)?

Any other explanation or insight would also be very welcome.  (Note that in
production use the messages will be consumed by third-party applications
into which it would be difficult to introduce any Flink specifics).

Best Regards

Paul O'Neill

Reply via email to