Hi,

I'm using a custom KafkaSerializationSchema to write records to Kafka using
FlinkKafkaProducer. The objects written are Rows coming from Flink's SQL
API.

In some cases, when trying to convert the Row object to a byte[],
serialization will fail due to malformed values. In such cases, I would
like the custom serialization schema to drop the bad records and not send
them through.

>From the API, it is unclear how such failures should be handled. Given the
following signature:

 ProducerRecord<byte[], byte[]> serialize(T element, @Nullable Long
timestamp);

>From reading the code, there's no exception handling or null checking,
which means that:

- If an exception is thrown, it will cause the entire job to fail (this has
happened to me in production)
- If null is passed, a null value will be pushed down to kafkaProducer.send
which is undesirable.

What are the options here?



-- 
Best Regards,
Yuval Itzchakov.

Reply via email to