From the stack trace it appears that flink-avro is used, which uses avro 1.8.2 internally by default,
for which this appears to be a known issue that was fixed in 1.9.?.

Are you sure that avro 1.9.2 is actually being used?

On 19/02/2020 23:53, Lian Jiang wrote:
Hi,

I use a FlinkKinesisConsumer in a Flink job to de-serialize kinesis events.

Flink: 1.9.2
Avro: 1.9.2

The serDe class is like:
public class ManagedSchemaKinesisPayloadSerDe<T extends SpecificRecord>
         implements KinesisSerializationSchema<T>, 
KinesisDeserializationSchema<T> {

private static final StringREGISTRY_ENDPOINT ="https://schema-registry.my.net";;
private static final long serialVersionUID = -1L;
private final Class<T> tClass;
private Stringtopic;

public ManagedSchemaKinesisPayloadSerDe(final Class<T> tClass) {this.tClass = 
tClass;
     this.topic =null;
     SpecificData.get().addLogicalTypeConversion(new 
TimeConversions.TimestampConversion());
}

@Override public ByteBuffer serialize(T obj) {
     Properties props =new Properties();
     props.put(KafkaAvroSerializerConfig.AUTO_REGISTER_SCHEMAS,false);
     
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,REGISTRY_ENDPOINT);
// some code to create schemaReg final KafkaAvroSerializer serializer =new 
KafkaAvroSerializer(schemaReg,new HashMap(props));
     return ByteBuffer.wrap(serializer.serialize(topic, obj));
}

@Override public T deserialize(
         byte[] record,
         String partitionKey,
         String sequenceNumber,
         long eventUtcTimestamp,
         String streamName,
         String shardId)throws IOException {
     Properties props =new Properties();
     props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, 
Boolean.toString(true));
     
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,REGISTRY_ENDPOINT);
     VerifiableProperties vProps =new VerifiableProperties(props);

     // some code to create schemaReg
     final KafkaAvroDecoder decoder =new KafkaAvroDecoder(schemaReg, vProps);
     return (T) decoder.fromBytes(record);
}

@Override public TypeInformation<T> getProducedType() {
     return TypeInformation.of(tClass);
}
} // end of classManagedSchemaKinesisPayloadSerDe


// create consumer, stream environment:
ManagedSchemaKinesisPayloadSerDe<MyPoJoRecord> serDe =
         new ManagedSchemaKinesisPayloadSerDe<>(MyPoJoRecord.class);

final FlinkKinesisConsumer<MyPoJoRecord> consumer =new FlinkKinesisConsumer<>(
         streamName,
         serDe,
         streamConfig);

streamEnv
        .addSource(consumer)
        .print();
streamEnv.execute();


The exception:
java.lang.RuntimeException: Unknown datum type org.joda.time.DateTime: 
2020-02-16T19:14:20.983Z
        atorg.apache.flink.streaming.runtime.io  
<http://org.apache.flink.streaming.runtime.io/>.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
        atorg.apache.flink.streaming.runtime.io  
<http://org.apache.flink.streaming.runtime.io/>.RecordWriterOutput.collect(RecordWriterOutput.java:89)
        atorg.apache.flink.streaming.runtime.io  
<http://org.apache.flink.streaming.runtime.io/>.RecordWriterOutput.collect(RecordWriterOutput.java:45)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
        at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
        at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
        at 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.emitRecordAndUpdateState(KinesisDataFetcher.java:766)
        at 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.access$000(KinesisDataFetcher.java:91)
        at 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$AsyncKinesisRecordEmitter.emit(KinesisDataFetcher.java:272)
        at 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$SyncKinesisRecordEmitter$1.put(KinesisDataFetcher.java:287)
        at 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$SyncKinesisRecordEmitter$1.put(KinesisDataFetcher.java:284)
        at 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.emitRecordAndUpdateState(KinesisDataFetcher.java:748)
        at 
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.deserializeRecordForCollectionAndUpdateState(ShardConsumer.java:371)
        at 
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:258)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.avro.AvroRuntimeException: Unknown datum type 
org.joda.time.DateTime: 2020-02-16T19:14:20.983Z
        at 
org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:772)
        at 
org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:302)
        at 
org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:737)
        at 
org.apache.avro.generic.GenericDatumWriter.resolveUnion(GenericDatumWriter.java:205)
        at 
org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:123)
        at 
org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:87)
        at 
org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:156)
        at 
org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:118)
        at 
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
        at 
org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:125)
        at 
org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:87)
        at 
org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:156)
        at 
org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:118)
        at 
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
        at 
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62)
        at 
org.apache.flink.formats.avro.typeutils.AvroSerializer.serialize(AvroSerializer.java:185)
        at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:175)
        at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:46)
        at 
org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
        at 
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.serializeRecord(SpanningRecordSerializer.java:78)
        at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:152)
        at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:120)
        atorg.apache.flink.streaming.runtime.io  
<http://org.apache.flink.streaming.runtime.io/>.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
        ... 19 more



Observation:
1. The kinesis events are deserialized SUCCESSFULLY. The exception comes when 
FlinkKinesisConsumer
tried to serialize the deserialized object post deserialization. 
FlinkKinesisConsumer will create its
own DatumWriter by using MyPoJoRecord (a child class of SpecificRecord). I have 
no control
on this DatumWriter. For example, I cannot add a logicDataType or register a 
custom kyro serializer.
GenericData.get().addLogicalTypeConversion(new 
TimeConversions.TimestampConversion()) does not resolve the problem.
2. the joda mentioned in the exception is from "union {null, timestamp_ms }". This union will be
handled by org.apache.avro.generic.GenericData.resolveUnion(). Because of 1, 
resolveUnion()
cannot get any conversions and fail to handle Joda time.

Question:
Is it expected thatFlinkKinesisConsumer cannot handle Joda time? Any solution 
here?

Appreciate very much!




Reply via email to