Hi,
This simplified code fails with the bellow stacktrace. In case I remove
the timestamp logicalType to a regular long in the avro schema, it
works fine. Also, it is independant of the sink (also did NPE with a
kafka sink).
Is it needed to implement a custom kryo serializer for built-in avro
logicalTypes ? Or am i missing something ?
GeneratorFunction<Long, String> generatorFunction = index -> "Number: " +
index;
long numberOfRecords = 1000;
DataGeneratorSource<String> source =
new DataGeneratorSource<>(generatorFunction, numberOfRecords,
Types.STRING);
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
final Schema abSchema =
new Schema.Parser()
.parse(
"{\"type\": \"record\", "
+ "\"name\": \"User\", "
+ "\"fields\": [\n"
+ " {\"name\": \"a\", \"type\": \"long\",
\"logicalType\": \"timestamp-millis\" },\n"
+ " {\"name\": \"b\", \"type\":
[\"null\",\"string\"] }\n"
+ " ]\n"
+ " }");
final FileSink<GenericRecord> sink =
FileSink.forBulkFormat(
new Path("/tmp/foo/"),
AvroParquetWriters.forGenericRecord(abSchema))
.build();
DataStreamSource<String> stream =
env.fromSource(source,
WatermarkStrategy.noWatermarks(),
"Generator Source");
// this for being able to map the stream
Class<?> unmodColl =
Class.forName("java.util.Collections$UnmodifiableCollection");
env.getConfig().addDefaultKryoSerializer(unmodColl,
UnmodifiableCollectionsSerializer.class);
stream.map(record -> {
GenericRecord newRecord = new GenericData.Record(abSchema);
newRecord.put("a", 1L);
newRecord.put("b", "bar");
return newRecord;
}).sinkTo(sink);
env.execute();
Stacktrace:
Caused by: com.esotericsoftware.kryo.KryoException:
java.lang.NullPointerException
Serialization trace:
props (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
schema (org.apache.avro.generic.GenericData$Record)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:322)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:74)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
at
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
at
org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:309)
at
org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
at
org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:101)
at
org.apache.flink.api.connector.source.lib.util.IteratorSourceReaderBase.pollNext(IteratorSourceReaderBase.java:111)
at
org.apache.flink.api.connector.source.util.ratelimit.RateLimitedSourceReader.pollNext(RateLimitedSourceReader.java:69)
at
org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:419)
at
org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:562)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807)
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
at
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.NullPointerException
at org.apache.avro.JsonProperties$2.putIfAbsent(JsonProperties.java:159)
at org.apache.avro.JsonProperties$2.put(JsonProperties.java:166)
at org.apache.avro.JsonProperties$2.put(JsonProperties.java:151)
at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:144)
at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
... 36 more