This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git


The following commit(s) were added to refs/heads/main by this push:
     new a84c2b8a [FLINK-37613] Fix resource leak during abortion
a84c2b8a is described below

commit a84c2b8a768392d7974523ae875497e83e6fdfdc
Author: Arvid Heise <[email protected]>
AuthorDate: Thu Apr 3 15:45:23 2025 +0200

    [FLINK-37613] Fix resource leak during abortion
    
    Transaction abortion can take a while. Apparently, task cleanup may not 
trigger correctly if this abortion is interrupted (for some unrelated reason). 
This leaks producers. The fix is to close() manually during initialization of 
the EOS writer until we fixed the Flink bug.
---
 .../kafka/sink/ExactlyOnceKafkaWriter.java         | 16 ++++++--
 .../flink/connector/kafka/sink/KafkaCommitter.java |  5 +++
 .../flink/connector/kafka/sink/KafkaWriter.java    | 14 ++++++-
 .../sink/FlinkKafkaInternalProducerITCase.java     | 48 ++++++++++++++++++++++
 4 files changed, 78 insertions(+), 5 deletions(-)

diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriter.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriter.java
index ed5b95e9..741d00f7 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriter.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriter.java
@@ -131,9 +131,19 @@ class ExactlyOnceKafkaWriter<IN> extends KafkaWriter<IN> {
 
     @Override
     public void initialize() {
-        abortLingeringTransactions(
-                checkNotNull(recoveredStates, "recoveredStates"), 
restoredCheckpointId + 1);
-        this.currentProducer = startTransaction(restoredCheckpointId + 1);
+        // Workaround for FLINK-37612: ensure that we are not leaking producers
+        try {
+            abortLingeringTransactions(
+                    checkNotNull(recoveredStates, "recoveredStates"), 
restoredCheckpointId + 1);
+            this.currentProducer = startTransaction(restoredCheckpointId + 1);
+        } catch (Throwable t) {
+            try {
+                close();
+            } catch (Exception e) {
+                t.addSuppressed(e);
+            }
+            throw t;
+        }
     }
 
     private FlinkKafkaInternalProducer<byte[], byte[]> startTransaction(long 
checkpointId) {
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java
index 9ba6de1f..70d67150 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java
@@ -139,6 +139,7 @@ class KafkaCommitter implements 
Committer<KafkaCommittable>, Closeable {
                         "Transaction ({}) encountered error and data has been 
potentially lost.",
                         request,
                         e);
+                closeCommitterProducer(producer);
                 // cause failover
                 request.signalFailedWithUnknownReason(e);
             }
@@ -150,6 +151,10 @@ class KafkaCommitter implements 
Committer<KafkaCommittable>, Closeable {
             return;
         }
         
backchannel.send(TransactionFinished.erroneously(producer.getTransactionalId()));
+        closeCommitterProducer(producer);
+    }
+
+    private void closeCommitterProducer(FlinkKafkaInternalProducer<?, ?> 
producer) {
         if (producer == this.committingProducer) {
             this.committingProducer.close();
             this.committingProducer = null;
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
index 83a400d7..8e0bf3ba 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
@@ -148,8 +148,18 @@ class KafkaWriter<IN>
     }
 
     public void initialize() {
-        this.currentProducer = new 
FlinkKafkaInternalProducer<>(this.kafkaProducerConfig);
-        initKafkaMetrics(this.currentProducer);
+        // Workaround for FLINK-37612: ensure that we are not leaking producers
+        try {
+            this.currentProducer = new 
FlinkKafkaInternalProducer<>(this.kafkaProducerConfig);
+            initKafkaMetrics(this.currentProducer);
+        } catch (Throwable t) {
+            try {
+                close();
+            } catch (Exception e) {
+                t.addSuppressed(e);
+            }
+            throw t;
+        }
     }
 
     @Override
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducerITCase.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducerITCase.java
index efd92dc9..6bd6d880 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducerITCase.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducerITCase.java
@@ -34,11 +34,13 @@ import 
org.apache.kafka.common.errors.InvalidTxnStateException;
 import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
+import org.assertj.core.api.AbstractThrowableAssert;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.api.extension.RegisterExtension;
 import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
 import org.junit.jupiter.params.provider.MethodSource;
 import org.testcontainers.containers.KafkaContainer;
 import org.testcontainers.junit.jupiter.Container;
@@ -54,6 +56,7 @@ import java.util.stream.Collectors;
 import static 
org.apache.flink.connector.kafka.testutils.KafkaUtil.checkProducerLeak;
 import static 
org.apache.flink.connector.kafka.testutils.KafkaUtil.createKafkaContainer;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 @Testcontainers
@@ -174,6 +177,51 @@ class FlinkKafkaInternalProducerITCase {
         }
     }
 
+    @ParameterizedTest
+    @CsvSource({"true,true", "true,false", "false,true", "false,false"})
+    void testDoubleCommitAndAbort(boolean firstCommit, boolean secondCommit) {
+        final String topic = "test-double-commit-transaction-" + firstCommit + 
secondCommit;
+        final String transactionIdPrefix = "testDoubleCommitTransaction-";
+        final String transactionalId = transactionIdPrefix + "id";
+
+        KafkaCommittable committable;
+        try (FlinkKafkaInternalProducer<String, String> producer =
+                new FlinkKafkaInternalProducer<>(getProperties(), 
transactionalId)) {
+            producer.initTransactions();
+            producer.beginTransaction();
+            producer.send(new ProducerRecord<>(topic, "test-value"));
+            producer.flush();
+            committable = KafkaCommittable.of(producer);
+            if (firstCommit) {
+                producer.commitTransaction();
+            } else {
+                producer.abortTransaction();
+            }
+        }
+
+        try (FlinkKafkaInternalProducer<String, String> resumedProducer =
+                new FlinkKafkaInternalProducer<>(getProperties(), 
transactionalId)) {
+            resumedProducer.resumeTransaction(committable.getProducerId(), 
committable.getEpoch());
+            AbstractThrowableAssert<?, ? extends Throwable> secondOp =
+                    assertThatCode(
+                            () -> {
+                                if (secondCommit) {
+                                    resumedProducer.commitTransaction();
+                                } else {
+                                    resumedProducer.abortTransaction();
+                                }
+                            });
+            if (firstCommit == secondCommit) {
+                secondOp.doesNotThrowAnyException();
+            } else {
+                secondOp.isInstanceOf(InvalidTxnStateException.class);
+            }
+        }
+
+        assertNumTransactions(1, transactionIdPrefix);
+        assertThat(readRecords(topic).count()).isEqualTo(firstCommit ? 1 : 0);
+    }
+
     private static Properties getProperties() {
         Properties properties = new Properties();
         properties.put(

Reply via email to