[ https://issues.apache.org/jira/browse/FLINK-18049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
ASF GitHub Bot updated FLINK-18049: ----------------------------------- Labels: pull-request-available (was: ) > The Flink kafka consumer job will be interrupted if the upstream kafka > producer change the AVRO schema > ------------------------------------------------------------------------------------------------------ > > Key: FLINK-18049 > URL: https://issues.apache.org/jira/browse/FLINK-18049 > Project: Flink > Issue Type: Bug > Reporter: Zheng Hu > Priority: Critical > Labels: pull-request-available > > We have encountered a critical case from online services. we have the data > pipeline: (producer) -> (kafka) -> (flink consumer job), and all those > records are encoded in AVRO format. Once the producer changed the AVRO > schema , says adding an extra column to the existing schema and writing few > data into the Kafka. > Then the downstream flink job crashed with the following stacktrace: > {code} > ==WARNING== allocating large > array--thread_id[0x00007fccd9c16800]--thread_name[Source: Custom Source > (1/1)]--array_size[1590681120 bytes]--array_length[1590681103 elememts] > os_prio=0 tid=0x00007fccd9c16800 nid=0x226c0 runnable > at org.shaded.apache.avro.util.Utf8.setByteLength(Utf8.java:78) > at > org.shaded.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:261) > at > org.shaded.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:272) > at > org.shaded.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:214) > at > org.shaded.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:412) > at > org.shaded.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:181) > at > org.shaded.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116) > at > org.shaded.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222) > at > org.shaded.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) > at > org.shaded.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) > at > org.shaded.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145) > at > org.apache.flink.formats.avro.AvroRowDeserializationSchema.deserialize(AvroRowDeserializationSchema.java:167) > at > org.apache.flink.formats.avro.AvroRowDeserializationSchema.deserialize(AvroRowDeserializationSchema.java:78) > at > org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper.deserialize(KeyedDeserializationSchemaWrapper.java:44) > at > org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:192) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:771) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:120) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:74) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:129) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:398) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:736) > at java.lang.Thread.run(Thread.java:834) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)