Re: Handle deserialization error

2016-09-01 Thread Jack Huang
Hi Yassine, For now my workaround is catching exceptions in my custom deserializer and producing some default object to the downstream. It would still be very nice to avoid this inefficiency by not producing an object at all. Thanks, Jack On Fri, Aug 26, 2016 at 6:51 PM, Yassine Marzougui wrot

Re: Handle deserialization error

2016-08-26 Thread Yassine Marzougui
Hi Jack, As Robert Metzger mentioned in a previous thread, there's an ongoing discussion about the issue in this JIRA: https://issues.apache.org/jira/browse/FLINK-3679. A possible workaround is to use a SimpleStringSchema in the Kafka source, and chain it with a flatMap operator where you can use

Handle deserialization error

2016-08-26 Thread Jack Huang
Hi all, I have a custom deserializer which I pass to a Kafka source to transform JSON string to Scala case class. val events = env.addSource(new FlinkKafkaConsumer09[Event]("events", new JsonSerde(classOf[Event], new Event), kafkaProp)) ​ There are time when the JSON message is malformed, in wh