Hi Robert, Thanks for your reply. This helps, was looking into similar direction.
Thanks, Hemant On Wed, 18 Mar 2020 at 8:44 PM, Robert Metzger <rmetz...@apache.org> wrote: > Hi Hemant, > > you could let the Kafka consumer just deserialize your JSON data as into a > DataStream<String>, then you use a custom processFunction to parse the > string to JSON. > In your custom function, you can handle the error more flexibly (like > outputting erroneous records through a side output). > > I hope this helps! > > Best, > Robert > > On Wed, Mar 18, 2020 at 11:48 AM hemant singh <hemant2...@gmail.com> > wrote: > >> Hi Users, >> >> Is there a way I can do a schema validation on read from Kafka in a Flink >> job. >> >> I have a pipeline like this >> >> Kafka Topic Raw(json data) -> Kafka Topic Avro(avro data) -> Kafka Topic >> Transformed(avro data) -> Sink >> >> While reading from Raw topic I wanted to validate the schema so that in >> case the schema check fails I can push the event to an error topic. I >> understand from documentation[1] that the events which cannot be >> deserialised will be returned as null and consumer moves ahead(failing the >> consumer does not help as this could be re-tried with same result). >> Is there a way I can filter these records if the events cannot be >> deserialised . >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/kafka.html#the-deserializationschema >> 'When encountering a corrupted message that cannot be deserialised for >> any reason, there are two options - either throwing an exception from the >> deserialize(...) method which will cause the job to fail and be >> restarted, or returning null to allow the Flink Kafka consumer to >> silently skip the corrupted message.' >> >> Thanks, >> Hemant >> >