If I am processing a stream in the following manner: 

val stream = env.addSource(consumer).name("KafkaStream") 
.keyBy(x => (x.obj.ID1(),x.obj.ID2(),x.obj.ID3()) 
.flatMap(new FlatMapProcessor) 

and the IDs bomb out because of deserialization issues, my job crashes with a 
'Could not extract key' error. How can I trap this cleanly? The only thing I 
can think of is to validate the IDs in the deserialization class argument that 
is used in the KafkaConsumer constructor, and trap any issues there. Is that 
the preferred way? Is there a better way? 

Reply via email to