Hi, I'm using a custom KafkaSerializationSchema to write records to Kafka using FlinkKafkaProducer. The objects written are Rows coming from Flink's SQL API.
In some cases, when trying to convert the Row object to a byte[], serialization will fail due to malformed values. In such cases, I would like the custom serialization schema to drop the bad records and not send them through. >From the API, it is unclear how such failures should be handled. Given the following signature: ProducerRecord<byte[], byte[]> serialize(T element, @Nullable Long timestamp); >From reading the code, there's no exception handling or null checking, which means that: - If an exception is thrown, it will cause the entire job to fail (this has happened to me in production) - If null is passed, a null value will be pushed down to kafkaProducer.send which is undesirable. What are the options here? -- Best Regards, Yuval Itzchakov.