Hi,

Could you share exactly how do you configure avro & kafka? Do you use
Table API or DataStream API? Do you use the
ConfluentRegistryDeserializationSchema that comes with Flink or did you
built custom DeserializationSchema? Could you maybe share the code for
instantiating the source with us? It could help us track down the
problematic spot.

Best,

Dawid

On 16/09/2020 08:09, Lian Jiang wrote:
> Hi,
>
> i am using avro 1.9.1 + Flink 1.10.1 + Confluent Kafka 5.5. In
> Intellij, I can see the FlinkKafkaConsumer already deserialized the
> upstream kafka message. However, I got below error when this message
> is serialized during pushToOperator. Per the stack trace, the reason
> is that AvroSerializer is created by AvroFactory.fromSpecific() which
> creates its private copy of specificData. This private specificData
> does not have logical type information. This blocks the deserialized
> messages from being passed to downstream operators. Any idea how to
> make this work? Appreciated very much!
>
>
> org.apache.avro.AvroRuntimeException: Unknown datum type
> java.time.Instant: 2020-09-15T07:00:00Z
> at org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:887)
> at
> org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:420)
> at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:850)
> at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1280)
> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
> at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1261)
> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
> at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1280)
> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
> at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1261)
> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
> at
> org.apache.flink.formats.avro.typeutils.AvroSerializer.copy(AvroSerializer.java:242)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
> at
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
> at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)
> at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to