This is an automated email from the ASF dual-hosted git repository.
arvid pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git
The following commit(s) were added to refs/heads/main by this push:
new a84c2b8a [FLINK-37613] Fix resource leak during abortion
a84c2b8a is described below
commit a84c2b8a768392d7974523ae875497e83e6fdfdc
Author: Arvid Heise <[email protected]>
AuthorDate: Thu Apr 3 15:45:23 2025 +0200
[FLINK-37613] Fix resource leak during abortion
Transaction abortion can take a while. Apparently, task cleanup may not
trigger correctly if this abortion is interrupted (for some unrelated reason).
This leaks producers. The fix is to close() manually during initialization of
the EOS writer until we fixed the Flink bug.
---
.../kafka/sink/ExactlyOnceKafkaWriter.java | 16 ++++++--
.../flink/connector/kafka/sink/KafkaCommitter.java | 5 +++
.../flink/connector/kafka/sink/KafkaWriter.java | 14 ++++++-
.../sink/FlinkKafkaInternalProducerITCase.java | 48 ++++++++++++++++++++++
4 files changed, 78 insertions(+), 5 deletions(-)
diff --git
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriter.java
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriter.java
index ed5b95e9..741d00f7 100644
---
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriter.java
+++
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriter.java
@@ -131,9 +131,19 @@ class ExactlyOnceKafkaWriter<IN> extends KafkaWriter<IN> {
@Override
public void initialize() {
- abortLingeringTransactions(
- checkNotNull(recoveredStates, "recoveredStates"),
restoredCheckpointId + 1);
- this.currentProducer = startTransaction(restoredCheckpointId + 1);
+ // Workaround for FLINK-37612: ensure that we are not leaking producers
+ try {
+ abortLingeringTransactions(
+ checkNotNull(recoveredStates, "recoveredStates"),
restoredCheckpointId + 1);
+ this.currentProducer = startTransaction(restoredCheckpointId + 1);
+ } catch (Throwable t) {
+ try {
+ close();
+ } catch (Exception e) {
+ t.addSuppressed(e);
+ }
+ throw t;
+ }
}
private FlinkKafkaInternalProducer<byte[], byte[]> startTransaction(long
checkpointId) {
diff --git
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java
index 9ba6de1f..70d67150 100644
---
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java
+++
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java
@@ -139,6 +139,7 @@ class KafkaCommitter implements
Committer<KafkaCommittable>, Closeable {
"Transaction ({}) encountered error and data has been
potentially lost.",
request,
e);
+ closeCommitterProducer(producer);
// cause failover
request.signalFailedWithUnknownReason(e);
}
@@ -150,6 +151,10 @@ class KafkaCommitter implements
Committer<KafkaCommittable>, Closeable {
return;
}
backchannel.send(TransactionFinished.erroneously(producer.getTransactionalId()));
+ closeCommitterProducer(producer);
+ }
+
+ private void closeCommitterProducer(FlinkKafkaInternalProducer<?, ?>
producer) {
if (producer == this.committingProducer) {
this.committingProducer.close();
this.committingProducer = null;
diff --git
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
index 83a400d7..8e0bf3ba 100644
---
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
+++
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
@@ -148,8 +148,18 @@ class KafkaWriter<IN>
}
public void initialize() {
- this.currentProducer = new
FlinkKafkaInternalProducer<>(this.kafkaProducerConfig);
- initKafkaMetrics(this.currentProducer);
+ // Workaround for FLINK-37612: ensure that we are not leaking producers
+ try {
+ this.currentProducer = new
FlinkKafkaInternalProducer<>(this.kafkaProducerConfig);
+ initKafkaMetrics(this.currentProducer);
+ } catch (Throwable t) {
+ try {
+ close();
+ } catch (Exception e) {
+ t.addSuppressed(e);
+ }
+ throw t;
+ }
}
@Override
diff --git
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducerITCase.java
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducerITCase.java
index efd92dc9..6bd6d880 100644
---
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducerITCase.java
+++
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducerITCase.java
@@ -34,11 +34,13 @@ import
org.apache.kafka.common.errors.InvalidTxnStateException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
+import org.assertj.core.api.AbstractThrowableAssert;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.junit.jupiter.Container;
@@ -54,6 +56,7 @@ import java.util.stream.Collectors;
import static
org.apache.flink.connector.kafka.testutils.KafkaUtil.checkProducerLeak;
import static
org.apache.flink.connector.kafka.testutils.KafkaUtil.createKafkaContainer;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@Testcontainers
@@ -174,6 +177,51 @@ class FlinkKafkaInternalProducerITCase {
}
}
+ @ParameterizedTest
+ @CsvSource({"true,true", "true,false", "false,true", "false,false"})
+ void testDoubleCommitAndAbort(boolean firstCommit, boolean secondCommit) {
+ final String topic = "test-double-commit-transaction-" + firstCommit +
secondCommit;
+ final String transactionIdPrefix = "testDoubleCommitTransaction-";
+ final String transactionalId = transactionIdPrefix + "id";
+
+ KafkaCommittable committable;
+ try (FlinkKafkaInternalProducer<String, String> producer =
+ new FlinkKafkaInternalProducer<>(getProperties(),
transactionalId)) {
+ producer.initTransactions();
+ producer.beginTransaction();
+ producer.send(new ProducerRecord<>(topic, "test-value"));
+ producer.flush();
+ committable = KafkaCommittable.of(producer);
+ if (firstCommit) {
+ producer.commitTransaction();
+ } else {
+ producer.abortTransaction();
+ }
+ }
+
+ try (FlinkKafkaInternalProducer<String, String> resumedProducer =
+ new FlinkKafkaInternalProducer<>(getProperties(),
transactionalId)) {
+ resumedProducer.resumeTransaction(committable.getProducerId(),
committable.getEpoch());
+ AbstractThrowableAssert<?, ? extends Throwable> secondOp =
+ assertThatCode(
+ () -> {
+ if (secondCommit) {
+ resumedProducer.commitTransaction();
+ } else {
+ resumedProducer.abortTransaction();
+ }
+ });
+ if (firstCommit == secondCommit) {
+ secondOp.doesNotThrowAnyException();
+ } else {
+ secondOp.isInstanceOf(InvalidTxnStateException.class);
+ }
+ }
+
+ assertNumTransactions(1, transactionIdPrefix);
+ assertThat(readRecords(topic).count()).isEqualTo(firstCommit ? 1 : 0);
+ }
+
private static Properties getProperties() {
Properties properties = new Properties();
properties.put(