Hi

从错误上看应该是schema跟数据不匹配导致导致的,看起来目前avro不支持这种schema变更新老数据一起处理

Best,
Shammon.FY


On Fri, Mar 10, 2023 at 2:29 PM Peihui He <peihu...@gmail.com> wrote:

> java.io.IOException: Failed to deserialize Avro record.
> at
>
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:106)
> at
>
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:46)
> at
>
> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
> at
>
> org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:130)
> at
>
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
> at
>
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
> at
>
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
> at
>
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
> at
>
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
> at
>
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
> Caused by: java.io.EOFException
> at
> org.apache.flink.avro.shaded.org.apache.avro.io
> .BinaryDecoder$InputStreamByteSource.readRaw(BinaryDecoder.java:851)
> at
> org.apache.flink.avro.shaded.org.apache.avro.io
> .BinaryDecoder.doReadBytes(BinaryDecoder.java:373)
> at
> org.apache.flink.avro.shaded.org.apache.avro.io
> .BinaryDecoder.readString(BinaryDecoder.java:290)
> at
> org.apache.flink.avro.shaded.org.apache.avro.io
> .ResolvingDecoder.readString(ResolvingDecoder.java:208)
> at
>
> org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:469)
> at
>
> org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:459)
> at
>
> org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:191)
> at
>
> org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
> at
>
> org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
> at
>
> org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
> at
>
> org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
> at
>
> org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readArray(GenericDatumReader.java:298)
> at
>
> org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:183)
> at
>
> org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
> at
>
> org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
> at
>
> org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
> at
>
> org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
> at
>
> org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
> at
>
> org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
> at
>
> org.apache.flink.formats.avro.AvroDeserializationSchema.deserialize(AvroDeserializationSchema.java:142)
> at
>
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:103)
> ... 9 more
>
>
> 如上,
> 比如
> 之前的schemal 是
> {
> a,
> b
> }
>
> 后来调整为
> {
> a,
> b,
> c
> }
>
> 当程序升级后,由于kafka中同时包含新旧数据,就会报错了
>
> Shammon FY <zjur...@gmail.com> 于2023年2月24日周五 18:56写道:
>
> > Hi
> >
> > 你可以贴一下错误看下具体原因
> >
> > Best,
> > Shammon
> >
> > On Fri, Feb 24, 2023 at 6:10 PM Peihui He <peihu...@gmail.com> wrote:
> >
> > > Hi, all
> > >
> > > 请教大家有没有遇到这样的情况,flink 使用avro
> > > 消费kafka中数据,后来在schema结构中加入新的字段。在灰度过程中会混杂着新老数据,这样的flink 消费程序就会挂掉。
> > >
> > > 大家一般是怎么处理的呢
> > >
> > > Best Wishes.
> > >
> >
>

Reply via email to