RE: Skip malformed messages with the KafkaSink

2022-10-24 Thread Salva Alcántara
I finally created this ticket:
https://issues.apache.org/jira/browse/FLINK-29480.

Can anyone provide any feedback?

Regards,

Salva

On 2022/09/08 10:48:07 Salva Alcántara wrote:
> Hi! Is there a way to skip/discard messages when using the KafkaSink, so
> that if for some reason messages are malformed they can simply be
> discarded? I tried by returning null in the corresponding KafkaWriter but
> that raises an exception:
>
> ```
> java.lang.NullPointerException
> at
>
org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:906)
> at
>
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:885)
> at
>
org.apache.flink.connector.kafka.sink.KafkaWriter.write(KafkaWriter.java:200)
> at
>
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158)
> ```
>
> What would be the way to handle this?
>
> On the other hand, that seems a bit asymmetric in the sense that when
> reading messages, if the deserializer returns null, then that message is
> simply ignored, see, e.g., from
>
https://kafka.apache.org/24/javadoc/org/apache/kafka/common/serialization/Deserializer.html
> :
>
> ```
> T deserialize(String topic,
>   byte[] data)
> Deserialize a record value from a byte array into a value or object.
> Parameters:
> topic - topic associated with the data
> data - serialized bytes; may be null; implementations are recommended to
> handle null by returning a value or null rather than throwing an
exception.
> Returns:
> deserialized typed data; may be null
> ```
>


Re: RE: Skip malformed messages with the KafkaSink

2022-09-08 Thread Salva Alcántara
Yeah, that would be an option. but it would be just nicer if I could simply
skip events which fail to be serialized without prepending any operator to
the sink since, conceptually, that is not really part of the pipeline but
more about handling serialization errors.

If I'm not mistaken, what I'm asking is entirely possible when
reading/deserializing messages (without having to append a filter to
discard invalid messages to be sent downstream). E.g., from the generic
DeserializationSchema interface:


   // To be overridden by the user
   T deserialize(byte[] message) throws IOException;

@PublicEvolving
default void deserialize(byte[] message, Collector out) throws
IOException {
T deserialize = deserialize(message);
if (deserialize != null) {
out.collect(deserialize);
}
}
```

The more specialized interface KafkaDeserializationSchema is implemented
identically:

```
// To be overridden by the user
T deserialize(ConsumerRecord record) throws Exception;

