This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git

commit fb28015127b5d16f3eb3f60d5684d8bb476779f9
Author: Arvid Heise <[email protected]>
AuthorDate: Mon Mar 31 11:10:50 2025 +0200

    [FLINK-34554] Adding listing abort strategy
    
    Use ListTransactionAPI to exactly query the lingering transactions and 
abort them. The writer needs to keep transactions that are to be committed in 
state to not accidentally also cancel them.
---
 .../c0d94764-76a0-4c50-b617-70b1754c4612           |  19 +-
 .../kafka/sink/ExactlyOnceKafkaWriter.java         | 113 ++++++++++--
 .../connector/kafka/sink/KafkaWriterState.java     |  75 +++++++-
 .../kafka/sink/KafkaWriterStateSerializer.java     |  44 ++++-
 .../kafka/sink/internal/ProducerPool.java          |   5 +
 .../kafka/sink/internal/ProducerPoolImpl.java      | 104 +++++++----
 .../TransactionAbortStrategyContextImpl.java       |  50 +++++-
 .../internal/TransactionAbortStrategyImpl.java     |  73 ++++++++
 .../internal/TransactionNamingStrategyImpl.java    |  12 +-
 .../kafka/sink/internal/TransactionOwnership.java  | 192 +++++++++++++++++++++
 .../sink/internal/TransactionalIdFactory.java      |  13 ++
 .../subscriber/KafkaSubscriberUtils.java           |  59 -------
 .../subscriber/PartitionSetSubscriber.java         |   2 +-
 .../enumerator/subscriber/TopicListSubscriber.java |   2 +-
 .../subscriber/TopicPatternSubscriber.java         |   2 +-
 .../flink/connector/kafka/util/AdminUtils.java     | 137 +++++++++++++++
 .../kafka/sink/ExactlyOnceKafkaWriterITCase.java   |   9 +-
 .../kafka/sink/KafkaWriterStateSerializerTest.java |  15 +-
 .../sink/internal/ProducerPoolImplITCase.java      |  51 +++++-
 .../sink/internal/TransactionIdFactoryTest.java    |  48 ++++++
 20 files changed, 892 insertions(+), 133 deletions(-)

diff --git 
a/flink-connector-kafka/archunit-violations/c0d94764-76a0-4c50-b617-70b1754c4612
 
b/flink-connector-kafka/archunit-violations/c0d94764-76a0-4c50-b617-70b1754c4612
index b153c29d..20e55f4c 100644
--- 
a/flink-connector-kafka/archunit-violations/c0d94764-76a0-4c50-b617-70b1754c4612
+++ 
b/flink-connector-kafka/archunit-violations/c0d94764-76a0-4c50-b617-70b1754c4612
@@ -3,6 +3,7 @@ Class 
<org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator$
 Class 
<org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator$PartitionOffsetsRetrieverImpl>
 is annotated with <org.apache.flink.annotation.VisibleForTesting> in 
(KafkaSourceEnumerator.java:0)
 Constructor 
<org.apache.flink.connector.kafka.dynamic.source.enumerator.DynamicKafkaSourceEnumerator.<init>(org.apache.flink.connector.kafka.dynamic.source.enumerator.subscriber.KafkaStreamSubscriber,
 org.apache.flink.connector.kafka.dynamic.metadata.KafkaMetadataService, 
org.apache.flink.api.connector.source.SplitEnumeratorContext, 
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer,
 org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInit 
[...]
 Constructor 
<org.apache.flink.connector.kafka.dynamic.source.reader.DynamicKafkaSourceReader.<init>(org.apache.flink.api.connector.source.SourceReaderContext,
 
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema,
 java.util.Properties)> calls constructor 
<org.apache.flink.streaming.runtime.io.MultipleFuturesAvailabilityHelper.<init>(int)>
 in (DynamicKafkaSourceReader.java:114)
+Constructor 
<org.apache.flink.connector.kafka.sink.KafkaWriterState.<init>(java.lang.String,
 int, int, org.apache.flink.connector.kafka.sink.internal.TransactionOwnership, 
java.util.Collection)> is annotated with 
<org.apache.flink.annotation.VisibleForTesting> in (KafkaWriterState.java:0)
 Constructor 
<org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaRecordSerializationSchema.<init>(java.util.List,
 java.util.regex.Pattern, 
org.apache.flink.connector.kafka.sink.KafkaPartitioner, 
org.apache.flink.api.common.serialization.SerializationSchema, 
org.apache.flink.api.common.serialization.SerializationSchema, 
[Lorg.apache.flink.table.data.RowData$FieldGetter;, 
[Lorg.apache.flink.table.data.RowData$FieldGetter;, boolean, [I, boolean)> has 
parameter of type <[Lorg.apach [...]
 Field 
<org.apache.flink.connector.kafka.dynamic.source.metrics.KafkaClusterMetricGroupManager.metricGroups>
 has generic type <java.util.Map<java.lang.String, 
org.apache.flink.runtime.metrics.groups.AbstractMetricGroup>> with type 
argument depending on 
<org.apache.flink.runtime.metrics.groups.AbstractMetricGroup> in 
(KafkaClusterMetricGroupManager.java:0)
 Field 
<org.apache.flink.connector.kafka.dynamic.source.reader.DynamicKafkaSourceReader.availabilityHelper>
 has type 
<org.apache.flink.streaming.runtime.io.MultipleFuturesAvailabilityHelper> in 
(DynamicKafkaSourceReader.java:0)
@@ -25,14 +26,14 @@ Method 
<org.apache.flink.connector.kafka.sink.ExactlyOnceKafkaWriter.getProducer
 Method 
<org.apache.flink.connector.kafka.sink.ExactlyOnceKafkaWriter.getTransactionalIdPrefix()>
 is annotated with <org.apache.flink.annotation.VisibleForTesting> in 
(ExactlyOnceKafkaWriter.java:0)
 Method <org.apache.flink.connector.kafka.sink.KafkaCommitter.getBackchannel()> 
is annotated with <org.apache.flink.annotation.VisibleForTesting> in 
(KafkaCommitter.java:0)
 Method 
<org.apache.flink.connector.kafka.sink.KafkaCommitter.getCommittingProducer()> 
is annotated with <org.apache.flink.annotation.VisibleForTesting> in 
(KafkaCommitter.java:0)
-Method 
<org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)>
 calls method <org.apache.flink.api.dag.Transformation.getCoLocationGroupKey()> 
in (KafkaSink.java:170)
-Method 
<org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)>
 calls method <org.apache.flink.api.dag.Transformation.getInputs()> in 
(KafkaSink.java:173)
-Method 
<org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)>
 calls method <org.apache.flink.api.dag.Transformation.getOutputType()> in 
(KafkaSink.java:169)
-Method 
<org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)>
 calls method 
<org.apache.flink.api.dag.Transformation.setCoLocationGroupKey(java.lang.String)>
 in (KafkaSink.java:172)
-Method 
<org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)>
 checks instanceof 
<org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo> in 
(KafkaSink.java:169)
+Method 
<org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)>
 calls method <org.apache.flink.api.dag.Transformation.getCoLocationGroupKey()> 
in (KafkaSink.java:177)
+Method 
<org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)>
 calls method <org.apache.flink.api.dag.Transformation.getInputs()> in 
(KafkaSink.java:180)
+Method 
<org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)>
 calls method <org.apache.flink.api.dag.Transformation.getOutputType()> in 
(KafkaSink.java:176)
+Method 
<org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)>
 calls method 
<org.apache.flink.api.dag.Transformation.setCoLocationGroupKey(java.lang.String)>
 in (KafkaSink.java:179)
+Method 
<org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)>
 checks instanceof 
<org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo> in 
(KafkaSink.java:176)
 Method 
<org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)>
 has generic parameter type 
<org.apache.flink.streaming.api.datastream.DataStream<org.apache.flink.streaming.api.connector.sink2.CommittableMessage<org.apache.flink.connector.kafka.sink.KafkaCommittable>>>
 with type argument depending on 
<org.apache.flink.streaming.api.connector.sink2.CommittableMessage> in 
(KafkaSink.java:0)
 Method 
<org.apache.flink.connector.kafka.sink.KafkaSink.getKafkaProducerConfig()> is 
annotated with <org.apache.flink.annotation.VisibleForTesting> in 
(KafkaSink.java:0)
-Method 
<org.apache.flink.connector.kafka.sink.KafkaSinkBuilder.setRecordSerializer(org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema)>
 calls method <org.apache.flink.api.java.ClosureCleaner.clean(java.lang.Object, 
org.apache.flink.api.common.ExecutionConfig$ClosureCleanerLevel, boolean)> in 
(KafkaSinkBuilder.java:152)
+Method 
<org.apache.flink.connector.kafka.sink.KafkaSinkBuilder.setRecordSerializer(org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema)>
 calls method <org.apache.flink.api.java.ClosureCleaner.clean(java.lang.Object, 
org.apache.flink.api.common.ExecutionConfig$ClosureCleanerLevel, boolean)> in 
(KafkaSinkBuilder.java:153)
 Method 
