This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git


The following commit(s) were added to refs/heads/main by this push:
     new 4606010e [FLINK-37622] Fix KafkaSink in BATCH
4606010e is described below

commit 4606010e2dbe81815275e86a20f4b0c58f510e79
Author: Arvid Heise <[email protected]>
AuthorDate: Tue Apr 8 09:47:54 2025 +0200

    [FLINK-37622] Fix KafkaSink in BATCH
    
    Since BATCH doesn't call snapshotState(), we never rotate the producer in 
batch mode. Hence, on closing the producer in the write failover region, we 
also abort the ongoing transaction. The committer in the next region is then 
failing.
    
    This commit generalizes the client side transaction state machine and adds 
a new state for precommitted. Then the writer must abort only those 
transactions that are not precommitted.
---
 .../kafka/sink/ExactlyOnceKafkaWriter.java         | 10 +++--
 .../sink/internal/FlinkKafkaInternalProducer.java  | 52 +++++++++++++---------
 .../connector/kafka/sink/KafkaSinkITCase.java      | 37 ++++++++++++++-
 3 files changed, 74 insertions(+), 25 deletions(-)

diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriter.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriter.java
index c7aee3c4..eb84977d 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriter.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriter.java
@@ -217,6 +217,7 @@ class ExactlyOnceKafkaWriter<IN> extends KafkaWriter<IN> {
         if (currentProducer.hasRecordsInTransaction()) {
             KafkaCommittable committable = 
KafkaCommittable.of(currentProducer);
             LOG.debug("Prepare {}.", committable);
+            currentProducer.precommitTransaction();
             return Collections.singletonList(committable);
         }
 
@@ -269,9 +270,12 @@ class ExactlyOnceKafkaWriter<IN> extends KafkaWriter<IN> {
     }
 
     private void abortCurrentProducer() {
-        // only abort if the transaction is known to the broker (needs to have 
at least one record
-        // sent)
-        if (currentProducer.isInTransaction() && 
currentProducer.hasRecordsInTransaction()) {
+        // Abort only if the transaction is known to the broker (at least one 
record sent).
+        // Producer may be in precommitted state if we run in batch; aborting 
would mean data loss.
+        // Note that this may leave the transaction open if an error happens 
in streaming between
+        // #prepareCommit and #snapshotState. However, aborting here is best 
effort anyways and
+        // recovery will cleanup the transaction.
+        if (currentProducer.hasRecordsInTransaction()) {
             try {
                 currentProducer.abortTransaction();
             } catch (ProducerFencedException e) {
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/FlinkKafkaInternalProducer.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/FlinkKafkaInternalProducer.java
index ee5b823d..2934a194 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/FlinkKafkaInternalProducer.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/FlinkKafkaInternalProducer.java
@@ -55,8 +55,7 @@ public class FlinkKafkaInternalProducer<K, V> extends 
KafkaProducer<K, V> {
     private static final String PRODUCER_ID_AND_EPOCH_FIELD_NAME = 
"producerIdAndEpoch";
 
     @Nullable private String transactionalId;
-    private volatile boolean inTransaction;
-    private volatile boolean hasRecordsInTransaction;
+    private volatile TransactionState transactionState = 
TransactionState.NOT_IN_TRANSACTION;
     private volatile boolean closed;
 
     public FlinkKafkaInternalProducer(Properties properties) {
@@ -79,8 +78,8 @@ public class FlinkKafkaInternalProducer<K, V> extends 
KafkaProducer<K, V> {
 
     @Override
     public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback 
callback) {
-        if (inTransaction) {
-            hasRecordsInTransaction = true;
+        if (isInTransaction()) {
+            transactionState = TransactionState.DATA_IN_TRANSACTION;
         }
         return super.send(record, callback);
     }
@@ -88,7 +87,7 @@ public class FlinkKafkaInternalProducer<K, V> extends 
KafkaProducer<K, V> {
     @Override
     public void flush() {
         super.flush();
-        if (inTransaction) {
+        if (isInTransaction()) {
             flushNewPartitions();
         }
     }
@@ -97,33 +96,40 @@ public class FlinkKafkaInternalProducer<K, V> extends 
KafkaProducer<K, V> {
     public void beginTransaction() throws ProducerFencedException {
         super.beginTransaction();
         LOG.debug("beginTransaction {}", transactionalId);
-        inTransaction = true;
+        transactionState = TransactionState.IN_TRANSACTION;
     }
 
     @Override
     public void abortTransaction() throws ProducerFencedException {
         LOG.debug("abortTransaction {}", transactionalId);
-        checkState(inTransaction, "Transaction was not started");
-        inTransaction = false;
-        hasRecordsInTransaction = false;
+        checkState(isInTransaction(), "Transaction was not started");
+        transactionState = TransactionState.NOT_IN_TRANSACTION;
         super.abortTransaction();
     }
 
     @Override
     public void commitTransaction() throws ProducerFencedException {
         LOG.debug("commitTransaction {}", transactionalId);
-        checkState(inTransaction, "Transaction was not started");
-        inTransaction = false;
-        hasRecordsInTransaction = false;
+        checkState(isInTransaction(), "Transaction was not started");
+        transactionState = TransactionState.NOT_IN_TRANSACTION;
         super.commitTransaction();
     }
 
     public boolean isInTransaction() {
-        return inTransaction;
+        return transactionState != TransactionState.NOT_IN_TRANSACTION;
     }
 
     public boolean hasRecordsInTransaction() {
-        return hasRecordsInTransaction;
+        return transactionState == TransactionState.DATA_IN_TRANSACTION;
+    }
+
+    public boolean isPrecommitted() {
+        return transactionState == TransactionState.PRECOMMITTED;
+    }
+
+    public void precommitTransaction() {
+        checkState(hasRecordsInTransaction(), "Transaction was not started");
+        transactionState = TransactionState.PRECOMMITTED;
     }
 
     @Override
@@ -172,7 +178,7 @@ public class FlinkKafkaInternalProducer<K, V> extends 
KafkaProducer<K, V> {
      */
     public void setTransactionId(String transactionalId) {
         checkState(
-                !inTransaction,
+                !isInTransaction(),
                 String.format("Another transaction %s is still open.", 
transactionalId));
         LOG.debug("Change transaction id from {} to {}", this.transactionalId, 
transactionalId);
         this.transactionalId = transactionalId;
@@ -292,7 +298,7 @@ public class FlinkKafkaInternalProducer<K, V> extends 
KafkaProducer<K, V> {
      * 
https://github.com/apache/kafka/commit/5d2422258cb975a137a42a4e08f03573c49a387e#diff-f4ef1afd8792cd2a2e9069cd7ddea630
      */
     public void resumeTransaction(long producerId, short epoch) {
-        checkState(!inTransaction, "Already in transaction %s", 
transactionalId);
+        checkState(!isInTransaction(), "Already in transaction %s", 
transactionalId);
         checkState(
                 producerId >= 0 && epoch >= 0,
                 "Incorrect values for producerId %s and epoch %s",
@@ -329,9 +335,8 @@ public class FlinkKafkaInternalProducer<K, V> extends 
KafkaProducer<K, V> {
             // when we create recovery producers to resume transactions and 
commit
             // them, we should always set this flag.
             setField(transactionManager, "transactionStarted", true);
-            this.inTransaction = true;
-            this.hasRecordsInTransaction = true;
         }
+        this.transactionState = TransactionState.PRECOMMITTED;
     }
 
     private static Object createProducerIdAndEpoch(long producerId, short 
epoch) {
@@ -391,7 +396,14 @@ public class FlinkKafkaInternalProducer<K, V> extends 
KafkaProducer<K, V> {
     @Override
     public String toString() {
         return String.format(
-                "FlinkKafkaInternalProducer@%d{transactionalId='%s', 
inTransaction=%s, closed=%s}",
-                System.identityHashCode(this), transactionalId, inTransaction, 
closed);
+                "FlinkKafkaInternalProducer@%d{transactionalId='%s', 
transactionState=%s, closed=%s}",
+                System.identityHashCode(this), transactionalId, 
transactionState, closed);
+    }
+
+    enum TransactionState {
+        NOT_IN_TRANSACTION,
+        IN_TRANSACTION,
+        DATA_IN_TRANSACTION,
+        PRECOMMITTED,
     }
 }
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
index f94a9354..fc9ad3d9 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
@@ -18,6 +18,7 @@
 package org.apache.flink.connector.kafka.sink;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.serialization.SerializationSchema;
@@ -239,6 +240,31 @@ public class KafkaSinkITCase extends TestLogger {
         writeRecordsToKafka(DeliveryGuarantee.EXACTLY_ONCE, namingStrategy, 
chained);
     }
 
+    @Test
+    public void testWriteRecordsToKafkaWithExactlyOnceGuaranteeBatch() throws 
Exception {
+        final StreamExecutionEnvironment env =
+                
StreamExecutionEnvironment.getExecutionEnvironment(createConfiguration(1));
+        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+        int count = 1000;
+        final DataStream<Long> source = createSource(env, false, count);
+        source.sinkTo(
+                new KafkaSinkBuilder<Long>()
+                        
.setBootstrapServers(KAFKA_CONTAINER.getBootstrapServers())
+                        .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
+                        .setRecordSerializer(
+                                KafkaRecordSerializationSchema.builder()
+                                        .setTopic(topic)
+                                        .setValueSerializationSchema(new 
RecordSerializer())
+                                        .build())
+                        .setTransactionalIdPrefix("kafka-sink")
+                        .build());
+        env.execute();
+
+        final List<Long> collectedRecords =
+                deserializeValues(drainAllRecordsFromTopic(topic, true));
+        assertThat(collectedRecords).hasSize(count);
+    }
+
     static Stream<Arguments> getEOSParameters() {
         return Arrays.stream(TransactionNamingStrategy.values())
                 .flatMap(
@@ -659,11 +685,18 @@ public class KafkaSinkITCase extends TestLogger {
     }
 
     private DataStream<Long> createThrottlingSource(StreamExecutionEnvironment 
env) {
+        return createSource(env, true, 1000);
+    }
+
+    private DataStream<Long> createSource(
+            StreamExecutionEnvironment env, boolean throttled, int count) {
         return env.fromSource(
                 new DataGeneratorSource<>(
                         value -> value,
-                        1000,
-                        new ThrottleUntilFirstCheckpointStrategy(),
+                        count,
+                        throttled
+                                ? new ThrottleUntilFirstCheckpointStrategy()
+                                : RateLimiterStrategy.noOp(),
                         BasicTypeInfo.LONG_TYPE_INFO),
                 WatermarkStrategy.noWatermarks(),
                 "Generator Source");

Reply via email to