Hi Hemant,

you could let the Kafka consumer just deserialize your JSON data as into a
DataStream<String>, then you use a custom processFunction to parse the
string to JSON.
In your custom function, you can handle the error more flexibly (like
outputting erroneous records through a side output).

I hope this helps!

Best,
Robert

On Wed, Mar 18, 2020 at 11:48 AM hemant singh <hemant2...@gmail.com> wrote:

> Hi Users,
>
> Is there a way I can do a schema validation on read from Kafka in a Flink
> job.
>
> I have a pipeline like this
>
> Kafka Topic Raw(json data) -> Kafka Topic Avro(avro data) -> Kafka Topic
> Transformed(avro data) -> Sink
>
> While reading from Raw topic I wanted to validate the schema so that in
> case the schema check fails I can push the event to an error topic. I
> understand from documentation[1] that the events which cannot be
> deserialised will be returned as null and consumer moves ahead(failing the
> consumer does not help as this could be re-tried with same result).
> Is there a way I can filter these records if the events cannot be
> deserialised .
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/kafka.html#the-deserializationschema
> 'When encountering a corrupted message that cannot be deserialised for
> any reason, there are two options - either throwing an exception from the
> deserialize(...) method which will cause the job to fail and be
> restarted, or returning null to allow the Flink Kafka consumer to
> silently skip the corrupted message.'
>
> Thanks,
> Hemant
>

Reply via email to