Yu Yang created FLINK-18017: ------------------------------- Summary: improve Kafka connector to handle record deserialization exception and report related metrics Key: FLINK-18017 URL: https://issues.apache.org/jira/browse/FLINK-18017 Project: Flink Issue Type: Improvement Components: Connectors / Kafka Affects Versions: 1.9.1 Reporter: Yu Yang
Corrupted messages can get into the message pipeline for various reasons. When a Flink deserializer fails to deserialize the message, and throw an exception due to corrupted message, the flink application will be blocked until we update the deserializer to handle the exception. Currently messages are deserialized as below in flink_pinterest/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaFetcher.java {code:java} for (ConsumerRecord<byte[], byte[]> record : partitionRecords) { final T value = deserializer.deserialize(record); if (deserializer.isEndOfStream(value)) { // end of stream signaled running = false; break; } // emit the actual record. this also updates offset state atomically // and deals with timestamps and watermark generation emitRecord(value, partition, record.offset(), record); } {code} Flink Kafka connector needs to catch exception from deserialization, and expose related metrics. -- This message was sent by Atlassian Jira (v8.3.4#803005)