This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git


The following commit(s) were added to refs/heads/main by this push:
     new f2e6b34e [FLINK-38947] Handle errors in onCompletion callback only 
once (#214)
f2e6b34e is described below

commit f2e6b34ebfd7f6d91874cd8ee9b9563b22fa398d
Author: Roman <[email protected]>
AuthorDate: Tue Jan 20 09:23:38 2026 +0100

    [FLINK-38947] Handle errors in onCompletion callback only once (#214)
    
    * [hotfix] Exit onCompletion callback if closed
    
    * [FLINK-38947] Handle errors in onCompletion callback only once
    
    * [hotfix] Log Kafka request failure + numRecordsSent
---
 .../flink/connector/kafka/sink/KafkaWriter.java    | 44 +++++++++++++++-------
 1 file changed, 31 insertions(+), 13 deletions(-)

diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
index 6f5324c5..c2692766 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
@@ -82,6 +82,9 @@ class KafkaWriter<IN>
     private final Map<String, KafkaMetricMutableWrapper> 
previouslyCreatedMetrics = new HashMap<>();
     private final SinkWriterMetricGroup metricGroup;
     private final boolean disabledMetrics;
+    // num records actually sent and acked by kafka; not volatile to prevent 
performance
+    // degradation
+    private long numRecordsSent;
     private final Counter numRecordsOutCounter;
     private final Counter numBytesOutCounter;
     private final Counter numRecordsOutErrorsCounter;
@@ -92,7 +95,7 @@ class KafkaWriter<IN>
     private Metric byteOutMetric;
     protected FlinkKafkaInternalProducer<byte[], byte[]> currentProducer;
 
-    private boolean closed = false;
+    private volatile boolean closed = false;
     private long lastSync = System.currentTimeMillis();
 
     /**
@@ -306,8 +309,12 @@ class KafkaWriter<IN>
         @Override
         public void onCompletion(RecordMetadata metadata, Exception exception) 
{
             if (exception != null) {
-                FlinkKafkaInternalProducer<byte[], byte[]> producer =
-                        KafkaWriter.this.currentProducer;
+                if (closed) {
+                    LOG.debug(
+                            "Completed exceptionally, but shutdown was already 
initiated",
+                            exception);
+                    return;
+                }
 
                 // Propagate the first exception since amount of exceptions 
could be large. Need to
                 // do this in Producer IO thread since flush() guarantees that 
the future will
@@ -315,18 +322,29 @@ class KafkaWriter<IN>
                 // executor e.g. mailbox executor. flush() needs to have the 
exception immediately
                 // available to fail the checkpoint.
                 if (asyncProducerException == null) {
+
+                    LOG.warn(
+                            "Kafka request failed, sent records: {}, out 
records: {}",
+                            numRecordsSent,
+                            numRecordsOutCounter.getCount(),
+                            exception);
+
+                    FlinkKafkaInternalProducer<byte[], byte[]> producer =
+                            KafkaWriter.this.currentProducer;
                     asyncProducerException = decorateException(metadata, 
exception, producer);
-                }
 
-                // Checking for exceptions from previous writes
-                // Notice: throwing exception in mailboxExecutor thread is not 
safe enough for
-                // triggering global fail over, which has been fixed in 
[FLINK-31305].
-                mailboxExecutor.execute(
-                        () -> {
-                            // Checking for exceptions from previous writes
-                            checkAsyncException();
-                        },
-                        "Update error metric");
+                    // Checking for exceptions from previous writes
+                    // Notice: throwing exception in mailboxExecutor thread is 
not safe enough for
+                    // triggering global fail over, which has been fixed in 
[FLINK-31305].
+                    mailboxExecutor.execute(
+                            () -> {
+                                // Checking for exceptions from previous writes
+                                checkAsyncException();
+                            },
+                            "Update error metric");
+                }
+            } else {
+                numRecordsSent++;
             }
 
             if (metadataConsumer != null) {

Reply via email to