<org.apache.flink.connector.kafka.sink.KafkaWriter.getCurrentProducer()> is 
annotated with <org.apache.flink.annotation.VisibleForTesting> in 
(KafkaWriter.java:0)
 Method 
<org.apache.flink.connector.kafka.sink.internal.ProducerPoolImpl.getProducers()>
 is annotated with <org.apache.flink.annotation.VisibleForTesting> in 
(ProducerPoolImpl.java:0)
 Method 
<org.apache.flink.connector.kafka.source.KafkaSource.createReader(org.apache.flink.api.connector.source.SourceReaderContext,
 java.util.function.Consumer)> is annotated with 
<org.apache.flink.annotation.VisibleForTesting> in (KafkaSource.java:0)
@@ -48,8 +49,8 @@ Method 
<org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader
 Method 
<org.apache.flink.connector.kafka.source.reader.KafkaSourceReader.getNumAliveFetchers()>
 is annotated with <org.apache.flink.annotation.VisibleForTesting> in 
(KafkaSourceReader.java:0)
 Method 
<org.apache.flink.connector.kafka.source.reader.KafkaSourceReader.getOffsetsToCommit()>
 is annotated with <org.apache.flink.annotation.VisibleForTesting> in 
(KafkaSourceReader.java:0)
 Method 