default void deserialize(ConsumerRecord message,
Collector out)
throws Exception {
T deserialized = deserialize(message);
if (deserialized != null) {
out.collect(deserialized);
}
}
```

So, one can simply return `null` in the overridden `deserialize` method and
those messages will be automatically filtered out.

If instead one uses the more recent KafkaRecordDeserializationSchema
interface, then

```
void deserialize(ConsumerRecord record, Collector out)
throws IOException;
```

it's possible to simply not call `out.collect` on those records that want
to be skipped.

In general, this gives a flexibility which is lost when writing/serializing
messages, resulting in a somewhat inconsistent/asymmetric behaviour when
one looks at the KafkaWriter used by the KafkaSink:

```
@Override
public void write(IN element, Context context) throws IOException {
final ProducerRecord record =
recordSerializer.serialize(element, kafkaSinkContext, context.timestamp());
currentProducer.send(record, deliveryCallback);
numRecordsSendCounter.inc();
}
```

where it's not possible to skip records if desired. On the other hand it's
not currently possible to pass a custom writer to the KafkaSink with a
different behaviour, e.g.,

```
@Override
public void write(IN element, Context context) throws IOException {
final ProducerRecord record =
recordSerializer.serialize(element, kafkaSinkContext, context.timestamp());
if (record != null) { // skip null records
  currentProducer.send(record, deliveryCallback);
  numRecordsSendCounter.inc();
}
}
```

Isn't the above implementation more consistent with the deserializaton case
(and also more powerful) than the current one?


On Thu, Sep 8, 2022 at 10:56 PM Alexander Fedulov 
wrote:

> Can't you add a flatMap function just before the Sink that does exactly
> this verification and filters out everything that is not supposed to be
> sent downstream?
>
> Best,
> Alexander Fedulov
>
> On Thu, Sep 8, 2022 at 6:15 PM Salva Alcántara 
> wrote:
>
>> Sorry I meant do nothing when the serialize method returns null...
>>
>> On 2022/09/08 15:52:48 Salva Alcántara wrote:
>> > I guess one possibility would be to extend/override the `write` method
>> of
>> > the KafkaWriter:
>> >
>> >
>> https://github.com/apache/flink/blob/26eeabfdd1f5f976ed1b5d761a3469bbcb7d3223/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L197
>> >
>> > ```
>> > @Override
>> > public void write(IN element, Context context) throws IOException {
>> > final ProducerRecord record =
>> > recordSerializer.serialize(element, kafkaSinkContext,
>> > context.timestamp());
>> > currentProducer.send(record, deliveryCallback);
>> > numRecordsSendCounter.inc();
>> > }
>> > ```
>> >
>> > so that it does nothing when the IN element is null. Would this be the
>> only
>> > way, really?
>> >
>> > On 2022/09/08 10:48:07 Salva Alcántara wrote:
>> > > Hi! Is there a way to skip/discard messages when using the KafkaSink,
>> so
>> > > that if for some reason messages are malformed they can simply be
>> > > discarded? I tried by returning null in the corresponding KafkaWriter
>> but
>> > > that raises an exception:
>> > >
>> > > ```
>> > > java.lang.NullPointerException
>> > > at
>> > >
>> >
>> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:906)
>> > > at
>> > >
>> >
>> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:885)
>> > > at
>> > >
>> >
>> org.apache.flink.connector.kafka.sink.KafkaWriter.write(KafkaWriter.java:200)
>> > > at
>> > >
>> >
>> org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158)
>> > > ```
>> > >
>> > > What would be the way to handle this?
>> > >
>> > > On the other 

Re: RE: Skip malformed messages with the KafkaSink

2022-09-08 Thread Alexander Fedulov
Can't you add a flatMap function just before the Sink that does exactly
this verification and filters out everything that is not supposed to be
sent downstream?

Best,
Alexander Fedulov

On Thu, Sep 8, 2022 at 6:15 PM Salva Alcántara 
wrote:

> Sorry I meant do nothing when the serialize method returns null...
>
> On 2022/09/08 15:52:48 Salva Alcántara wrote:
> > I guess one possibility would be to extend/override the `write` method of
> > the KafkaWriter:
> >
> >
> https://github.com/apache/flink/blob/26eeabfdd1f5f976ed1b5d761a3469bbcb7d3223/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L197
> >
> > ```
> > @Override
> > public void write(IN element, Context context) throws IOException {
> > final ProducerRecord record =
> > recordSerializer.serialize(element, kafkaSinkContext,
> > context.timestamp());
> > currentProducer.send(record, deliveryCallback);
> > numRecordsSendCounter.inc();
> > }
> > ```
> >
> > so that it does nothing when the IN element is null. Would this be the
> only
> > way, really?
> >
> > On 2022/09/08 10:48:07 Salva Alcántara wrote:
> > > Hi! Is there a way to skip/discard messages when using the KafkaSink,
> so
> > > that if for some reason messages are malformed they can simply be
> > > discarded? I tried by returning null in the corresponding KafkaWriter
> but
> > > that raises an exception:
> > >
> > > ```
> > > java.lang.NullPointerException
> > > at
> > >
> >
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:906)
> > > at
> > >
> >
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:885)
> > > at
> > >
> >
> org.apache.flink.connector.kafka.sink.KafkaWriter.write(KafkaWriter.java:200)
> > > at
> > >
> >
> org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158)
> > > ```
> > >
> > > What would be the way to handle this?
> > >
> > > On the other hand, that seems a bit asymmetric in the sense that when
> > > reading messages, if the deserializer returns null, then that message
> is
> > > simply ignored, see, e.g., from
> > >
> >
> https://kafka.apache.org/24/javadoc/org/apache/kafka/common/serialization/Deserializer.html
> > > :
> > >
> > > ```
> > > T deserialize(String topic,
> > >   byte[] data)
> > > Deserialize a record value from a byte array into a value or object.
> > > Parameters:
> > > topic - topic associated with the data
> > > data - serialized bytes; may be null; implementations are recommended
> to
> > > handle null by returning a value or null rather than throwing an
> > exception.
> > > Returns:
> > > deserialized typed data; may be null
> > > ```
> > >
> >
>


RE: RE: Skip malformed messages with the KafkaSink

2022-09-08 Thread Salva Alcántara
Sorry I meant do nothing when the serialize method returns null...

On 2022/09/08 15:52:48 Salva Alcántara wrote:
> I guess one possibility would be to extend/override the `write` method of
> the KafkaWriter:
>
>
https://github.com/apache/flink/blob/26eeabfdd1f5f976ed1b5d761a3469bbcb7d3223/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L197
>
> ```
> @Override
> public void write(IN element, Context context) throws IOException {
> final ProducerRecord record =
> recordSerializer.serialize(element, kafkaSinkContext,
> context.timestamp());
> currentProducer.send(record, deliveryCallback);
> numRecordsSendCounter.inc();
> }
> ```
>
> so that it does nothing when the IN element is null. Would this be the
only
> way, really?
>
> On 2022/09/08 10:48:07 Salva Alcántara wrote:
> > Hi! Is there a way to skip/discard messages when using the KafkaSink, so
> > that if for some reason messages are malformed they can simply be
> > discarded? I tried by returning null in the corresponding KafkaWriter
but
> > that raises an exception:
> >
> > ```
> > java.lang.NullPointerException
> > at
> >
>
org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:906)
> > at
> >
>
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:885)
> > at
> >
>
org.apache.flink.connector.kafka.sink.KafkaWriter.write(KafkaWriter.java:200)
> > at
> >
>
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158)
> > ```
> >
> > What would be the way to handle this?
> >
> > On the other hand, that seems a bit asymmetric in the sense that when
> > reading messages, if the deserializer returns null, then that message is
> > simply ignored, see, e.g., from
> >
>
https://kafka.apache.org/24/javadoc/org/apache/kafka/common/serialization/Deserializer.html
> > :
> >
> > ```
> > T deserialize(String topic,
> >   byte[] data)
> > Deserialize a record value from a byte array into a value or object.
> > Parameters:
> > topic - topic associated with the data
> > data - serialized bytes; may be null; implementations are recommended to
> > handle null by returning a value or null rather than throwing an
> exception.
> > Returns:
> > deserialized typed data; may be null
> > ```
> >
>


RE: Skip malformed messages with the KafkaSink

2022-09-08 Thread Salva Alcántara
I guess one possibility would be to extend/override the `write` method of
the KafkaWriter:

https://github.com/apache/flink/blob/26eeabfdd1f5f976ed1b5d761a3469bbcb7d3223/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L197

```
@Override
public void write(IN element, Context context) throws IOException {
final ProducerRecord record =
recordSerializer.serialize(element, kafkaSinkContext,
context.timestamp());
currentProducer.send(record, deliveryCallback);
numRecordsSendCounter.inc();
}
```

so that it does nothing when the IN element is null. Would this be the only
way, really?

On 2022/09/08 10:48:07 Salva Alcántara wrote:
> Hi! Is there a way to skip/discard messages when using the KafkaSink, so
> that if for some reason messages are malformed they can simply be
> discarded? I tried by returning null in the corresponding KafkaWriter but
> that raises an exception:
>
> ```
> java.lang.NullPointerException
> at
>
org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:906)
> at
>
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:885)
> at
>
org.apache.flink.connector.kafka.sink.KafkaWriter.write(KafkaWriter.java:200)
> at
>
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158)
> ```
>
> What would be the way to handle this?
>
> On the other hand, that seems a bit asymmetric in the sense that when
> reading messages, if the deserializer returns null, then that message is
> simply ignored, see, e.g., from
>
https://kafka.apache.org/24/javadoc/org/apache/kafka/common/serialization/Deserializer.html
> :
>
> ```
> T deserialize(String topic,
>   byte[] data)
> Deserialize a record value from a byte array into a value or object.
> Parameters:
> topic - topic associated with the data
> data - serialized bytes; may be null; implementations are recommended to
> handle null by returning a value or null rather than throwing an
exception.
> Returns:
> deserialized typed data; may be null
> ```
>