Hi Jack, As Robert Metzger mentioned in a previous thread, there's an ongoing discussion about the issue in this JIRA: https://issues.apache.org/jira/browse/FLINK-3679.
A possible workaround is to use a SimpleStringSchema in the Kafka source, and chain it with a flatMap operator where you can use your custom deserializer and handle deserialization errors. Best, Yassine On Aug 27, 2016 02:37, "Jack Huang" <jackhu...@mz.com> wrote: > Hi all, > > I have a custom deserializer which I pass to a Kafka source to transform > JSON string to Scala case class. > > val events = env.addSource(new FlinkKafkaConsumer09[Event]("events", new > JsonSerde(classOf[Event], new Event), kafkaProp)) > > > > There are time when the JSON message is malformed, in which case I want to > catch the exception, log some error message, and go on to the next message > without producing an event to the downstream. It doesn't seem like the > DeserializationSchema > interface allows such behavior. How could I achieve this? > > Thanks, > Jack >