This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new fa36a7f2d66 MINOR: Push down logic from TransactionManager to TxnPartitionEntry (#14591) fa36a7f2d66 is described below commit fa36a7f2d664bac8524f830663023b83f5ee090b Author: Ismael Juma <ism...@juma.me.uk> AuthorDate: Sat Oct 28 07:27:20 2023 -0700 MINOR: Push down logic from TransactionManager to TxnPartitionEntry (#14591) And encapsulate TxnPartitionEntry state. This makes it easier to understand the behavior and the paths through which the state is updated. Reviewers: Justine Olshan <jols...@confluent.io> --- .../clients/producer/internals/ProducerBatch.java | 4 +- .../producer/internals/RecordAccumulator.java | 2 +- .../producer/internals/TransactionManager.java | 105 ++++-------------- .../producer/internals/TxnPartitionEntry.java | 115 ++++++++++++++++++-- .../producer/internals/TxnPartitionMap.java | 95 +++++++++++----- .../kafka/common/utils/ProducerIdAndEpoch.java | 2 +- .../clients/producer/internals/SenderTest.java | 120 ++++++++++----------- .../producer/internals/TransactionManagerTest.java | 86 +++++++-------- 8 files changed, 305 insertions(+), 224 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java index 408b8316eb8..61432b53ab0 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java @@ -473,11 +473,11 @@ public final class ProducerBatch { recordsBuilder.setProducerState(producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, baseSequence, isTransactional); } - public void resetProducerState(ProducerIdAndEpoch producerIdAndEpoch, int baseSequence, boolean isTransactional) { + public void resetProducerState(ProducerIdAndEpoch producerIdAndEpoch, int baseSequence) { log.info("Resetting sequence number of batch with current sequence {} for partition {} to {}", this.baseSequence(), this.topicPartition, baseSequence); reopened = true; - recordsBuilder.reopenAndRewriteProducerState(producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, baseSequence, isTransactional); + recordsBuilder.reopenAndRewriteProducerState(producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, baseSequence, isTransactional()); } /** diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index 5e1795cb2a1..45332dff391 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -818,7 +818,7 @@ public class RecordAccumulator { } private boolean shouldStopDrainBatchesForPartition(ProducerBatch first, TopicPartition tp) { - ProducerIdAndEpoch producerIdAndEpoch = null; + ProducerIdAndEpoch producerIdAndEpoch; if (transactionManager != null) { if (!transactionManager.isSendToPartitionAllowed(tp)) return true; diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index 3eaaca04aa6..1d343484388 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -48,7 +48,6 @@ import org.apache.kafka.common.message.FindCoordinatorResponseData.Coordinator; import org.apache.kafka.common.message.InitProducerIdRequestData; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.record.DefaultRecordBatch; import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.requests.AbstractRequest; import org.apache.kafka.common.requests.AbstractResponse; @@ -84,7 +83,6 @@ import java.util.OptionalInt; import java.util.OptionalLong; import java.util.PriorityQueue; import java.util.Set; -import java.util.SortedSet; import java.util.function.Supplier; /** @@ -92,7 +90,6 @@ import java.util.function.Supplier; */ public class TransactionManager { private static final int NO_INFLIGHT_REQUEST_CORRELATION_ID = -1; - static final int NO_LAST_ACKED_SEQUENCE_NUMBER = -1; private final Logger log; private final String transactionalId; @@ -271,7 +268,7 @@ public class TransactionManager { this.partitionsWithUnresolvedSequences = new HashMap<>(); this.partitionsToRewriteSequences = new HashSet<>(); this.retryBackoffMs = retryBackoffMs; - this.txnPartitionMap = new TxnPartitionMap(); + this.txnPartitionMap = new TxnPartitionMap(logContext); this.apiVersions = apiVersions; } @@ -521,7 +518,7 @@ public class TransactionManager { } private void resetSequenceForPartition(TopicPartition topicPartition) { - txnPartitionMap.topicPartitions.remove(topicPartition); + txnPartitionMap.remove(topicPartition); this.partitionsWithUnresolvedSequences.remove(topicPartition); } @@ -572,28 +569,25 @@ public class TransactionManager { /** * Returns the next sequence number to be written to the given TopicPartition. */ - synchronized Integer sequenceNumber(TopicPartition topicPartition) { - return txnPartitionMap.getOrCreate(topicPartition).nextSequence; + synchronized int sequenceNumber(TopicPartition topicPartition) { + return txnPartitionMap.getOrCreate(topicPartition).nextSequence(); } /** * Returns the current producer id/epoch of the given TopicPartition. */ synchronized ProducerIdAndEpoch producerIdAndEpoch(TopicPartition topicPartition) { - return txnPartitionMap.getOrCreate(topicPartition).producerIdAndEpoch; + return txnPartitionMap.getOrCreate(topicPartition).producerIdAndEpoch(); } synchronized void incrementSequenceNumber(TopicPartition topicPartition, int increment) { - Integer currentSequence = sequenceNumber(topicPartition); - - currentSequence = DefaultRecordBatch.incrementSequence(currentSequence, increment); - txnPartitionMap.get(topicPartition).nextSequence = currentSequence; + txnPartitionMap.get(topicPartition).incrementSequence(increment); } synchronized void addInFlightBatch(ProducerBatch batch) { if (!batch.hasSequence()) throw new IllegalStateException("Can't track batch for partition " + batch.topicPartition + " when sequence is not set."); - txnPartitionMap.get(batch.topicPartition).inflightBatchesBySequence.add(batch); + txnPartitionMap.get(batch.topicPartition).addInflightBatch(batch); } /** @@ -606,33 +600,21 @@ public class TransactionManager { synchronized int firstInFlightSequence(TopicPartition topicPartition) { if (!hasInflightBatches(topicPartition)) return RecordBatch.NO_SEQUENCE; - - SortedSet<ProducerBatch> inflightBatches = txnPartitionMap.get(topicPartition).inflightBatchesBySequence; - if (inflightBatches.isEmpty()) - return RecordBatch.NO_SEQUENCE; - else - return inflightBatches.first().baseSequence(); + ProducerBatch batch = nextBatchBySequence(topicPartition); + return batch == null ? RecordBatch.NO_SEQUENCE : batch.baseSequence(); } synchronized ProducerBatch nextBatchBySequence(TopicPartition topicPartition) { - SortedSet<ProducerBatch> queue = txnPartitionMap.get(topicPartition).inflightBatchesBySequence; - return queue.isEmpty() ? null : queue.first(); + return txnPartitionMap.nextBatchBySequence(topicPartition); } synchronized void removeInFlightBatch(ProducerBatch batch) { - if (hasInflightBatches(batch.topicPartition)) { - txnPartitionMap.get(batch.topicPartition).inflightBatchesBySequence.remove(batch); - } + if (hasInflightBatches(batch.topicPartition)) + txnPartitionMap.removeInFlightBatch(batch); } private int maybeUpdateLastAckedSequence(TopicPartition topicPartition, int sequence) { - int lastAckedSequence = lastAckedSequence(topicPartition).orElse(NO_LAST_ACKED_SEQUENCE_NUMBER); - if (sequence > lastAckedSequence) { - txnPartitionMap.get(topicPartition).lastAckedSequence = sequence; - return sequence; - } - - return lastAckedSequence; + return txnPartitionMap.maybeUpdateLastAckedSequence(topicPartition, sequence); } synchronized OptionalInt lastAckedSequence(TopicPartition topicPartition) { @@ -647,18 +629,7 @@ public class TransactionManager { if (response.baseOffset == ProduceResponse.INVALID_OFFSET) return; long lastOffset = response.baseOffset + batch.recordCount - 1; - OptionalLong lastAckedOffset = lastAckedOffset(batch.topicPartition); - // It might happen that the TransactionManager has been reset while a request was reenqueued and got a valid - // response for this. This can happen only if the producer is only idempotent (not transactional) and in - // this case there will be no tracked bookkeeper entry about it, so we have to insert one. - if (!lastAckedOffset.isPresent() && !isTransactional()) { - txnPartitionMap.getOrCreate(batch.topicPartition); - } - if (lastOffset > lastAckedOffset.orElse(ProduceResponse.INVALID_OFFSET)) { - txnPartitionMap.get(batch.topicPartition).lastAckedOffset = lastOffset; - } else { - log.trace("Partition {} keeps lastOffset at {}", batch.topicPartition, lastOffset); - } + txnPartitionMap.updateLastAckedOffset(batch.topicPartition, isTransactional(), lastOffset); } public synchronized void handleCompletedBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response) { @@ -724,50 +695,18 @@ public class TransactionManager { if (!isTransactional()) { requestEpochBumpForPartition(batch.topicPartition); } else { - adjustSequencesDueToFailedBatch(batch); + txnPartitionMap.adjustSequencesDueToFailedBatch(batch); } } } } - // If a batch is failed fatally, the sequence numbers for future batches bound for the partition must be adjusted - // so that they don't fail with the OutOfOrderSequenceException. - // - // This method must only be called when we know that the batch is question has been unequivocally failed by the broker, - // ie. it has received a confirmed fatal status code like 'Message Too Large' or something similar. - private void adjustSequencesDueToFailedBatch(ProducerBatch batch) { - if (!txnPartitionMap.contains(batch.topicPartition)) - // Sequence numbers are not being tracked for this partition. This could happen if the producer id was just - // reset due to a previous OutOfOrderSequenceException. - return; - log.debug("producerId: {}, send to partition {} failed fatally. Reducing future sequence numbers by {}", - batch.producerId(), batch.topicPartition, batch.recordCount); - int currentSequence = sequenceNumber(batch.topicPartition); - currentSequence -= batch.recordCount; - if (currentSequence < 0) - throw new IllegalStateException("Sequence number for partition " + batch.topicPartition + " is going to become negative: " + currentSequence); - - setNextSequence(batch.topicPartition, currentSequence); - - txnPartitionMap.get(batch.topicPartition).resetSequenceNumbers(inFlightBatch -> { - if (inFlightBatch.baseSequence() < batch.baseSequence()) - return; - - int newSequence = inFlightBatch.baseSequence() - batch.recordCount; - if (newSequence < 0) - throw new IllegalStateException("Sequence number for batch with sequence " + inFlightBatch.baseSequence() - + " for partition " + batch.topicPartition + " is going to become negative: " + newSequence); - - inFlightBatch.resetProducerState(new ProducerIdAndEpoch(inFlightBatch.producerId(), inFlightBatch.producerEpoch()), newSequence, inFlightBatch.isTransactional()); - }); - } - synchronized boolean hasInflightBatches(TopicPartition topicPartition) { - return !txnPartitionMap.getOrCreate(topicPartition).inflightBatchesBySequence.isEmpty(); + return txnPartitionMap.getOrCreate(topicPartition).hasInflightBatches(); } synchronized boolean hasStaleProducerIdAndEpoch(TopicPartition topicPartition) { - return !producerIdAndEpoch.equals(txnPartitionMap.getOrCreate(topicPartition).producerIdAndEpoch); + return !producerIdAndEpoch.equals(txnPartitionMap.getOrCreate(topicPartition).producerIdAndEpoch()); } synchronized boolean hasUnresolvedSequences() { @@ -817,7 +756,7 @@ public class TransactionManager { // For the idempotent producer, bump the epoch log.info("No inflight batches remaining for {}, last ack'd sequence for partition is {}, next sequence is {}. " + "Going to bump epoch and reset sequence numbers.", topicPartition, - lastAckedSequence(topicPartition).orElse(NO_LAST_ACKED_SEQUENCE_NUMBER), sequenceNumber(topicPartition)); + lastAckedSequence(topicPartition).orElse(TxnPartitionEntry.NO_LAST_ACKED_SEQUENCE_NUMBER), sequenceNumber(topicPartition)); requestEpochBumpForPartition(topicPartition); } @@ -828,11 +767,7 @@ public class TransactionManager { } private boolean isNextSequence(TopicPartition topicPartition, int sequence) { - return sequence - lastAckedSequence(topicPartition).orElse(NO_LAST_ACKED_SEQUENCE_NUMBER) == 1; - } - - private void setNextSequence(TopicPartition topicPartition, int sequence) { - txnPartitionMap.get(topicPartition).nextSequence = sequence; + return sequence - lastAckedSequence(topicPartition).orElse(TxnPartitionEntry.NO_LAST_ACKED_SEQUENCE_NUMBER) == 1; } private boolean isNextSequenceForUnresolvedPartition(TopicPartition topicPartition, int sequence) { @@ -983,7 +918,7 @@ public class TransactionManager { // come back from the broker, they would also come with an UNKNOWN_PRODUCER_ID error. In this case, we should not // reset the sequence numbers to the beginning. return true; - } else if (lastAckedOffset(batch.topicPartition).orElse(NO_LAST_ACKED_SEQUENCE_NUMBER) < response.logStartOffset) { + } else if (lastAckedOffset(batch.topicPartition).orElse(TxnPartitionEntry.NO_LAST_ACKED_SEQUENCE_NUMBER) < response.logStartOffset) { // The head of the log has been removed, probably due to the retention time elapsing. In this case, // we expect to lose the producer state. For the transactional producer, reset the sequences of all // inflight batches to be from the beginning and retry them, so that the transaction does not need to diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TxnPartitionEntry.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TxnPartitionEntry.java index be79d8ee0f1..35684ea54cc 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TxnPartitionEntry.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TxnPartitionEntry.java @@ -18,33 +18,41 @@ package org.apache.kafka.clients.producer.internals; import java.util.Comparator; +import java.util.OptionalInt; +import java.util.OptionalLong; import java.util.SortedSet; import java.util.TreeSet; import java.util.function.Consumer; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.record.DefaultRecordBatch; import org.apache.kafka.common.requests.ProduceResponse; +import org.apache.kafka.common.utils.PrimitiveRef; import org.apache.kafka.common.utils.ProducerIdAndEpoch; class TxnPartitionEntry { + static final int NO_LAST_ACKED_SEQUENCE_NUMBER = -1; + + private final TopicPartition topicPartition; // The producer id/epoch being used for a given partition. - ProducerIdAndEpoch producerIdAndEpoch; + private ProducerIdAndEpoch producerIdAndEpoch; // The base sequence of the next batch bound for a given partition. - int nextSequence; + private int nextSequence; // The sequence number of the last record of the last ack'd batch from the given partition. When there are no // in flight requests for a partition, the lastAckedSequence(topicPartition) == nextSequence(topicPartition) - 1. - int lastAckedSequence; + private int lastAckedSequence; // Keep track of the in flight batches bound for a partition, ordered by sequence. This helps us to ensure that // we continue to order batches by the sequence numbers even when the responses come back out of order during // leader failover. We add a batch to the queue when it is drained, and remove it when the batch completes // (either successfully or through a fatal failure). - SortedSet<ProducerBatch> inflightBatchesBySequence; + private SortedSet<ProducerBatch> inflightBatchesBySequence; // We keep track of the last acknowledged offset on a per partition basis in order to disambiguate UnknownProducer // responses which are due to the retention period elapsing, and those which are due to actual lost data. - long lastAckedOffset; + private long lastAckedOffset; // `inflightBatchesBySequence` should only have batches with the same producer id and producer // epoch, but there is an edge case where we may remove the wrong batch if the comparator @@ -55,15 +63,94 @@ class TxnPartitionEntry { .thenComparingInt(ProducerBatch::producerEpoch) .thenComparingInt(ProducerBatch::baseSequence); - TxnPartitionEntry() { + TxnPartitionEntry(TopicPartition topicPartition) { + this.topicPartition = topicPartition; this.producerIdAndEpoch = ProducerIdAndEpoch.NONE; this.nextSequence = 0; - this.lastAckedSequence = TransactionManager.NO_LAST_ACKED_SEQUENCE_NUMBER; + this.lastAckedSequence = NO_LAST_ACKED_SEQUENCE_NUMBER; this.lastAckedOffset = ProduceResponse.INVALID_OFFSET; this.inflightBatchesBySequence = new TreeSet<>(PRODUCER_BATCH_COMPARATOR); } - void resetSequenceNumbers(Consumer<ProducerBatch> resetSequence) { + ProducerIdAndEpoch producerIdAndEpoch() { + return producerIdAndEpoch; + } + + int nextSequence() { + return nextSequence; + } + + OptionalLong lastAckedOffset() { + if (lastAckedOffset != ProduceResponse.INVALID_OFFSET) + return OptionalLong.of(lastAckedOffset); + return OptionalLong.empty(); + } + + OptionalInt lastAckedSequence() { + if (lastAckedSequence != TxnPartitionEntry.NO_LAST_ACKED_SEQUENCE_NUMBER) + return OptionalInt.of(lastAckedSequence); + return OptionalInt.empty(); + } + + boolean hasInflightBatches() { + return !inflightBatchesBySequence.isEmpty(); + } + + ProducerBatch nextBatchBySequence() { + return inflightBatchesBySequence.isEmpty() ? null : inflightBatchesBySequence.first(); + } + + void incrementSequence(int increment) { + this.nextSequence = DefaultRecordBatch.incrementSequence(this.nextSequence, increment); + } + + void addInflightBatch(ProducerBatch batch) { + inflightBatchesBySequence.add(batch); + } + + void setLastAckedOffset(long lastAckedOffset) { + this.lastAckedOffset = lastAckedOffset; + } + + void startSequencesAtBeginning(ProducerIdAndEpoch newProducerIdAndEpoch) { + final PrimitiveRef.IntRef sequence = PrimitiveRef.ofInt(0); + resetSequenceNumbers(inFlightBatch -> { + inFlightBatch.resetProducerState(newProducerIdAndEpoch, sequence.value); + sequence.value += inFlightBatch.recordCount; + }); + producerIdAndEpoch = newProducerIdAndEpoch; + nextSequence = sequence.value; + lastAckedSequence = NO_LAST_ACKED_SEQUENCE_NUMBER; + } + + int maybeUpdateLastAckedSequence(int sequence) { + if (sequence > lastAckedSequence) { + lastAckedSequence = sequence; + return sequence; + } + return lastAckedSequence; + } + + void removeInFlightBatch(ProducerBatch batch) { + inflightBatchesBySequence.remove(batch); + } + + void adjustSequencesDueToFailedBatch(long baseSequence, int recordCount) { + decrementSequence(recordCount); + resetSequenceNumbers(inFlightBatch -> { + if (inFlightBatch.baseSequence() < baseSequence) + return; + + int newSequence = inFlightBatch.baseSequence() - recordCount; + if (newSequence < 0) + throw new IllegalStateException("Sequence number for batch with sequence " + inFlightBatch.baseSequence() + + " for partition " + topicPartition + " is going to become negative: " + newSequence); + + inFlightBatch.resetProducerState(new ProducerIdAndEpoch(inFlightBatch.producerId(), inFlightBatch.producerEpoch()), newSequence); + }); + } + + private void resetSequenceNumbers(Consumer<ProducerBatch> resetSequence) { TreeSet<ProducerBatch> newInflights = new TreeSet<>(PRODUCER_BATCH_COMPARATOR); for (ProducerBatch inflightBatch : inflightBatchesBySequence) { resetSequence.accept(inflightBatch); @@ -71,4 +158,16 @@ class TxnPartitionEntry { } inflightBatchesBySequence = newInflights; } + + private boolean decrementSequence(int decrement) { + int updatedSequence = nextSequence; + updatedSequence -= decrement; + if (updatedSequence < 0) { + throw new IllegalStateException( + "Sequence number for partition " + topicPartition + " is going to become negative: " + + updatedSequence); + } + this.nextSequence = updatedSequence; + return true; + } } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TxnPartitionMap.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TxnPartitionMap.java index 95553119c5b..ce8aa4a824f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TxnPartitionMap.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TxnPartitionMap.java @@ -21,26 +21,34 @@ import java.util.HashMap; import java.util.Map; import java.util.OptionalInt; import java.util.OptionalLong; + import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.requests.ProduceResponse; -import org.apache.kafka.common.utils.PrimitiveRef; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.ProducerIdAndEpoch; +import org.slf4j.Logger; class TxnPartitionMap { - final Map<TopicPartition, TxnPartitionEntry> topicPartitions = new HashMap<>(); + private final Logger log; + + private final Map<TopicPartition, TxnPartitionEntry> topicPartitions = new HashMap<>(); + + TxnPartitionMap(LogContext logContext) { + this.log = logContext.logger(TxnPartitionMap.class); + } TxnPartitionEntry get(TopicPartition topicPartition) { TxnPartitionEntry ent = topicPartitions.get(topicPartition); if (ent == null) { - throw new IllegalStateException("Trying to get the sequence number for " + topicPartition + - ", but the sequence number was never set for this partition."); + throw new IllegalStateException("Trying to get txnPartitionEntry for " + topicPartition + + ", but it was never set for this partition."); } return ent; } TxnPartitionEntry getOrCreate(TopicPartition topicPartition) { - return topicPartitions.computeIfAbsent(topicPartition, tp -> new TxnPartitionEntry()); + return topicPartitions.computeIfAbsent(topicPartition, tp -> new TxnPartitionEntry(tp)); } boolean contains(TopicPartition topicPartition) { @@ -53,31 +61,70 @@ class TxnPartitionMap { OptionalLong lastAckedOffset(TopicPartition topicPartition) { TxnPartitionEntry entry = topicPartitions.get(topicPartition); - if (entry != null && entry.lastAckedOffset != ProduceResponse.INVALID_OFFSET) { - return OptionalLong.of(entry.lastAckedOffset); - } else { - return OptionalLong.empty(); - } + if (entry != null) + return entry.lastAckedOffset(); + return OptionalLong.empty(); } OptionalInt lastAckedSequence(TopicPartition topicPartition) { TxnPartitionEntry entry = topicPartitions.get(topicPartition); - if (entry != null && entry.lastAckedSequence != TransactionManager.NO_LAST_ACKED_SEQUENCE_NUMBER) { - return OptionalInt.of(entry.lastAckedSequence); - } else { - return OptionalInt.empty(); - } + if (entry != null) + return entry.lastAckedSequence(); + return OptionalInt.empty(); } void startSequencesAtBeginning(TopicPartition topicPartition, ProducerIdAndEpoch newProducerIdAndEpoch) { - final PrimitiveRef.IntRef sequence = PrimitiveRef.ofInt(0); - TxnPartitionEntry topicPartitionEntry = get(topicPartition); - topicPartitionEntry.resetSequenceNumbers(inFlightBatch -> { - inFlightBatch.resetProducerState(newProducerIdAndEpoch, sequence.value, inFlightBatch.isTransactional()); - sequence.value += inFlightBatch.recordCount; - }); - topicPartitionEntry.producerIdAndEpoch = newProducerIdAndEpoch; - topicPartitionEntry.nextSequence = sequence.value; - topicPartitionEntry.lastAckedSequence = TransactionManager.NO_LAST_ACKED_SEQUENCE_NUMBER; + TxnPartitionEntry entry = get(topicPartition); + if (entry != null) + entry.startSequencesAtBeginning(newProducerIdAndEpoch); + } + + void remove(TopicPartition topicPartition) { + topicPartitions.remove(topicPartition); + } + + + void updateLastAckedOffset(TopicPartition topicPartition, boolean isTransactional, long lastOffset) { + OptionalLong lastAckedOffset = lastAckedOffset(topicPartition); + // It might happen that the TransactionManager has been reset while a request was reenqueued and got a valid + // response for this. This can happen only if the producer is only idempotent (not transactional) and in + // this case there will be no tracked bookkeeper entry about it, so we have to insert one. + if (!lastAckedOffset.isPresent() && !isTransactional) + getOrCreate(topicPartition); + if (lastOffset > lastAckedOffset.orElse(ProduceResponse.INVALID_OFFSET)) + get(topicPartition).setLastAckedOffset(lastOffset); + else + log.trace("Partition {} keeps lastOffset at {}", topicPartition, lastOffset); + } + + // If a batch is failed fatally, the sequence numbers for future batches bound for the partition must be adjusted + // so that they don't fail with the OutOfOrderSequenceException. + // + // This method must only be called when we know that the batch is question has been unequivocally failed by the broker, + // ie. it has received a confirmed fatal status code like 'Message Too Large' or something similar. + void adjustSequencesDueToFailedBatch(ProducerBatch batch) { + if (!contains(batch.topicPartition)) + // Sequence numbers are not being tracked for this partition. This could happen if the producer id was just + // reset due to a previous OutOfOrderSequenceException. + return; + log.debug("producerId: {}, send to partition {} failed fatally. Reducing future sequence numbers by {}", + batch.producerId(), batch.topicPartition, batch.recordCount); + + get(batch.topicPartition).adjustSequencesDueToFailedBatch(batch.baseSequence(), batch.recordCount); + } + + int maybeUpdateLastAckedSequence(TopicPartition topicPartition, int sequence) { + TxnPartitionEntry entry = topicPartitions.get(topicPartition); + if (entry != null) + return entry.maybeUpdateLastAckedSequence(sequence); + return TxnPartitionEntry.NO_LAST_ACKED_SEQUENCE_NUMBER; + } + + ProducerBatch nextBatchBySequence(TopicPartition topicPartition) { + return get(topicPartition).nextBatchBySequence(); + } + + void removeInFlightBatch(ProducerBatch batch) { + get(batch.topicPartition).removeInFlightBatch(batch); } } diff --git a/clients/src/main/java/org/apache/kafka/common/utils/ProducerIdAndEpoch.java b/clients/src/main/java/org/apache/kafka/common/utils/ProducerIdAndEpoch.java index 5061da1343c..674b42352b7 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/ProducerIdAndEpoch.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/ProducerIdAndEpoch.java @@ -35,7 +35,7 @@ public class ProducerIdAndEpoch { @Override public String toString() { - return "ProducerIdAndEpoch(producerId=" + producerId + ", epoch=" + epoch + ")"; + return "(producerId=" + producerId + ", epoch=" + epoch + ")"; } @Override diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index ba625d74408..92ef4d18917 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -778,7 +778,7 @@ public class SenderTest { prepareAndReceiveInitProducerId(producerId, Errors.NONE); assertTrue(transactionManager.hasProducerId()); - assertEquals(0, transactionManager.sequenceNumber(tp0).longValue()); + assertEquals(0, transactionManager.sequenceNumber(tp0)); // Send first ProduceRequest Future<RecordMetadata> request1 = appendToAccumulator(tp0); @@ -786,14 +786,14 @@ public class SenderTest { String nodeId = client.requests().peek().destination(); Node node = new Node(Integer.parseInt(nodeId), "localhost", 0); assertEquals(1, client.inFlightRequestCount()); - assertEquals(1, transactionManager.sequenceNumber(tp0).longValue()); + assertEquals(1, transactionManager.sequenceNumber(tp0)); assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0)); // Send second ProduceRequest Future<RecordMetadata> request2 = appendToAccumulator(tp0); sender.runOnce(); assertEquals(2, client.inFlightRequestCount()); - assertEquals(2, transactionManager.sequenceNumber(tp0).longValue()); + assertEquals(2, transactionManager.sequenceNumber(tp0)); assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0)); assertFalse(request1.isDone()); assertFalse(request2.isDone()); @@ -828,7 +828,7 @@ public class SenderTest { prepareAndReceiveInitProducerId(producerId, Errors.NONE); assertTrue(transactionManager.hasProducerId()); - assertEquals(0, transactionManager.sequenceNumber(tp0).longValue()); + assertEquals(0, transactionManager.sequenceNumber(tp0)); // Send first ProduceRequest Future<RecordMetadata> request1 = appendToAccumulator(tp0); @@ -836,7 +836,7 @@ public class SenderTest { String nodeId = client.requests().peek().destination(); Node node = new Node(Integer.parseInt(nodeId), "localhost", 0); assertEquals(1, client.inFlightRequestCount()); - assertEquals(1, transactionManager.sequenceNumber(tp0).longValue()); + assertEquals(1, transactionManager.sequenceNumber(tp0)); assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0)); // Send second ProduceRequest @@ -848,7 +848,7 @@ public class SenderTest { sender.runOnce(); assertEquals(3, client.inFlightRequestCount()); - assertEquals(3, transactionManager.sequenceNumber(tp0).longValue()); + assertEquals(3, transactionManager.sequenceNumber(tp0)); assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0)); assertFalse(request1.isDone()); assertFalse(request2.isDone()); @@ -875,7 +875,7 @@ public class SenderTest { sender.runOnce(); // Do nothing, we are reduced to one in flight request during retries. - assertEquals(3, transactionManager.sequenceNumber(tp0).longValue()); // the batch for request 4 shouldn't have been drained, and hence the sequence should not have been incremented. + assertEquals(3, transactionManager.sequenceNumber(tp0)); // the batch for request 4 shouldn't have been drained, and hence the sequence should not have been incremented. assertEquals(1, client.inFlightRequestCount()); assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0)); @@ -928,7 +928,7 @@ public class SenderTest { prepareAndReceiveInitProducerId(producerId, Errors.NONE); assertTrue(transactionManager.hasProducerId()); - assertEquals(0, transactionManager.sequenceNumber(tp0).longValue()); + assertEquals(0, transactionManager.sequenceNumber(tp0)); // Send first ProduceRequest Future<RecordMetadata> request1 = appendToAccumulator(tp0); @@ -936,14 +936,14 @@ public class SenderTest { String nodeId = client.requests().peek().destination(); Node node = new Node(Integer.parseInt(nodeId), "localhost", 0); assertEquals(1, client.inFlightRequestCount()); - assertEquals(1, transactionManager.sequenceNumber(tp0).longValue()); + assertEquals(1, transactionManager.sequenceNumber(tp0)); assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0)); // Send second ProduceRequest Future<RecordMetadata> request2 = appendToAccumulator(tp0); sender.runOnce(); assertEquals(2, client.inFlightRequestCount()); - assertEquals(2, transactionManager.sequenceNumber(tp0).longValue()); + assertEquals(2, transactionManager.sequenceNumber(tp0)); assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0)); assertFalse(request1.isDone()); assertFalse(request2.isDone()); @@ -987,7 +987,7 @@ public class SenderTest { prepareAndReceiveInitProducerId(producerId, Errors.NONE); assertTrue(transactionManager.hasProducerId()); - assertEquals(0, transactionManager.sequenceNumber(tp0).longValue()); + assertEquals(0, transactionManager.sequenceNumber(tp0)); // Send first ProduceRequest with multiple messages. Future<RecordMetadata> request1 = appendToAccumulator(tp0); @@ -998,7 +998,7 @@ public class SenderTest { assertEquals(1, client.inFlightRequestCount()); // make sure the next sequence number accounts for multi-message batches. - assertEquals(2, transactionManager.sequenceNumber(tp0).longValue()); + assertEquals(2, transactionManager.sequenceNumber(tp0)); assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0)); sendIdempotentProducerResponse(0, tp0, Errors.NONE, 0); @@ -1008,7 +1008,7 @@ public class SenderTest { Future<RecordMetadata> request2 = appendToAccumulator(tp0); sender.runOnce(); assertEquals(1, client.inFlightRequestCount()); - assertEquals(3, transactionManager.sequenceNumber(tp0).longValue()); + assertEquals(3, transactionManager.sequenceNumber(tp0)); assertEquals(OptionalInt.of(1), transactionManager.lastAckedSequence(tp0)); assertTrue(request1.isDone()); assertEquals(0, request1.get().offset()); @@ -1023,7 +1023,7 @@ public class SenderTest { // epoch should be bumped and sequence numbers reset assertEquals(1, transactionManager.producerIdAndEpoch().epoch); - assertEquals(1, transactionManager.sequenceNumber(tp0).intValue()); + assertEquals(1, transactionManager.sequenceNumber(tp0)); assertEquals(0, transactionManager.firstInFlightSequence(tp0)); } @@ -1249,7 +1249,7 @@ public class SenderTest { ) { assertEquals(expectedProducerId, transactionManager.producerIdAndEpoch(tp).producerId, "Producer Id:"); assertEquals(expectedProducerEpoch, transactionManager.producerIdAndEpoch(tp).epoch, "Producer Epoch:"); - assertEquals(expectedSequenceValue, transactionManager.sequenceNumber(tp).longValue(), "Seq Number:"); + assertEquals(expectedSequenceValue, transactionManager.sequenceNumber(tp), "Seq Number:"); assertEquals(expectedLastAckedSequence, transactionManager.lastAckedSequence(tp), "Last Acked Seq Number:"); } @@ -1260,7 +1260,7 @@ public class SenderTest { setupWithTransactionState(transactionManager); prepareAndReceiveInitProducerId(producerId, Errors.NONE); assertTrue(transactionManager.hasProducerId()); - assertEquals(0, transactionManager.sequenceNumber(tp0).longValue()); + assertEquals(0, transactionManager.sequenceNumber(tp0)); // Send first ProduceRequest Future<RecordMetadata> request1 = appendToAccumulator(tp0); @@ -1268,14 +1268,14 @@ public class SenderTest { String nodeId = client.requests().peek().destination(); Node node = new Node(Integer.parseInt(nodeId), "localhost", 0); assertEquals(1, client.inFlightRequestCount()); - assertEquals(1, transactionManager.sequenceNumber(tp0).longValue()); + assertEquals(1, transactionManager.sequenceNumber(tp0)); assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0)); // Send second ProduceRequest Future<RecordMetadata> request2 = appendToAccumulator(tp0); sender.runOnce(); assertEquals(2, client.inFlightRequestCount()); - assertEquals(2, transactionManager.sequenceNumber(tp0).longValue()); + assertEquals(2, transactionManager.sequenceNumber(tp0)); assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0)); assertFalse(request1.isDone()); assertFalse(request2.isDone()); @@ -1342,7 +1342,7 @@ public class SenderTest { prepareAndReceiveInitProducerId(producerId, Errors.NONE); assertTrue(transactionManager.hasProducerId()); - assertEquals(0, transactionManager.sequenceNumber(tp0).longValue()); + assertEquals(0, transactionManager.sequenceNumber(tp0)); // Send first ProduceRequest Future<RecordMetadata> request1 = appendToAccumulator(tp0); @@ -1410,7 +1410,7 @@ public class SenderTest { prepareAndReceiveInitProducerId(producerId, Errors.NONE); assertTrue(transactionManager.hasProducerId()); - assertEquals(0, transactionManager.sequenceNumber(tp0).longValue()); + assertEquals(0, transactionManager.sequenceNumber(tp0)); // Send first ProduceRequest Future<RecordMetadata> request1 = appendToAccumulator(tp0, 0L, "key", "value"); @@ -1432,7 +1432,7 @@ public class SenderTest { setupWithTransactionState(transactionManager, false, null); prepareAndReceiveInitProducerId(producerId, Errors.NONE); assertTrue(transactionManager.hasProducerId()); - assertEquals(0, transactionManager.sequenceNumber(tp0).longValue()); + assertEquals(0, transactionManager.sequenceNumber(tp0)); // Send first ProduceRequest Future<RecordMetadata> request1 = appendToAccumulator(tp0); @@ -1480,7 +1480,7 @@ public class SenderTest { assertEquals(1, batches.size()); assertFalse(batches.peekFirst().hasSequence()); assertFalse(client.hasInFlightRequests()); - assertEquals(2L, transactionManager.sequenceNumber(tp0).longValue()); + assertEquals(2L, transactionManager.sequenceNumber(tp0)); assertTrue(transactionManager.hasUnresolvedSequence(tp0)); sender.runOnce(); // clear the unresolved state, send the pending request. @@ -1499,7 +1499,7 @@ public class SenderTest { setupWithTransactionState(transactionManager); prepareAndReceiveInitProducerId(producerId, Errors.NONE); assertTrue(transactionManager.hasProducerId()); - assertEquals(0, transactionManager.sequenceNumber(tp0).longValue()); + assertEquals(0, transactionManager.sequenceNumber(tp0)); // Send first ProduceRequest Future<RecordMetadata> request1 = appendToAccumulator(tp0); @@ -1538,7 +1538,7 @@ public class SenderTest { sender.runOnce(); assertEquals((short) 1, transactionManager.producerIdAndEpoch().epoch); - assertEquals(1, transactionManager.sequenceNumber(tp0).longValue()); + assertEquals(1, transactionManager.sequenceNumber(tp0)); assertFalse(transactionManager.hasUnresolvedSequence(tp0)); } @@ -1591,7 +1591,7 @@ public class SenderTest { prepareAndReceiveInitProducerId(producerId, Errors.NONE); assertTrue(transactionManager.hasProducerId()); - assertEquals(0, transactionManager.sequenceNumber(tp0).longValue()); + assertEquals(0, transactionManager.sequenceNumber(tp0)); // Send first ProduceRequest Future<RecordMetadata> request1 = appendToAccumulator(tp0, 0L, "key", "value"); @@ -1599,7 +1599,7 @@ public class SenderTest { sendIdempotentProducerResponse(0, tp0, Errors.NOT_LEADER_OR_FOLLOWER, -1); sender.runOnce(); // receive response - assertEquals(1L, transactionManager.sequenceNumber(tp0).longValue()); + assertEquals(1L, transactionManager.sequenceNumber(tp0)); Node node = metadata.fetch().nodes().get(0); time.sleep(15000L); @@ -1660,7 +1660,7 @@ public class SenderTest { assertEquals(10, successfulResponse.get().offset()); // The epoch and the sequence are updated when the next batch is sent. - assertEquals(1, transactionManager.sequenceNumber(tp1).longValue()); + assertEquals(1, transactionManager.sequenceNumber(tp1)); } @Test @@ -1772,7 +1772,7 @@ public class SenderTest { sender.runOnce(); assertTrue(successfulResponse.isDone()); // epoch of partition is bumped and sequence is reset when the next batch is sent - assertEquals(1, transactionManager.sequenceNumber(tp1).intValue()); + assertEquals(1, transactionManager.sequenceNumber(tp1)); } @Test @@ -1783,7 +1783,7 @@ public class SenderTest { prepareAndReceiveInitProducerId(producerId, Errors.NONE); assertTrue(transactionManager.hasProducerId()); - assertEquals(0, transactionManager.sequenceNumber(tp0).longValue()); + assertEquals(0, transactionManager.sequenceNumber(tp0)); // Send first ProduceRequest Future<RecordMetadata> request1 = appendToAccumulator(tp0); @@ -1791,14 +1791,14 @@ public class SenderTest { String nodeId = client.requests().peek().destination(); Node node = new Node(Integer.parseInt(nodeId), "localhost", 0); assertEquals(1, client.inFlightRequestCount()); - assertEquals(1, transactionManager.sequenceNumber(tp0).longValue()); + assertEquals(1, transactionManager.sequenceNumber(tp0)); assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0)); // Send second ProduceRequest Future<RecordMetadata> request2 = appendToAccumulator(tp0); sender.runOnce(); assertEquals(2, client.inFlightRequestCount()); - assertEquals(2, transactionManager.sequenceNumber(tp0).longValue()); + assertEquals(2, transactionManager.sequenceNumber(tp0)); assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0)); assertFalse(request1.isDone()); assertFalse(request2.isDone()); @@ -1842,14 +1842,14 @@ public class SenderTest { client.prepareResponse(buildAddPartitionsToTxnResponseData(0, Collections.singletonMap(tp0, Errors.NONE))); sender.runOnce(); // Receive AddPartitions response - assertEquals(0, transactionManager.sequenceNumber(tp0).longValue()); + assertEquals(0, transactionManager.sequenceNumber(tp0)); // Send first ProduceRequest Future<RecordMetadata> request1 = appendToAccumulator(tp0); sender.runOnce(); assertEquals(1, client.inFlightRequestCount()); - assertEquals(1, transactionManager.sequenceNumber(tp0).longValue()); + assertEquals(1, transactionManager.sequenceNumber(tp0)); assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0)); sendIdempotentProducerResponse(0, tp0, Errors.NONE, 1000L, 10L); @@ -1865,7 +1865,7 @@ public class SenderTest { appendToAccumulator(tp0); Future<RecordMetadata> request2 = appendToAccumulator(tp0); sender.runOnce(); - assertEquals(3, transactionManager.sequenceNumber(tp0).longValue()); + assertEquals(3, transactionManager.sequenceNumber(tp0)); assertEquals(OptionalInt.of(0), transactionManager.lastAckedSequence(tp0)); assertFalse(request2.isDone()); @@ -1875,7 +1875,7 @@ public class SenderTest { // We should have reset the sequence number state of the partition because the state was lost on the broker. assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0)); - assertEquals(2, transactionManager.sequenceNumber(tp0).longValue()); + assertEquals(2, transactionManager.sequenceNumber(tp0)); assertFalse(request2.isDone()); assertFalse(client.hasInFlightRequests()); @@ -1885,7 +1885,7 @@ public class SenderTest { sendIdempotentProducerResponse(0, tp0, Errors.NONE, 1011L, 1010L); sender.runOnce(); // receive response 1 assertEquals(OptionalInt.of(1), transactionManager.lastAckedSequence(tp0)); - assertEquals(2, transactionManager.sequenceNumber(tp0).longValue()); + assertEquals(2, transactionManager.sequenceNumber(tp0)); assertFalse(client.hasInFlightRequests()); assertTrue(request2.isDone()); assertEquals(1012L, request2.get().offset()); @@ -1900,14 +1900,14 @@ public class SenderTest { prepareAndReceiveInitProducerId(producerId, Errors.NONE); assertTrue(transactionManager.hasProducerId()); - assertEquals(0, transactionManager.sequenceNumber(tp0).longValue()); + assertEquals(0, transactionManager.sequenceNumber(tp0)); // Send first ProduceRequest Future<RecordMetadata> request1 = appendToAccumulator(tp0); sender.runOnce(); assertEquals(1, client.inFlightRequestCount()); - assertEquals(1, transactionManager.sequenceNumber(tp0).longValue()); + assertEquals(1, transactionManager.sequenceNumber(tp0)); assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0)); sendIdempotentProducerResponse(0, tp0, Errors.NONE, 1000L, 10L); @@ -1923,7 +1923,7 @@ public class SenderTest { appendToAccumulator(tp0); Future<RecordMetadata> request2 = appendToAccumulator(tp0); sender.runOnce(); - assertEquals(3, transactionManager.sequenceNumber(tp0).longValue()); + assertEquals(3, transactionManager.sequenceNumber(tp0)); assertEquals(OptionalInt.of(0), transactionManager.lastAckedSequence(tp0)); assertFalse(request2.isDone()); @@ -1934,7 +1934,7 @@ public class SenderTest { // We should have reset the sequence number state of the partition because the state was lost on the broker. assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0)); - assertEquals(2, transactionManager.sequenceNumber(tp0).longValue()); + assertEquals(2, transactionManager.sequenceNumber(tp0)); assertFalse(request2.isDone()); assertTrue(client.hasInFlightRequests()); assertEquals((short) 1, transactionManager.producerIdAndEpoch().epoch); @@ -1943,7 +1943,7 @@ public class SenderTest { sendIdempotentProducerResponse(0, tp0, Errors.NONE, 1011L, 1010L); sender.runOnce(); // receive response 1 assertEquals(OptionalInt.of(1), transactionManager.lastAckedSequence(tp0)); - assertEquals(2, transactionManager.sequenceNumber(tp0).longValue()); + assertEquals(2, transactionManager.sequenceNumber(tp0)); assertFalse(client.hasInFlightRequests()); assertTrue(request2.isDone()); assertEquals(1012L, request2.get().offset()); @@ -1958,14 +1958,14 @@ public class SenderTest { prepareAndReceiveInitProducerId(producerId, Errors.NONE); assertTrue(transactionManager.hasProducerId()); - assertEquals(0, transactionManager.sequenceNumber(tp0).longValue()); + assertEquals(0, transactionManager.sequenceNumber(tp0)); // Send first ProduceRequest Future<RecordMetadata> request1 = appendToAccumulator(tp0); sender.runOnce(); assertEquals(1, client.inFlightRequestCount()); - assertEquals(1, transactionManager.sequenceNumber(tp0).longValue()); + assertEquals(1, transactionManager.sequenceNumber(tp0)); assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0)); sendIdempotentProducerResponse(0, tp0, Errors.NONE, 1000L, 10L); @@ -1980,7 +1980,7 @@ public class SenderTest { // Send second ProduceRequest Future<RecordMetadata> request2 = appendToAccumulator(tp0); sender.runOnce(); - assertEquals(2, transactionManager.sequenceNumber(tp0).longValue()); + assertEquals(2, transactionManager.sequenceNumber(tp0)); assertEquals(OptionalInt.of(0), transactionManager.lastAckedSequence(tp0)); assertFalse(request2.isDone()); @@ -1990,7 +1990,7 @@ public class SenderTest { // We should have reset the sequence number state of the partition because the state was lost on the broker. assertEquals(OptionalInt.of(0), transactionManager.lastAckedSequence(tp0)); - assertEquals(2, transactionManager.sequenceNumber(tp0).longValue()); + assertEquals(2, transactionManager.sequenceNumber(tp0)); assertFalse(request2.isDone()); assertFalse(client.hasInFlightRequests()); @@ -2001,7 +2001,7 @@ public class SenderTest { sendIdempotentProducerResponse(1, tp0, Errors.NONE, 1011L, 1010L); sender.runOnce(); // receive response 1 assertEquals(OptionalInt.of(1), transactionManager.lastAckedSequence(tp0)); - assertEquals(2, transactionManager.sequenceNumber(tp0).longValue()); + assertEquals(2, transactionManager.sequenceNumber(tp0)); assertFalse(client.hasInFlightRequests()); assertTrue(request2.isDone()); assertEquals(1011L, request2.get().offset()); @@ -2016,14 +2016,14 @@ public class SenderTest { prepareAndReceiveInitProducerId(producerId, Errors.NONE); assertTrue(transactionManager.hasProducerId()); - assertEquals(0, transactionManager.sequenceNumber(tp0).longValue()); + assertEquals(0, transactionManager.sequenceNumber(tp0)); // Send first ProduceRequest Future<RecordMetadata> request1 = appendToAccumulator(tp0); sender.runOnce(); assertEquals(1, client.inFlightRequestCount()); - assertEquals(1, transactionManager.sequenceNumber(tp0).longValue()); + assertEquals(1, transactionManager.sequenceNumber(tp0)); assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0)); sendIdempotentProducerResponse(0, tp0, Errors.NONE, 1000L, 10L); @@ -2038,14 +2038,14 @@ public class SenderTest { // Send second ProduceRequest Future<RecordMetadata> request2 = appendToAccumulator(tp0); sender.runOnce(); - assertEquals(2, transactionManager.sequenceNumber(tp0).longValue()); + assertEquals(2, transactionManager.sequenceNumber(tp0)); assertEquals(OptionalInt.of(0), transactionManager.lastAckedSequence(tp0)); // Send the third ProduceRequest, in parallel with the second. It should be retried even though the // lastAckedOffset > logStartOffset when its UnknownProducerResponse comes back. Future<RecordMetadata> request3 = appendToAccumulator(tp0); sender.runOnce(); - assertEquals(3, transactionManager.sequenceNumber(tp0).longValue()); + assertEquals(3, transactionManager.sequenceNumber(tp0)); assertEquals(OptionalInt.of(0), transactionManager.lastAckedSequence(tp0)); assertFalse(request2.isDone()); @@ -2058,7 +2058,7 @@ public class SenderTest { // We should have reset the sequence number state of the partition because the state was lost on the broker. assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0)); - assertEquals(2, transactionManager.sequenceNumber(tp0).longValue()); + assertEquals(2, transactionManager.sequenceNumber(tp0)); assertFalse(request2.isDone()); assertFalse(request3.isDone()); assertEquals(2, client.inFlightRequestCount()); @@ -2070,7 +2070,7 @@ public class SenderTest { assertEquals(1, client.inFlightRequestCount()); assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0)); - assertEquals(2, transactionManager.sequenceNumber(tp0).longValue()); + assertEquals(2, transactionManager.sequenceNumber(tp0)); sendIdempotentProducerResponse(0, tp0, Errors.NONE, 1011L, 1010L); sender.runOnce(); // receive response 2, don't send request 3 since we can have at most 1 in flight when retrying @@ -2102,14 +2102,14 @@ public class SenderTest { prepareAndReceiveInitProducerId(producerId, Errors.NONE); assertTrue(transactionManager.hasProducerId()); - assertEquals(0, transactionManager.sequenceNumber(tp0).longValue()); + assertEquals(0, transactionManager.sequenceNumber(tp0)); // Send first ProduceRequest Future<RecordMetadata> request1 = appendToAccumulator(tp0); sender.runOnce(); assertEquals(1, client.inFlightRequestCount()); - assertEquals(1, transactionManager.sequenceNumber(tp0).longValue()); + assertEquals(1, transactionManager.sequenceNumber(tp0)); assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0)); sendIdempotentProducerResponse(0, tp0, Errors.NONE, 1000L, 10L); @@ -2124,7 +2124,7 @@ public class SenderTest { // Send second ProduceRequest, Future<RecordMetadata> request2 = appendToAccumulator(tp0); sender.runOnce(); - assertEquals(2, transactionManager.sequenceNumber(tp0).longValue()); + assertEquals(2, transactionManager.sequenceNumber(tp0)); assertEquals(OptionalInt.of(0), transactionManager.lastAckedSequence(tp0)); assertFalse(request2.isDone()); @@ -2432,7 +2432,7 @@ public class SenderTest { sender.runOnce(); // connect sender.runOnce(); // send produce request - assertEquals(2, txnManager.sequenceNumber(tp).longValue(), "The next sequence should be 2"); + assertEquals(2, txnManager.sequenceNumber(tp), "The next sequence should be 2"); String id = client.requests().peek().destination(); assertEquals(ApiKeys.PRODUCE, client.requests().peek().requestBuilder().apiKey()); Node node = new Node(Integer.parseInt(id), "localhost", 0); @@ -2443,12 +2443,12 @@ public class SenderTest { responseMap.put(tp, new ProduceResponse.PartitionResponse(Errors.MESSAGE_TOO_LARGE)); client.respond(new ProduceResponse(responseMap)); sender.runOnce(); // split and reenqueue - assertEquals(2, txnManager.sequenceNumber(tp).longValue(), "The next sequence should be 2"); + assertEquals(2, txnManager.sequenceNumber(tp), "The next sequence should be 2"); // The compression ratio should have been improved once. assertEquals(CompressionType.GZIP.rate - CompressionRatioEstimator.COMPRESSION_RATIO_IMPROVING_STEP, CompressionRatioEstimator.estimation(topic, CompressionType.GZIP), 0.01); sender.runOnce(); // send the first produce request - assertEquals(2, txnManager.sequenceNumber(tp).longValue(), "The next sequence number should be 2"); + assertEquals(2, txnManager.sequenceNumber(tp), "The next sequence number should be 2"); assertFalse(f1.isDone(), "The future shouldn't have been done."); assertFalse(f2.isDone(), "The future shouldn't have been done."); id = client.requests().peek().destination(); @@ -2463,7 +2463,7 @@ public class SenderTest { sender.runOnce(); // receive assertTrue(f1.isDone(), "The future should have been done."); - assertEquals(2, txnManager.sequenceNumber(tp).longValue(), "The next sequence number should still be 2"); + assertEquals(2, txnManager.sequenceNumber(tp), "The next sequence number should still be 2"); assertEquals(OptionalInt.of(0), txnManager.lastAckedSequence(tp), "The last ack'd sequence number should be 0"); assertFalse(f2.isDone(), "The future shouldn't have been done."); assertEquals(0L, f1.get().offset(), "Offset of the first message should be 0"); @@ -2480,7 +2480,7 @@ public class SenderTest { sender.runOnce(); // receive assertTrue(f2.isDone(), "The future should have been done."); - assertEquals(2, txnManager.sequenceNumber(tp).longValue(), "The next sequence number should be 2"); + assertEquals(2, txnManager.sequenceNumber(tp), "The next sequence number should be 2"); assertEquals(OptionalInt.of(1), txnManager.lastAckedSequence(tp), "The last ack'd sequence number should be 1"); assertEquals(1L, f2.get().offset(), "Offset of the first message should be 1"); assertTrue(accumulator.getDeque(tp).isEmpty(), "There should be no batch in the accumulator"); diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java index feb764228cb..b615d1c5356 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java @@ -564,9 +564,9 @@ public class TransactionManagerTest { @Test public void testDefaultSequenceNumber() { initializeTransactionManager(Optional.empty()); - assertEquals((int) transactionManager.sequenceNumber(tp0), 0); + assertEquals(transactionManager.sequenceNumber(tp0), 0); transactionManager.incrementSequenceNumber(tp0, 3); - assertEquals((int) transactionManager.sequenceNumber(tp0), 3); + assertEquals(transactionManager.sequenceNumber(tp0), 3); } @Test @@ -579,7 +579,7 @@ public class TransactionManagerTest { ProducerBatch b3 = writeIdempotentBatchWithValue(transactionManager, tp0, "3"); ProducerBatch b4 = writeIdempotentBatchWithValue(transactionManager, tp0, "4"); ProducerBatch b5 = writeIdempotentBatchWithValue(transactionManager, tp0, "5"); - assertEquals(5, transactionManager.sequenceNumber(tp0).intValue()); + assertEquals(5, transactionManager.sequenceNumber(tp0)); // First batch succeeds long b1AppendTime = time.milliseconds(); @@ -624,8 +624,8 @@ public class TransactionManagerTest { ProducerBatch tp0b2 = writeIdempotentBatchWithValue(transactionManager, tp0, "2"); ProducerBatch tp1b2 = writeIdempotentBatchWithValue(transactionManager, tp1, "2"); - assertEquals(2, transactionManager.sequenceNumber(tp0).intValue()); - assertEquals(2, transactionManager.sequenceNumber(tp1).intValue()); + assertEquals(2, transactionManager.sequenceNumber(tp0)); + assertEquals(2, transactionManager.sequenceNumber(tp1)); ProduceResponse.PartitionResponse b1Response = new ProduceResponse.PartitionResponse( Errors.UNKNOWN_PRODUCER_ID, -1, -1, 400L); @@ -637,9 +637,9 @@ public class TransactionManagerTest { transactionManager.bumpIdempotentEpochAndResetIdIfNeeded(); - assertEquals(1, transactionManager.sequenceNumber(tp0).intValue()); + assertEquals(1, transactionManager.sequenceNumber(tp0)); assertEquals(tp0b2, transactionManager.nextBatchBySequence(tp0)); - assertEquals(2, transactionManager.sequenceNumber(tp1).intValue()); + assertEquals(2, transactionManager.sequenceNumber(tp1)); assertEquals(tp1b2, transactionManager.nextBatchBySequence(tp1)); } @@ -654,7 +654,7 @@ public class TransactionManagerTest { writeIdempotentBatchWithValue(transactionManager, tp1, "1"); ProducerBatch b2 = writeIdempotentBatchWithValue(transactionManager, tp0, "2"); - assertEquals(2, transactionManager.sequenceNumber(tp0).intValue()); + assertEquals(2, transactionManager.sequenceNumber(tp0)); // The producerId might be reset due to a failure on another partition transactionManager.requestEpochBumpForPartition(tp1); @@ -666,7 +666,7 @@ public class TransactionManagerTest { Errors.NONE, 500L, time.milliseconds(), 0L); transactionManager.handleCompletedBatch(b1, b1Response); - assertEquals(2, transactionManager.sequenceNumber(tp0).intValue()); + assertEquals(2, transactionManager.sequenceNumber(tp0)); assertEquals(0, transactionManager.lastAckedSequence(tp0).getAsInt()); assertEquals(b2, transactionManager.nextBatchBySequence(tp0)); assertEquals(epoch, transactionManager.nextBatchBySequence(tp0).producerEpoch()); @@ -676,7 +676,7 @@ public class TransactionManagerTest { transactionManager.handleCompletedBatch(b2, b2Response); transactionManager.maybeUpdateProducerIdAndEpoch(tp0); - assertEquals(0, transactionManager.sequenceNumber(tp0).intValue()); + assertEquals(0, transactionManager.sequenceNumber(tp0)); assertFalse(transactionManager.lastAckedSequence(tp0).isPresent()); assertNull(transactionManager.nextBatchBySequence(tp0)); } @@ -698,23 +698,23 @@ public class TransactionManagerTest { MAX_REQUEST_SIZE, ACKS_ALL, MAX_RETRIES, new SenderMetricsRegistry(metrics), this.time, requestTimeout, 0, transactionManager, apiVersions); - assertEquals(0, transactionManager.sequenceNumber(tp0).intValue()); + assertEquals(0, transactionManager.sequenceNumber(tp0)); Future<RecordMetadata> responseFuture1 = accumulator.append(tp0.topic(), tp0.partition(), time.milliseconds(), "1".getBytes(), "1".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false, time.milliseconds(), TestUtils.singletonCluster()).future; sender.runOnce(); - assertEquals(1, transactionManager.sequenceNumber(tp0).intValue()); + assertEquals(1, transactionManager.sequenceNumber(tp0)); time.sleep(requestTimeout); sender.runOnce(); assertEquals(0, client.inFlightRequestCount()); assertTrue(transactionManager.hasInflightBatches(tp0)); - assertEquals(1, transactionManager.sequenceNumber(tp0).intValue()); + assertEquals(1, transactionManager.sequenceNumber(tp0)); sender.runOnce(); // retry assertEquals(1, client.inFlightRequestCount()); assertTrue(transactionManager.hasInflightBatches(tp0)); - assertEquals(1, transactionManager.sequenceNumber(tp0).intValue()); + assertEquals(1, transactionManager.sequenceNumber(tp0)); time.sleep(5000); // delivery time out sender.runOnce(); @@ -729,7 +729,7 @@ public class TransactionManagerTest { sender.runOnce(); // bump the epoch assertEquals(epoch + 1, transactionManager.producerIdAndEpoch().epoch); - assertEquals(0, transactionManager.sequenceNumber(tp0).intValue()); + assertEquals(0, transactionManager.sequenceNumber(tp0)); Future<RecordMetadata> responseFuture2 = accumulator.append(tp0.topic(), tp0.partition(), time.milliseconds(), "2".getBytes(), "2".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false, time.milliseconds(), @@ -737,7 +737,7 @@ public class TransactionManagerTest { sender.runOnce(); sender.runOnce(); assertEquals(0, transactionManager.firstInFlightSequence(tp0)); - assertEquals(1, transactionManager.sequenceNumber(tp0).intValue()); + assertEquals(1, transactionManager.sequenceNumber(tp0)); time.sleep(5000); // request time out again sender.runOnce(); @@ -770,30 +770,30 @@ public class TransactionManagerTest { @Test public void testSequenceNumberOverflow() { initializeTransactionManager(Optional.empty()); - assertEquals((int) transactionManager.sequenceNumber(tp0), 0); + assertEquals(transactionManager.sequenceNumber(tp0), 0); transactionManager.incrementSequenceNumber(tp0, Integer.MAX_VALUE); - assertEquals((int) transactionManager.sequenceNumber(tp0), Integer.MAX_VALUE); + assertEquals(transactionManager.sequenceNumber(tp0), Integer.MAX_VALUE); transactionManager.incrementSequenceNumber(tp0, 100); - assertEquals((int) transactionManager.sequenceNumber(tp0), 99); + assertEquals(transactionManager.sequenceNumber(tp0), 99); transactionManager.incrementSequenceNumber(tp0, Integer.MAX_VALUE); - assertEquals((int) transactionManager.sequenceNumber(tp0), 98); + assertEquals(transactionManager.sequenceNumber(tp0), 98); } @Test public void testProducerIdReset() { initializeTransactionManager(Optional.empty()); initializeIdempotentProducerId(15L, Short.MAX_VALUE); - assertEquals((int) transactionManager.sequenceNumber(tp0), 0); - assertEquals((int) transactionManager.sequenceNumber(tp1), 0); + assertEquals(transactionManager.sequenceNumber(tp0), 0); + assertEquals(transactionManager.sequenceNumber(tp1), 0); transactionManager.incrementSequenceNumber(tp0, 3); - assertEquals((int) transactionManager.sequenceNumber(tp0), 3); + assertEquals(transactionManager.sequenceNumber(tp0), 3); transactionManager.incrementSequenceNumber(tp1, 3); - assertEquals((int) transactionManager.sequenceNumber(tp1), 3); + assertEquals(transactionManager.sequenceNumber(tp1), 3); transactionManager.requestEpochBumpForPartition(tp0); transactionManager.bumpIdempotentEpochAndResetIdIfNeeded(); - assertEquals((int) transactionManager.sequenceNumber(tp0), 0); - assertEquals((int) transactionManager.sequenceNumber(tp1), 3); + assertEquals(transactionManager.sequenceNumber(tp0), 0); + assertEquals(transactionManager.sequenceNumber(tp1), 3); } @Test @@ -2835,7 +2835,7 @@ public class TransactionManagerTest { ProducerBatch b1 = writeIdempotentBatchWithValue(transactionManager, tp0, "1"); ProducerBatch b2 = writeIdempotentBatchWithValue(transactionManager, tp0, "2"); ProducerBatch b3 = writeIdempotentBatchWithValue(transactionManager, tp0, "3"); - assertEquals(3, transactionManager.sequenceNumber(tp0).intValue()); + assertEquals(3, transactionManager.sequenceNumber(tp0)); // The first batch fails with a timeout transactionManager.markSequenceUnresolved(b1); @@ -2860,7 +2860,7 @@ public class TransactionManagerTest { transactionManager.maybeResolveSequences(); assertEquals(producerIdAndEpoch, transactionManager.producerIdAndEpoch()); assertFalse(transactionManager.hasUnresolvedSequences()); - assertEquals(3, transactionManager.sequenceNumber(tp0).intValue()); + assertEquals(3, transactionManager.sequenceNumber(tp0)); } @Test @@ -2893,7 +2893,7 @@ public class TransactionManagerTest { // Run sender loop to trigger epoch bump runUntil(() -> transactionManager.producerIdAndEpoch().epoch == 2); assertFalse(transactionManager.hasUnresolvedSequences()); - assertEquals(0, transactionManager.sequenceNumber(tp0).intValue()); + assertEquals(0, transactionManager.sequenceNumber(tp0)); } @Test @@ -2966,7 +2966,7 @@ public class TransactionManagerTest { prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, producerId); runUntil(() -> transactionManager.isPartitionAdded(tp0)); // Send AddPartitionsRequest - assertEquals(2, transactionManager.sequenceNumber(tp0).intValue()); + assertEquals(2, transactionManager.sequenceNumber(tp0)); } @Test @@ -3026,8 +3026,8 @@ public class TransactionManagerTest { prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, producerId); runUntil(() -> transactionManager.isPartitionAdded(tp0)); - assertEquals(0, transactionManager.sequenceNumber(tp0).intValue()); - assertEquals(1, transactionManager.sequenceNumber(tp1).intValue()); + assertEquals(0, transactionManager.sequenceNumber(tp0)); + assertEquals(1, transactionManager.sequenceNumber(tp1)); } @Test @@ -3073,7 +3073,7 @@ public class TransactionManagerTest { prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, bumpedEpoch, producerId); runUntil(() -> transactionManager.isPartitionAdded(tp0)); - assertEquals(0, transactionManager.sequenceNumber(tp0).intValue()); + assertEquals(0, transactionManager.sequenceNumber(tp0)); } @Test @@ -3120,7 +3120,7 @@ public class TransactionManagerTest { prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, bumpedEpoch, producerId); runUntil(() -> transactionManager.isPartitionAdded(tp0)); - assertEquals(0, transactionManager.sequenceNumber(tp0).intValue()); + assertEquals(0, transactionManager.sequenceNumber(tp0)); } @Test @@ -3179,7 +3179,7 @@ public class TransactionManagerTest { prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, bumpedEpoch, producerId); runUntil(() -> transactionManager.isPartitionAdded(tp0)); - assertEquals(0, transactionManager.sequenceNumber(tp0).intValue()); + assertEquals(0, transactionManager.sequenceNumber(tp0)); } @Test @@ -3249,8 +3249,8 @@ public class TransactionManagerTest { writeIdempotentBatchWithValue(transactionManager, tp0, "3"); ProducerBatch tp1b1 = writeIdempotentBatchWithValue(transactionManager, tp1, "4"); ProducerBatch tp1b2 = writeIdempotentBatchWithValue(transactionManager, tp1, "5"); - assertEquals(3, transactionManager.sequenceNumber(tp0).intValue()); - assertEquals(2, transactionManager.sequenceNumber(tp1).intValue()); + assertEquals(3, transactionManager.sequenceNumber(tp0)); + assertEquals(2, transactionManager.sequenceNumber(tp1)); // First batch of each partition succeeds long b1AppendTime = time.milliseconds(); @@ -3311,7 +3311,7 @@ public class TransactionManagerTest { transactionManager.maybeUpdateProducerIdAndEpoch(tp1); assertFalse(transactionManager.hasInflightBatches(tp1)); - assertEquals(0, transactionManager.sequenceNumber(tp1).intValue()); + assertEquals(0, transactionManager.sequenceNumber(tp1)); // The last batch should now be drained and sent runUntil(() -> transactionManager.hasInflightBatches(tp1)); @@ -3326,7 +3326,7 @@ public class TransactionManagerTest { transactionManager.maybeUpdateProducerIdAndEpoch(tp1); assertFalse(transactionManager.hasInflightBatches(tp1)); - assertEquals(1, transactionManager.sequenceNumber(tp1).intValue()); + assertEquals(1, transactionManager.sequenceNumber(tp1)); } @Test @@ -3373,8 +3373,8 @@ public class TransactionManagerTest { writeIdempotentBatchWithValue(transactionManager, tp0, "3"); ProducerBatch tp1b1 = writeIdempotentBatchWithValue(transactionManager, tp1, "4"); ProducerBatch tp1b2 = writeIdempotentBatchWithValue(transactionManager, tp1, "5"); - assertEquals(3, transactionManager.sequenceNumber(tp0).intValue()); - assertEquals(2, transactionManager.sequenceNumber(tp1).intValue()); + assertEquals(3, transactionManager.sequenceNumber(tp0)); + assertEquals(2, transactionManager.sequenceNumber(tp1)); // First batch of each partition succeeds long b1AppendTime = time.milliseconds(); @@ -3435,7 +3435,7 @@ public class TransactionManagerTest { transactionManager.maybeUpdateProducerIdAndEpoch(tp1); assertFalse(transactionManager.hasInflightBatches(tp1)); - assertEquals(0, transactionManager.sequenceNumber(tp1).intValue()); + assertEquals(0, transactionManager.sequenceNumber(tp1)); // The last batch should now be drained and sent runUntil(() -> transactionManager.hasInflightBatches(tp1)); @@ -3449,7 +3449,7 @@ public class TransactionManagerTest { transactionManager.handleCompletedBatch(tp1b3, t1b3Response); assertFalse(transactionManager.hasInflightBatches(tp1)); - assertEquals(1, transactionManager.sequenceNumber(tp1).intValue()); + assertEquals(1, transactionManager.sequenceNumber(tp1)); } @Test