Hi Yuval,
thanks for bringing this issue up. You're right: There is no error handling
currently implemented for SerializationSchema. FLIP-124 [1] addressed this
for the DeserializationSchema, though. I created FLINK-19397 [2] to cover
this feature.

In the meantime, I cannot think of any other solution than filtering those
rows out in a step before emitting the data to Kafka.

Best,
Matthias

[1]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=148645988
[2] https://issues.apache.org/jira/browse/FLINK-19397

On Wed, Sep 23, 2020 at 1:12 PM Yuval Itzchakov <yuva...@gmail.com> wrote:

> Hi,
>
> I'm using a custom KafkaSerializationSchema to write records to Kafka
> using FlinkKafkaProducer. The objects written are Rows coming from Flink's
> SQL API.
>
> In some cases, when trying to convert the Row object to a byte[],
> serialization will fail due to malformed values. In such cases, I would
> like the custom serialization schema to drop the bad records and not send
> them through.
>
> From the API, it is unclear how such failures should be handled. Given the
> following signature:
>
>  ProducerRecord<byte[], byte[]> serialize(T element, @Nullable Long
> timestamp);
>
> From reading the code, there's no exception handling or null checking,
> which means that:
>
> - If an exception is thrown, it will cause the entire job to fail (this
> has happened to me in production)
> - If null is passed, a null value will be pushed down to
> kafkaProducer.send which is undesirable.
>
> What are the options here?
>
>
>
> --
> Best Regards,
> Yuval Itzchakov.
>

Reply via email to