RE: Skip malformed messages with the KafkaSink
I finally created this ticket: https://issues.apache.org/jira/browse/FLINK-29480. Can anyone provide any feedback? Regards, Salva On 2022/09/08 10:48:07 Salva Alcántara wrote: > Hi! Is there a way to skip/discard messages when using the KafkaSink, so > that if for some reason messages are malformed they can simply be > discarded? I tried by returning null in the corresponding KafkaWriter but > that raises an exception: > > ``` > java.lang.NullPointerException > at > org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:906) > at > org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:885) > at > org.apache.flink.connector.kafka.sink.KafkaWriter.write(KafkaWriter.java:200) > at > org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158) > ``` > > What would be the way to handle this? > > On the other hand, that seems a bit asymmetric in the sense that when > reading messages, if the deserializer returns null, then that message is > simply ignored, see, e.g., from > https://kafka.apache.org/24/javadoc/org/apache/kafka/common/serialization/Deserializer.html > : > > ``` > T deserialize(String topic, > byte[] data) > Deserialize a record value from a byte array into a value or object. > Parameters: > topic - topic associated with the data > data - serialized bytes; may be null; implementations are recommended to > handle null by returning a value or null rather than throwing an exception. > Returns: > deserialized typed data; may be null > ``` >
Re: RE: Skip malformed messages with the KafkaSink
Yeah, that would be an option. but it would be just nicer if I could simply skip events which fail to be serialized without prepending any operator to the sink since, conceptually, that is not really part of the pipeline but more about handling serialization errors. If I'm not mistaken, what I'm asking is entirely possible when reading/deserializing messages (without having to append a filter to discard invalid messages to be sent downstream). E.g., from the generic DeserializationSchema interface: // To be overridden by the user T deserialize(byte[] message) throws IOException; @PublicEvolving default void deserialize(byte[] message, Collector out) throws IOException { T deserialize = deserialize(message); if (deserialize != null) { out.collect(deserialize); } } ``` The more specialized interface KafkaDeserializationSchema is implemented identically: ``` // To be overridden by the user T deserialize(ConsumerRecord record) throws Exception; default void deserialize(ConsumerRecord message, Collector out) throws Exception { T deserialized = deserialize(message); if (deserialized != null) { out.collect(deserialized); } } ``` So, one can simply return `null` in the overridden `deserialize` method and those messages will be automatically filtered out. If instead one uses the more recent KafkaRecordDeserializationSchema interface, then ``` void deserialize(ConsumerRecord record, Collector out) throws IOException; ``` it's possible to simply not call `out.collect` on those records that want to be skipped. In general, this gives a flexibility which is lost when writing/serializing messages, resulting in a somewhat inconsistent/asymmetric behaviour when one looks at the KafkaWriter used by the KafkaSink: ``` @Override public void write(IN element, Context context) throws IOException { final ProducerRecord record = recordSerializer.serialize(element, kafkaSinkContext, context.timestamp()); currentProducer.send(record, deliveryCallback); numRecordsSendCounter.inc(); } ``` where it's not possible to skip records if desired. On the other hand it's not currently possible to pass a custom writer to the KafkaSink with a different behaviour, e.g., ``` @Override public void write(IN element, Context context) throws IOException { final ProducerRecord record = recordSerializer.serialize(element, kafkaSinkContext, context.timestamp()); if (record != null) { // skip null records currentProducer.send(record, deliveryCallback); numRecordsSendCounter.inc(); } } ``` Isn't the above implementation more consistent with the deserializaton case (and also more powerful) than the current one? On Thu, Sep 8, 2022 at 10:56 PM Alexander Fedulov wrote: > Can't you add a flatMap function just before the Sink that does exactly > this verification and filters out everything that is not supposed to be > sent downstream? > > Best, > Alexander Fedulov > > On Thu, Sep 8, 2022 at 6:15 PM Salva Alcántara > wrote: > >> Sorry I meant do nothing when the serialize method returns null... >> >> On 2022/09/08 15:52:48 Salva Alcántara wrote: >> > I guess one possibility would be to extend/override the `write` method >> of >> > the KafkaWriter: >> > >> > >> https://github.com/apache/flink/blob/26eeabfdd1f5f976ed1b5d761a3469bbcb7d3223/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L197 >> > >> > ``` >> > @Override >> > public void write(IN element, Context context) throws IOException { >> > final ProducerRecord record = >> > recordSerializer.serialize(element, kafkaSinkContext, >> > context.timestamp()); >> > currentProducer.send(record, deliveryCallback); >> > numRecordsSendCounter.inc(); >> > } >> > ``` >> > >> > so that it does nothing when the IN element is null. Would this be the >> only >> > way, really? >> > >> > On 2022/09/08 10:48:07 Salva Alcántara wrote: >> > > Hi! Is there a way to skip/discard messages when using the KafkaSink, >> so >> > > that if for some reason messages are malformed they can simply be >> > > discarded? I tried by returning null in the corresponding KafkaWriter >> but >> > > that raises an exception: >> > > >> > > ``` >> > > java.lang.NullPointerException >> > > at >> > > >> > >> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:906) >> > > at >> > > >> > >> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:885) >> > > at >> > > >> > >> org.apache.flink.connector.kafka.sink.KafkaWriter.write(KafkaWriter.java:200) >> > > at >> > > >> > >> org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158) >> > > ``` >> > > >> > > What would be the way to handle this? >> > > >> > > On the other
Re: RE: Skip malformed messages with the KafkaSink
Can't you add a flatMap function just before the Sink that does exactly this verification and filters out everything that is not supposed to be sent downstream? Best, Alexander Fedulov On Thu, Sep 8, 2022 at 6:15 PM Salva Alcántara wrote: > Sorry I meant do nothing when the serialize method returns null... > > On 2022/09/08 15:52:48 Salva Alcántara wrote: > > I guess one possibility would be to extend/override the `write` method of > > the KafkaWriter: > > > > > https://github.com/apache/flink/blob/26eeabfdd1f5f976ed1b5d761a3469bbcb7d3223/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L197 > > > > ``` > > @Override > > public void write(IN element, Context context) throws IOException { > > final ProducerRecord record = > > recordSerializer.serialize(element, kafkaSinkContext, > > context.timestamp()); > > currentProducer.send(record, deliveryCallback); > > numRecordsSendCounter.inc(); > > } > > ``` > > > > so that it does nothing when the IN element is null. Would this be the > only > > way, really? > > > > On 2022/09/08 10:48:07 Salva Alcántara wrote: > > > Hi! Is there a way to skip/discard messages when using the KafkaSink, > so > > > that if for some reason messages are malformed they can simply be > > > discarded? I tried by returning null in the corresponding KafkaWriter > but > > > that raises an exception: > > > > > > ``` > > > java.lang.NullPointerException > > > at > > > > > > org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:906) > > > at > > > > > > org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:885) > > > at > > > > > > org.apache.flink.connector.kafka.sink.KafkaWriter.write(KafkaWriter.java:200) > > > at > > > > > > org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158) > > > ``` > > > > > > What would be the way to handle this? > > > > > > On the other hand, that seems a bit asymmetric in the sense that when > > > reading messages, if the deserializer returns null, then that message > is > > > simply ignored, see, e.g., from > > > > > > https://kafka.apache.org/24/javadoc/org/apache/kafka/common/serialization/Deserializer.html > > > : > > > > > > ``` > > > T deserialize(String topic, > > > byte[] data) > > > Deserialize a record value from a byte array into a value or object. > > > Parameters: > > > topic - topic associated with the data > > > data - serialized bytes; may be null; implementations are recommended > to > > > handle null by returning a value or null rather than throwing an > > exception. > > > Returns: > > > deserialized typed data; may be null > > > ``` > > > > > >
RE: RE: Skip malformed messages with the KafkaSink
Sorry I meant do nothing when the serialize method returns null... On 2022/09/08 15:52:48 Salva Alcántara wrote: > I guess one possibility would be to extend/override the `write` method of > the KafkaWriter: > > https://github.com/apache/flink/blob/26eeabfdd1f5f976ed1b5d761a3469bbcb7d3223/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L197 > > ``` > @Override > public void write(IN element, Context context) throws IOException { > final ProducerRecord record = > recordSerializer.serialize(element, kafkaSinkContext, > context.timestamp()); > currentProducer.send(record, deliveryCallback); > numRecordsSendCounter.inc(); > } > ``` > > so that it does nothing when the IN element is null. Would this be the only > way, really? > > On 2022/09/08 10:48:07 Salva Alcántara wrote: > > Hi! Is there a way to skip/discard messages when using the KafkaSink, so > > that if for some reason messages are malformed they can simply be > > discarded? I tried by returning null in the corresponding KafkaWriter but > > that raises an exception: > > > > ``` > > java.lang.NullPointerException > > at > > > org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:906) > > at > > > org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:885) > > at > > > org.apache.flink.connector.kafka.sink.KafkaWriter.write(KafkaWriter.java:200) > > at > > > org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158) > > ``` > > > > What would be the way to handle this? > > > > On the other hand, that seems a bit asymmetric in the sense that when > > reading messages, if the deserializer returns null, then that message is > > simply ignored, see, e.g., from > > > https://kafka.apache.org/24/javadoc/org/apache/kafka/common/serialization/Deserializer.html > > : > > > > ``` > > T deserialize(String topic, > > byte[] data) > > Deserialize a record value from a byte array into a value or object. > > Parameters: > > topic - topic associated with the data > > data - serialized bytes; may be null; implementations are recommended to > > handle null by returning a value or null rather than throwing an > exception. > > Returns: > > deserialized typed data; may be null > > ``` > > >
RE: Skip malformed messages with the KafkaSink
I guess one possibility would be to extend/override the `write` method of the KafkaWriter: https://github.com/apache/flink/blob/26eeabfdd1f5f976ed1b5d761a3469bbcb7d3223/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L197 ``` @Override public void write(IN element, Context context) throws IOException { final ProducerRecord record = recordSerializer.serialize(element, kafkaSinkContext, context.timestamp()); currentProducer.send(record, deliveryCallback); numRecordsSendCounter.inc(); } ``` so that it does nothing when the IN element is null. Would this be the only way, really? On 2022/09/08 10:48:07 Salva Alcántara wrote: > Hi! Is there a way to skip/discard messages when using the KafkaSink, so > that if for some reason messages are malformed they can simply be > discarded? I tried by returning null in the corresponding KafkaWriter but > that raises an exception: > > ``` > java.lang.NullPointerException > at > org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:906) > at > org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:885) > at > org.apache.flink.connector.kafka.sink.KafkaWriter.write(KafkaWriter.java:200) > at > org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158) > ``` > > What would be the way to handle this? > > On the other hand, that seems a bit asymmetric in the sense that when > reading messages, if the deserializer returns null, then that message is > simply ignored, see, e.g., from > https://kafka.apache.org/24/javadoc/org/apache/kafka/common/serialization/Deserializer.html > : > > ``` > T deserialize(String topic, > byte[] data) > Deserialize a record value from a byte array into a value or object. > Parameters: > topic - topic associated with the data > data - serialized bytes; may be null; implementations are recommended to > handle null by returning a value or null rather than throwing an exception. > Returns: > deserialized typed data; may be null > ``` >
Skip malformed messages with the KafkaSink
Hi! Is there a way to skip/discard messages when using the KafkaSink, so that if for some reason messages are malformed they can simply be discarded? I tried by returning null in the corresponding KafkaWriter but that raises an exception: ``` java.lang.NullPointerException at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:906) at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:885) at org.apache.flink.connector.kafka.sink.KafkaWriter.write(KafkaWriter.java:200) at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158) ``` What would be the way to handle this? On the other hand, that seems a bit asymmetric in the sense that when reading messages, if the deserializer returns null, then that message is simply ignored, see, e.g., from https://kafka.apache.org/24/javadoc/org/apache/kafka/common/serialization/Deserializer.html : ``` T deserialize(String topic, byte[] data) Deserialize a record value from a byte array into a value or object. Parameters: topic - topic associated with the data data - serialized bytes; may be null; implementations are recommended to handle null by returning a value or null rather than throwing an exception. Returns: deserialized typed data; may be null ```