[ https://issues.apache.org/jira/browse/FLINK-5583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15854944#comment-15854944 ]
Haohui Mai commented on FLINK-5583: ----------------------------------- In general (1) sounds good to me. Taking a closer look it seems that it might require a stateful API instead of the traditional {{Collector}} APIs. We have a mission-critical use case that needs to write all corrupted messages to a persistent store so that these messages can be inspected and backfilled later. Ideally the {{DeserializationSchema}} could some state and probably will need to be synchronized when checkpoints happen. It might be more natural to deserialize messages as a subsequent stage of the consumer. Thoughts? [~rmetzger] [~tzulitai] > Support flexible error handling in the Kafka consumer > ----------------------------------------------------- > > Key: FLINK-5583 > URL: https://issues.apache.org/jira/browse/FLINK-5583 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector > Reporter: Haohui Mai > Assignee: Haohui Mai > > We found that it is valuable to allow the applications to handle errors and > exceptions in the Kafka consumer in order to build a robust application in > production. > The context is the following: > (1) We have schematized, Avro records flowing through Kafka. > (2) The decoder implements the DeserializationSchema to decode the records. > (3) Occasionally there are corrupted records (e.g., schema issues). The > streaming pipeline might want to bail out (which is the current behavior) or > to skip the corrupted records depending on the applications. > Two options are available: > (1) Have a variant of DeserializationSchema to return a FlatMap like > structure as suggested in FLINK-3679. > (2) Allow the applications to catch and handle the exception by exposing some > APIs that are similar to the {{ExceptionProxy}}. > Thoughts? -- This message was sent by Atlassian JIRA (v6.3.15#6346)