<org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaRecordSerializationSchema.createProjectedRow(org.apache.flink.table.data.RowData,
 org.apache.flink.types.RowKind, 
[Lorg.apache.flink.table.data.RowData$FieldGetter;)> has parameter of type 
<[Lorg.apache.flink.table.data.RowData$FieldGetter;> in 
(DynamicKafkaRecordSerializationSchema.java:0)
-Method 
<org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.createKeyFormatProjection(org.apache.flink.configuration.ReadableConfig,
 org.apache.flink.table.types.DataType)> calls method 
<org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getFieldNames(org.apache.flink.table.types.logical.LogicalType)>
 in (KafkaConnectorOptionsUtil.java:525)
-Method 
<org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.createValueFormatProjection(org.apache.flink.configuration.ReadableConfig,
 org.apache.flink.table.types.DataType)> calls method 
<org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getFieldCount(org.apache.flink.table.types.logical.LogicalType)>
 in (KafkaConnectorOptionsUtil.java:569)
+Method 
<org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.createKeyFormatProjection(org.apache.flink.configuration.ReadableConfig,
 org.apache.flink.table.types.DataType)> calls method 
<org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getFieldNames(org.apache.flink.table.types.logical.LogicalType)>
 in (KafkaConnectorOptionsUtil.java:520)
+Method 
<org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.createValueFormatProjection(org.apache.flink.configuration.ReadableConfig,
 org.apache.flink.table.types.DataType)> calls method 
<org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getFieldCount(org.apache.flink.table.types.logical.LogicalType)>
 in (KafkaConnectorOptionsUtil.java:564)
 Method 
<org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSink.createSerialization(org.apache.flink.table.connector.sink.DynamicTableSink$Context,
 org.apache.flink.table.connector.format.EncodingFormat, [I, java.lang.String)> 
calls method 
<org.apache.flink.table.types.utils.DataTypeUtils.stripRowPrefix(org.apache.flink.table.types.DataType,
 java.lang.String)> in (KafkaDynamicSink.java:401)
 Method 
<org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSink.getFieldGetters(java.util.List,
 [I)> has return type <[Lorg.apache.flink.table.data.RowData$FieldGetter;> in 
(KafkaDynamicSink.java:0)
-Method 
<org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.createDeserialization(org.apache.flink.table.connector.source.DynamicTableSource$Context,
 org.apache.flink.table.connector.format.DecodingFormat, [I, java.lang.String)> 
calls method 
<org.apache.flink.table.types.utils.DataTypeUtils.stripRowPrefix(org.apache.flink.table.types.DataType,
 java.lang.String)> in (KafkaDynamicSource.java:580)
+Method 
<org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.createDeserialization(org.apache.flink.table.connector.source.DynamicTableSource$Context,
 org.apache.flink.table.connector.format.DecodingFormat, [I, java.lang.String)> 
calls method 
<org.apache.flink.table.types.utils.DataTypeUtils.stripRowPrefix(org.apache.flink.table.types.DataType,
 java.lang.String)> in (KafkaDynamicSource.java:574)
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriter.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriter.java
index 32131056..e75436d8 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriter.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriter.java
@@ -18,10 +18,15 @@
 package org.apache.flink.connector.kafka.sink;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.api.connector.sink2.WriterInitContext;
 import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.kafka.lineage.KafkaDatasetFacet;
+import org.apache.flink.connector.kafka.lineage.KafkaDatasetFacetProvider;
+import org.apache.flink.connector.kafka.lineage.KafkaDatasetIdentifier;
 import org.apache.flink.connector.kafka.sink.internal.BackchannelFactory;
+import org.apache.flink.connector.kafka.sink.internal.CheckpointTransaction;
 import 
org.apache.flink.connector.kafka.sink.internal.FlinkKafkaInternalProducer;
 import org.apache.flink.connector.kafka.sink.internal.ProducerPool;
 import org.apache.flink.connector.kafka.sink.internal.ProducerPoolImpl;
@@ -31,9 +36,13 @@ import 
org.apache.flink.connector.kafka.sink.internal.TransactionAbortStrategyIm
 import org.apache.flink.connector.kafka.sink.internal.TransactionFinished;
 import 
org.apache.flink.connector.kafka.sink.internal.TransactionNamingStrategyContextImpl;
 import 
org.apache.flink.connector.kafka.sink.internal.TransactionNamingStrategyImpl;
+import org.apache.flink.connector.kafka.sink.internal.TransactionOwnership;
+import org.apache.flink.connector.kafka.util.AdminUtils;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
 import org.apache.flink.util.FlinkRuntimeException;
 
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.errors.ProducerFencedException;
 import org.slf4j.Logger;
@@ -46,6 +55,8 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 import static org.apache.flink.util.IOUtils.closeAll;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -69,7 +80,6 @@ class ExactlyOnceKafkaWriter<IN> extends KafkaWriter<IN> {
     /** Strategy to name transactions. */
     private final TransactionNamingStrategyImpl transactionNamingStrategy;
 
-    private final KafkaWriterState kafkaWriterState;
     private final Collection<KafkaWriterState> recoveredStates;
     private final long restoredCheckpointId;
 
@@ -88,6 +98,11 @@ class ExactlyOnceKafkaWriter<IN> extends KafkaWriter<IN> {
     /** The context used to name transactions. */
     private final TransactionNamingStrategyContextImpl namingContext;
 
+    private final int totalNumberOfOwnedSubtasks;
+    private final int[] ownedSubtaskIds;
+    /** Lazily created admin client for {@link TransactionAbortStrategyImpl}. 
*/
+    private AdminClient adminClient;
+
     /**
      * Constructor creating a kafka writer.
      *
@@ -133,25 +148,37 @@ class ExactlyOnceKafkaWriter<IN> extends KafkaWriter<IN> {
             throw new FlinkRuntimeException("Cannot initialize schema.", e);
         }
 
-        this.kafkaWriterState = new KafkaWriterState(transactionalIdPrefix);
-
         this.recoveredStates = checkNotNull(recoveredStates, 
"recoveredStates");
+        TaskInfo taskInfo = sinkInitContext.getTaskInfo();
+        TransactionOwnership ownership = 
transactionNamingStrategy.getOwnership();
+        int subtaskId = taskInfo.getIndexOfThisSubtask();
+        int parallelism = taskInfo.getNumberOfParallelSubtasks();
+        this.ownedSubtaskIds =
+                ownership.getOwnedSubtaskIds(subtaskId, parallelism, 
recoveredStates);
+        this.totalNumberOfOwnedSubtasks =
+                ownership.getTotalNumberOfOwnedSubtasks(subtaskId, 
parallelism, recoveredStates);
         initFlinkMetrics();
         restoredCheckpointId =
                 sinkInitContext
                         .getRestoredCheckpointId()
                         .orElse(CheckpointIDCounter.INITIAL_CHECKPOINT_ID - 1);
-        int subtaskId = sinkInitContext.getTaskInfo().getIndexOfThisSubtask();
-        this.producerPool = new ProducerPoolImpl(kafkaProducerConfig, 
this::initKafkaMetrics);
+        this.producerPool =
+                new ProducerPoolImpl(
+                        kafkaProducerConfig,
+                        this::initKafkaMetrics,
+                        recoveredStates.stream()
+                                .flatMap(r -> 
r.getPrecommittedTransactionalIds().stream())
+                                .collect(Collectors.toList()));
         this.backchannel =
                 BackchannelFactory.getInstance()
                         .getReadableBackchannel(
-                                subtaskId,
-                                
sinkInitContext.getTaskInfo().getAttemptNumber(),
-                                transactionalIdPrefix);
+                                subtaskId, taskInfo.getAttemptNumber(), 
transactionalIdPrefix);
         this.namingContext =
                 new TransactionNamingStrategyContextImpl(
-                        transactionalIdPrefix, subtaskId, 
restoredCheckpointId, producerPool);
+                        transactionalIdPrefix,
+                        this.ownedSubtaskIds[0],
+                        restoredCheckpointId,
+                        producerPool);
     }
 
     @Override
@@ -202,8 +229,30 @@ class ExactlyOnceKafkaWriter<IN> extends KafkaWriter<IN> {
             producerPool.recycleByTransactionId(
                     finishedTransaction.getTransactionId(), 
finishedTransaction.isSuccess());
         }
+        // persist the ongoing transactions into the state; these will not be 
aborted on restart
+        Collection<CheckpointTransaction> ongoingTransactions =
+                producerPool.getOngoingTransactions();
         currentProducer = startTransaction(checkpointId + 1);
-        return Collections.singletonList(kafkaWriterState);
+        return createSnapshots(ongoingTransactions);
+    }
+
+    private List<KafkaWriterState> createSnapshots(
+            Collection<CheckpointTransaction> ongoingTransactions) {
+        List<KafkaWriterState> states = new ArrayList<>();
+        int[] subtaskIds = this.ownedSubtaskIds;
+        for (int index = 0; index < subtaskIds.length; index++) {
+            int ownedSubtask = subtaskIds[index];
+            states.add(
+                    new KafkaWriterState(
+                            transactionalIdPrefix,
+                            ownedSubtask,
+                            totalNumberOfOwnedSubtasks,
+                            transactionNamingStrategy.getOwnership(),
+                            // new transactions are only created with the 
first owned subtask id
+                            index == 0 ? ongoingTransactions : List.of()));
+        }
+        LOG.debug("Snapshotting state {}", states);
+        return states;
     }
 
     @Override
@@ -281,11 +330,53 @@ class ExactlyOnceKafkaWriter<IN> extends KafkaWriter<IN> {
                     producerPool.recycle(producer);
                     return epoch;
                 };
+        Set<String> precommittedTransactionalIds =
+                recoveredStates.stream()
+                        .flatMap(
+                                s ->
+                                        
s.getPrecommittedTransactionalIds().stream()
+                                                
.map(CheckpointTransaction::getTransactionalId))
+                        .collect(Collectors.toSet());
         return new TransactionAbortStrategyContextImpl(
+                this::getTopicNames,
                 kafkaSinkContext.getParallelInstanceId(),
                 kafkaSinkContext.getNumberOfParallelInstances(),
+                ownedSubtaskIds,
+                totalNumberOfOwnedSubtasks,
                 prefixesToAbort,
                 startCheckpointId,
-                aborter);
+                aborter,
+                this::getAdminClient,
+                precommittedTransactionalIds);
+    }
+
+    private Collection<String> getTopicNames() {
+        KafkaDatasetIdentifier identifier =
+                getDatasetIdentifier()
+                        .orElseThrow(
+                                () ->
+                                        new IllegalStateException(
+                                                "The record serializer does 
not expose a static list of target topics."));
+        if (identifier.getTopics() != null) {
+            return identifier.getTopics();
+        }
+        return AdminUtils.getTopicsByPattern(getAdminClient(), 
identifier.getTopicPattern());
+    }
+
+    private Optional<KafkaDatasetIdentifier> getDatasetIdentifier() {
+        if (recordSerializer instanceof KafkaDatasetFacetProvider) {
+            Optional<KafkaDatasetFacet> kafkaDatasetFacet =
+                    ((KafkaDatasetFacetProvider) 
recordSerializer).getKafkaDatasetFacet();
+
+            return 
kafkaDatasetFacet.map(KafkaDatasetFacet::getTopicIdentifier);
+        }
+        return Optional.empty();
+    }
+
+    private Admin getAdminClient() {
+        if (adminClient == null) {
+            adminClient = AdminClient.create(kafkaProducerConfig);
+        }
+        return adminClient;
     }
 }
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriterState.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriterState.java
index b4482c69..5c2e742e 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriterState.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriterState.java
@@ -17,44 +17,101 @@
 
 package org.apache.flink.connector.kafka.sink;
 
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connector.kafka.sink.internal.CheckpointTransaction;
+import org.apache.flink.connector.kafka.sink.internal.TransactionOwnership;
+
+import java.util.Collection;
 import java.util.Objects;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
-class KafkaWriterState {
+/** The state of the Kafka writer. Used to capture information regarding 
transactions. */
+@Internal
+public class KafkaWriterState {
+    public static final int UNKNOWN = -1;
+
     private final String transactionalIdPrefix;
+    private final int ownedSubtaskId;
+    private final int totalNumberOfOwnedSubtasks;
+    private final TransactionOwnership transactionOwnership;
+    private final Collection<CheckpointTransaction> 
precommittedTransactionalIds;
 
-    KafkaWriterState(String transactionalIdPrefix) {
+    @VisibleForTesting
+    public KafkaWriterState(
+            String transactionalIdPrefix,
+            int ownedSubtaskId,
+            int totalNumberOfOwnedSubtasks,
+            TransactionOwnership transactionOwnership,
+            Collection<CheckpointTransaction> precommittedTransactionalIds) {
         this.transactionalIdPrefix = checkNotNull(transactionalIdPrefix, 
"transactionalIdPrefix");
+        this.ownedSubtaskId = ownedSubtaskId;
+        this.totalNumberOfOwnedSubtasks = totalNumberOfOwnedSubtasks;
+        this.transactionOwnership = transactionOwnership;
+        this.precommittedTransactionalIds = precommittedTransactionalIds;
     }
 
     public String getTransactionalIdPrefix() {
         return transactionalIdPrefix;
     }
 
+    public int getOwnedSubtaskId() {
+        return ownedSubtaskId;
+    }
+
+    public int getTotalNumberOfOwnedSubtasks() {
+        return totalNumberOfOwnedSubtasks;
+    }
+
+    public Collection<CheckpointTransaction> getPrecommittedTransactionalIds() 
{
+        return precommittedTransactionalIds;
+    }
+
+    public TransactionOwnership getTransactionOwnership() {
+        return transactionOwnership;
+    }
+
     @Override
-    public boolean equals(Object o) {
-        if (this == o) {
+    public boolean equals(Object object) {
+        if (this == object) {
             return true;
         }
-        if (o == null || getClass() != o.getClass()) {
+        if (object == null || getClass() != object.getClass()) {
             return false;
         }
-        KafkaWriterState that = (KafkaWriterState) o;
-        return transactionalIdPrefix.equals(that.transactionalIdPrefix);
+        KafkaWriterState that = (KafkaWriterState) object;
+        return ownedSubtaskId == that.ownedSubtaskId
+                && totalNumberOfOwnedSubtasks == 
that.totalNumberOfOwnedSubtasks
+                && Objects.equals(transactionalIdPrefix, 
that.transactionalIdPrefix)
+                && transactionOwnership == that.transactionOwnership
+                && Objects.equals(precommittedTransactionalIds, 
that.precommittedTransactionalIds);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(transactionalIdPrefix);
+        return Objects.hash(
+                transactionalIdPrefix,
+                ownedSubtaskId,
+                totalNumberOfOwnedSubtasks,
+                transactionOwnership,
+                precommittedTransactionalIds);
     }
 
     @Override
     public String toString() {
         return "KafkaWriterState{"
-                + ", transactionalIdPrefix='"
+                + "transactionalIdPrefix='"
                 + transactionalIdPrefix
                 + '\''
+                + ", ownedSubtaskId="
+                + ownedSubtaskId
+                + ", totalNumberOfOwnedSubtasks="
+                + totalNumberOfOwnedSubtasks
+                + ", transactionOwnership="
+                + transactionOwnership
+                + ", precommittedTransactionalIds="
+                + precommittedTransactionalIds
                 + '}';
     }
 }
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriterStateSerializer.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriterStateSerializer.java
index 5c91967c..529d80b0 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriterStateSerializer.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriterStateSerializer.java
@@ -17,6 +17,8 @@
 
 package org.apache.flink.connector.kafka.sink;
 
+import org.apache.flink.connector.kafka.sink.internal.CheckpointTransaction;
+import org.apache.flink.connector.kafka.sink.internal.TransactionOwnership;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 
 import java.io.ByteArrayInputStream;
@@ -24,13 +26,16 @@ import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+
+import static org.apache.flink.connector.kafka.sink.KafkaWriterState.UNKNOWN;
 
 /** A serializer used to serialize {@link KafkaWriterState}. */
 class KafkaWriterStateSerializer implements 
SimpleVersionedSerializer<KafkaWriterState> {
-
     @Override
     public int getVersion() {
-        return 1;
+        return 2;
     }
 
     @Override
@@ -38,6 +43,14 @@ class KafkaWriterStateSerializer implements 
SimpleVersionedSerializer<KafkaWrite
         try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
                 final DataOutputStream out = new DataOutputStream(baos)) {
             out.writeUTF(state.getTransactionalIdPrefix());
+            out.writeInt(state.getOwnedSubtaskId());
+            out.writeInt(state.getTotalNumberOfOwnedSubtasks());
+            out.writeInt(state.getTransactionOwnership().ordinal());
+            out.writeInt(state.getPrecommittedTransactionalIds().size());
+            for (CheckpointTransaction transaction : 
state.getPrecommittedTransactionalIds()) {
+                out.writeUTF(transaction.getTransactionalId());
+                out.writeLong(transaction.getCheckpointId());
+            }
             out.flush();
             return baos.toByteArray();
         }
@@ -45,10 +58,33 @@ class KafkaWriterStateSerializer implements 
SimpleVersionedSerializer<KafkaWrite
 
     @Override
     public KafkaWriterState deserialize(int version, byte[] serialized) throws 
IOException {
+        if (version > 2) {
+            throw new IOException("Unknown version: " + version);
+        }
+
         try (final ByteArrayInputStream bais = new 
ByteArrayInputStream(serialized);
                 final DataInputStream in = new DataInputStream(bais)) {
-            final String transactionalIdPrefx = in.readUTF();
-            return new KafkaWriterState(transactionalIdPrefx);
+            final String transactionalIdPrefix = in.readUTF();
+            int ownedSubtaskId = UNKNOWN;
+            int totalNumberOfOwnedSubtasks = UNKNOWN;
+            TransactionOwnership transactionOwnership = 
TransactionOwnership.IMPLICIT_BY_SUBTASK_ID;
+            final Collection<CheckpointTransaction> precommitted = new 
ArrayList<>();
+            if (version == 2) {
+                ownedSubtaskId = in.readInt();
+                totalNumberOfOwnedSubtasks = in.readInt();
+                transactionOwnership = 
TransactionOwnership.values()[in.readInt()];
+
+                final int usedTransactionIdsSize = in.readInt();
+                for (int i = 0; i < usedTransactionIdsSize; i++) {
+                    precommitted.add(new CheckpointTransaction(in.readUTF(), 
in.readLong()));
+                }
+            }
+            return new KafkaWriterState(
+                    transactionalIdPrefix,
+                    ownedSubtaskId,
+                    totalNumberOfOwnedSubtasks,
+                    transactionOwnership,
+                    precommitted);
         }
     }
 }
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/ProducerPool.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/ProducerPool.java
index d47cca28..77a95aa4 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/ProducerPool.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/ProducerPool.java
@@ -20,6 +20,8 @@ package org.apache.flink.connector.kafka.sink.internal;
 
 import org.apache.flink.annotation.Internal;
 
+import java.util.Collection;
+
 /** A pool of producers that can be recycled. */
 @Internal
 public interface ProducerPool extends AutoCloseable {
@@ -38,6 +40,9 @@ public interface ProducerPool extends AutoCloseable {
     FlinkKafkaInternalProducer<byte[], byte[]> getTransactionalProducer(
             String transactionalId, long checkpointId);
 
+    /** Returns a snapshot of all ongoing transactions. */
+    Collection<CheckpointTransaction> getOngoingTransactions();
+
     /**
      * Explicitly recycle a producer. This is useful when the producer has not 
been passed to the
      * committer.
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/ProducerPoolImpl.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/ProducerPoolImpl.java
index 38a159ed..3789ea09 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/ProducerPoolImpl.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/ProducerPoolImpl.java
@@ -21,6 +21,7 @@ package org.apache.flink.connector.kafka.sink.internal;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 
+import org.apache.kafka.common.KafkaException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -28,9 +29,11 @@ import javax.annotation.Nullable;
 import javax.annotation.concurrent.NotThreadSafe;
 
 import java.util.ArrayDeque;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.Deque;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.NavigableMap;
 import java.util.Objects;
@@ -96,28 +99,39 @@ public class ProducerPoolImpl implements ProducerPool {
      * checkpoints.
      */
     private final NavigableMap<CheckpointTransaction, String> 
transactionalIdsByCheckpoint =
-            new 
TreeMap<>(Comparator.comparing(CheckpointTransaction::getCheckpointId));
+            new TreeMap<>(
+                    
Comparator.comparing(CheckpointTransaction::getCheckpointId)
+                            
.thenComparing(CheckpointTransaction::getTransactionalId));
 
     /** Creates a new {@link ProducerPoolImpl}. */
     public ProducerPoolImpl(
             Properties kafkaProducerConfig,
-            Consumer<FlinkKafkaInternalProducer<byte[], byte[]>> producerInit) 
{
+            Consumer<FlinkKafkaInternalProducer<byte[], byte[]>> producerInit,
+            Collection<CheckpointTransaction> precommittedTransactions) {
         this.kafkaProducerConfig =
                 checkNotNull(kafkaProducerConfig, "kafkaProducerConfig must 
not be null");
         this.producerInit = checkNotNull(producerInit, "producerInit must not 
be null");
+
+        initPrecommittedTransactions(precommittedTransactions);
     }
 
     @Override
     public void recycleByTransactionId(String transactionalId, boolean 
success) {
         ProducerEntry producerEntry = 
producerByTransactionalId.remove(transactionalId);
         LOG.debug("Transaction {} finished, producer {}", transactionalId, 
producerEntry);
+
         if (producerEntry == null) {
-            // during recovery, the committer may finish transactions that are 
not yet ongoing from
-            // the writer's perspective
-            // these transaction will be closed by the second half of this 
method eventually
+            LOG.info(
+                    "Received unmatched producer for transaction {}. This is 
expected during rescale.",
+                    transactionalId);
+            // recycle of unmatched entries happens on next checkpoint at the 
second half of this
+            // method
             return;
         }
 
+        long finishedChkId = 
producerEntry.getCheckpointedTransaction().getCheckpointId();
+        boolean hasTransactionsFromPreviousCheckpoint =
+                transactionalIdsByCheckpoint.firstKey().getCheckpointId() != 
finishedChkId;
         
transactionalIdsByCheckpoint.remove(producerEntry.getCheckpointedTransaction());
         if (success) {
             recycleProducer(producerEntry.getProducer());
@@ -125,23 +139,26 @@ public class ProducerPoolImpl implements ProducerPool {
             closeProducer(producerEntry.getProducer());
         }
 
-        // In rare cases (only for non-chained committer), some transactions 
may not be detected to
-        // be finished.
+        // In rare cases (non-chained committer or recovery), some 
transactions may not be detected
+        // to be finished.
         // For example, a transaction may be committed at the same time the 
writer state is
         // snapshot. The writer contains the transaction as ongoing but the 
committer state will
         // later not contain it.
         // In these cases, we make use of the fact that committables are 
processed in order of the
         // checkpoint id.
         // That means a transaction state with checkpoint id C implies that 
all C' < C are finished.
-        NavigableMap<CheckpointTransaction, String> earlierTransactions =
-                transactionalIdsByCheckpoint.headMap(
-                        producerEntry.getCheckpointedTransaction(), false);
-        if (!earlierTransactions.isEmpty()) {
-            for (String id : earlierTransactions.values()) {
-                ProducerEntry entry = producerByTransactionalId.remove(id);
-                closeProducer(entry.getProducer());
+        if (hasTransactionsFromPreviousCheckpoint) {
+            // We can safely remove all transactions with checkpoint id < 
finishedChkId.
+            // Entries are primarily sorted by checkpoint id
+            Iterator<Map.Entry<CheckpointTransaction, String>> iterator =
+                    transactionalIdsByCheckpoint.entrySet().iterator();
+            while (iterator.hasNext()) {
+                Map.Entry<CheckpointTransaction, String> entry = 
iterator.next();
+                if (entry.getKey().getCheckpointId() < finishedChkId) {
+                    iterator.remove();
+                    
closeProducer(producerByTransactionalId.remove(entry.getValue()).getProducer());
+                }
             }
-            earlierTransactions.clear();
         }
     }
 
@@ -166,18 +183,38 @@ public class ProducerPoolImpl implements ProducerPool {
             return;
         }
 
-        // For non-chained committer, we have a split brain scenario:
-        // Both the writer and the committer have a producer representing the 
same transaction.
-        // The committer producer has finished the transaction while the 
writer producer is still in
-        // transaction.
-        if (producer.isInTransaction()) {
-            // Here we just double-commit the same transaction which succeeds 
in all cases
-            // because the producer shares the same epoch as the committer's 
producer
-            producer.commitTransaction();
+        try {
+            // For non-chained committer, we have a split brain scenario:
+            // Both the writer and the committer have a producer representing 
the same transaction.
+            // The committer producer has finished the transaction while the 
writer producer is
+            // still in transaction.
+            if (producer.isInTransaction()) {
+                // Here we just double-commit the same transaction which 
succeeds in all cases
+                // because the producer shares the same epoch as the 
committer's producer
+                producer.commitTransaction();
+            }
+
+            producerPool.add(producer);
+
+            LOG.debug("Recycling {}, new pool size {}", producer, 
producerPool.size());
+        } catch (KafkaException e) {
+            closeProducer(producer);
+
+            LOG.debug(
+                    "Encountered exception while double-committing, discarding 
producer {}: {}",
+                    producer,
+                    e);
         }
-        producerPool.add(producer);
+    }
 
-        LOG.debug("Recycling {}, new pool size {}", producer, 
producerPool.size());
+    private void initPrecommittedTransactions(
+            Collection<CheckpointTransaction> precommittedTransactions) {
+        for (CheckpointTransaction transaction : precommittedTransactions) {
+            this.transactionalIdsByCheckpoint.put(transaction, 
transaction.getTransactionalId());
+            this.producerByTransactionalId.put(
+                    transaction.getTransactionalId(), new ProducerEntry(null, 
transaction));
+        }
+        LOG.debug("Initialized ongoing transactions from state {}", 
precommittedTransactions);
     }
 
     @Override
@@ -211,6 +248,11 @@ public class ProducerPoolImpl implements ProducerPool {
         return producer;
     }
 
+    @Override
+    public Collection<CheckpointTransaction> getOngoingTransactions() {
+        return new ArrayList<>(transactionalIdsByCheckpoint.keySet());
+    }
+
     @VisibleForTesting
     public Collection<FlinkKafkaInternalProducer<byte[], byte[]>> 
getProducers() {
         return producerPool;
@@ -235,13 +277,13 @@ public class ProducerPoolImpl implements ProducerPool {
     }
 
     private static class ProducerEntry {
-        private final FlinkKafkaInternalProducer<byte[], byte[]> producer;
+        @Nullable private final FlinkKafkaInternalProducer<byte[], byte[]> 
producer;
         private final CheckpointTransaction checkpointedTransaction;
 
         private ProducerEntry(
-                FlinkKafkaInternalProducer<byte[], byte[]> producer,
+                @Nullable FlinkKafkaInternalProducer<byte[], byte[]> producer,
                 CheckpointTransaction checkpointedTransaction) {
-            this.producer = checkNotNull(producer, "producer must not be 
null");
+            this.producer = producer;
             this.checkpointedTransaction =
                     checkNotNull(
                             checkpointedTransaction, "checkpointedTransaction 
must not be null");
@@ -251,13 +293,17 @@ public class ProducerPoolImpl implements ProducerPool {
             return checkpointedTransaction;
         }
 
+        @Nullable
         public FlinkKafkaInternalProducer<byte[], byte[]> getProducer() {
             return producer;
         }
 
         @Override
         public String toString() {
-            return producer.toString();
+            if (producer != null) {
+                return producer.toString();
+            }
+            return checkpointedTransaction.toString();
         }
     }
 }
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionAbortStrategyContextImpl.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionAbortStrategyContextImpl.java
index 80f6386e..c88485ce 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionAbortStrategyContextImpl.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionAbortStrategyContextImpl.java
@@ -20,34 +20,77 @@ package org.apache.flink.connector.kafka.sink.internal;
 
 import org.apache.flink.annotation.Internal;
 import 
org.apache.flink.connector.kafka.sink.internal.TransactionAbortStrategyImpl.TransactionAborter;
+import org.apache.flink.connector.kafka.util.AdminUtils;
 
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.TransactionListing;
+
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
 import java.util.Set;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
 
+import static 
org.apache.flink.connector.kafka.sink.internal.TransactionalIdFactory.extractSubtaskId;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /** Implementation of {@link TransactionAbortStrategyImpl.Context}. */
 @Internal
 public class TransactionAbortStrategyContextImpl implements 
TransactionAbortStrategyImpl.Context {
+    private final Supplier<Admin> adminSupplier;
+    private final Supplier<Collection<String>> topicNames;
     private final int currentSubtaskId;
     private final int currentParallelism;
+    private final Set<Integer> ownedSubtaskIds;
+    private final int totalNumberOfOwnedSubtasks;
     private final Set<String> prefixesToAbort;
     private final long startCheckpointId;
     private final TransactionAborter transactionAborter;
+    /** Transactional ids that mustn't be aborted. */
+    private final Set<String> precommittedTransactionIds;
 
     /** Creates a new {@link TransactionAbortStrategyContextImpl}. */
     public TransactionAbortStrategyContextImpl(
+            Supplier<Collection<String>> topicNames,
             int currentSubtaskId,
             int currentParallelism,
+            int[] ownedSubtaskIds,
+            int totalNumberOfOwnedSubtasks,
             List<String> prefixesToAbort,
             long startCheckpointId,
-            TransactionAborter transactionAborter) {
+            TransactionAborter transactionAborter,
+            Supplier<Admin> adminSupplier,
+            Set<String> precommittedTransactionIds) {
+        this.topicNames = checkNotNull(topicNames, "topicNames must not be 
null");
         this.currentSubtaskId = currentSubtaskId;
         this.currentParallelism = currentParallelism;
+        this.ownedSubtaskIds = 
Arrays.stream(ownedSubtaskIds).boxed().collect(Collectors.toSet());
+        this.totalNumberOfOwnedSubtasks = totalNumberOfOwnedSubtasks;
         this.prefixesToAbort = Set.copyOf(prefixesToAbort);
         this.startCheckpointId = startCheckpointId;
         this.transactionAborter =
                 checkNotNull(transactionAborter, "transactionAborter must not 
be null");
+        this.adminSupplier = checkNotNull(adminSupplier, "adminSupplier must 
not be null");
+        this.precommittedTransactionIds =
+                checkNotNull(
+                        precommittedTransactionIds, "transactionsToBeCommitted 
must not be null");
+    }
+
+    @Override
+    public Collection<String> getOpenTransactionsForTopics() {
+        return AdminUtils.getOpenTransactionsForTopics(adminSupplier.get(), 
topicNames.get())
+                .stream()
+                .map(TransactionListing::transactionalId)
+                .collect(Collectors.toList());
+    }
+
+    @Override
+    public boolean ownsTransactionalId(String transactionalId) {
+        // Only the subtask that owns a respective state with oldSubtaskId can 
abort it
+        // For upscaling: use the modulo operator to extrapolate ownership
+        return ownedSubtaskIds.contains(
+                extractSubtaskId(transactionalId) % 
totalNumberOfOwnedSubtasks);
     }
 
     @Override
@@ -65,6 +108,11 @@ public class TransactionAbortStrategyContextImpl implements 
TransactionAbortStra
         return prefixesToAbort;
     }
 
+    @Override
+    public Set<String> getPrecommittedTransactionalIds() {
+        return precommittedTransactionIds;
+    }
+
     @Override
     public long getStartCheckpointId() {
         return startCheckpointId;
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionAbortStrategyImpl.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionAbortStrategyImpl.java
index 6ea69586..3b8e3d0b 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionAbortStrategyImpl.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionAbortStrategyImpl.java
@@ -23,7 +23,10 @@ import org.apache.flink.annotation.Internal;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collection;
+import java.util.List;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 /** Implementations of an abort strategy for transactions left over from 
previous runs. */
 @Internal
@@ -112,6 +115,46 @@ public enum TransactionAbortStrategyImpl {
             }
             return numTransactionAborted;
         }
+    },
+    LISTING {
+        @Override
+        public void abortTransactions(Context context) {
+            Collection<String> openTransactionsForTopics = 
context.getOpenTransactionsForTopics();
+
+            if (openTransactionsForTopics.isEmpty()) {
+                return;
+            }
+
+            List<String> openTransactionsForSubtask =
+                    openTransactionsForTopics.stream()
+                            // ignore transactions from other applications
+                            .filter(name -> hasKnownPrefix(name, context))
+                            //  look only at transactions owned by this subtask
+                            .filter(context::ownsTransactionalId)
+                            .collect(Collectors.toList());
+
+            LOG.warn(
+                    "Found {} open transactions for subtask {}: {}",
+                    openTransactionsForSubtask.size(),
+                    context.getCurrentSubtaskId(),
+                    openTransactionsForSubtask);
+            // look only at transactions coming from this application
+            // remove transactions that are owned by the committer
+            TransactionAborter transactionAborter = 
context.getTransactionAborter();
+            for (String name : openTransactionsForSubtask) {
+                if (context.getPrecommittedTransactionalIds().contains(name)) {
+                    LOG.debug(
+                            "Skipping transaction {} because it's in the list 
of transactions to be committed",
+                            name);
+                    continue;
+                }
+                transactionAborter.abortTransaction(name);
+            }
+        }
+
+        private boolean hasKnownPrefix(String name, Context context) {
+            return 
context.getPrefixesToAbort().stream().anyMatch(name::startsWith);
+        }
     };
 
     private static final Logger LOG = 
LoggerFactory.getLogger(TransactionAbortStrategyImpl.class);
@@ -126,12 +169,42 @@ public enum TransactionAbortStrategyImpl {
 
     /** Context for the {@link TransactionAbortStrategyImpl}. */
     public interface Context {
+        /**
+         * Returns the list of all open transactions for the topics retrieved 
through introspection.
+         */
+        Collection<String> getOpenTransactionsForTopics();
+
         int getCurrentSubtaskId();
 
         int getCurrentParallelism();
 
+        /**
+         * Subtask must abort transactions that they own and must not abort 
any transaction that
+         * they don't own. Ownership is defined as follows:
+         *
+         * <ul>
+         *   <li>Writer states contains the old subtask id and the max 
parallelism when it was
+         *       snapshotted.
+         *   <li>On recovery, the state is reassigned somewhere. The new 
assignee takes the
+         *       ownership.
+         *   <li>If there was a downscale, one subtask may own several old 
subtask ids.
+         *   <li>If there was an upscale, the ids usually remain stable but 
that isn't necessary for
+         *       the abort to work.
+         *   <li>Any number of intermediate attempts between the recovered 
checkpoint and the
+         *       current attempt may produce transactions > then the current 
parallelism. In that
+         *       case, subtask ids are distributed in a round robin fashion 
using modulo.
+         * </ul>
+         */
+        boolean ownsTransactionalId(String transactionalId);
+
         Set<String> getPrefixesToAbort();
 
+        /**
+         * Returns a list of transactional ids that shouldn't be aborted 
because they are part of
+         * the committer state.
+         */
+        Set<String> getPrecommittedTransactionalIds();
+
         long getStartCheckpointId();
 
         TransactionAborter getTransactionAborter();
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionNamingStrategyImpl.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionNamingStrategyImpl.java
index f1be95a0..278ee9d9 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionNamingStrategyImpl.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionNamingStrategyImpl.java
@@ -26,7 +26,7 @@ import static org.apache.flink.util.Preconditions.checkState;
 /** Implementation of {@link TransactionNamingStrategy}. */
 @Internal
 public enum TransactionNamingStrategyImpl {
-    INCREMENTING {
+    INCREMENTING(TransactionOwnership.IMPLICIT_BY_SUBTASK_ID) {
         /**
          * For each checkpoint we create new {@link 
FlinkKafkaInternalProducer} so that new
          * transactions will not clash with transactions created during 
previous checkpoints ({@code
@@ -57,6 +57,12 @@ public enum TransactionNamingStrategyImpl {
         }
     };
 
+    private final TransactionOwnership ownership;
+
+    TransactionNamingStrategyImpl(TransactionOwnership ownership) {
+        this.ownership = ownership;
+    }
+
     /**
      * Returns a {@link FlinkKafkaInternalProducer} that will not clash with 
any ongoing
      * transactions.
@@ -64,6 +70,10 @@ public enum TransactionNamingStrategyImpl {
     public abstract FlinkKafkaInternalProducer<byte[], byte[]> 
getTransactionalProducer(
             Context context);
 
+    public TransactionOwnership getOwnership() {
+        return ownership;
+    }
+
     /** Context for the transaction naming strategy. */
     public interface Context {
         String buildTransactionalId(long offset);
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionOwnership.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionOwnership.java
new file mode 100644
index 00000000..8799ba09
--- /dev/null
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionOwnership.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.sink.internal;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.kafka.sink.KafkaWriterState;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.connector.kafka.sink.KafkaWriterState.UNKNOWN;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Describes the ownership model of transactional ids and with that ownership 
of the transactions.
+ *
+ * <p>A subtask that owns a transactional id is responsible for committing and 
aborting the
+ * transactions having that id. Only that subtask may create new ids.
+ *
+ * <p>Transactional ids have the form <code>transactionalIdPrefix + "-" + 
subtaskId + "-" + counter
+ * </code>. The prefix is given by the user, the subtask id is defined through 
the ownership model
+ * and the counter through the {@link
+ * org.apache.flink.connector.kafka.sink.TransactionNamingStrategy}.
+ *
+ * <p>For all strategies ownership is extrapolated for subtask ids beyond the 
currently known
+ * subtasks. This is necessary to support cases of intermediate upscaling 
where no checkpoint has
+ * been taken. Consider an application that runs with 3 subtasks and 
checkpointed. Later, its
+ * upscaled to 5 but then a failure happens. We need to have at least 5 open 
transactions. If the
+ * application is finally resumed from the checkpoint with 3 subtasks again. 
These 3 subtasks need
+ * to assume ownership of the remaining 2.
+ */
+@Internal
+public enum TransactionOwnership {
+    /**
+     * The ownership is determined by the current subtask ID. Ownership is 
extrapolated by
+     * extracting the original subtask id of the ongoing transactions and 
applying modulo on the
+     * current parallelism.
+     */
+    IMPLICIT_BY_SUBTASK_ID {
+        @Override
+        public int[] getOwnedSubtaskIds(
+                int currentSubtaskId,
+                int currentParallelism,
+                Collection<KafkaWriterState> recoveredStates) {
+            if (!recoveredStates.isEmpty()) {
+                checkForMigration(recoveredStates);
+            }
+
+            return new int[] {currentSubtaskId};
+        }
+
+        private void checkForMigration(Collection<KafkaWriterState> 
recoveredStates) {
+            TransactionOwnership oldOwnership =
+                    recoveredStates.stream()
+                            .map(KafkaWriterState::getTransactionOwnership)
+                            .findFirst()
+                            .orElseThrow();
+            if (oldOwnership != this) {
+                throw new IllegalStateException(
+                        "Attempted to switch the transaction naming strategy 
back to INCREMENTING which may result in data loss.");
+            }
+        }
+
+        @Override
+        public int getTotalNumberOfOwnedSubtasks(
+                int currentSubtaskId,
+                int currentParallelism,
+                Collection<KafkaWriterState> recoveredStates) {
+            return currentParallelism;
+        }
+    },
+    /**
+     * The ownership is determined by the writer state that is recovered. Each 
writer may have
+     * multiple states each with a different subtask id (e.g. when 
downscaling).
+     *
+     * <p>Additionally, the maximum parallelism that has been observed is 
stored in the state and
+     * used to extrapolate ownership.
+     *
+     * <p>This ownership model has two assumption of the state assignment:
+     *
+     * <ul>
+     *   <li>State is assigned first to lower subtask ids and then to higher 
ones. In the upscaling
+     *       case, from oldP to newP, only tasks [0; oldP) are assigned. 
[oldP; newP) are not
+     *       assigned any state.
+     *   <li>State is uniformly assigned. In the upscaling case, none of the 
tasks have more than 1
+     *       state assigned.
+     * </ul>
+     *
+     * <p>Hence, the state is consecutively assigned to the subtasks from low 
to high.
+     *
+     * <p>With these assumption, this ownership model is able to recover from 
writer states with
+     * subtask id + max parallelism:
+     *
+     * <ul>
+     *   <li>If there is state, we can extract the owned subtask ids and the 
max parallelism from
+     *       the state.
+     *   <li>If there is no state, we can use the current subtask id and the 
max parallelism from
+     *       the current parallelism. The current subtask id cannot possibly 
be owned already. The
+     *       max parallelism in any state must be lower than the current max 
parallelism.
+     *   <li>Hence, no subtask id is owned by more than one task and all tasks 
have the same max
+     *       parallelism.
+     *   <li>Since all tasks have shared knowledge, we can exclusively assign 
all transactional ids.
+     *   <li>Since each subtask owns at least one transactional id, we can 
safely create new
+     *       transactional ids while other subtasks are still aborting their 
transactions.
+     * </ul>
+     */
+    EXPLICIT_BY_WRITER_STATE {
+        @Override
+        public int[] getOwnedSubtaskIds(
+                int currentSubtaskId,
+                int currentParallelism,
+                Collection<KafkaWriterState> recoveredStates) {
+            if (recoveredStates.isEmpty()) {
+                return new int[] {currentSubtaskId};
+            } else {
+                int[] ownedSubtaskIds =
+                        recoveredStates.stream()
+                                .mapToInt(KafkaWriterState::getOwnedSubtaskId)
+                                .sorted()
+                                .toArray();
+                assertKnown(ownedSubtaskIds[0]);
+
+                int maxParallelism =
+                        
recoveredStates.iterator().next().getTotalNumberOfOwnedSubtasks();
+                // Assumption of the ownership model: state is distributed 
consecutively across the
+                // subtasks starting with subtask 0
+                checkState(currentSubtaskId < maxParallelism, "State not 
consecutively assigned");
+
+                return ownedSubtaskIds;
+            }
+        }
+
+        @Override
+        public int getTotalNumberOfOwnedSubtasks(
+                int currentSubtaskId,
+                int currentParallelism,
+                Collection<KafkaWriterState> recoveredStates) {
+            if (recoveredStates.isEmpty()) {
+                return currentParallelism;
+            }
+            Set<Integer> numSubtasks =
+                    recoveredStates.stream()
+                            
.map(KafkaWriterState::getTotalNumberOfOwnedSubtasks)
+                            .collect(Collectors.toSet());
+            checkState(numSubtasks.size() == 1, "Writer states not in sync 
%s", recoveredStates);
+            int totalNumberOfOwnedSubtasks = numSubtasks.iterator().next();
+            assertKnown(totalNumberOfOwnedSubtasks);
+
+            if (currentParallelism >= totalNumberOfOwnedSubtasks) {
+                // Assumption of the ownership model: state is distributed 
consecutively across the
+                // subtasks starting with subtask 0
+                checkState(recoveredStates.size() == 1, "Not uniformly 
assigned");
+            }
+            return Math.max(totalNumberOfOwnedSubtasks, currentParallelism);
+        }
+
+        private void assertKnown(int ownershipValue) {
+            checkState(
+                    ownershipValue != UNKNOWN,
+                    "Attempted to migrate from flink-connector-kafka 3.X 
directly to a naming strategy that uses the new writer state. Please first 
migrate to a flink-connector-kafka 4.X with INCREMENTING.");
+        }
+    };
+
+    /** Returns the owned subtask ids for this subtask. */
+    public abstract int[] getOwnedSubtaskIds(
+            int currentSubtaskId,
+            int currentParallelism,
+            Collection<KafkaWriterState> recoveredStates);
+
+    /** Returns the total number of owned subtasks across all subtasks. */
+    public abstract int getTotalNumberOfOwnedSubtasks(
+            int currentSubtaskId,
+            int currentParallelism,
+            Collection<KafkaWriterState> recoveredStates);
+}
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionalIdFactory.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionalIdFactory.java
index 1c0c6263..3208483f 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionalIdFactory.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionalIdFactory.java
@@ -43,4 +43,17 @@ public class TransactionalIdFactory {
                 + TRANSACTIONAL_ID_DELIMITER
                 + checkpointOffset;
     }
+
+    public static long extractSubtaskId(String name) {
+        int lastSep = name.lastIndexOf("-");
+        int secondLastSep = name.lastIndexOf("-", lastSep - 1);
+        String subtaskString = name.substring(secondLastSep + 1, lastSep);
+        return Long.parseLong(subtaskString);
+    }
+
+    public static String extractPrefix(String name) {
+        int lastSep = name.lastIndexOf("-");
+        int secondLastSep = name.lastIndexOf("-", lastSep - 1);
+        return name.substring(0, secondLastSep);
+    }
 }
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriberUtils.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriberUtils.java
deleted file mode 100644
index 478e3a35..00000000
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriberUtils.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.kafka.source.enumerator.subscriber;
-
-import org.apache.kafka.clients.admin.AdminClient;
-import org.apache.kafka.clients.admin.TopicDescription;
-
-import java.util.Map;
-import java.util.Set;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
-
-/** The base implementations of {@link KafkaSubscriber}. */
-class KafkaSubscriberUtils {
-
-    private KafkaSubscriberUtils() {}
-
-    static Map<String, TopicDescription> getTopicMetadata(
-            AdminClient adminClient, Pattern topicPattern) {
-        try {
-            Set<String> allTopicNames = adminClient.listTopics().names().get();
-            Set<String> matchedTopicNames =
-                    allTopicNames.stream()
-                            .filter(name -> 
topicPattern.matcher(name).matches())
-                            .collect(Collectors.toSet());
-            return getTopicMetadata(adminClient, matchedTopicNames);
-        } catch (Exception e) {
-            throw new RuntimeException(
-                    String.format("Failed to get metadata for %s topics.", 
topicPattern.pattern()),
-                    e);
-        }
-    }
-
-    static Map<String, TopicDescription> getTopicMetadata(
-            AdminClient adminClient, Set<String> topicNames) {
-        try {
-            return 
adminClient.describeTopics(topicNames).allTopicNames().get();
-        } catch (Exception e) {
-            throw new RuntimeException(
-                    String.format("Failed to get metadata for topics %s.", 
topicNames), e);
-        }
-    }
-}
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/PartitionSetSubscriber.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/PartitionSetSubscriber.java
index 9cd50fb2..6ddf7c57 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/PartitionSetSubscriber.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/PartitionSetSubscriber.java
@@ -33,7 +33,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 
-import static 
org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.getTopicMetadata;
+import static 
org.apache.flink.connector.kafka.util.AdminUtils.getTopicMetadata;
 
 /** A subscriber for a partition set. */
 class PartitionSetSubscriber implements KafkaSubscriber, 
KafkaDatasetIdentifierProvider {
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/TopicListSubscriber.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/TopicListSubscriber.java
index e86ade0f..20234bc1 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/TopicListSubscriber.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/TopicListSubscriber.java
@@ -34,7 +34,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 
-import static 
org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.getTopicMetadata;
+import static 
org.apache.flink.connector.kafka.util.AdminUtils.getTopicMetadata;
 
 /**
  * A subscriber to a fixed list of topics. The subscribed topics must have 
existed in the Kafka
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/TopicPatternSubscriber.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/TopicPatternSubscriber.java
index 208959e2..e525cc59 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/TopicPatternSubscriber.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/TopicPatternSubscriber.java
@@ -34,7 +34,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.regex.Pattern;
 
-import static 
org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.getTopicMetadata;
+import static 
org.apache.flink.connector.kafka.util.AdminUtils.getTopicMetadata;
 
 /** A subscriber to a topic pattern. */
 class TopicPatternSubscriber implements KafkaSubscriber, 
KafkaDatasetIdentifierProvider {
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/util/AdminUtils.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/util/AdminUtils.java
new file mode 100644
index 00000000..977c6520
--- /dev/null
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/util/AdminUtils.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.util;
+
+import org.apache.flink.annotation.Internal;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.DescribeProducersResult;
+import org.apache.kafka.clients.admin.ListTransactionsOptions;
+import org.apache.kafka.clients.admin.ProducerState;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.admin.TransactionListing;
+import org.apache.kafka.clients.admin.TransactionState;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+/** Utility methods for Kafka admin operations. */
+@Internal
+public class AdminUtils {
+
+    private AdminUtils() {}
+
+    public static Map<String, TopicDescription> getTopicMetadata(
+            Admin admin, Pattern topicPattern) {
+        try {
+            Set<String> matchedTopicNames = getTopicsByPattern(admin, 
topicPattern);
+            return getTopicMetadata(admin, matchedTopicNames);
+        } catch (Exception e) {
+            checkIfInterrupted(e);
+            throw new RuntimeException(
+                    String.format("Failed to get metadata for %s topics.", 
topicPattern.pattern()),
+                    e);
+        }
+    }
+
+    public static Set<String> getTopicsByPattern(Admin admin, Pattern 
topicPattern) {
+        try {
+            Set<String> allTopicNames = admin.listTopics().names().get();
+            return allTopicNames.stream()
+                    .filter(name -> topicPattern.matcher(name).matches())
+                    .collect(Collectors.toSet());
+        } catch (Exception e) {
+            checkIfInterrupted(e);
+            throw new RuntimeException(
+                    String.format("Failed to get metadata for %s topics.", 
topicPattern.pattern()),
+                    e);
+        }
+    }
+
+    public static Map<String, TopicDescription> getTopicMetadata(
+            Admin admin, Collection<String> topicNames) {
+        try {
+            return admin.describeTopics(topicNames).allTopicNames().get();
+        } catch (Exception e) {
+            checkIfInterrupted(e);
+            throw new RuntimeException(
+                    String.format("Failed to get metadata for topics %s.", 
topicNames), e);
+        }
+    }
+
+    public static Map<TopicPartition, 
DescribeProducersResult.PartitionProducerState>
+            getProducerStates(Admin admin, Collection<String> topicNames) {
+        try {
+            return admin.describeProducers(getTopicPartitions(admin, 
topicNames)).all().get();
+        } catch (Exception e) {
+            checkIfInterrupted(e);
+            throw new RuntimeException(
+                    String.format("Failed to get producers for topics %s.", 
topicNames), e);
+        }
+    }
+
+    public static Collection<Long> getProducerIds(Admin admin, 
Collection<String> topicNames) {
+        return getProducerStates(admin, topicNames).values().stream()
+                .flatMap(
+                        producerState ->
+                                producerState.activeProducers().stream()
+                                        .map(ProducerState::producerId))
+                .collect(Collectors.toList());
+    }
+
+    public static Collection<TransactionListing> getOpenTransactionsForTopics(
+            Admin admin, Collection<String> topicNames) {
+        try {
+            return admin.listTransactions(
+                            new ListTransactionsOptions()
+                                    .filterProducerIds(getProducerIds(admin, 
topicNames))
+                                    
.filterStates(List.of(TransactionState.ONGOING)))
+                    .all()
+                    .get();
+        } catch (Exception e) {
+            checkIfInterrupted(e);
+            throw new RuntimeException(
+                    String.format(
+                            "Failed to get open transactions for topics %s. 
Make sure that the Kafka broker has at least version 3.0 and the application 
has read permissions on the target topics.",
+                            topicNames),
+                    e);
+        }
+    }
+
+    private static void checkIfInterrupted(Exception e) {
+        if (e instanceof InterruptedException) {
+            Thread.currentThread().interrupt();
+        }
+    }
+
+    public static List<TopicPartition> getTopicPartitions(
+            Admin admin, Collection<String> topicNames) {
+        return getTopicMetadata(admin, topicNames).values().stream()
+                .flatMap(
+                        t ->
+                                t.partitions().stream()
+                                        .map(p -> new TopicPartition(t.name(), 
p.partition())))
+                .collect(Collectors.toList());
+    }
+}
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriterITCase.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriterITCase.java
index e04613c1..83c5b560 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriterITCase.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriterITCase.java
@@ -23,6 +23,7 @@ import org.apache.flink.connector.base.DeliveryGuarantee;
 import 
org.apache.flink.connector.kafka.sink.internal.FlinkKafkaInternalProducer;
 import org.apache.flink.connector.kafka.sink.internal.ProducerPoolImpl;
 import org.apache.flink.connector.kafka.sink.internal.TransactionFinished;
+import org.apache.flink.connector.kafka.sink.internal.TransactionOwnership;
 import org.apache.flink.connector.kafka.sink.internal.WritableBackchannel;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
@@ -211,7 +212,13 @@ public class ExactlyOnceKafkaWriterITCase extends 
KafkaWriterTestBase {
             onCheckpointBarrier(failedWriter, 2);
 
             // use state to ensure that the new writer knows about the old 
prefix
-            KafkaWriterState state = new 
KafkaWriterState(failedWriter.getTransactionalIdPrefix());
+            KafkaWriterState state =
+                    new KafkaWriterState(
+                            failedWriter.getTransactionalIdPrefix(),
+                            0,
+                            1,
+                            TransactionOwnership.IMPLICIT_BY_SUBTASK_ID,
+                            List.of());
 
             try (final KafkaWriter<Integer> recoveredWriter =
                     restoreWriter(EXACTLY_ONCE, List.of(state), 
createInitContext())) {
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterStateSerializerTest.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterStateSerializerTest.java
index 3df0ea88..f3aaaa8f 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterStateSerializerTest.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterStateSerializerTest.java
@@ -17,11 +17,14 @@
 
 package org.apache.flink.connector.kafka.sink;
 
+import org.apache.flink.connector.kafka.sink.internal.CheckpointTransaction;
+import org.apache.flink.connector.kafka.sink.internal.TransactionOwnership;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
 import java.io.IOException;
+import java.util.Arrays;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -35,8 +38,16 @@ public class KafkaWriterStateSerializerTest extends 
TestLogger {
 
     @Test
     public void testStateSerDe() throws IOException {
-        final KafkaWriterState state = new KafkaWriterState("idPrefix");
+        final KafkaWriterState state =
+                new KafkaWriterState(
+                        "idPrefix",
+                        0,
+                        1,
+                        TransactionOwnership.IMPLICIT_BY_SUBTASK_ID,
+                        Arrays.asList(
+                                new CheckpointTransaction("id1", 5L),
+                                new CheckpointTransaction("id2", 6L)));
         final byte[] serialized = SERIALIZER.serialize(state);
-        assertThat(SERIALIZER.deserialize(1, serialized)).isEqualTo(state);
+        assertThat(SERIALIZER.deserialize(2, serialized)).isEqualTo(state);
     }
 }
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/internal/ProducerPoolImplITCase.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/internal/ProducerPoolImplITCase.java
index 4848f586..ddc0592f 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/internal/ProducerPoolImplITCase.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/internal/ProducerPoolImplITCase.java
@@ -27,10 +27,15 @@ import 
org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 import org.testcontainers.containers.KafkaContainer;
 import org.testcontainers.junit.jupiter.Container;
 import org.testcontainers.junit.jupiter.Testcontainers;
 
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
 import java.util.Properties;
 import java.util.function.Consumer;
 
@@ -64,7 +69,8 @@ class ProducerPoolImplITCase {
 
     @Test
     void testGetTransactionalProducer() throws Exception {
-        try (ProducerPoolImpl producerPool = new 
ProducerPoolImpl(getProducerConfig(), INIT)) {
+        try (ProducerPoolImpl producerPool =
+                new ProducerPoolImpl(getProducerConfig(), INIT, 
Collections.emptyList())) {
 
             FlinkKafkaInternalProducer<byte[], byte[]> producer =
                     producerPool.getTransactionalProducer(TRANSACTIONAL_ID, 
1L);
@@ -80,7 +86,8 @@ class ProducerPoolImplITCase {
     /** Tests direct recycling as used during abortion of transactions. */
     @Test
     void testRecycleProducer() throws Exception {
-        try (ProducerPoolImpl producerPool = new 
ProducerPoolImpl(getProducerConfig(), INIT)) {
+        try (ProducerPoolImpl producerPool =
+                new ProducerPoolImpl(getProducerConfig(), INIT, 
Collections.emptyList())) {
             FlinkKafkaInternalProducer<byte[], byte[]> producer =
                     producerPool.getTransactionalProducer(TRANSACTIONAL_ID, 
1L);
 
@@ -97,7 +104,8 @@ class ProducerPoolImplITCase {
     /** Tests indirect recycling triggered through the backchannel. */
     @Test
     void testRecycleByTransactionId() throws Exception {
-        try (ProducerPoolImpl producerPool = new 
ProducerPoolImpl(getProducerConfig(), INIT)) {
+        try (ProducerPoolImpl producerPool =
+                new ProducerPoolImpl(getProducerConfig(), INIT, 
Collections.emptyList())) {
             FlinkKafkaInternalProducer<byte[], byte[]> producer =
                     producerPool.getTransactionalProducer(TRANSACTIONAL_ID, 
1L);
 
@@ -114,10 +122,45 @@ class ProducerPoolImplITCase {
         }
     }
 
+    /** Tests the edge case where some transaction ids are implicitly closed. 
*/
+    @ParameterizedTest
+    @ValueSource(longs = {2, 3})
+    void testEarlierTransactionRecycleByTransactionId(long finishedCheckpoint) 
throws Exception {
+        CheckpointTransaction oldTransaction1 =
+                new CheckpointTransaction(TRANSACTIONAL_ID + "-0", 1L);
+        CheckpointTransaction oldTransaction2 =
+                new CheckpointTransaction(TRANSACTIONAL_ID + "-1", 2L);
+
+        try (ProducerPoolImpl producerPool =
+                new ProducerPoolImpl(
+                        getProducerConfig(),
+                        INIT,
+                        Arrays.asList(oldTransaction1, oldTransaction2))) {
+            FlinkKafkaInternalProducer<byte[], byte[]> producer =
+                    producerPool.getTransactionalProducer(
+                            TRANSACTIONAL_ID + "-2", finishedCheckpoint);
+
+            assertThat(producerPool.getOngoingTransactions()).hasSize(3);
+
+            assertThat(producerPool.getProducers()).isEmpty();
+            producer.beginTransaction();
+            producerPool.recycleByTransactionId(TRANSACTIONAL_ID + "-2", true);
+            assertThat(producerPool.getProducers()).contains(producer);
+
+            // expect that old transactions have been removed where checkpoint 
id is smaller
+            if (finishedCheckpoint == 2) {
+                assertThat(producerPool.getOngoingTransactions()).hasSize(1);
+            } else {
+                assertThat(producerPool.getOngoingTransactions()).hasSize(0);
+            }
+        }
+    }
+
     /** Tests indirect recycling triggered through the backchannel. */
     @Test
     void testCloseByTransactionId() throws Exception {
-        try (ProducerPoolImpl producerPool = new 
ProducerPoolImpl(getProducerConfig(), INIT)) {
+        try (ProducerPoolImpl producerPool =
+                new ProducerPoolImpl(getProducerConfig(), INIT, List.of())) {
             FlinkKafkaInternalProducer<byte[], byte[]> producer =
                     producerPool.getTransactionalProducer(TRANSACTIONAL_ID, 
1L);
 
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/internal/TransactionIdFactoryTest.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/internal/TransactionIdFactoryTest.java
new file mode 100644
index 00000000..d274496a
--- /dev/null
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/internal/TransactionIdFactoryTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.sink.internal;
+
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link TransactionalIdFactory}. */
+public class TransactionIdFactoryTest extends TestLogger {
+
+    @Test
+    public void testBuildTransactionalId() {
+        final String expected = "prefix-1-2";
+        assertThat(TransactionalIdFactory.buildTransactionalId("prefix", 1, 
2L))
+                .isEqualTo(expected);
+    }
+
+    @Test
+    public void testExtractSubtaskId() {
+        final String transactionalId = "prefix-1-2";
+        
assertThat(TransactionalIdFactory.extractSubtaskId(transactionalId)).isEqualTo(1);
+    }
+
+    @Test
+    public void testExtractPrefix() {
+        final String transactionalId = "prefix-1-2";
+        
assertThat(TransactionalIdFactory.extractPrefix(transactionalId)).isEqualTo("prefix");
+    }
+}

Reply via email to