Hi Robert,

Thanks for your reply. This helps, was looking into similar direction.

Thanks,
Hemant

On Wed, 18 Mar 2020 at 8:44 PM, Robert Metzger <rmetz...@apache.org> wrote:

> Hi Hemant,
>
> you could let the Kafka consumer just deserialize your JSON data as into a
> DataStream<String>, then you use a custom processFunction to parse the
> string to JSON.
> In your custom function, you can handle the error more flexibly (like
> outputting erroneous records through a side output).
>
> I hope this helps!
>
> Best,
> Robert
>
> On Wed, Mar 18, 2020 at 11:48 AM hemant singh <hemant2...@gmail.com>
> wrote:
>
>> Hi Users,
>>
>> Is there a way I can do a schema validation on read from Kafka in a Flink
>> job.
>>
>> I have a pipeline like this
>>
>> Kafka Topic Raw(json data) -> Kafka Topic Avro(avro data) -> Kafka Topic
>> Transformed(avro data) -> Sink
>>
>> While reading from Raw topic I wanted to validate the schema so that in
>> case the schema check fails I can push the event to an error topic. I
>> understand from documentation[1] that the events which cannot be
>> deserialised will be returned as null and consumer moves ahead(failing the
>> consumer does not help as this could be re-tried with same result).
>> Is there a way I can filter these records if the events cannot be
>> deserialised .
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/kafka.html#the-deserializationschema
>> 'When encountering a corrupted message that cannot be deserialised for
>> any reason, there are two options - either throwing an exception from the
>> deserialize(...) method which will cause the job to fail and be
>> restarted, or returning null to allow the Flink Kafka consumer to
>> silently skip the corrupted message.'
>>
>> Thanks,
>> Hemant
>>
>

Reply via email to