Hi Jack,

As Robert Metzger mentioned in a previous thread, there's an ongoing
discussion about the issue in this JIRA:
https://issues.apache.org/jira/browse/FLINK-3679.

A possible workaround is to use a SimpleStringSchema in the Kafka source,
and chain it with a flatMap operator where you can use your custom
deserializer and handle deserialization errors.

Best,
Yassine

On Aug 27, 2016 02:37, "Jack Huang" <jackhu...@mz.com> wrote:

> Hi all,
>
> I have a custom deserializer which I pass to a Kafka source to transform
> JSON string to Scala case class.
>
> val events = env.addSource(new FlinkKafkaConsumer09[Event]("events", new 
> JsonSerde(classOf[Event], new Event), kafkaProp))
>
> ​
>
> There are time when the JSON message is malformed, in which case I want to
> catch the exception, log some error message, and go on to the next message
> without producing an event to the downstream. It doesn't seem like the 
> DeserializationSchema
> interface allows such behavior. How could I achieve this?
>
> Thanks,
> Jack
>

Reply via email to