[ 
https://issues.apache.org/jira/browse/FLINK-5583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15854944#comment-15854944
 ] 

Haohui Mai commented on FLINK-5583:
-----------------------------------

In general (1) sounds good to me. Taking a closer look it seems that it might 
require a stateful API instead of the traditional {{Collector}} APIs.

We have a mission-critical use case that needs to write all corrupted messages 
to a persistent store so that these messages can be inspected and backfilled 
later. Ideally the {{DeserializationSchema}} could some state and probably will 
need to be synchronized when checkpoints happen.

It might be more natural to deserialize messages as a subsequent stage of the 
consumer. Thoughts?

[~rmetzger] [~tzulitai]

> Support flexible error handling in the Kafka consumer
> -----------------------------------------------------
>
>                 Key: FLINK-5583
>                 URL: https://issues.apache.org/jira/browse/FLINK-5583
>             Project: Flink
>          Issue Type: Improvement
>          Components: Kafka Connector
>            Reporter: Haohui Mai
>            Assignee: Haohui Mai
>
> We found that it is valuable to allow the applications to handle errors and 
> exceptions in the Kafka consumer in order to build a robust application in 
> production.
> The context is the following:
> (1) We have schematized, Avro records flowing through Kafka.
> (2) The decoder implements the DeserializationSchema to decode the records.
> (3) Occasionally there are corrupted records (e.g., schema issues). The 
> streaming pipeline might want to bail out (which is the current behavior) or 
> to skip the corrupted records depending on the applications.
> Two options are available:
> (1) Have a variant of DeserializationSchema to return a FlatMap like 
> structure as suggested in FLINK-3679.
> (2) Allow the applications to catch and handle the exception by exposing some 
> APIs that are similar to the {{ExceptionProxy}}.
> Thoughts?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to