Upon further investigation, we found out that the reason:

* The cluster was started on YARN with the hadoop classpath, which includes 
Avro. Therefore, Avro's SpecificRecord class was loaded using 
sun.misc.Launcher$AppClassLoader


* Our LteSession class was submitted with the application jar, and loaded with 
the child-first classloader

* Flink check if LteSession is assignable to SpecificRecord, which fails.

* Flink fall back to Reflection-based avro writer, which throws NPE on null 
field.

If we change the classloader to parent-first, everything is ok. Now the 
question is why the default doesn't work for us.

Best regards,
Kien

⁣Sent from TypeApp ​

On Dec 20, 2017, 14:09, at 14:09, Kien Truong <duckientru...@gmail.com> wrote:
>Hi,
>
>After upgrading to Flink 1.4, we encounter this exception
>
>Caused by: java.lang.NullPointerException: in
>com.viettel.big4g.avro.LteSession in long null of long in field tmsi of
>com.viettel.big4g.avro.LteSession
>at
>org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:161)
>at
>org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62)
>at
>org.apache.flink.formats.avro.typeutils.AvroSerializer.serialize(AvroSerializer.java:132)
>at
>org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:176)
>at
>org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:48)
>at
>org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
>at
>org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:93)
>at
>org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:114)
>at
>org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:89)
>at
>org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:84)
>at
>org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:102)
>
>
>It seems Flink attempts to use the reflection writer instead of the
>specific writer for this schema. This is wrong, because our LteSession
>is an Avro object, and should use the specific writer.
>
>Best regards,
>Kien
>
>⁣Sent from TypeApp ​

Reply via email to