Hi all,

I had a requirement to handle Kafka producer exceptions so that they don’t 
bring down the job. I extended FlinkKafkaProducer010 and handled the exceptions 
there.

public void invoke(T value, Context context) throws Exception {
              try {
this.checkErroneous();
                     ...
this.producer.send(record, this.callback);
              } catch (Exception exception) {
                     // Handle exception
              }
       }
The problem with this is, because checkErroneous() is at the beginning of the 
invoke() method, the catch block is getting triggered for the next message – 
not for the message that is causing the exception. So, I moved checkErroneous() 
below producer.send() like so –

       public void invoke(T value, Context context) throws Exception {
              try {
                     ...
                     this.producer.send(record, this.callback);
                     this.checkErroneous();
              } catch (Exception exception) {
                     // Handle exception
              }
       }

This solved the issue, the exceptions are now being thrown for the message 
that’s causing the error instead of the next message.

Is there a specific reason why checkErroneous() is on top? Or am I doing 
something wrong?
Class: 
https://github.com/apache/flink/blob/19d20e5cf8d44d726b4a44575e6c8db677e4c3c8/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java

Regards,
Harshith

Reply via email to