This is an automated email from the ASF dual-hosted git repository.
arvid pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git
The following commit(s) were added to refs/heads/main by this push:
new 4606010e [FLINK-37622] Fix KafkaSink in BATCH
4606010e is described below
commit 4606010e2dbe81815275e86a20f4b0c58f510e79
Author: Arvid Heise <[email protected]>
AuthorDate: Tue Apr 8 09:47:54 2025 +0200
[FLINK-37622] Fix KafkaSink in BATCH
Since BATCH doesn't call snapshotState(), we never rotate the producer in
batch mode. Hence, on closing the producer in the write failover region, we
also abort the ongoing transaction. The committer in the next region is then
failing.
This commit generalizes the client side transaction state machine and adds
a new state for precommitted. Then the writer must abort only those
transactions that are not precommitted.
---
.../kafka/sink/ExactlyOnceKafkaWriter.java | 10 +++--
.../sink/internal/FlinkKafkaInternalProducer.java | 52 +++++++++++++---------
.../connector/kafka/sink/KafkaSinkITCase.java | 37 ++++++++++++++-
3 files changed, 74 insertions(+), 25 deletions(-)
diff --git
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriter.java
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriter.java
index c7aee3c4..eb84977d 100644
---
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriter.java
+++
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriter.java
@@ -217,6 +217,7 @@ class ExactlyOnceKafkaWriter<IN> extends KafkaWriter<IN> {
if (currentProducer.hasRecordsInTransaction()) {
KafkaCommittable committable =
KafkaCommittable.of(currentProducer);
LOG.debug("Prepare {}.", committable);
+ currentProducer.precommitTransaction();
return Collections.singletonList(committable);
}
@@ -269,9 +270,12 @@ class ExactlyOnceKafkaWriter<IN> extends KafkaWriter<IN> {
}
private void abortCurrentProducer() {
- // only abort if the transaction is known to the broker (needs to have
at least one record
- // sent)
- if (currentProducer.isInTransaction() &&
currentProducer.hasRecordsInTransaction()) {
+ // Abort only if the transaction is known to the broker (at least one
record sent).
+ // Producer may be in precommitted state if we run in batch; aborting
would mean data loss.
+ // Note that this may leave the transaction open if an error happens
in streaming between
+ // #prepareCommit and #snapshotState. However, aborting here is best
effort anyways and
+ // recovery will cleanup the transaction.
+ if (currentProducer.hasRecordsInTransaction()) {
try {
currentProducer.abortTransaction();
} catch (ProducerFencedException e) {
diff --git
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/FlinkKafkaInternalProducer.java
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/FlinkKafkaInternalProducer.java
index ee5b823d..2934a194 100644
---
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/FlinkKafkaInternalProducer.java
+++
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/FlinkKafkaInternalProducer.java
@@ -55,8 +55,7 @@ public class FlinkKafkaInternalProducer<K, V> extends
KafkaProducer<K, V> {
private static final String PRODUCER_ID_AND_EPOCH_FIELD_NAME =
"producerIdAndEpoch";
@Nullable private String transactionalId;
- private volatile boolean inTransaction;
- private volatile boolean hasRecordsInTransaction;
+ private volatile TransactionState transactionState =
TransactionState.NOT_IN_TRANSACTION;
private volatile boolean closed;
public FlinkKafkaInternalProducer(Properties properties) {
@@ -79,8 +78,8 @@ public class FlinkKafkaInternalProducer<K, V> extends
KafkaProducer<K, V> {
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback
callback) {
- if (inTransaction) {
- hasRecordsInTransaction = true;
+ if (isInTransaction()) {
+ transactionState = TransactionState.DATA_IN_TRANSACTION;
}
return super.send(record, callback);
}
@@ -88,7 +87,7 @@ public class FlinkKafkaInternalProducer<K, V> extends
KafkaProducer<K, V> {
@Override
public void flush() {
super.flush();
- if (inTransaction) {
+ if (isInTransaction()) {
flushNewPartitions();
}
}
@@ -97,33 +96,40 @@ public class FlinkKafkaInternalProducer<K, V> extends
KafkaProducer<K, V> {
public void beginTransaction() throws ProducerFencedException {
super.beginTransaction();
LOG.debug("beginTransaction {}", transactionalId);
- inTransaction = true;
+ transactionState = TransactionState.IN_TRANSACTION;
}
@Override
public void abortTransaction() throws ProducerFencedException {
LOG.debug("abortTransaction {}", transactionalId);
- checkState(inTransaction, "Transaction was not started");
- inTransaction = false;
- hasRecordsInTransaction = false;
+ checkState(isInTransaction(), "Transaction was not started");
+ transactionState = TransactionState.NOT_IN_TRANSACTION;
super.abortTransaction();
}
@Override
public void commitTransaction() throws ProducerFencedException {
LOG.debug("commitTransaction {}", transactionalId);
- checkState(inTransaction, "Transaction was not started");
- inTransaction = false;
- hasRecordsInTransaction = false;
+ checkState(isInTransaction(), "Transaction was not started");
+ transactionState = TransactionState.NOT_IN_TRANSACTION;
super.commitTransaction();
}
public boolean isInTransaction() {
- return inTransaction;
+ return transactionState != TransactionState.NOT_IN_TRANSACTION;
}
public boolean hasRecordsInTransaction() {
- return hasRecordsInTransaction;
+ return transactionState == TransactionState.DATA_IN_TRANSACTION;
+ }
+
+ public boolean isPrecommitted() {
+ return transactionState == TransactionState.PRECOMMITTED;
+ }
+
+ public void precommitTransaction() {
+ checkState(hasRecordsInTransaction(), "Transaction was not started");
+ transactionState = TransactionState.PRECOMMITTED;
}
@Override
@@ -172,7 +178,7 @@ public class FlinkKafkaInternalProducer<K, V> extends
KafkaProducer<K, V> {
*/
public void setTransactionId(String transactionalId) {
checkState(
- !inTransaction,
+ !isInTransaction(),
String.format("Another transaction %s is still open.",
transactionalId));
LOG.debug("Change transaction id from {} to {}", this.transactionalId,
transactionalId);
this.transactionalId = transactionalId;
@@ -292,7 +298,7 @@ public class FlinkKafkaInternalProducer<K, V> extends
KafkaProducer<K, V> {
*
https://github.com/apache/kafka/commit/5d2422258cb975a137a42a4e08f03573c49a387e#diff-f4ef1afd8792cd2a2e9069cd7ddea630
*/
public void resumeTransaction(long producerId, short epoch) {
- checkState(!inTransaction, "Already in transaction %s",
transactionalId);
+ checkState(!isInTransaction(), "Already in transaction %s",
transactionalId);
checkState(
producerId >= 0 && epoch >= 0,
"Incorrect values for producerId %s and epoch %s",
@@ -329,9 +335,8 @@ public class FlinkKafkaInternalProducer<K, V> extends
KafkaProducer<K, V> {
// when we create recovery producers to resume transactions and
commit
// them, we should always set this flag.
setField(transactionManager, "transactionStarted", true);
- this.inTransaction = true;
- this.hasRecordsInTransaction = true;
}
+ this.transactionState = TransactionState.PRECOMMITTED;
}
private static Object createProducerIdAndEpoch(long producerId, short
epoch) {
@@ -391,7 +396,14 @@ public class FlinkKafkaInternalProducer<K, V> extends
KafkaProducer<K, V> {
@Override
public String toString() {
return String.format(
- "FlinkKafkaInternalProducer@%d{transactionalId='%s',
inTransaction=%s, closed=%s}",
- System.identityHashCode(this), transactionalId, inTransaction,
closed);
+ "FlinkKafkaInternalProducer@%d{transactionalId='%s',
transactionState=%s, closed=%s}",
+ System.identityHashCode(this), transactionalId,
transactionState, closed);
+ }
+
+ enum TransactionState {
+ NOT_IN_TRANSACTION,
+ IN_TRANSACTION,
+ DATA_IN_TRANSACTION,
+ PRECOMMITTED,
}
}
diff --git
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
index f94a9354..fc9ad3d9 100644
---
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
+++
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
@@ -18,6 +18,7 @@
package org.apache.flink.connector.kafka.sink;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SerializationSchema;
@@ -239,6 +240,31 @@ public class KafkaSinkITCase extends TestLogger {
writeRecordsToKafka(DeliveryGuarantee.EXACTLY_ONCE, namingStrategy,
chained);
}
+ @Test
+ public void testWriteRecordsToKafkaWithExactlyOnceGuaranteeBatch() throws
Exception {
+ final StreamExecutionEnvironment env =
+
StreamExecutionEnvironment.getExecutionEnvironment(createConfiguration(1));
+ env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+ int count = 1000;
+ final DataStream<Long> source = createSource(env, false, count);
+ source.sinkTo(
+ new KafkaSinkBuilder<Long>()
+
.setBootstrapServers(KAFKA_CONTAINER.getBootstrapServers())
+ .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
+ .setRecordSerializer(
+ KafkaRecordSerializationSchema.builder()
+ .setTopic(topic)
+ .setValueSerializationSchema(new
RecordSerializer())
+ .build())
+ .setTransactionalIdPrefix("kafka-sink")
+ .build());
+ env.execute();
+
+ final List<Long> collectedRecords =
+ deserializeValues(drainAllRecordsFromTopic(topic, true));
+ assertThat(collectedRecords).hasSize(count);
+ }
+
static Stream<Arguments> getEOSParameters() {
return Arrays.stream(TransactionNamingStrategy.values())
.flatMap(
@@ -659,11 +685,18 @@ public class KafkaSinkITCase extends TestLogger {
}
private DataStream<Long> createThrottlingSource(StreamExecutionEnvironment
env) {
+ return createSource(env, true, 1000);
+ }
+
+ private DataStream<Long> createSource(
+ StreamExecutionEnvironment env, boolean throttled, int count) {
return env.fromSource(
new DataGeneratorSource<>(
value -> value,
- 1000,
- new ThrottleUntilFirstCheckpointStrategy(),
+ count,
+ throttled
+ ? new ThrottleUntilFirstCheckpointStrategy()
+ : RateLimiterStrategy.noOp(),
BasicTypeInfo.LONG_TYPE_INFO),
WatermarkStrategy.noWatermarks(),
"Generator Source");