From the stack trace it appears that flink-avro is used, which uses
avro 1.8.2 internally by default,
for which this appears to be a known issue that was fixed in 1.9.?.
Are you sure that avro 1.9.2 is actually being used?
On 19/02/2020 23:53, Lian Jiang wrote:
Hi,
I use a FlinkKinesisConsumer in a Flink job to de-serialize kinesis
events.
Flink: 1.9.2
Avro: 1.9.2
The serDe class is like:
public class ManagedSchemaKinesisPayloadSerDe<T extends SpecificRecord>
implements KinesisSerializationSchema<T>,
KinesisDeserializationSchema<T> {
private static final StringREGISTRY_ENDPOINT ="https://schema-registry.my.net";
private static final long serialVersionUID = -1L;
private final Class<T> tClass;
private Stringtopic;
public ManagedSchemaKinesisPayloadSerDe(final Class<T> tClass) {this.tClass =
tClass;
this.topic =null;
SpecificData.get().addLogicalTypeConversion(new
TimeConversions.TimestampConversion());
}
@Override public ByteBuffer serialize(T obj) {
Properties props =new Properties();
props.put(KafkaAvroSerializerConfig.AUTO_REGISTER_SCHEMAS,false);
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,REGISTRY_ENDPOINT);
// some code to create schemaReg final KafkaAvroSerializer serializer =new
KafkaAvroSerializer(schemaReg,new HashMap(props));
return ByteBuffer.wrap(serializer.serialize(topic, obj));
}
@Override public T deserialize(
byte[] record,
String partitionKey,
String sequenceNumber,
long eventUtcTimestamp,
String streamName,
String shardId)throws IOException {
Properties props =new Properties();
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG,
Boolean.toString(true));
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,REGISTRY_ENDPOINT);
VerifiableProperties vProps =new VerifiableProperties(props);
// some code to create schemaReg
final KafkaAvroDecoder decoder =new KafkaAvroDecoder(schemaReg, vProps);
return (T) decoder.fromBytes(record);
}
@Override public TypeInformation<T> getProducedType() {
return TypeInformation.of(tClass);
}
} // end of classManagedSchemaKinesisPayloadSerDe
// create consumer, stream environment:
ManagedSchemaKinesisPayloadSerDe<MyPoJoRecord> serDe =
new ManagedSchemaKinesisPayloadSerDe<>(MyPoJoRecord.class);
final FlinkKinesisConsumer<MyPoJoRecord> consumer =new FlinkKinesisConsumer<>(
streamName,
serDe,
streamConfig);
streamEnv
.addSource(consumer)
.print();
streamEnv.execute();
The exception:
java.lang.RuntimeException: Unknown datum type org.joda.time.DateTime:
2020-02-16T19:14:20.983Z
atorg.apache.flink.streaming.runtime.io
<http://org.apache.flink.streaming.runtime.io/>.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
atorg.apache.flink.streaming.runtime.io
<http://org.apache.flink.streaming.runtime.io/>.RecordWriterOutput.collect(RecordWriterOutput.java:89)
atorg.apache.flink.streaming.runtime.io
<http://org.apache.flink.streaming.runtime.io/>.RecordWriterOutput.collect(RecordWriterOutput.java:45)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
at
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.emitRecordAndUpdateState(KinesisDataFetcher.java:766)
at
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.access$000(KinesisDataFetcher.java:91)
at
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$AsyncKinesisRecordEmitter.emit(KinesisDataFetcher.java:272)
at
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$SyncKinesisRecordEmitter$1.put(KinesisDataFetcher.java:287)
at
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$SyncKinesisRecordEmitter$1.put(KinesisDataFetcher.java:284)
at
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.emitRecordAndUpdateState(KinesisDataFetcher.java:748)
at
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.deserializeRecordForCollectionAndUpdateState(ShardConsumer.java:371)
at
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:258)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.avro.AvroRuntimeException: Unknown datum type
org.joda.time.DateTime: 2020-02-16T19:14:20.983Z
at
org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:772)
at
org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:302)
at
org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:737)
at
org.apache.avro.generic.GenericDatumWriter.resolveUnion(GenericDatumWriter.java:205)
at
org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:123)
at
org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:87)
at
org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:156)
at
org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:118)
at
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
at
org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:125)
at
org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:87)
at
org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:156)
at
org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:118)
at
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
at
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62)
at
org.apache.flink.formats.avro.typeutils.AvroSerializer.serialize(AvroSerializer.java:185)
at
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:175)
at
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:46)
at
org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
at
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.serializeRecord(SpanningRecordSerializer.java:78)
at
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:152)
at
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:120)
atorg.apache.flink.streaming.runtime.io
<http://org.apache.flink.streaming.runtime.io/>.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
... 19 more
Observation:
1. The kinesis events are deserialized SUCCESSFULLY. The exception comes when
FlinkKinesisConsumer
tried to serialize the deserialized object post deserialization.
FlinkKinesisConsumer will create its
own DatumWriter by using MyPoJoRecord (a child class of SpecificRecord). I have
no control
on this DatumWriter. For example, I cannot add a logicDataType or register a
custom kyro serializer.
GenericData.get().addLogicalTypeConversion(new
TimeConversions.TimestampConversion()) does not resolve the problem.
2. the joda mentioned in the exception is from "union {null, timestamp_ms }". This union will be
handled by org.apache.avro.generic.GenericData.resolveUnion(). Because of 1,
resolveUnion()
cannot get any conversions and fail to handle Joda time.
Question:
Is it expected thatFlinkKinesisConsumer cannot handle Joda time? Any solution
here?
Appreciate very much!