This is an automated email from the ASF dual-hosted git repository. arvid pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git
commit fb28015127b5d16f3eb3f60d5684d8bb476779f9 Author: Arvid Heise <[email protected]> AuthorDate: Mon Mar 31 11:10:50 2025 +0200 [FLINK-34554] Adding listing abort strategy Use ListTransactionAPI to exactly query the lingering transactions and abort them. The writer needs to keep transactions that are to be committed in state to not accidentally also cancel them. --- .../c0d94764-76a0-4c50-b617-70b1754c4612 | 19 +- .../kafka/sink/ExactlyOnceKafkaWriter.java | 113 ++++++++++-- .../connector/kafka/sink/KafkaWriterState.java | 75 +++++++- .../kafka/sink/KafkaWriterStateSerializer.java | 44 ++++- .../kafka/sink/internal/ProducerPool.java | 5 + .../kafka/sink/internal/ProducerPoolImpl.java | 104 +++++++---- .../TransactionAbortStrategyContextImpl.java | 50 +++++- .../internal/TransactionAbortStrategyImpl.java | 73 ++++++++ .../internal/TransactionNamingStrategyImpl.java | 12 +- .../kafka/sink/internal/TransactionOwnership.java | 192 +++++++++++++++++++++ .../sink/internal/TransactionalIdFactory.java | 13 ++ .../subscriber/KafkaSubscriberUtils.java | 59 ------- .../subscriber/PartitionSetSubscriber.java | 2 +- .../enumerator/subscriber/TopicListSubscriber.java | 2 +- .../subscriber/TopicPatternSubscriber.java | 2 +- .../flink/connector/kafka/util/AdminUtils.java | 137 +++++++++++++++ .../kafka/sink/ExactlyOnceKafkaWriterITCase.java | 9 +- .../kafka/sink/KafkaWriterStateSerializerTest.java | 15 +- .../sink/internal/ProducerPoolImplITCase.java | 51 +++++- .../sink/internal/TransactionIdFactoryTest.java | 48 ++++++ 20 files changed, 892 insertions(+), 133 deletions(-) diff --git a/flink-connector-kafka/archunit-violations/c0d94764-76a0-4c50-b617-70b1754c4612 b/flink-connector-kafka/archunit-violations/c0d94764-76a0-4c50-b617-70b1754c4612 index b153c29d..20e55f4c 100644 --- a/flink-connector-kafka/archunit-violations/c0d94764-76a0-4c50-b617-70b1754c4612 +++ b/flink-connector-kafka/archunit-violations/c0d94764-76a0-4c50-b617-70b1754c4612 @@ -3,6 +3,7 @@ Class <org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator$ Class <org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator$PartitionOffsetsRetrieverImpl> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaSourceEnumerator.java:0) Constructor <org.apache.flink.connector.kafka.dynamic.source.enumerator.DynamicKafkaSourceEnumerator.<init>(org.apache.flink.connector.kafka.dynamic.source.enumerator.subscriber.KafkaStreamSubscriber, org.apache.flink.connector.kafka.dynamic.metadata.KafkaMetadataService, org.apache.flink.api.connector.source.SplitEnumeratorContext, org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer, org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInit [...] Constructor <org.apache.flink.connector.kafka.dynamic.source.reader.DynamicKafkaSourceReader.<init>(org.apache.flink.api.connector.source.SourceReaderContext, org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema, java.util.Properties)> calls constructor <org.apache.flink.streaming.runtime.io.MultipleFuturesAvailabilityHelper.<init>(int)> in (DynamicKafkaSourceReader.java:114) +Constructor <org.apache.flink.connector.kafka.sink.KafkaWriterState.<init>(java.lang.String, int, int, org.apache.flink.connector.kafka.sink.internal.TransactionOwnership, java.util.Collection)> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaWriterState.java:0) Constructor <org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaRecordSerializationSchema.<init>(java.util.List, java.util.regex.Pattern, org.apache.flink.connector.kafka.sink.KafkaPartitioner, org.apache.flink.api.common.serialization.SerializationSchema, org.apache.flink.api.common.serialization.SerializationSchema, [Lorg.apache.flink.table.data.RowData$FieldGetter;, [Lorg.apache.flink.table.data.RowData$FieldGetter;, boolean, [I, boolean)> has parameter of type <[Lorg.apach [...] Field <org.apache.flink.connector.kafka.dynamic.source.metrics.KafkaClusterMetricGroupManager.metricGroups> has generic type <java.util.Map<java.lang.String, org.apache.flink.runtime.metrics.groups.AbstractMetricGroup>> with type argument depending on <org.apache.flink.runtime.metrics.groups.AbstractMetricGroup> in (KafkaClusterMetricGroupManager.java:0) Field <org.apache.flink.connector.kafka.dynamic.source.reader.DynamicKafkaSourceReader.availabilityHelper> has type <org.apache.flink.streaming.runtime.io.MultipleFuturesAvailabilityHelper> in (DynamicKafkaSourceReader.java:0) @@ -25,14 +26,14 @@ Method <org.apache.flink.connector.kafka.sink.ExactlyOnceKafkaWriter.getProducer Method <org.apache.flink.connector.kafka.sink.ExactlyOnceKafkaWriter.getTransactionalIdPrefix()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (ExactlyOnceKafkaWriter.java:0) Method <org.apache.flink.connector.kafka.sink.KafkaCommitter.getBackchannel()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaCommitter.java:0) Method <org.apache.flink.connector.kafka.sink.KafkaCommitter.getCommittingProducer()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaCommitter.java:0) -Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.getCoLocationGroupKey()> in (KafkaSink.java:170) -Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.getInputs()> in (KafkaSink.java:173) -Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.getOutputType()> in (KafkaSink.java:169) -Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.setCoLocationGroupKey(java.lang.String)> in (KafkaSink.java:172) -Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> checks instanceof <org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo> in (KafkaSink.java:169) +Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.getCoLocationGroupKey()> in (KafkaSink.java:177) +Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.getInputs()> in (KafkaSink.java:180) +Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.getOutputType()> in (KafkaSink.java:176) +Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.setCoLocationGroupKey(java.lang.String)> in (KafkaSink.java:179) +Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> checks instanceof <org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo> in (KafkaSink.java:176) Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> has generic parameter type <org.apache.flink.streaming.api.datastream.DataStream<org.apache.flink.streaming.api.connector.sink2.CommittableMessage<org.apache.flink.connector.kafka.sink.KafkaCommittable>>> with type argument depending on <org.apache.flink.streaming.api.connector.sink2.CommittableMessage> in (KafkaSink.java:0) Method <org.apache.flink.connector.kafka.sink.KafkaSink.getKafkaProducerConfig()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaSink.java:0) -Method <org.apache.flink.connector.kafka.sink.KafkaSinkBuilder.setRecordSerializer(org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema)> calls method <org.apache.flink.api.java.ClosureCleaner.clean(java.lang.Object, org.apache.flink.api.common.ExecutionConfig$ClosureCleanerLevel, boolean)> in (KafkaSinkBuilder.java:152) +Method <org.apache.flink.connector.kafka.sink.KafkaSinkBuilder.setRecordSerializer(org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema)> calls method <org.apache.flink.api.java.ClosureCleaner.clean(java.lang.Object, org.apache.flink.api.common.ExecutionConfig$ClosureCleanerLevel, boolean)> in (KafkaSinkBuilder.java:153) Method <org.apache.flink.connector.kafka.sink.KafkaWriter.getCurrentProducer()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaWriter.java:0) Method <org.apache.flink.connector.kafka.sink.internal.ProducerPoolImpl.getProducers()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (ProducerPoolImpl.java:0) Method <org.apache.flink.connector.kafka.source.KafkaSource.createReader(org.apache.flink.api.connector.source.SourceReaderContext, java.util.function.Consumer)> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaSource.java:0) @@ -48,8 +49,8 @@ Method <org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader Method <org.apache.flink.connector.kafka.source.reader.KafkaSourceReader.getNumAliveFetchers()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaSourceReader.java:0) Method <org.apache.flink.connector.kafka.source.reader.KafkaSourceReader.getOffsetsToCommit()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaSourceReader.java:0) Method <org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaRecordSerializationSchema.createProjectedRow(org.apache.flink.table.data.RowData, org.apache.flink.types.RowKind, [Lorg.apache.flink.table.data.RowData$FieldGetter;)> has parameter of type <[Lorg.apache.flink.table.data.RowData$FieldGetter;> in (DynamicKafkaRecordSerializationSchema.java:0) -Method <org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.createKeyFormatProjection(org.apache.flink.configuration.ReadableConfig, org.apache.flink.table.types.DataType)> calls method <org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getFieldNames(org.apache.flink.table.types.logical.LogicalType)> in (KafkaConnectorOptionsUtil.java:525) -Method <org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.createValueFormatProjection(org.apache.flink.configuration.ReadableConfig, org.apache.flink.table.types.DataType)> calls method <org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getFieldCount(org.apache.flink.table.types.logical.LogicalType)> in (KafkaConnectorOptionsUtil.java:569) +Method <org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.createKeyFormatProjection(org.apache.flink.configuration.ReadableConfig, org.apache.flink.table.types.DataType)> calls method <org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getFieldNames(org.apache.flink.table.types.logical.LogicalType)> in (KafkaConnectorOptionsUtil.java:520) +Method <org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.createValueFormatProjection(org.apache.flink.configuration.ReadableConfig, org.apache.flink.table.types.DataType)> calls method <org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getFieldCount(org.apache.flink.table.types.logical.LogicalType)> in (KafkaConnectorOptionsUtil.java:564) Method <org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSink.createSerialization(org.apache.flink.table.connector.sink.DynamicTableSink$Context, org.apache.flink.table.connector.format.EncodingFormat, [I, java.lang.String)> calls method <org.apache.flink.table.types.utils.DataTypeUtils.stripRowPrefix(org.apache.flink.table.types.DataType, java.lang.String)> in (KafkaDynamicSink.java:401) Method <org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSink.getFieldGetters(java.util.List, [I)> has return type <[Lorg.apache.flink.table.data.RowData$FieldGetter;> in (KafkaDynamicSink.java:0) -Method <org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.createDeserialization(org.apache.flink.table.connector.source.DynamicTableSource$Context, org.apache.flink.table.connector.format.DecodingFormat, [I, java.lang.String)> calls method <org.apache.flink.table.types.utils.DataTypeUtils.stripRowPrefix(org.apache.flink.table.types.DataType, java.lang.String)> in (KafkaDynamicSource.java:580) +Method <org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.createDeserialization(org.apache.flink.table.connector.source.DynamicTableSource$Context, org.apache.flink.table.connector.format.DecodingFormat, [I, java.lang.String)> calls method <org.apache.flink.table.types.utils.DataTypeUtils.stripRowPrefix(org.apache.flink.table.types.DataType, java.lang.String)> in (KafkaDynamicSource.java:574) diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriter.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriter.java index 32131056..e75436d8 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriter.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriter.java @@ -18,10 +18,15 @@ package org.apache.flink.connector.kafka.sink; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.TaskInfo; import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.api.connector.sink2.WriterInitContext; import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.kafka.lineage.KafkaDatasetFacet; +import org.apache.flink.connector.kafka.lineage.KafkaDatasetFacetProvider; +import org.apache.flink.connector.kafka.lineage.KafkaDatasetIdentifier; import org.apache.flink.connector.kafka.sink.internal.BackchannelFactory; +import org.apache.flink.connector.kafka.sink.internal.CheckpointTransaction; import org.apache.flink.connector.kafka.sink.internal.FlinkKafkaInternalProducer; import org.apache.flink.connector.kafka.sink.internal.ProducerPool; import org.apache.flink.connector.kafka.sink.internal.ProducerPoolImpl; @@ -31,9 +36,13 @@ import org.apache.flink.connector.kafka.sink.internal.TransactionAbortStrategyIm import org.apache.flink.connector.kafka.sink.internal.TransactionFinished; import org.apache.flink.connector.kafka.sink.internal.TransactionNamingStrategyContextImpl; import org.apache.flink.connector.kafka.sink.internal.TransactionNamingStrategyImpl; +import org.apache.flink.connector.kafka.sink.internal.TransactionOwnership; +import org.apache.flink.connector.kafka.util.AdminUtils; import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; import org.apache.flink.util.FlinkRuntimeException; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.errors.ProducerFencedException; import org.slf4j.Logger; @@ -46,6 +55,8 @@ import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.Properties; +import java.util.Set; +import java.util.stream.Collectors; import static org.apache.flink.util.IOUtils.closeAll; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -69,7 +80,6 @@ class ExactlyOnceKafkaWriter<IN> extends KafkaWriter<IN> { /** Strategy to name transactions. */ private final TransactionNamingStrategyImpl transactionNamingStrategy; - private final KafkaWriterState kafkaWriterState; private final Collection<KafkaWriterState> recoveredStates; private final long restoredCheckpointId; @@ -88,6 +98,11 @@ class ExactlyOnceKafkaWriter<IN> extends KafkaWriter<IN> { /** The context used to name transactions. */ private final TransactionNamingStrategyContextImpl namingContext; + private final int totalNumberOfOwnedSubtasks; + private final int[] ownedSubtaskIds; + /** Lazily created admin client for {@link TransactionAbortStrategyImpl}. */ + private AdminClient adminClient; + /** * Constructor creating a kafka writer. * @@ -133,25 +148,37 @@ class ExactlyOnceKafkaWriter<IN> extends KafkaWriter<IN> { throw new FlinkRuntimeException("Cannot initialize schema.", e); } - this.kafkaWriterState = new KafkaWriterState(transactionalIdPrefix); - this.recoveredStates = checkNotNull(recoveredStates, "recoveredStates"); + TaskInfo taskInfo = sinkInitContext.getTaskInfo(); + TransactionOwnership ownership = transactionNamingStrategy.getOwnership(); + int subtaskId = taskInfo.getIndexOfThisSubtask(); + int parallelism = taskInfo.getNumberOfParallelSubtasks(); + this.ownedSubtaskIds = + ownership.getOwnedSubtaskIds(subtaskId, parallelism, recoveredStates); + this.totalNumberOfOwnedSubtasks = + ownership.getTotalNumberOfOwnedSubtasks(subtaskId, parallelism, recoveredStates); initFlinkMetrics(); restoredCheckpointId = sinkInitContext .getRestoredCheckpointId() .orElse(CheckpointIDCounter.INITIAL_CHECKPOINT_ID - 1); - int subtaskId = sinkInitContext.getTaskInfo().getIndexOfThisSubtask(); - this.producerPool = new ProducerPoolImpl(kafkaProducerConfig, this::initKafkaMetrics); + this.producerPool = + new ProducerPoolImpl( + kafkaProducerConfig, + this::initKafkaMetrics, + recoveredStates.stream() + .flatMap(r -> r.getPrecommittedTransactionalIds().stream()) + .collect(Collectors.toList())); this.backchannel = BackchannelFactory.getInstance() .getReadableBackchannel( - subtaskId, - sinkInitContext.getTaskInfo().getAttemptNumber(), - transactionalIdPrefix); + subtaskId, taskInfo.getAttemptNumber(), transactionalIdPrefix); this.namingContext = new TransactionNamingStrategyContextImpl( - transactionalIdPrefix, subtaskId, restoredCheckpointId, producerPool); + transactionalIdPrefix, + this.ownedSubtaskIds[0], + restoredCheckpointId, + producerPool); } @Override @@ -202,8 +229,30 @@ class ExactlyOnceKafkaWriter<IN> extends KafkaWriter<IN> { producerPool.recycleByTransactionId( finishedTransaction.getTransactionId(), finishedTransaction.isSuccess()); } + // persist the ongoing transactions into the state; these will not be aborted on restart + Collection<CheckpointTransaction> ongoingTransactions = + producerPool.getOngoingTransactions(); currentProducer = startTransaction(checkpointId + 1); - return Collections.singletonList(kafkaWriterState); + return createSnapshots(ongoingTransactions); + } + + private List<KafkaWriterState> createSnapshots( + Collection<CheckpointTransaction> ongoingTransactions) { + List<KafkaWriterState> states = new ArrayList<>(); + int[] subtaskIds = this.ownedSubtaskIds; + for (int index = 0; index < subtaskIds.length; index++) { + int ownedSubtask = subtaskIds[index]; + states.add( + new KafkaWriterState( + transactionalIdPrefix, + ownedSubtask, + totalNumberOfOwnedSubtasks, + transactionNamingStrategy.getOwnership(), + // new transactions are only created with the first owned subtask id + index == 0 ? ongoingTransactions : List.of())); + } + LOG.debug("Snapshotting state {}", states); + return states; } @Override @@ -281,11 +330,53 @@ class ExactlyOnceKafkaWriter<IN> extends KafkaWriter<IN> { producerPool.recycle(producer); return epoch; }; + Set<String> precommittedTransactionalIds = + recoveredStates.stream() + .flatMap( + s -> + s.getPrecommittedTransactionalIds().stream() + .map(CheckpointTransaction::getTransactionalId)) + .collect(Collectors.toSet()); return new TransactionAbortStrategyContextImpl( + this::getTopicNames, kafkaSinkContext.getParallelInstanceId(), kafkaSinkContext.getNumberOfParallelInstances(), + ownedSubtaskIds, + totalNumberOfOwnedSubtasks, prefixesToAbort, startCheckpointId, - aborter); + aborter, + this::getAdminClient, + precommittedTransactionalIds); + } + + private Collection<String> getTopicNames() { + KafkaDatasetIdentifier identifier = + getDatasetIdentifier() + .orElseThrow( + () -> + new IllegalStateException( + "The record serializer does not expose a static list of target topics.")); + if (identifier.getTopics() != null) { + return identifier.getTopics(); + } + return AdminUtils.getTopicsByPattern(getAdminClient(), identifier.getTopicPattern()); + } + + private Optional<KafkaDatasetIdentifier> getDatasetIdentifier() { + if (recordSerializer instanceof KafkaDatasetFacetProvider) { + Optional<KafkaDatasetFacet> kafkaDatasetFacet = + ((KafkaDatasetFacetProvider) recordSerializer).getKafkaDatasetFacet(); + + return kafkaDatasetFacet.map(KafkaDatasetFacet::getTopicIdentifier); + } + return Optional.empty(); + } + + private Admin getAdminClient() { + if (adminClient == null) { + adminClient = AdminClient.create(kafkaProducerConfig); + } + return adminClient; } } diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriterState.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriterState.java index b4482c69..5c2e742e 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriterState.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriterState.java @@ -17,44 +17,101 @@ package org.apache.flink.connector.kafka.sink; +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.connector.kafka.sink.internal.CheckpointTransaction; +import org.apache.flink.connector.kafka.sink.internal.TransactionOwnership; + +import java.util.Collection; import java.util.Objects; import static org.apache.flink.util.Preconditions.checkNotNull; -class KafkaWriterState { +/** The state of the Kafka writer. Used to capture information regarding transactions. */ +@Internal +public class KafkaWriterState { + public static final int UNKNOWN = -1; + private final String transactionalIdPrefix; + private final int ownedSubtaskId; + private final int totalNumberOfOwnedSubtasks; + private final TransactionOwnership transactionOwnership; + private final Collection<CheckpointTransaction> precommittedTransactionalIds; - KafkaWriterState(String transactionalIdPrefix) { + @VisibleForTesting + public KafkaWriterState( + String transactionalIdPrefix, + int ownedSubtaskId, + int totalNumberOfOwnedSubtasks, + TransactionOwnership transactionOwnership, + Collection<CheckpointTransaction> precommittedTransactionalIds) { this.transactionalIdPrefix = checkNotNull(transactionalIdPrefix, "transactionalIdPrefix"); + this.ownedSubtaskId = ownedSubtaskId; + this.totalNumberOfOwnedSubtasks = totalNumberOfOwnedSubtasks; + this.transactionOwnership = transactionOwnership; + this.precommittedTransactionalIds = precommittedTransactionalIds; } public String getTransactionalIdPrefix() { return transactionalIdPrefix; } + public int getOwnedSubtaskId() { + return ownedSubtaskId; + } + + public int getTotalNumberOfOwnedSubtasks() { + return totalNumberOfOwnedSubtasks; + } + + public Collection<CheckpointTransaction> getPrecommittedTransactionalIds() { + return precommittedTransactionalIds; + } + + public TransactionOwnership getTransactionOwnership() { + return transactionOwnership; + } + @Override - public boolean equals(Object o) { - if (this == o) { + public boolean equals(Object object) { + if (this == object) { return true; } - if (o == null || getClass() != o.getClass()) { + if (object == null || getClass() != object.getClass()) { return false; } - KafkaWriterState that = (KafkaWriterState) o; - return transactionalIdPrefix.equals(that.transactionalIdPrefix); + KafkaWriterState that = (KafkaWriterState) object; + return ownedSubtaskId == that.ownedSubtaskId + && totalNumberOfOwnedSubtasks == that.totalNumberOfOwnedSubtasks + && Objects.equals(transactionalIdPrefix, that.transactionalIdPrefix) + && transactionOwnership == that.transactionOwnership + && Objects.equals(precommittedTransactionalIds, that.precommittedTransactionalIds); } @Override public int hashCode() { - return Objects.hash(transactionalIdPrefix); + return Objects.hash( + transactionalIdPrefix, + ownedSubtaskId, + totalNumberOfOwnedSubtasks, + transactionOwnership, + precommittedTransactionalIds); } @Override public String toString() { return "KafkaWriterState{" - + ", transactionalIdPrefix='" + + "transactionalIdPrefix='" + transactionalIdPrefix + '\'' + + ", ownedSubtaskId=" + + ownedSubtaskId + + ", totalNumberOfOwnedSubtasks=" + + totalNumberOfOwnedSubtasks + + ", transactionOwnership=" + + transactionOwnership + + ", precommittedTransactionalIds=" + + precommittedTransactionalIds + '}'; } } diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriterStateSerializer.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriterStateSerializer.java index 5c91967c..529d80b0 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriterStateSerializer.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriterStateSerializer.java @@ -17,6 +17,8 @@ package org.apache.flink.connector.kafka.sink; +import org.apache.flink.connector.kafka.sink.internal.CheckpointTransaction; +import org.apache.flink.connector.kafka.sink.internal.TransactionOwnership; import org.apache.flink.core.io.SimpleVersionedSerializer; import java.io.ByteArrayInputStream; @@ -24,13 +26,16 @@ import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; + +import static org.apache.flink.connector.kafka.sink.KafkaWriterState.UNKNOWN; /** A serializer used to serialize {@link KafkaWriterState}. */ class KafkaWriterStateSerializer implements SimpleVersionedSerializer<KafkaWriterState> { - @Override public int getVersion() { - return 1; + return 2; } @Override @@ -38,6 +43,14 @@ class KafkaWriterStateSerializer implements SimpleVersionedSerializer<KafkaWrite try (final ByteArrayOutputStream baos = new ByteArrayOutputStream(); final DataOutputStream out = new DataOutputStream(baos)) { out.writeUTF(state.getTransactionalIdPrefix()); + out.writeInt(state.getOwnedSubtaskId()); + out.writeInt(state.getTotalNumberOfOwnedSubtasks()); + out.writeInt(state.getTransactionOwnership().ordinal()); + out.writeInt(state.getPrecommittedTransactionalIds().size()); + for (CheckpointTransaction transaction : state.getPrecommittedTransactionalIds()) { + out.writeUTF(transaction.getTransactionalId()); + out.writeLong(transaction.getCheckpointId()); + } out.flush(); return baos.toByteArray(); } @@ -45,10 +58,33 @@ class KafkaWriterStateSerializer implements SimpleVersionedSerializer<KafkaWrite @Override public KafkaWriterState deserialize(int version, byte[] serialized) throws IOException { + if (version > 2) { + throw new IOException("Unknown version: " + version); + } + try (final ByteArrayInputStream bais = new ByteArrayInputStream(serialized); final DataInputStream in = new DataInputStream(bais)) { - final String transactionalIdPrefx = in.readUTF(); - return new KafkaWriterState(transactionalIdPrefx); + final String transactionalIdPrefix = in.readUTF(); + int ownedSubtaskId = UNKNOWN; + int totalNumberOfOwnedSubtasks = UNKNOWN; + TransactionOwnership transactionOwnership = TransactionOwnership.IMPLICIT_BY_SUBTASK_ID; + final Collection<CheckpointTransaction> precommitted = new ArrayList<>(); + if (version == 2) { + ownedSubtaskId = in.readInt(); + totalNumberOfOwnedSubtasks = in.readInt(); + transactionOwnership = TransactionOwnership.values()[in.readInt()]; + + final int usedTransactionIdsSize = in.readInt(); + for (int i = 0; i < usedTransactionIdsSize; i++) { + precommitted.add(new CheckpointTransaction(in.readUTF(), in.readLong())); + } + } + return new KafkaWriterState( + transactionalIdPrefix, + ownedSubtaskId, + totalNumberOfOwnedSubtasks, + transactionOwnership, + precommitted); } } } diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/ProducerPool.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/ProducerPool.java index d47cca28..77a95aa4 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/ProducerPool.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/ProducerPool.java @@ -20,6 +20,8 @@ package org.apache.flink.connector.kafka.sink.internal; import org.apache.flink.annotation.Internal; +import java.util.Collection; + /** A pool of producers that can be recycled. */ @Internal public interface ProducerPool extends AutoCloseable { @@ -38,6 +40,9 @@ public interface ProducerPool extends AutoCloseable { FlinkKafkaInternalProducer<byte[], byte[]> getTransactionalProducer( String transactionalId, long checkpointId); + /** Returns a snapshot of all ongoing transactions. */ + Collection<CheckpointTransaction> getOngoingTransactions(); + /** * Explicitly recycle a producer. This is useful when the producer has not been passed to the * committer. diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/ProducerPoolImpl.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/ProducerPoolImpl.java index 38a159ed..3789ea09 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/ProducerPoolImpl.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/ProducerPoolImpl.java @@ -21,6 +21,7 @@ package org.apache.flink.connector.kafka.sink.internal; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.kafka.common.KafkaException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,9 +29,11 @@ import javax.annotation.Nullable; import javax.annotation.concurrent.NotThreadSafe; import java.util.ArrayDeque; +import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; import java.util.Deque; +import java.util.Iterator; import java.util.Map; import java.util.NavigableMap; import java.util.Objects; @@ -96,28 +99,39 @@ public class ProducerPoolImpl implements ProducerPool { * checkpoints. */ private final NavigableMap<CheckpointTransaction, String> transactionalIdsByCheckpoint = - new TreeMap<>(Comparator.comparing(CheckpointTransaction::getCheckpointId)); + new TreeMap<>( + Comparator.comparing(CheckpointTransaction::getCheckpointId) + .thenComparing(CheckpointTransaction::getTransactionalId)); /** Creates a new {@link ProducerPoolImpl}. */ public ProducerPoolImpl( Properties kafkaProducerConfig, - Consumer<FlinkKafkaInternalProducer<byte[], byte[]>> producerInit) { + Consumer<FlinkKafkaInternalProducer<byte[], byte[]>> producerInit, + Collection<CheckpointTransaction> precommittedTransactions) { this.kafkaProducerConfig = checkNotNull(kafkaProducerConfig, "kafkaProducerConfig must not be null"); this.producerInit = checkNotNull(producerInit, "producerInit must not be null"); + + initPrecommittedTransactions(precommittedTransactions); } @Override public void recycleByTransactionId(String transactionalId, boolean success) { ProducerEntry producerEntry = producerByTransactionalId.remove(transactionalId); LOG.debug("Transaction {} finished, producer {}", transactionalId, producerEntry); + if (producerEntry == null) { - // during recovery, the committer may finish transactions that are not yet ongoing from - // the writer's perspective - // these transaction will be closed by the second half of this method eventually + LOG.info( + "Received unmatched producer for transaction {}. This is expected during rescale.", + transactionalId); + // recycle of unmatched entries happens on next checkpoint at the second half of this + // method return; } + long finishedChkId = producerEntry.getCheckpointedTransaction().getCheckpointId(); + boolean hasTransactionsFromPreviousCheckpoint = + transactionalIdsByCheckpoint.firstKey().getCheckpointId() != finishedChkId; transactionalIdsByCheckpoint.remove(producerEntry.getCheckpointedTransaction()); if (success) { recycleProducer(producerEntry.getProducer()); @@ -125,23 +139,26 @@ public class ProducerPoolImpl implements ProducerPool { closeProducer(producerEntry.getProducer()); } - // In rare cases (only for non-chained committer), some transactions may not be detected to - // be finished. + // In rare cases (non-chained committer or recovery), some transactions may not be detected + // to be finished. // For example, a transaction may be committed at the same time the writer state is // snapshot. The writer contains the transaction as ongoing but the committer state will // later not contain it. // In these cases, we make use of the fact that committables are processed in order of the // checkpoint id. // That means a transaction state with checkpoint id C implies that all C' < C are finished. - NavigableMap<CheckpointTransaction, String> earlierTransactions = - transactionalIdsByCheckpoint.headMap( - producerEntry.getCheckpointedTransaction(), false); - if (!earlierTransactions.isEmpty()) { - for (String id : earlierTransactions.values()) { - ProducerEntry entry = producerByTransactionalId.remove(id); - closeProducer(entry.getProducer()); + if (hasTransactionsFromPreviousCheckpoint) { + // We can safely remove all transactions with checkpoint id < finishedChkId. + // Entries are primarily sorted by checkpoint id + Iterator<Map.Entry<CheckpointTransaction, String>> iterator = + transactionalIdsByCheckpoint.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry<CheckpointTransaction, String> entry = iterator.next(); + if (entry.getKey().getCheckpointId() < finishedChkId) { + iterator.remove(); + closeProducer(producerByTransactionalId.remove(entry.getValue()).getProducer()); + } } - earlierTransactions.clear(); } } @@ -166,18 +183,38 @@ public class ProducerPoolImpl implements ProducerPool { return; } - // For non-chained committer, we have a split brain scenario: - // Both the writer and the committer have a producer representing the same transaction. - // The committer producer has finished the transaction while the writer producer is still in - // transaction. - if (producer.isInTransaction()) { - // Here we just double-commit the same transaction which succeeds in all cases - // because the producer shares the same epoch as the committer's producer - producer.commitTransaction(); + try { + // For non-chained committer, we have a split brain scenario: + // Both the writer and the committer have a producer representing the same transaction. + // The committer producer has finished the transaction while the writer producer is + // still in transaction. + if (producer.isInTransaction()) { + // Here we just double-commit the same transaction which succeeds in all cases + // because the producer shares the same epoch as the committer's producer + producer.commitTransaction(); + } + + producerPool.add(producer); + + LOG.debug("Recycling {}, new pool size {}", producer, producerPool.size()); + } catch (KafkaException e) { + closeProducer(producer); + + LOG.debug( + "Encountered exception while double-committing, discarding producer {}: {}", + producer, + e); } - producerPool.add(producer); + } - LOG.debug("Recycling {}, new pool size {}", producer, producerPool.size()); + private void initPrecommittedTransactions( + Collection<CheckpointTransaction> precommittedTransactions) { + for (CheckpointTransaction transaction : precommittedTransactions) { + this.transactionalIdsByCheckpoint.put(transaction, transaction.getTransactionalId()); + this.producerByTransactionalId.put( + transaction.getTransactionalId(), new ProducerEntry(null, transaction)); + } + LOG.debug("Initialized ongoing transactions from state {}", precommittedTransactions); } @Override @@ -211,6 +248,11 @@ public class ProducerPoolImpl implements ProducerPool { return producer; } + @Override + public Collection<CheckpointTransaction> getOngoingTransactions() { + return new ArrayList<>(transactionalIdsByCheckpoint.keySet()); + } + @VisibleForTesting public Collection<FlinkKafkaInternalProducer<byte[], byte[]>> getProducers() { return producerPool; @@ -235,13 +277,13 @@ public class ProducerPoolImpl implements ProducerPool { } private static class ProducerEntry { - private final FlinkKafkaInternalProducer<byte[], byte[]> producer; + @Nullable private final FlinkKafkaInternalProducer<byte[], byte[]> producer; private final CheckpointTransaction checkpointedTransaction; private ProducerEntry( - FlinkKafkaInternalProducer<byte[], byte[]> producer, + @Nullable FlinkKafkaInternalProducer<byte[], byte[]> producer, CheckpointTransaction checkpointedTransaction) { - this.producer = checkNotNull(producer, "producer must not be null"); + this.producer = producer; this.checkpointedTransaction = checkNotNull( checkpointedTransaction, "checkpointedTransaction must not be null"); @@ -251,13 +293,17 @@ public class ProducerPoolImpl implements ProducerPool { return checkpointedTransaction; } + @Nullable public FlinkKafkaInternalProducer<byte[], byte[]> getProducer() { return producer; } @Override public String toString() { - return producer.toString(); + if (producer != null) { + return producer.toString(); + } + return checkpointedTransaction.toString(); } } } diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionAbortStrategyContextImpl.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionAbortStrategyContextImpl.java index 80f6386e..c88485ce 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionAbortStrategyContextImpl.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionAbortStrategyContextImpl.java @@ -20,34 +20,77 @@ package org.apache.flink.connector.kafka.sink.internal; import org.apache.flink.annotation.Internal; import org.apache.flink.connector.kafka.sink.internal.TransactionAbortStrategyImpl.TransactionAborter; +import org.apache.flink.connector.kafka.util.AdminUtils; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.TransactionListing; + +import java.util.Arrays; +import java.util.Collection; import java.util.List; import java.util.Set; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import static org.apache.flink.connector.kafka.sink.internal.TransactionalIdFactory.extractSubtaskId; import static org.apache.flink.util.Preconditions.checkNotNull; /** Implementation of {@link TransactionAbortStrategyImpl.Context}. */ @Internal public class TransactionAbortStrategyContextImpl implements TransactionAbortStrategyImpl.Context { + private final Supplier<Admin> adminSupplier; + private final Supplier<Collection<String>> topicNames; private final int currentSubtaskId; private final int currentParallelism; + private final Set<Integer> ownedSubtaskIds; + private final int totalNumberOfOwnedSubtasks; private final Set<String> prefixesToAbort; private final long startCheckpointId; private final TransactionAborter transactionAborter; + /** Transactional ids that mustn't be aborted. */ + private final Set<String> precommittedTransactionIds; /** Creates a new {@link TransactionAbortStrategyContextImpl}. */ public TransactionAbortStrategyContextImpl( + Supplier<Collection<String>> topicNames, int currentSubtaskId, int currentParallelism, + int[] ownedSubtaskIds, + int totalNumberOfOwnedSubtasks, List<String> prefixesToAbort, long startCheckpointId, - TransactionAborter transactionAborter) { + TransactionAborter transactionAborter, + Supplier<Admin> adminSupplier, + Set<String> precommittedTransactionIds) { + this.topicNames = checkNotNull(topicNames, "topicNames must not be null"); this.currentSubtaskId = currentSubtaskId; this.currentParallelism = currentParallelism; + this.ownedSubtaskIds = Arrays.stream(ownedSubtaskIds).boxed().collect(Collectors.toSet()); + this.totalNumberOfOwnedSubtasks = totalNumberOfOwnedSubtasks; this.prefixesToAbort = Set.copyOf(prefixesToAbort); this.startCheckpointId = startCheckpointId; this.transactionAborter = checkNotNull(transactionAborter, "transactionAborter must not be null"); + this.adminSupplier = checkNotNull(adminSupplier, "adminSupplier must not be null"); + this.precommittedTransactionIds = + checkNotNull( + precommittedTransactionIds, "transactionsToBeCommitted must not be null"); + } + + @Override + public Collection<String> getOpenTransactionsForTopics() { + return AdminUtils.getOpenTransactionsForTopics(adminSupplier.get(), topicNames.get()) + .stream() + .map(TransactionListing::transactionalId) + .collect(Collectors.toList()); + } + + @Override + public boolean ownsTransactionalId(String transactionalId) { + // Only the subtask that owns a respective state with oldSubtaskId can abort it + // For upscaling: use the modulo operator to extrapolate ownership + return ownedSubtaskIds.contains( + extractSubtaskId(transactionalId) % totalNumberOfOwnedSubtasks); } @Override @@ -65,6 +108,11 @@ public class TransactionAbortStrategyContextImpl implements TransactionAbortStra return prefixesToAbort; } + @Override + public Set<String> getPrecommittedTransactionalIds() { + return precommittedTransactionIds; + } + @Override public long getStartCheckpointId() { return startCheckpointId; diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionAbortStrategyImpl.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionAbortStrategyImpl.java index 6ea69586..3b8e3d0b 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionAbortStrategyImpl.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionAbortStrategyImpl.java @@ -23,7 +23,10 @@ import org.apache.flink.annotation.Internal; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Collection; +import java.util.List; import java.util.Set; +import java.util.stream.Collectors; /** Implementations of an abort strategy for transactions left over from previous runs. */ @Internal @@ -112,6 +115,46 @@ public enum TransactionAbortStrategyImpl { } return numTransactionAborted; } + }, + LISTING { + @Override + public void abortTransactions(Context context) { + Collection<String> openTransactionsForTopics = context.getOpenTransactionsForTopics(); + + if (openTransactionsForTopics.isEmpty()) { + return; + } + + List<String> openTransactionsForSubtask = + openTransactionsForTopics.stream() + // ignore transactions from other applications + .filter(name -> hasKnownPrefix(name, context)) + // look only at transactions owned by this subtask + .filter(context::ownsTransactionalId) + .collect(Collectors.toList()); + + LOG.warn( + "Found {} open transactions for subtask {}: {}", + openTransactionsForSubtask.size(), + context.getCurrentSubtaskId(), + openTransactionsForSubtask); + // look only at transactions coming from this application + // remove transactions that are owned by the committer + TransactionAborter transactionAborter = context.getTransactionAborter(); + for (String name : openTransactionsForSubtask) { + if (context.getPrecommittedTransactionalIds().contains(name)) { + LOG.debug( + "Skipping transaction {} because it's in the list of transactions to be committed", + name); + continue; + } + transactionAborter.abortTransaction(name); + } + } + + private boolean hasKnownPrefix(String name, Context context) { + return context.getPrefixesToAbort().stream().anyMatch(name::startsWith); + } }; private static final Logger LOG = LoggerFactory.getLogger(TransactionAbortStrategyImpl.class); @@ -126,12 +169,42 @@ public enum TransactionAbortStrategyImpl { /** Context for the {@link TransactionAbortStrategyImpl}. */ public interface Context { + /** + * Returns the list of all open transactions for the topics retrieved through introspection. + */ + Collection<String> getOpenTransactionsForTopics(); + int getCurrentSubtaskId(); int getCurrentParallelism(); + /** + * Subtask must abort transactions that they own and must not abort any transaction that + * they don't own. Ownership is defined as follows: + * + * <ul> + * <li>Writer states contains the old subtask id and the max parallelism when it was + * snapshotted. + * <li>On recovery, the state is reassigned somewhere. The new assignee takes the + * ownership. + * <li>If there was a downscale, one subtask may own several old subtask ids. + * <li>If there was an upscale, the ids usually remain stable but that isn't necessary for + * the abort to work. + * <li>Any number of intermediate attempts between the recovered checkpoint and the + * current attempt may produce transactions > then the current parallelism. In that + * case, subtask ids are distributed in a round robin fashion using modulo. + * </ul> + */ + boolean ownsTransactionalId(String transactionalId); + Set<String> getPrefixesToAbort(); + /** + * Returns a list of transactional ids that shouldn't be aborted because they are part of + * the committer state. + */ + Set<String> getPrecommittedTransactionalIds(); + long getStartCheckpointId(); TransactionAborter getTransactionAborter(); diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionNamingStrategyImpl.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionNamingStrategyImpl.java index f1be95a0..278ee9d9 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionNamingStrategyImpl.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionNamingStrategyImpl.java @@ -26,7 +26,7 @@ import static org.apache.flink.util.Preconditions.checkState; /** Implementation of {@link TransactionNamingStrategy}. */ @Internal public enum TransactionNamingStrategyImpl { - INCREMENTING { + INCREMENTING(TransactionOwnership.IMPLICIT_BY_SUBTASK_ID) { /** * For each checkpoint we create new {@link FlinkKafkaInternalProducer} so that new * transactions will not clash with transactions created during previous checkpoints ({@code @@ -57,6 +57,12 @@ public enum TransactionNamingStrategyImpl { } }; + private final TransactionOwnership ownership; + + TransactionNamingStrategyImpl(TransactionOwnership ownership) { + this.ownership = ownership; + } + /** * Returns a {@link FlinkKafkaInternalProducer} that will not clash with any ongoing * transactions. @@ -64,6 +70,10 @@ public enum TransactionNamingStrategyImpl { public abstract FlinkKafkaInternalProducer<byte[], byte[]> getTransactionalProducer( Context context); + public TransactionOwnership getOwnership() { + return ownership; + } + /** Context for the transaction naming strategy. */ public interface Context { String buildTransactionalId(long offset); diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionOwnership.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionOwnership.java new file mode 100644 index 00000000..8799ba09 --- /dev/null +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionOwnership.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kafka.sink.internal; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.kafka.sink.KafkaWriterState; + +import java.util.Collection; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.flink.connector.kafka.sink.KafkaWriterState.UNKNOWN; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Describes the ownership model of transactional ids and with that ownership of the transactions. + * + * <p>A subtask that owns a transactional id is responsible for committing and aborting the + * transactions having that id. Only that subtask may create new ids. + * + * <p>Transactional ids have the form <code>transactionalIdPrefix + "-" + subtaskId + "-" + counter + * </code>. The prefix is given by the user, the subtask id is defined through the ownership model + * and the counter through the {@link + * org.apache.flink.connector.kafka.sink.TransactionNamingStrategy}. + * + * <p>For all strategies ownership is extrapolated for subtask ids beyond the currently known + * subtasks. This is necessary to support cases of intermediate upscaling where no checkpoint has + * been taken. Consider an application that runs with 3 subtasks and checkpointed. Later, its + * upscaled to 5 but then a failure happens. We need to have at least 5 open transactions. If the + * application is finally resumed from the checkpoint with 3 subtasks again. These 3 subtasks need + * to assume ownership of the remaining 2. + */ +@Internal +public enum TransactionOwnership { + /** + * The ownership is determined by the current subtask ID. Ownership is extrapolated by + * extracting the original subtask id of the ongoing transactions and applying modulo on the + * current parallelism. + */ + IMPLICIT_BY_SUBTASK_ID { + @Override + public int[] getOwnedSubtaskIds( + int currentSubtaskId, + int currentParallelism, + Collection<KafkaWriterState> recoveredStates) { + if (!recoveredStates.isEmpty()) { + checkForMigration(recoveredStates); + } + + return new int[] {currentSubtaskId}; + } + + private void checkForMigration(Collection<KafkaWriterState> recoveredStates) { + TransactionOwnership oldOwnership = + recoveredStates.stream() + .map(KafkaWriterState::getTransactionOwnership) + .findFirst() + .orElseThrow(); + if (oldOwnership != this) { + throw new IllegalStateException( + "Attempted to switch the transaction naming strategy back to INCREMENTING which may result in data loss."); + } + } + + @Override + public int getTotalNumberOfOwnedSubtasks( + int currentSubtaskId, + int currentParallelism, + Collection<KafkaWriterState> recoveredStates) { + return currentParallelism; + } + }, + /** + * The ownership is determined by the writer state that is recovered. Each writer may have + * multiple states each with a different subtask id (e.g. when downscaling). + * + * <p>Additionally, the maximum parallelism that has been observed is stored in the state and + * used to extrapolate ownership. + * + * <p>This ownership model has two assumption of the state assignment: + * + * <ul> + * <li>State is assigned first to lower subtask ids and then to higher ones. In the upscaling + * case, from oldP to newP, only tasks [0; oldP) are assigned. [oldP; newP) are not + * assigned any state. + * <li>State is uniformly assigned. In the upscaling case, none of the tasks have more than 1 + * state assigned. + * </ul> + * + * <p>Hence, the state is consecutively assigned to the subtasks from low to high. + * + * <p>With these assumption, this ownership model is able to recover from writer states with + * subtask id + max parallelism: + * + * <ul> + * <li>If there is state, we can extract the owned subtask ids and the max parallelism from + * the state. + * <li>If there is no state, we can use the current subtask id and the max parallelism from + * the current parallelism. The current subtask id cannot possibly be owned already. The + * max parallelism in any state must be lower than the current max parallelism. + * <li>Hence, no subtask id is owned by more than one task and all tasks have the same max + * parallelism. + * <li>Since all tasks have shared knowledge, we can exclusively assign all transactional ids. + * <li>Since each subtask owns at least one transactional id, we can safely create new + * transactional ids while other subtasks are still aborting their transactions. + * </ul> + */ + EXPLICIT_BY_WRITER_STATE { + @Override + public int[] getOwnedSubtaskIds( + int currentSubtaskId, + int currentParallelism, + Collection<KafkaWriterState> recoveredStates) { + if (recoveredStates.isEmpty()) { + return new int[] {currentSubtaskId}; + } else { + int[] ownedSubtaskIds = + recoveredStates.stream() + .mapToInt(KafkaWriterState::getOwnedSubtaskId) + .sorted() + .toArray(); + assertKnown(ownedSubtaskIds[0]); + + int maxParallelism = + recoveredStates.iterator().next().getTotalNumberOfOwnedSubtasks(); + // Assumption of the ownership model: state is distributed consecutively across the + // subtasks starting with subtask 0 + checkState(currentSubtaskId < maxParallelism, "State not consecutively assigned"); + + return ownedSubtaskIds; + } + } + + @Override + public int getTotalNumberOfOwnedSubtasks( + int currentSubtaskId, + int currentParallelism, + Collection<KafkaWriterState> recoveredStates) { + if (recoveredStates.isEmpty()) { + return currentParallelism; + } + Set<Integer> numSubtasks = + recoveredStates.stream() + .map(KafkaWriterState::getTotalNumberOfOwnedSubtasks) + .collect(Collectors.toSet()); + checkState(numSubtasks.size() == 1, "Writer states not in sync %s", recoveredStates); + int totalNumberOfOwnedSubtasks = numSubtasks.iterator().next(); + assertKnown(totalNumberOfOwnedSubtasks); + + if (currentParallelism >= totalNumberOfOwnedSubtasks) { + // Assumption of the ownership model: state is distributed consecutively across the + // subtasks starting with subtask 0 + checkState(recoveredStates.size() == 1, "Not uniformly assigned"); + } + return Math.max(totalNumberOfOwnedSubtasks, currentParallelism); + } + + private void assertKnown(int ownershipValue) { + checkState( + ownershipValue != UNKNOWN, + "Attempted to migrate from flink-connector-kafka 3.X directly to a naming strategy that uses the new writer state. Please first migrate to a flink-connector-kafka 4.X with INCREMENTING."); + } + }; + + /** Returns the owned subtask ids for this subtask. */ + public abstract int[] getOwnedSubtaskIds( + int currentSubtaskId, + int currentParallelism, + Collection<KafkaWriterState> recoveredStates); + + /** Returns the total number of owned subtasks across all subtasks. */ + public abstract int getTotalNumberOfOwnedSubtasks( + int currentSubtaskId, + int currentParallelism, + Collection<KafkaWriterState> recoveredStates); +} diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionalIdFactory.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionalIdFactory.java index 1c0c6263..3208483f 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionalIdFactory.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionalIdFactory.java @@ -43,4 +43,17 @@ public class TransactionalIdFactory { + TRANSACTIONAL_ID_DELIMITER + checkpointOffset; } + + public static long extractSubtaskId(String name) { + int lastSep = name.lastIndexOf("-"); + int secondLastSep = name.lastIndexOf("-", lastSep - 1); + String subtaskString = name.substring(secondLastSep + 1, lastSep); + return Long.parseLong(subtaskString); + } + + public static String extractPrefix(String name) { + int lastSep = name.lastIndexOf("-"); + int secondLastSep = name.lastIndexOf("-", lastSep - 1); + return name.substring(0, secondLastSep); + } } diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriberUtils.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriberUtils.java deleted file mode 100644 index 478e3a35..00000000 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriberUtils.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.connector.kafka.source.enumerator.subscriber; - -import org.apache.kafka.clients.admin.AdminClient; -import org.apache.kafka.clients.admin.TopicDescription; - -import java.util.Map; -import java.util.Set; -import java.util.regex.Pattern; -import java.util.stream.Collectors; - -/** The base implementations of {@link KafkaSubscriber}. */ -class KafkaSubscriberUtils { - - private KafkaSubscriberUtils() {} - - static Map<String, TopicDescription> getTopicMetadata( - AdminClient adminClient, Pattern topicPattern) { - try { - Set<String> allTopicNames = adminClient.listTopics().names().get(); - Set<String> matchedTopicNames = - allTopicNames.stream() - .filter(name -> topicPattern.matcher(name).matches()) - .collect(Collectors.toSet()); - return getTopicMetadata(adminClient, matchedTopicNames); - } catch (Exception e) { - throw new RuntimeException( - String.format("Failed to get metadata for %s topics.", topicPattern.pattern()), - e); - } - } - - static Map<String, TopicDescription> getTopicMetadata( - AdminClient adminClient, Set<String> topicNames) { - try { - return adminClient.describeTopics(topicNames).allTopicNames().get(); - } catch (Exception e) { - throw new RuntimeException( - String.format("Failed to get metadata for topics %s.", topicNames), e); - } - } -} diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/PartitionSetSubscriber.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/PartitionSetSubscriber.java index 9cd50fb2..6ddf7c57 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/PartitionSetSubscriber.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/PartitionSetSubscriber.java @@ -33,7 +33,7 @@ import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; -import static org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.getTopicMetadata; +import static org.apache.flink.connector.kafka.util.AdminUtils.getTopicMetadata; /** A subscriber for a partition set. */ class PartitionSetSubscriber implements KafkaSubscriber, KafkaDatasetIdentifierProvider { diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/TopicListSubscriber.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/TopicListSubscriber.java index e86ade0f..20234bc1 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/TopicListSubscriber.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/TopicListSubscriber.java @@ -34,7 +34,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; -import static org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.getTopicMetadata; +import static org.apache.flink.connector.kafka.util.AdminUtils.getTopicMetadata; /** * A subscriber to a fixed list of topics. The subscribed topics must have existed in the Kafka diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/TopicPatternSubscriber.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/TopicPatternSubscriber.java index 208959e2..e525cc59 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/TopicPatternSubscriber.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/TopicPatternSubscriber.java @@ -34,7 +34,7 @@ import java.util.Optional; import java.util.Set; import java.util.regex.Pattern; -import static org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.getTopicMetadata; +import static org.apache.flink.connector.kafka.util.AdminUtils.getTopicMetadata; /** A subscriber to a topic pattern. */ class TopicPatternSubscriber implements KafkaSubscriber, KafkaDatasetIdentifierProvider { diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/util/AdminUtils.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/util/AdminUtils.java new file mode 100644 index 00000000..977c6520 --- /dev/null +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/util/AdminUtils.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kafka.util; + +import org.apache.flink.annotation.Internal; + +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.DescribeProducersResult; +import org.apache.kafka.clients.admin.ListTransactionsOptions; +import org.apache.kafka.clients.admin.ProducerState; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.clients.admin.TransactionListing; +import org.apache.kafka.clients.admin.TransactionState; +import org.apache.kafka.common.TopicPartition; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +/** Utility methods for Kafka admin operations. */ +@Internal +public class AdminUtils { + + private AdminUtils() {} + + public static Map<String, TopicDescription> getTopicMetadata( + Admin admin, Pattern topicPattern) { + try { + Set<String> matchedTopicNames = getTopicsByPattern(admin, topicPattern); + return getTopicMetadata(admin, matchedTopicNames); + } catch (Exception e) { + checkIfInterrupted(e); + throw new RuntimeException( + String.format("Failed to get metadata for %s topics.", topicPattern.pattern()), + e); + } + } + + public static Set<String> getTopicsByPattern(Admin admin, Pattern topicPattern) { + try { + Set<String> allTopicNames = admin.listTopics().names().get(); + return allTopicNames.stream() + .filter(name -> topicPattern.matcher(name).matches()) + .collect(Collectors.toSet()); + } catch (Exception e) { + checkIfInterrupted(e); + throw new RuntimeException( + String.format("Failed to get metadata for %s topics.", topicPattern.pattern()), + e); + } + } + + public static Map<String, TopicDescription> getTopicMetadata( + Admin admin, Collection<String> topicNames) { + try { + return admin.describeTopics(topicNames).allTopicNames().get(); + } catch (Exception e) { + checkIfInterrupted(e); + throw new RuntimeException( + String.format("Failed to get metadata for topics %s.", topicNames), e); + } + } + + public static Map<TopicPartition, DescribeProducersResult.PartitionProducerState> + getProducerStates(Admin admin, Collection<String> topicNames) { + try { + return admin.describeProducers(getTopicPartitions(admin, topicNames)).all().get(); + } catch (Exception e) { + checkIfInterrupted(e); + throw new RuntimeException( + String.format("Failed to get producers for topics %s.", topicNames), e); + } + } + + public static Collection<Long> getProducerIds(Admin admin, Collection<String> topicNames) { + return getProducerStates(admin, topicNames).values().stream() + .flatMap( + producerState -> + producerState.activeProducers().stream() + .map(ProducerState::producerId)) + .collect(Collectors.toList()); + } + + public static Collection<TransactionListing> getOpenTransactionsForTopics( + Admin admin, Collection<String> topicNames) { + try { + return admin.listTransactions( + new ListTransactionsOptions() + .filterProducerIds(getProducerIds(admin, topicNames)) + .filterStates(List.of(TransactionState.ONGOING))) + .all() + .get(); + } catch (Exception e) { + checkIfInterrupted(e); + throw new RuntimeException( + String.format( + "Failed to get open transactions for topics %s. Make sure that the Kafka broker has at least version 3.0 and the application has read permissions on the target topics.", + topicNames), + e); + } + } + + private static void checkIfInterrupted(Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + } + + public static List<TopicPartition> getTopicPartitions( + Admin admin, Collection<String> topicNames) { + return getTopicMetadata(admin, topicNames).values().stream() + .flatMap( + t -> + t.partitions().stream() + .map(p -> new TopicPartition(t.name(), p.partition()))) + .collect(Collectors.toList()); + } +} diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriterITCase.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriterITCase.java index e04613c1..83c5b560 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriterITCase.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriterITCase.java @@ -23,6 +23,7 @@ import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.kafka.sink.internal.FlinkKafkaInternalProducer; import org.apache.flink.connector.kafka.sink.internal.ProducerPoolImpl; import org.apache.flink.connector.kafka.sink.internal.TransactionFinished; +import org.apache.flink.connector.kafka.sink.internal.TransactionOwnership; import org.apache.flink.connector.kafka.sink.internal.WritableBackchannel; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.groups.SinkWriterMetricGroup; @@ -211,7 +212,13 @@ public class ExactlyOnceKafkaWriterITCase extends KafkaWriterTestBase { onCheckpointBarrier(failedWriter, 2); // use state to ensure that the new writer knows about the old prefix - KafkaWriterState state = new KafkaWriterState(failedWriter.getTransactionalIdPrefix()); + KafkaWriterState state = + new KafkaWriterState( + failedWriter.getTransactionalIdPrefix(), + 0, + 1, + TransactionOwnership.IMPLICIT_BY_SUBTASK_ID, + List.of()); try (final KafkaWriter<Integer> recoveredWriter = restoreWriter(EXACTLY_ONCE, List.of(state), createInitContext())) { diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterStateSerializerTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterStateSerializerTest.java index 3df0ea88..f3aaaa8f 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterStateSerializerTest.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterStateSerializerTest.java @@ -17,11 +17,14 @@ package org.apache.flink.connector.kafka.sink; +import org.apache.flink.connector.kafka.sink.internal.CheckpointTransaction; +import org.apache.flink.connector.kafka.sink.internal.TransactionOwnership; import org.apache.flink.util.TestLogger; import org.junit.Test; import java.io.IOException; +import java.util.Arrays; import static org.assertj.core.api.Assertions.assertThat; @@ -35,8 +38,16 @@ public class KafkaWriterStateSerializerTest extends TestLogger { @Test public void testStateSerDe() throws IOException { - final KafkaWriterState state = new KafkaWriterState("idPrefix"); + final KafkaWriterState state = + new KafkaWriterState( + "idPrefix", + 0, + 1, + TransactionOwnership.IMPLICIT_BY_SUBTASK_ID, + Arrays.asList( + new CheckpointTransaction("id1", 5L), + new CheckpointTransaction("id2", 6L))); final byte[] serialized = SERIALIZER.serialize(state); - assertThat(SERIALIZER.deserialize(1, serialized)).isEqualTo(state); + assertThat(SERIALIZER.deserialize(2, serialized)).isEqualTo(state); } } diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/internal/ProducerPoolImplITCase.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/internal/ProducerPoolImplITCase.java index 4848f586..ddc0592f 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/internal/ProducerPoolImplITCase.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/internal/ProducerPoolImplITCase.java @@ -27,10 +27,15 @@ import org.apache.kafka.common.serialization.ByteArraySerializer; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.testcontainers.containers.KafkaContainer; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; import java.util.Properties; import java.util.function.Consumer; @@ -64,7 +69,8 @@ class ProducerPoolImplITCase { @Test void testGetTransactionalProducer() throws Exception { - try (ProducerPoolImpl producerPool = new ProducerPoolImpl(getProducerConfig(), INIT)) { + try (ProducerPoolImpl producerPool = + new ProducerPoolImpl(getProducerConfig(), INIT, Collections.emptyList())) { FlinkKafkaInternalProducer<byte[], byte[]> producer = producerPool.getTransactionalProducer(TRANSACTIONAL_ID, 1L); @@ -80,7 +86,8 @@ class ProducerPoolImplITCase { /** Tests direct recycling as used during abortion of transactions. */ @Test void testRecycleProducer() throws Exception { - try (ProducerPoolImpl producerPool = new ProducerPoolImpl(getProducerConfig(), INIT)) { + try (ProducerPoolImpl producerPool = + new ProducerPoolImpl(getProducerConfig(), INIT, Collections.emptyList())) { FlinkKafkaInternalProducer<byte[], byte[]> producer = producerPool.getTransactionalProducer(TRANSACTIONAL_ID, 1L); @@ -97,7 +104,8 @@ class ProducerPoolImplITCase { /** Tests indirect recycling triggered through the backchannel. */ @Test void testRecycleByTransactionId() throws Exception { - try (ProducerPoolImpl producerPool = new ProducerPoolImpl(getProducerConfig(), INIT)) { + try (ProducerPoolImpl producerPool = + new ProducerPoolImpl(getProducerConfig(), INIT, Collections.emptyList())) { FlinkKafkaInternalProducer<byte[], byte[]> producer = producerPool.getTransactionalProducer(TRANSACTIONAL_ID, 1L); @@ -114,10 +122,45 @@ class ProducerPoolImplITCase { } } + /** Tests the edge case where some transaction ids are implicitly closed. */ + @ParameterizedTest + @ValueSource(longs = {2, 3}) + void testEarlierTransactionRecycleByTransactionId(long finishedCheckpoint) throws Exception { + CheckpointTransaction oldTransaction1 = + new CheckpointTransaction(TRANSACTIONAL_ID + "-0", 1L); + CheckpointTransaction oldTransaction2 = + new CheckpointTransaction(TRANSACTIONAL_ID + "-1", 2L); + + try (ProducerPoolImpl producerPool = + new ProducerPoolImpl( + getProducerConfig(), + INIT, + Arrays.asList(oldTransaction1, oldTransaction2))) { + FlinkKafkaInternalProducer<byte[], byte[]> producer = + producerPool.getTransactionalProducer( + TRANSACTIONAL_ID + "-2", finishedCheckpoint); + + assertThat(producerPool.getOngoingTransactions()).hasSize(3); + + assertThat(producerPool.getProducers()).isEmpty(); + producer.beginTransaction(); + producerPool.recycleByTransactionId(TRANSACTIONAL_ID + "-2", true); + assertThat(producerPool.getProducers()).contains(producer); + + // expect that old transactions have been removed where checkpoint id is smaller + if (finishedCheckpoint == 2) { + assertThat(producerPool.getOngoingTransactions()).hasSize(1); + } else { + assertThat(producerPool.getOngoingTransactions()).hasSize(0); + } + } + } + /** Tests indirect recycling triggered through the backchannel. */ @Test void testCloseByTransactionId() throws Exception { - try (ProducerPoolImpl producerPool = new ProducerPoolImpl(getProducerConfig(), INIT)) { + try (ProducerPoolImpl producerPool = + new ProducerPoolImpl(getProducerConfig(), INIT, List.of())) { FlinkKafkaInternalProducer<byte[], byte[]> producer = producerPool.getTransactionalProducer(TRANSACTIONAL_ID, 1L); diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/internal/TransactionIdFactoryTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/internal/TransactionIdFactoryTest.java new file mode 100644 index 00000000..d274496a --- /dev/null +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/internal/TransactionIdFactoryTest.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kafka.sink.internal; + +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link TransactionalIdFactory}. */ +public class TransactionIdFactoryTest extends TestLogger { + + @Test + public void testBuildTransactionalId() { + final String expected = "prefix-1-2"; + assertThat(TransactionalIdFactory.buildTransactionalId("prefix", 1, 2L)) + .isEqualTo(expected); + } + + @Test + public void testExtractSubtaskId() { + final String transactionalId = "prefix-1-2"; + assertThat(TransactionalIdFactory.extractSubtaskId(transactionalId)).isEqualTo(1); + } + + @Test + public void testExtractPrefix() { + final String transactionalId = "prefix-1-2"; + assertThat(TransactionalIdFactory.extractPrefix(transactionalId)).isEqualTo("prefix"); + } +}
