This is an automated email from the ASF dual-hosted git repository.
roman pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git
The following commit(s) were added to refs/heads/main by this push:
new f2e6b34e [FLINK-38947] Handle errors in onCompletion callback only
once (#214)
f2e6b34e is described below
commit f2e6b34ebfd7f6d91874cd8ee9b9563b22fa398d
Author: Roman <[email protected]>
AuthorDate: Tue Jan 20 09:23:38 2026 +0100
[FLINK-38947] Handle errors in onCompletion callback only once (#214)
* [hotfix] Exit onCompletion callback if closed
* [FLINK-38947] Handle errors in onCompletion callback only once
* [hotfix] Log Kafka request failure + numRecordsSent
---
.../flink/connector/kafka/sink/KafkaWriter.java | 44 +++++++++++++++-------
1 file changed, 31 insertions(+), 13 deletions(-)
diff --git
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
index 6f5324c5..c2692766 100644
---
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
+++
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
@@ -82,6 +82,9 @@ class KafkaWriter<IN>
private final Map<String, KafkaMetricMutableWrapper>
previouslyCreatedMetrics = new HashMap<>();
private final SinkWriterMetricGroup metricGroup;
private final boolean disabledMetrics;
+ // num records actually sent and acked by kafka; not volatile to prevent
performance
+ // degradation
+ private long numRecordsSent;
private final Counter numRecordsOutCounter;
private final Counter numBytesOutCounter;
private final Counter numRecordsOutErrorsCounter;
@@ -92,7 +95,7 @@ class KafkaWriter<IN>
private Metric byteOutMetric;
protected FlinkKafkaInternalProducer<byte[], byte[]> currentProducer;
- private boolean closed = false;
+ private volatile boolean closed = false;
private long lastSync = System.currentTimeMillis();
/**
@@ -306,8 +309,12 @@ class KafkaWriter<IN>
@Override
public void onCompletion(RecordMetadata metadata, Exception exception)
{
if (exception != null) {
- FlinkKafkaInternalProducer<byte[], byte[]> producer =
- KafkaWriter.this.currentProducer;
+ if (closed) {
+ LOG.debug(
+ "Completed exceptionally, but shutdown was already
initiated",
+ exception);
+ return;
+ }
// Propagate the first exception since amount of exceptions
could be large. Need to
// do this in Producer IO thread since flush() guarantees that
the future will
@@ -315,18 +322,29 @@ class KafkaWriter<IN>
// executor e.g. mailbox executor. flush() needs to have the
exception immediately
// available to fail the checkpoint.
if (asyncProducerException == null) {
+
+ LOG.warn(
+ "Kafka request failed, sent records: {}, out
records: {}",
+ numRecordsSent,
+ numRecordsOutCounter.getCount(),
+ exception);
+
+ FlinkKafkaInternalProducer<byte[], byte[]> producer =
+ KafkaWriter.this.currentProducer;
asyncProducerException = decorateException(metadata,
exception, producer);
- }
- // Checking for exceptions from previous writes
- // Notice: throwing exception in mailboxExecutor thread is not
safe enough for
- // triggering global fail over, which has been fixed in
[FLINK-31305].
- mailboxExecutor.execute(
- () -> {
- // Checking for exceptions from previous writes
- checkAsyncException();
- },
- "Update error metric");
+ // Checking for exceptions from previous writes
+ // Notice: throwing exception in mailboxExecutor thread is
not safe enough for
+ // triggering global fail over, which has been fixed in
[FLINK-31305].
+ mailboxExecutor.execute(
+ () -> {
+ // Checking for exceptions from previous writes
+ checkAsyncException();
+ },
+ "Update error metric");
+ }
+ } else {
+ numRecordsSent++;
}
if (metadataConsumer != null) {