[ https://issues.apache.org/jira/browse/FLINK-29480 ]
Salva deleted comment on FLINK-29480:
-------------------------------
was (Author: JIRAUSER287051):
Can someone provide any guidance on how to fix those errors?
> Skip invalid messages when writing
> ----------------------------------
>
> Key: FLINK-29480
> URL: https://issues.apache.org/jira/browse/FLINK-29480
> Project: Flink
> Issue Type: Improvement
> Components: Connectors / Kafka
> Reporter: Salva
> Assignee: Salva
> Priority: Minor
> Labels: pull-request-available
> Attachments: Screenshot 2022-10-28 at 13.48.12.png
>
>
> As reported in [1], it seems that it's not possible to skip invalid messages
> when writing. More specifically, if there is an error serializing messages,
> there is no option for skipping them and then Flink job enters a crash loop.
> In particular, the `write` method of the `KafkaWriter` looks like this:
> {code:java}
> @Override
> public void write(IN element, Context context) throws IOException {
> final ProducerRecord<byte[], byte[]> record =
> recordSerializer.serialize(element, ...);
> currentProducer.send(record, deliveryCallback); // line 200
> numRecordsSendCounter.inc();
> } {code}
> So, If you make your `serialize` method return `null`, this is what you get
> at runtime
> {code:java}
> java.lang.NullPointerException at
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:906)
> at
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:885)
> at
> org.apache.flink.connector.kafka.sink.KafkaWriter.write(KafkaWriter.java:200)
> at
> org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158)
> {code}
> What I propose is to modify the KafkaWriter [2, 3] like this:
> {code:java}
> @Override
> public void write(IN element, Context context) throws IOException {
> final ProducerRecord<byte[], byte[]> record =
> recordSerializer.serialize(element, ...);
> if (record != null) { // skip null records (check to be added)
> currentProducer.send(record, deliveryCallback);
> numRecordsSendCounter.inc();
> }
> } {code}
> In order to at least give a chance of skipping those messages and move on to
> the next ones.
> Obviously, one could prepend the sink with a flatMap operator for filtering
> out invalid messages, but
> # It looks weird that one has to prepend an operator for "making sure" that
> the serializer will not fail right after. Wouldn't it be simpler to skip the
> null records directly in order to avoid this pre-check? [4]
> # It's such a simple change (apparently)
> # Brings consistency/symmetry with the reading case [4, 5]
> To expand on point 3, by looking at `KafkaDeserializationSchema`:
> {code:java}
> T deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception;
> default void deserialize(ConsumerRecord<byte[], byte[]> message, Collector<T>
> out) throws Exception {
> T deserialized = deserialize(message);
> if (deserialized != null) { // skip null records (check already exists)
> out.collect(deserialized);
> }
> } {code}
> one can simply return `null` in the overriden `deserialize` method in order
> to skip any message that fails to be deserialized. Similarly, if one uses the
> `KafkaRecordDeserializationSchema` interface instead:
> {code:java}
> void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<T> out)
> throws IOException {code}
> then it's also possible not to invoke `out.collect(...)` on null records. To
> me, it looks strange that the same flexibility is not given in the writing
> case.
> *References*
> [1] [https://lists.apache.org/thread/ykmy4llovrrrzlvz0ng3x5yosskjg70h]
> [2]
> [https://nightlies.apache.org/flink/flink-docs-release-1.14/release-notes/flink-1.14/#port-kafkasink-to-new-unified-sink-api-flip-143]
>
> [3]
> [https://github.com/apache/flink/blob/f0fe85a50920da2b7d7da815db0a924940522e28/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L197]
>
> [4] [https://lists.apache.org/thread/pllv5dqq27xkvj6p3lj91vcz409pw38d]
> [5]
> [https://stackoverflow.com/questions/55538736/how-to-skip-corrupted-messages-in-flink]
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)