Hi Yuval,

Here are some workarounds.

One option is to use a tombstone record (0 byte payload) and filter it
downstream. If it's log-compacted, Kafka would filter them on compaction.

Second option is to actually translate the Row to a byte[] array in a
separate flatMap (returning 0 records on error) and then simply write the
byte[] directly to Kafka in the Schema.

Third option is to wrap the sink or KafkaProducer and catch the exception
(possibly using a custom exception for clarity).

On Thu, Sep 24, 2020 at 3:00 PM Matthias Pohl <matth...@ververica.com>
wrote:

> Hi Yuval,
> thanks for bringing this issue up. You're right: There is no error
> handling currently implemented for SerializationSchema. FLIP-124 [1]
> addressed this for the DeserializationSchema, though. I created
> FLINK-19397 [2] to cover this feature.
>
> In the meantime, I cannot think of any other solution than filtering those
> rows out in a step before emitting the data to Kafka.
>
> Best,
> Matthias
>
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=148645988
> [2] https://issues.apache.org/jira/browse/FLINK-19397
>
> On Wed, Sep 23, 2020 at 1:12 PM Yuval Itzchakov <yuva...@gmail.com> wrote:
>
>> Hi,
>>
>> I'm using a custom KafkaSerializationSchema to write records to Kafka
>> using FlinkKafkaProducer. The objects written are Rows coming from Flink's
>> SQL API.
>>
>> In some cases, when trying to convert the Row object to a byte[],
>> serialization will fail due to malformed values. In such cases, I would
>> like the custom serialization schema to drop the bad records and not send
>> them through.
>>
>> From the API, it is unclear how such failures should be handled. Given
>> the following signature:
>>
>>  ProducerRecord<byte[], byte[]> serialize(T element, @Nullable Long
>> timestamp);
>>
>> From reading the code, there's no exception handling or null checking,
>> which means that:
>>
>> - If an exception is thrown, it will cause the entire job to fail (this
>> has happened to me in production)
>> - If null is passed, a null value will be pushed down to
>> kafkaProducer.send which is undesirable.
>>
>> What are the options here?
>>
>>
>>
>> --
>> Best Regards,
>> Yuval Itzchakov.
>>
>
>

-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Reply via email to