Hi Yuval, thanks for bringing this issue up. You're right: There is no error handling currently implemented for SerializationSchema. FLIP-124 [1] addressed this for the DeserializationSchema, though. I created FLINK-19397 [2] to cover this feature.
In the meantime, I cannot think of any other solution than filtering those rows out in a step before emitting the data to Kafka. Best, Matthias [1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=148645988 [2] https://issues.apache.org/jira/browse/FLINK-19397 On Wed, Sep 23, 2020 at 1:12 PM Yuval Itzchakov <yuva...@gmail.com> wrote: > Hi, > > I'm using a custom KafkaSerializationSchema to write records to Kafka > using FlinkKafkaProducer. The objects written are Rows coming from Flink's > SQL API. > > In some cases, when trying to convert the Row object to a byte[], > serialization will fail due to malformed values. In such cases, I would > like the custom serialization schema to drop the bad records and not send > them through. > > From the API, it is unclear how such failures should be handled. Given the > following signature: > > ProducerRecord<byte[], byte[]> serialize(T element, @Nullable Long > timestamp); > > From reading the code, there's no exception handling or null checking, > which means that: > > - If an exception is thrown, it will cause the entire job to fail (this > has happened to me in production) > - If null is passed, a null value will be pushed down to > kafkaProducer.send which is undesirable. > > What are the options here? > > > > -- > Best Regards, > Yuval Itzchakov. >