This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new fa36a7f2d66 MINOR: Push down logic from TransactionManager to 
TxnPartitionEntry (#14591)
fa36a7f2d66 is described below

commit fa36a7f2d664bac8524f830663023b83f5ee090b
Author: Ismael Juma <ism...@juma.me.uk>
AuthorDate: Sat Oct 28 07:27:20 2023 -0700

    MINOR: Push down logic from TransactionManager to TxnPartitionEntry (#14591)
    
    And encapsulate TxnPartitionEntry state.
    
    This makes it easier to understand the behavior and the paths through
    which the state is updated.
    
    Reviewers: Justine Olshan <jols...@confluent.io>
---
 .../clients/producer/internals/ProducerBatch.java  |   4 +-
 .../producer/internals/RecordAccumulator.java      |   2 +-
 .../producer/internals/TransactionManager.java     | 105 ++++--------------
 .../producer/internals/TxnPartitionEntry.java      | 115 ++++++++++++++++++--
 .../producer/internals/TxnPartitionMap.java        |  95 +++++++++++-----
 .../kafka/common/utils/ProducerIdAndEpoch.java     |   2 +-
 .../clients/producer/internals/SenderTest.java     | 120 ++++++++++-----------
 .../producer/internals/TransactionManagerTest.java |  86 +++++++--------
 8 files changed, 305 insertions(+), 224 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
index 408b8316eb8..61432b53ab0 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
@@ -473,11 +473,11 @@ public final class ProducerBatch {
         recordsBuilder.setProducerState(producerIdAndEpoch.producerId, 
producerIdAndEpoch.epoch, baseSequence, isTransactional);
     }
 
-    public void resetProducerState(ProducerIdAndEpoch producerIdAndEpoch, int 
baseSequence, boolean isTransactional) {
+    public void resetProducerState(ProducerIdAndEpoch producerIdAndEpoch, int 
baseSequence) {
         log.info("Resetting sequence number of batch with current sequence {} 
for partition {} to {}",
                 this.baseSequence(), this.topicPartition, baseSequence);
         reopened = true;
-        
recordsBuilder.reopenAndRewriteProducerState(producerIdAndEpoch.producerId, 
producerIdAndEpoch.epoch, baseSequence, isTransactional);
+        
recordsBuilder.reopenAndRewriteProducerState(producerIdAndEpoch.producerId, 
producerIdAndEpoch.epoch, baseSequence, isTransactional());
     }
 
     /**
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index 5e1795cb2a1..45332dff391 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -818,7 +818,7 @@ public class RecordAccumulator {
     }
 
     private boolean shouldStopDrainBatchesForPartition(ProducerBatch first, 
TopicPartition tp) {
-        ProducerIdAndEpoch producerIdAndEpoch = null;
+        ProducerIdAndEpoch producerIdAndEpoch;
         if (transactionManager != null) {
             if (!transactionManager.isSendToPartitionAllowed(tp))
                 return true;
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index 3eaaca04aa6..1d343484388 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -48,7 +48,6 @@ import 
org.apache.kafka.common.message.FindCoordinatorResponseData.Coordinator;
 import org.apache.kafka.common.message.InitProducerIdRequestData;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.record.DefaultRecordBatch;
 import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.AbstractResponse;
@@ -84,7 +83,6 @@ import java.util.OptionalInt;
 import java.util.OptionalLong;
 import java.util.PriorityQueue;
 import java.util.Set;
-import java.util.SortedSet;
 import java.util.function.Supplier;
 
 /**
@@ -92,7 +90,6 @@ import java.util.function.Supplier;
  */
 public class TransactionManager {
     private static final int NO_INFLIGHT_REQUEST_CORRELATION_ID = -1;
-    static final int NO_LAST_ACKED_SEQUENCE_NUMBER = -1;
 
     private final Logger log;
     private final String transactionalId;
@@ -271,7 +268,7 @@ public class TransactionManager {
         this.partitionsWithUnresolvedSequences = new HashMap<>();
         this.partitionsToRewriteSequences = new HashSet<>();
         this.retryBackoffMs = retryBackoffMs;
-        this.txnPartitionMap = new TxnPartitionMap();
+        this.txnPartitionMap = new TxnPartitionMap(logContext);
         this.apiVersions = apiVersions;
     }
 
@@ -521,7 +518,7 @@ public class TransactionManager {
     }
 
     private void resetSequenceForPartition(TopicPartition topicPartition) {
-        txnPartitionMap.topicPartitions.remove(topicPartition);
+        txnPartitionMap.remove(topicPartition);
         this.partitionsWithUnresolvedSequences.remove(topicPartition);
     }
 
@@ -572,28 +569,25 @@ public class TransactionManager {
     /**
      * Returns the next sequence number to be written to the given 
TopicPartition.
      */
-    synchronized Integer sequenceNumber(TopicPartition topicPartition) {
-        return txnPartitionMap.getOrCreate(topicPartition).nextSequence;
+    synchronized int sequenceNumber(TopicPartition topicPartition) {
+        return txnPartitionMap.getOrCreate(topicPartition).nextSequence();
     }
 
     /**
      * Returns the current producer id/epoch of the given TopicPartition.
      */
     synchronized ProducerIdAndEpoch producerIdAndEpoch(TopicPartition 
topicPartition) {
-        return txnPartitionMap.getOrCreate(topicPartition).producerIdAndEpoch;
+        return 
txnPartitionMap.getOrCreate(topicPartition).producerIdAndEpoch();
     }
 
     synchronized void incrementSequenceNumber(TopicPartition topicPartition, 
int increment) {
-        Integer currentSequence = sequenceNumber(topicPartition);
-
-        currentSequence = 
DefaultRecordBatch.incrementSequence(currentSequence, increment);
-        txnPartitionMap.get(topicPartition).nextSequence = currentSequence;
+        txnPartitionMap.get(topicPartition).incrementSequence(increment);
     }
 
     synchronized void addInFlightBatch(ProducerBatch batch) {
         if (!batch.hasSequence())
             throw new IllegalStateException("Can't track batch for partition " 
+ batch.topicPartition + " when sequence is not set.");
-        
txnPartitionMap.get(batch.topicPartition).inflightBatchesBySequence.add(batch);
+        txnPartitionMap.get(batch.topicPartition).addInflightBatch(batch);
     }
 
     /**
@@ -606,33 +600,21 @@ public class TransactionManager {
     synchronized int firstInFlightSequence(TopicPartition topicPartition) {
         if (!hasInflightBatches(topicPartition))
             return RecordBatch.NO_SEQUENCE;
-
-        SortedSet<ProducerBatch> inflightBatches = 
txnPartitionMap.get(topicPartition).inflightBatchesBySequence;
-        if (inflightBatches.isEmpty())
-            return RecordBatch.NO_SEQUENCE;
-        else
-            return inflightBatches.first().baseSequence();
+        ProducerBatch batch = nextBatchBySequence(topicPartition);
+        return batch == null ? RecordBatch.NO_SEQUENCE : batch.baseSequence();
     }
 
     synchronized ProducerBatch nextBatchBySequence(TopicPartition 
topicPartition) {
-        SortedSet<ProducerBatch> queue = 
txnPartitionMap.get(topicPartition).inflightBatchesBySequence;
-        return queue.isEmpty() ? null : queue.first();
+        return txnPartitionMap.nextBatchBySequence(topicPartition);
     }
 
     synchronized void removeInFlightBatch(ProducerBatch batch) {
-        if (hasInflightBatches(batch.topicPartition)) {
-            
txnPartitionMap.get(batch.topicPartition).inflightBatchesBySequence.remove(batch);
-        }
+        if (hasInflightBatches(batch.topicPartition))
+            txnPartitionMap.removeInFlightBatch(batch);
     }
 
     private int maybeUpdateLastAckedSequence(TopicPartition topicPartition, 
int sequence) {
-        int lastAckedSequence = 
lastAckedSequence(topicPartition).orElse(NO_LAST_ACKED_SEQUENCE_NUMBER);
-        if (sequence > lastAckedSequence) {
-            txnPartitionMap.get(topicPartition).lastAckedSequence = sequence;
-            return sequence;
-        }
-
-        return lastAckedSequence;
+        return txnPartitionMap.maybeUpdateLastAckedSequence(topicPartition, 
sequence);
     }
 
     synchronized OptionalInt lastAckedSequence(TopicPartition topicPartition) {
@@ -647,18 +629,7 @@ public class TransactionManager {
         if (response.baseOffset == ProduceResponse.INVALID_OFFSET)
             return;
         long lastOffset = response.baseOffset + batch.recordCount - 1;
-        OptionalLong lastAckedOffset = lastAckedOffset(batch.topicPartition);
-        // It might happen that the TransactionManager has been reset while a 
request was reenqueued and got a valid
-        // response for this. This can happen only if the producer is only 
idempotent (not transactional) and in
-        // this case there will be no tracked bookkeeper entry about it, so we 
have to insert one.
-        if (!lastAckedOffset.isPresent() && !isTransactional()) {
-            txnPartitionMap.getOrCreate(batch.topicPartition);
-        }
-        if (lastOffset > 
lastAckedOffset.orElse(ProduceResponse.INVALID_OFFSET)) {
-            txnPartitionMap.get(batch.topicPartition).lastAckedOffset = 
lastOffset;
-        } else {
-            log.trace("Partition {} keeps lastOffset at {}", 
batch.topicPartition, lastOffset);
-        }
+        txnPartitionMap.updateLastAckedOffset(batch.topicPartition, 
isTransactional(), lastOffset);
     }
 
     public synchronized void handleCompletedBatch(ProducerBatch batch, 
ProduceResponse.PartitionResponse response) {
@@ -724,50 +695,18 @@ public class TransactionManager {
                 if (!isTransactional()) {
                     requestEpochBumpForPartition(batch.topicPartition);
                 } else {
-                    adjustSequencesDueToFailedBatch(batch);
+                    txnPartitionMap.adjustSequencesDueToFailedBatch(batch);
                 }
             }
         }
     }
 
-    // If a batch is failed fatally, the sequence numbers for future batches 
bound for the partition must be adjusted
-    // so that they don't fail with the OutOfOrderSequenceException.
-    //
-    // This method must only be called when we know that the batch is question 
has been unequivocally failed by the broker,
-    // ie. it has received a confirmed fatal status code like 'Message Too 
Large' or something similar.
-    private void adjustSequencesDueToFailedBatch(ProducerBatch batch) {
-        if (!txnPartitionMap.contains(batch.topicPartition))
-            // Sequence numbers are not being tracked for this partition. This 
could happen if the producer id was just
-            // reset due to a previous OutOfOrderSequenceException.
-            return;
-        log.debug("producerId: {}, send to partition {} failed fatally. 
Reducing future sequence numbers by {}",
-                batch.producerId(), batch.topicPartition, batch.recordCount);
-        int currentSequence = sequenceNumber(batch.topicPartition);
-        currentSequence -= batch.recordCount;
-        if (currentSequence < 0)
-            throw new IllegalStateException("Sequence number for partition " + 
batch.topicPartition + " is going to become negative: " + currentSequence);
-
-        setNextSequence(batch.topicPartition, currentSequence);
-
-        
txnPartitionMap.get(batch.topicPartition).resetSequenceNumbers(inFlightBatch -> 
{
-            if (inFlightBatch.baseSequence() < batch.baseSequence())
-                return;
-
-            int newSequence = inFlightBatch.baseSequence() - batch.recordCount;
-            if (newSequence < 0)
-                throw new IllegalStateException("Sequence number for batch 
with sequence " + inFlightBatch.baseSequence()
-                        + " for partition " + batch.topicPartition + " is 
going to become negative: " + newSequence);
-
-            inFlightBatch.resetProducerState(new 
ProducerIdAndEpoch(inFlightBatch.producerId(), inFlightBatch.producerEpoch()), 
newSequence, inFlightBatch.isTransactional());
-        });
-    }
-
     synchronized boolean hasInflightBatches(TopicPartition topicPartition) {
-        return 
!txnPartitionMap.getOrCreate(topicPartition).inflightBatchesBySequence.isEmpty();
+        return 
txnPartitionMap.getOrCreate(topicPartition).hasInflightBatches();
     }
 
     synchronized boolean hasStaleProducerIdAndEpoch(TopicPartition 
topicPartition) {
-        return 
!producerIdAndEpoch.equals(txnPartitionMap.getOrCreate(topicPartition).producerIdAndEpoch);
+        return 
!producerIdAndEpoch.equals(txnPartitionMap.getOrCreate(topicPartition).producerIdAndEpoch());
     }
 
     synchronized boolean hasUnresolvedSequences() {
@@ -817,7 +756,7 @@ public class TransactionManager {
                         // For the idempotent producer, bump the epoch
                         log.info("No inflight batches remaining for {}, last 
ack'd sequence for partition is {}, next sequence is {}. " +
                                         "Going to bump epoch and reset 
sequence numbers.", topicPartition,
-                                
lastAckedSequence(topicPartition).orElse(NO_LAST_ACKED_SEQUENCE_NUMBER), 
sequenceNumber(topicPartition));
+                                
lastAckedSequence(topicPartition).orElse(TxnPartitionEntry.NO_LAST_ACKED_SEQUENCE_NUMBER),
 sequenceNumber(topicPartition));
                         requestEpochBumpForPartition(topicPartition);
                     }
 
@@ -828,11 +767,7 @@ public class TransactionManager {
     }
 
     private boolean isNextSequence(TopicPartition topicPartition, int 
sequence) {
-        return sequence - 
lastAckedSequence(topicPartition).orElse(NO_LAST_ACKED_SEQUENCE_NUMBER) == 1;
-    }
-
-    private void setNextSequence(TopicPartition topicPartition, int sequence) {
-        txnPartitionMap.get(topicPartition).nextSequence = sequence;
+        return sequence - 
lastAckedSequence(topicPartition).orElse(TxnPartitionEntry.NO_LAST_ACKED_SEQUENCE_NUMBER)
 == 1;
     }
 
     private boolean isNextSequenceForUnresolvedPartition(TopicPartition 
topicPartition, int sequence) {
@@ -983,7 +918,7 @@ public class TransactionManager {
                 // come back from the broker, they would also come with an 
UNKNOWN_PRODUCER_ID error. In this case, we should not
                 // reset the sequence numbers to the beginning.
                 return true;
-            } else if 
(lastAckedOffset(batch.topicPartition).orElse(NO_LAST_ACKED_SEQUENCE_NUMBER) < 
response.logStartOffset) {
+            } else if 
(lastAckedOffset(batch.topicPartition).orElse(TxnPartitionEntry.NO_LAST_ACKED_SEQUENCE_NUMBER)
 < response.logStartOffset) {
                 // The head of the log has been removed, probably due to the 
retention time elapsing. In this case,
                 // we expect to lose the producer state. For the transactional 
producer, reset the sequences of all
                 // inflight batches to be from the beginning and retry them, 
so that the transaction does not need to
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TxnPartitionEntry.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TxnPartitionEntry.java
index be79d8ee0f1..35684ea54cc 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TxnPartitionEntry.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TxnPartitionEntry.java
@@ -18,33 +18,41 @@
 package org.apache.kafka.clients.producer.internals;
 
 import java.util.Comparator;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
 import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.function.Consumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.record.DefaultRecordBatch;
 import org.apache.kafka.common.requests.ProduceResponse;
+import org.apache.kafka.common.utils.PrimitiveRef;
 import org.apache.kafka.common.utils.ProducerIdAndEpoch;
 
 class TxnPartitionEntry {
+    static final int NO_LAST_ACKED_SEQUENCE_NUMBER = -1;
+
+    private final TopicPartition topicPartition;
 
     // The producer id/epoch being used for a given partition.
-    ProducerIdAndEpoch producerIdAndEpoch;
+    private ProducerIdAndEpoch producerIdAndEpoch;
 
     // The base sequence of the next batch bound for a given partition.
-    int nextSequence;
+    private int nextSequence;
 
     // The sequence number of the last record of the last ack'd batch from the 
given partition. When there are no
     // in flight requests for a partition, the 
lastAckedSequence(topicPartition) == nextSequence(topicPartition) - 1.
-    int lastAckedSequence;
+    private int lastAckedSequence;
 
     // Keep track of the in flight batches bound for a partition, ordered by 
sequence. This helps us to ensure that
     // we continue to order batches by the sequence numbers even when the 
responses come back out of order during
     // leader failover. We add a batch to the queue when it is drained, and 
remove it when the batch completes
     // (either successfully or through a fatal failure).
-    SortedSet<ProducerBatch> inflightBatchesBySequence;
+    private SortedSet<ProducerBatch> inflightBatchesBySequence;
 
     // We keep track of the last acknowledged offset on a per partition basis 
in order to disambiguate UnknownProducer
     // responses which are due to the retention period elapsing, and those 
which are due to actual lost data.
-    long lastAckedOffset;
+    private long lastAckedOffset;
 
     // `inflightBatchesBySequence` should only have batches with the same 
producer id and producer
     // epoch, but there is an edge case where we may remove the wrong batch if 
the comparator
@@ -55,15 +63,94 @@ class TxnPartitionEntry {
             .thenComparingInt(ProducerBatch::producerEpoch)
             .thenComparingInt(ProducerBatch::baseSequence);
 
-    TxnPartitionEntry() {
+    TxnPartitionEntry(TopicPartition topicPartition) {
+        this.topicPartition = topicPartition;
         this.producerIdAndEpoch = ProducerIdAndEpoch.NONE;
         this.nextSequence = 0;
-        this.lastAckedSequence = 
TransactionManager.NO_LAST_ACKED_SEQUENCE_NUMBER;
+        this.lastAckedSequence = NO_LAST_ACKED_SEQUENCE_NUMBER;
         this.lastAckedOffset = ProduceResponse.INVALID_OFFSET;
         this.inflightBatchesBySequence = new 
TreeSet<>(PRODUCER_BATCH_COMPARATOR);
     }
 
-    void resetSequenceNumbers(Consumer<ProducerBatch> resetSequence) {
+    ProducerIdAndEpoch producerIdAndEpoch() {
+        return producerIdAndEpoch;
+    }
+
+    int nextSequence() {
+        return nextSequence;
+    }
+
+    OptionalLong lastAckedOffset() {
+        if (lastAckedOffset != ProduceResponse.INVALID_OFFSET)
+            return OptionalLong.of(lastAckedOffset);
+        return OptionalLong.empty();
+    }
+
+    OptionalInt lastAckedSequence() {
+        if (lastAckedSequence != 
TxnPartitionEntry.NO_LAST_ACKED_SEQUENCE_NUMBER)
+            return OptionalInt.of(lastAckedSequence);
+        return OptionalInt.empty();
+    }
+
+    boolean hasInflightBatches() {
+        return !inflightBatchesBySequence.isEmpty();
+    }
+
+    ProducerBatch nextBatchBySequence() {
+        return inflightBatchesBySequence.isEmpty() ? null : 
inflightBatchesBySequence.first();
+    }
+
+    void incrementSequence(int increment) {
+        this.nextSequence = 
DefaultRecordBatch.incrementSequence(this.nextSequence, increment);
+    }
+
+    void addInflightBatch(ProducerBatch batch) {
+        inflightBatchesBySequence.add(batch);
+    }
+
+    void setLastAckedOffset(long lastAckedOffset) {
+        this.lastAckedOffset = lastAckedOffset;
+    }
+
+    void startSequencesAtBeginning(ProducerIdAndEpoch newProducerIdAndEpoch) {
+        final PrimitiveRef.IntRef sequence = PrimitiveRef.ofInt(0);
+        resetSequenceNumbers(inFlightBatch -> {
+            inFlightBatch.resetProducerState(newProducerIdAndEpoch, 
sequence.value);
+            sequence.value += inFlightBatch.recordCount;
+        });
+        producerIdAndEpoch = newProducerIdAndEpoch;
+        nextSequence = sequence.value;
+        lastAckedSequence = NO_LAST_ACKED_SEQUENCE_NUMBER;
+    }
+
+    int maybeUpdateLastAckedSequence(int sequence) {
+        if (sequence > lastAckedSequence) {
+            lastAckedSequence = sequence;
+            return sequence;
+        }
+        return lastAckedSequence;
+    }
+
+    void removeInFlightBatch(ProducerBatch batch) {
+        inflightBatchesBySequence.remove(batch);
+    }
+
+    void adjustSequencesDueToFailedBatch(long baseSequence, int recordCount) {
+        decrementSequence(recordCount);
+        resetSequenceNumbers(inFlightBatch -> {
+            if (inFlightBatch.baseSequence() < baseSequence)
+                return;
+
+            int newSequence = inFlightBatch.baseSequence() - recordCount;
+            if (newSequence < 0)
+                throw new IllegalStateException("Sequence number for batch 
with sequence " + inFlightBatch.baseSequence()
+                        + " for partition " + topicPartition + " is going to 
become negative: " + newSequence);
+
+            inFlightBatch.resetProducerState(new 
ProducerIdAndEpoch(inFlightBatch.producerId(), inFlightBatch.producerEpoch()), 
newSequence);
+        });
+    }
+
+    private void resetSequenceNumbers(Consumer<ProducerBatch> resetSequence) {
         TreeSet<ProducerBatch> newInflights = new 
TreeSet<>(PRODUCER_BATCH_COMPARATOR);
         for (ProducerBatch inflightBatch : inflightBatchesBySequence) {
             resetSequence.accept(inflightBatch);
@@ -71,4 +158,16 @@ class TxnPartitionEntry {
         }
         inflightBatchesBySequence = newInflights;
     }
+
+    private boolean decrementSequence(int decrement) {
+        int updatedSequence = nextSequence;
+        updatedSequence -= decrement;
+        if (updatedSequence < 0) {
+            throw new IllegalStateException(
+                    "Sequence number for partition " + topicPartition + " is 
going to become negative: "
+                            + updatedSequence);
+        }
+        this.nextSequence = updatedSequence;
+        return true;
+    }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TxnPartitionMap.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TxnPartitionMap.java
index 95553119c5b..ce8aa4a824f 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TxnPartitionMap.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TxnPartitionMap.java
@@ -21,26 +21,34 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.OptionalInt;
 import java.util.OptionalLong;
+
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.requests.ProduceResponse;
-import org.apache.kafka.common.utils.PrimitiveRef;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.ProducerIdAndEpoch;
+import org.slf4j.Logger;
 
 class TxnPartitionMap {
 
-    final Map<TopicPartition, TxnPartitionEntry> topicPartitions = new 
HashMap<>();
+    private final Logger log;
+
+    private final Map<TopicPartition, TxnPartitionEntry> topicPartitions = new 
HashMap<>();
+
+    TxnPartitionMap(LogContext logContext) {
+        this.log = logContext.logger(TxnPartitionMap.class);
+    }
 
     TxnPartitionEntry get(TopicPartition topicPartition) {
         TxnPartitionEntry ent = topicPartitions.get(topicPartition);
         if (ent == null) {
-            throw new IllegalStateException("Trying to get the sequence number 
for " + topicPartition +
-                ", but the sequence number was never set for this partition.");
+            throw new IllegalStateException("Trying to get txnPartitionEntry 
for " + topicPartition +
+                ", but it was never set for this partition.");
         }
         return ent;
     }
 
     TxnPartitionEntry getOrCreate(TopicPartition topicPartition) {
-        return topicPartitions.computeIfAbsent(topicPartition, tp -> new 
TxnPartitionEntry());
+        return topicPartitions.computeIfAbsent(topicPartition, tp -> new 
TxnPartitionEntry(tp));
     }
 
     boolean contains(TopicPartition topicPartition) {
@@ -53,31 +61,70 @@ class TxnPartitionMap {
 
     OptionalLong lastAckedOffset(TopicPartition topicPartition) {
         TxnPartitionEntry entry = topicPartitions.get(topicPartition);
-        if (entry != null && entry.lastAckedOffset != 
ProduceResponse.INVALID_OFFSET) {
-            return OptionalLong.of(entry.lastAckedOffset);
-        } else {
-            return OptionalLong.empty();
-        }
+        if (entry != null)
+            return entry.lastAckedOffset();
+        return OptionalLong.empty();
     }
 
     OptionalInt lastAckedSequence(TopicPartition topicPartition) {
         TxnPartitionEntry entry = topicPartitions.get(topicPartition);
-        if (entry != null && entry.lastAckedSequence != 
TransactionManager.NO_LAST_ACKED_SEQUENCE_NUMBER) {
-            return OptionalInt.of(entry.lastAckedSequence);
-        } else {
-            return OptionalInt.empty();
-        }
+        if (entry != null)
+            return entry.lastAckedSequence();
+        return OptionalInt.empty();
     }
 
     void startSequencesAtBeginning(TopicPartition topicPartition, 
ProducerIdAndEpoch newProducerIdAndEpoch) {
-        final PrimitiveRef.IntRef sequence = PrimitiveRef.ofInt(0);
-        TxnPartitionEntry topicPartitionEntry = get(topicPartition);
-        topicPartitionEntry.resetSequenceNumbers(inFlightBatch -> {
-            inFlightBatch.resetProducerState(newProducerIdAndEpoch, 
sequence.value, inFlightBatch.isTransactional());
-            sequence.value += inFlightBatch.recordCount;
-        });
-        topicPartitionEntry.producerIdAndEpoch = newProducerIdAndEpoch;
-        topicPartitionEntry.nextSequence = sequence.value;
-        topicPartitionEntry.lastAckedSequence = 
TransactionManager.NO_LAST_ACKED_SEQUENCE_NUMBER;
+        TxnPartitionEntry entry = get(topicPartition);
+        if (entry != null)
+            entry.startSequencesAtBeginning(newProducerIdAndEpoch);
+    }
+
+    void remove(TopicPartition topicPartition) {
+        topicPartitions.remove(topicPartition);
+    }
+
+
+    void updateLastAckedOffset(TopicPartition topicPartition, boolean 
isTransactional, long lastOffset) {
+        OptionalLong lastAckedOffset = lastAckedOffset(topicPartition);
+        // It might happen that the TransactionManager has been reset while a 
request was reenqueued and got a valid
+        // response for this. This can happen only if the producer is only 
idempotent (not transactional) and in
+        // this case there will be no tracked bookkeeper entry about it, so we 
have to insert one.
+        if (!lastAckedOffset.isPresent() && !isTransactional)
+            getOrCreate(topicPartition);
+        if (lastOffset > 
lastAckedOffset.orElse(ProduceResponse.INVALID_OFFSET))
+            get(topicPartition).setLastAckedOffset(lastOffset);
+        else
+            log.trace("Partition {} keeps lastOffset at {}", topicPartition, 
lastOffset);
+    }
+
+    // If a batch is failed fatally, the sequence numbers for future batches 
bound for the partition must be adjusted
+    // so that they don't fail with the OutOfOrderSequenceException.
+    //
+    // This method must only be called when we know that the batch is question 
has been unequivocally failed by the broker,
+    // ie. it has received a confirmed fatal status code like 'Message Too 
Large' or something similar.
+    void adjustSequencesDueToFailedBatch(ProducerBatch batch) {
+        if (!contains(batch.topicPartition))
+            // Sequence numbers are not being tracked for this partition. This 
could happen if the producer id was just
+            // reset due to a previous OutOfOrderSequenceException.
+            return;
+        log.debug("producerId: {}, send to partition {} failed fatally. 
Reducing future sequence numbers by {}",
+                batch.producerId(), batch.topicPartition, batch.recordCount);
+
+        
get(batch.topicPartition).adjustSequencesDueToFailedBatch(batch.baseSequence(), 
batch.recordCount);
+    }
+
+    int maybeUpdateLastAckedSequence(TopicPartition topicPartition, int 
sequence) {
+        TxnPartitionEntry entry = topicPartitions.get(topicPartition);
+        if (entry != null)
+            return entry.maybeUpdateLastAckedSequence(sequence);
+        return TxnPartitionEntry.NO_LAST_ACKED_SEQUENCE_NUMBER;
+    }
+
+    ProducerBatch nextBatchBySequence(TopicPartition topicPartition) {
+        return get(topicPartition).nextBatchBySequence();
+    }
+
+    void removeInFlightBatch(ProducerBatch batch) {
+        get(batch.topicPartition).removeInFlightBatch(batch);
     }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/utils/ProducerIdAndEpoch.java 
b/clients/src/main/java/org/apache/kafka/common/utils/ProducerIdAndEpoch.java
index 5061da1343c..674b42352b7 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/utils/ProducerIdAndEpoch.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/utils/ProducerIdAndEpoch.java
@@ -35,7 +35,7 @@ public class ProducerIdAndEpoch {
 
     @Override
     public String toString() {
-        return "ProducerIdAndEpoch(producerId=" + producerId + ", epoch=" + 
epoch + ")";
+        return "(producerId=" + producerId + ", epoch=" + epoch + ")";
     }
 
     @Override
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index ba625d74408..92ef4d18917 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -778,7 +778,7 @@ public class SenderTest {
         prepareAndReceiveInitProducerId(producerId, Errors.NONE);
         assertTrue(transactionManager.hasProducerId());
 
-        assertEquals(0, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(0, transactionManager.sequenceNumber(tp0));
 
         // Send first ProduceRequest
         Future<RecordMetadata> request1 = appendToAccumulator(tp0);
@@ -786,14 +786,14 @@ public class SenderTest {
         String nodeId = client.requests().peek().destination();
         Node node = new Node(Integer.parseInt(nodeId), "localhost", 0);
         assertEquals(1, client.inFlightRequestCount());
-        assertEquals(1, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(1, transactionManager.sequenceNumber(tp0));
         assertEquals(OptionalInt.empty(), 
transactionManager.lastAckedSequence(tp0));
 
         // Send second ProduceRequest
         Future<RecordMetadata> request2 = appendToAccumulator(tp0);
         sender.runOnce();
         assertEquals(2, client.inFlightRequestCount());
-        assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(2, transactionManager.sequenceNumber(tp0));
         assertEquals(OptionalInt.empty(), 
transactionManager.lastAckedSequence(tp0));
         assertFalse(request1.isDone());
         assertFalse(request2.isDone());
@@ -828,7 +828,7 @@ public class SenderTest {
         prepareAndReceiveInitProducerId(producerId, Errors.NONE);
         assertTrue(transactionManager.hasProducerId());
 
-        assertEquals(0, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(0, transactionManager.sequenceNumber(tp0));
 
         // Send first ProduceRequest
         Future<RecordMetadata> request1 = appendToAccumulator(tp0);
@@ -836,7 +836,7 @@ public class SenderTest {
         String nodeId = client.requests().peek().destination();
         Node node = new Node(Integer.parseInt(nodeId), "localhost", 0);
         assertEquals(1, client.inFlightRequestCount());
-        assertEquals(1, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(1, transactionManager.sequenceNumber(tp0));
         assertEquals(OptionalInt.empty(), 
transactionManager.lastAckedSequence(tp0));
 
         // Send second ProduceRequest
@@ -848,7 +848,7 @@ public class SenderTest {
         sender.runOnce();
 
         assertEquals(3, client.inFlightRequestCount());
-        assertEquals(3, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(3, transactionManager.sequenceNumber(tp0));
         assertEquals(OptionalInt.empty(), 
transactionManager.lastAckedSequence(tp0));
         assertFalse(request1.isDone());
         assertFalse(request2.isDone());
@@ -875,7 +875,7 @@ public class SenderTest {
 
         sender.runOnce(); // Do nothing, we are reduced to one in flight 
request during retries.
 
-        assertEquals(3, transactionManager.sequenceNumber(tp0).longValue());  
// the batch for request 4 shouldn't have been drained, and hence the sequence 
should not have been incremented.
+        assertEquals(3, transactionManager.sequenceNumber(tp0));  // the batch 
for request 4 shouldn't have been drained, and hence the sequence should not 
have been incremented.
         assertEquals(1, client.inFlightRequestCount());
 
         assertEquals(OptionalInt.empty(), 
transactionManager.lastAckedSequence(tp0));
@@ -928,7 +928,7 @@ public class SenderTest {
         prepareAndReceiveInitProducerId(producerId, Errors.NONE);
         assertTrue(transactionManager.hasProducerId());
 
-        assertEquals(0, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(0, transactionManager.sequenceNumber(tp0));
 
         // Send first ProduceRequest
         Future<RecordMetadata> request1 = appendToAccumulator(tp0);
@@ -936,14 +936,14 @@ public class SenderTest {
         String nodeId = client.requests().peek().destination();
         Node node = new Node(Integer.parseInt(nodeId), "localhost", 0);
         assertEquals(1, client.inFlightRequestCount());
-        assertEquals(1, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(1, transactionManager.sequenceNumber(tp0));
         assertEquals(OptionalInt.empty(), 
transactionManager.lastAckedSequence(tp0));
 
         // Send second ProduceRequest
         Future<RecordMetadata> request2 = appendToAccumulator(tp0);
         sender.runOnce();
         assertEquals(2, client.inFlightRequestCount());
-        assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(2, transactionManager.sequenceNumber(tp0));
         assertEquals(OptionalInt.empty(), 
transactionManager.lastAckedSequence(tp0));
         assertFalse(request1.isDone());
         assertFalse(request2.isDone());
@@ -987,7 +987,7 @@ public class SenderTest {
         prepareAndReceiveInitProducerId(producerId, Errors.NONE);
         assertTrue(transactionManager.hasProducerId());
 
-        assertEquals(0, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(0, transactionManager.sequenceNumber(tp0));
 
         // Send first ProduceRequest with multiple messages.
         Future<RecordMetadata> request1 = appendToAccumulator(tp0);
@@ -998,7 +998,7 @@ public class SenderTest {
         assertEquals(1, client.inFlightRequestCount());
 
         // make sure the next sequence number accounts for multi-message 
batches.
-        assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(2, transactionManager.sequenceNumber(tp0));
         assertEquals(OptionalInt.empty(), 
transactionManager.lastAckedSequence(tp0));
         sendIdempotentProducerResponse(0, tp0, Errors.NONE, 0);
 
@@ -1008,7 +1008,7 @@ public class SenderTest {
         Future<RecordMetadata> request2 = appendToAccumulator(tp0);
         sender.runOnce();
         assertEquals(1, client.inFlightRequestCount());
-        assertEquals(3, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(3, transactionManager.sequenceNumber(tp0));
         assertEquals(OptionalInt.of(1), 
transactionManager.lastAckedSequence(tp0));
         assertTrue(request1.isDone());
         assertEquals(0, request1.get().offset());
@@ -1023,7 +1023,7 @@ public class SenderTest {
 
         // epoch should be bumped and sequence numbers reset
         assertEquals(1, transactionManager.producerIdAndEpoch().epoch);
-        assertEquals(1, transactionManager.sequenceNumber(tp0).intValue());
+        assertEquals(1, transactionManager.sequenceNumber(tp0));
         assertEquals(0, transactionManager.firstInFlightSequence(tp0));
     }
 
@@ -1249,7 +1249,7 @@ public class SenderTest {
     ) {
         assertEquals(expectedProducerId, 
transactionManager.producerIdAndEpoch(tp).producerId, "Producer Id:");
         assertEquals(expectedProducerEpoch, 
transactionManager.producerIdAndEpoch(tp).epoch, "Producer Epoch:");
-        assertEquals(expectedSequenceValue, 
transactionManager.sequenceNumber(tp).longValue(), "Seq Number:");
+        assertEquals(expectedSequenceValue, 
transactionManager.sequenceNumber(tp), "Seq Number:");
         assertEquals(expectedLastAckedSequence, 
transactionManager.lastAckedSequence(tp), "Last Acked Seq Number:");
     }
 
@@ -1260,7 +1260,7 @@ public class SenderTest {
         setupWithTransactionState(transactionManager);
         prepareAndReceiveInitProducerId(producerId, Errors.NONE);
         assertTrue(transactionManager.hasProducerId());
-        assertEquals(0, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(0, transactionManager.sequenceNumber(tp0));
 
         // Send first ProduceRequest
         Future<RecordMetadata> request1 = appendToAccumulator(tp0);
@@ -1268,14 +1268,14 @@ public class SenderTest {
         String nodeId = client.requests().peek().destination();
         Node node = new Node(Integer.parseInt(nodeId), "localhost", 0);
         assertEquals(1, client.inFlightRequestCount());
-        assertEquals(1, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(1, transactionManager.sequenceNumber(tp0));
         assertEquals(OptionalInt.empty(), 
transactionManager.lastAckedSequence(tp0));
 
         // Send second ProduceRequest
         Future<RecordMetadata> request2 = appendToAccumulator(tp0);
         sender.runOnce();
         assertEquals(2, client.inFlightRequestCount());
-        assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(2, transactionManager.sequenceNumber(tp0));
         assertEquals(OptionalInt.empty(), 
transactionManager.lastAckedSequence(tp0));
         assertFalse(request1.isDone());
         assertFalse(request2.isDone());
@@ -1342,7 +1342,7 @@ public class SenderTest {
         prepareAndReceiveInitProducerId(producerId, Errors.NONE);
         assertTrue(transactionManager.hasProducerId());
 
-        assertEquals(0, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(0, transactionManager.sequenceNumber(tp0));
 
         // Send first ProduceRequest
         Future<RecordMetadata> request1 = appendToAccumulator(tp0);
@@ -1410,7 +1410,7 @@ public class SenderTest {
         prepareAndReceiveInitProducerId(producerId, Errors.NONE);
         assertTrue(transactionManager.hasProducerId());
 
-        assertEquals(0, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(0, transactionManager.sequenceNumber(tp0));
 
         // Send first ProduceRequest
         Future<RecordMetadata> request1 = appendToAccumulator(tp0, 0L, "key", 
"value");
@@ -1432,7 +1432,7 @@ public class SenderTest {
         setupWithTransactionState(transactionManager, false, null);
         prepareAndReceiveInitProducerId(producerId, Errors.NONE);
         assertTrue(transactionManager.hasProducerId());
-        assertEquals(0, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(0, transactionManager.sequenceNumber(tp0));
 
         // Send first ProduceRequest
         Future<RecordMetadata> request1 = appendToAccumulator(tp0);
@@ -1480,7 +1480,7 @@ public class SenderTest {
         assertEquals(1, batches.size());
         assertFalse(batches.peekFirst().hasSequence());
         assertFalse(client.hasInFlightRequests());
-        assertEquals(2L, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(2L, transactionManager.sequenceNumber(tp0));
         assertTrue(transactionManager.hasUnresolvedSequence(tp0));
 
         sender.runOnce();  // clear the unresolved state, send the pending 
request.
@@ -1499,7 +1499,7 @@ public class SenderTest {
         setupWithTransactionState(transactionManager);
         prepareAndReceiveInitProducerId(producerId, Errors.NONE);
         assertTrue(transactionManager.hasProducerId());
-        assertEquals(0, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(0, transactionManager.sequenceNumber(tp0));
 
         // Send first ProduceRequest
         Future<RecordMetadata> request1 = appendToAccumulator(tp0);
@@ -1538,7 +1538,7 @@ public class SenderTest {
 
         sender.runOnce();
         assertEquals((short) 1, transactionManager.producerIdAndEpoch().epoch);
-        assertEquals(1, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(1, transactionManager.sequenceNumber(tp0));
         assertFalse(transactionManager.hasUnresolvedSequence(tp0));
     }
 
@@ -1591,7 +1591,7 @@ public class SenderTest {
         prepareAndReceiveInitProducerId(producerId, Errors.NONE);
         assertTrue(transactionManager.hasProducerId());
 
-        assertEquals(0, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(0, transactionManager.sequenceNumber(tp0));
 
         // Send first ProduceRequest
         Future<RecordMetadata> request1 = appendToAccumulator(tp0, 0L, "key", 
"value");
@@ -1599,7 +1599,7 @@ public class SenderTest {
         sendIdempotentProducerResponse(0, tp0, Errors.NOT_LEADER_OR_FOLLOWER, 
-1);
 
         sender.runOnce();  // receive response
-        assertEquals(1L, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(1L, transactionManager.sequenceNumber(tp0));
 
         Node node = metadata.fetch().nodes().get(0);
         time.sleep(15000L);
@@ -1660,7 +1660,7 @@ public class SenderTest {
         assertEquals(10, successfulResponse.get().offset());
 
         // The epoch and the sequence are updated when the next batch is sent.
-        assertEquals(1, transactionManager.sequenceNumber(tp1).longValue());
+        assertEquals(1, transactionManager.sequenceNumber(tp1));
     }
 
     @Test
@@ -1772,7 +1772,7 @@ public class SenderTest {
         sender.runOnce();
         assertTrue(successfulResponse.isDone());
         // epoch of partition is bumped and sequence is reset when the next 
batch is sent
-        assertEquals(1, transactionManager.sequenceNumber(tp1).intValue());
+        assertEquals(1, transactionManager.sequenceNumber(tp1));
     }
 
     @Test
@@ -1783,7 +1783,7 @@ public class SenderTest {
         prepareAndReceiveInitProducerId(producerId, Errors.NONE);
         assertTrue(transactionManager.hasProducerId());
 
-        assertEquals(0, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(0, transactionManager.sequenceNumber(tp0));
 
         // Send first ProduceRequest
         Future<RecordMetadata> request1 = appendToAccumulator(tp0);
@@ -1791,14 +1791,14 @@ public class SenderTest {
         String nodeId = client.requests().peek().destination();
         Node node = new Node(Integer.parseInt(nodeId), "localhost", 0);
         assertEquals(1, client.inFlightRequestCount());
-        assertEquals(1, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(1, transactionManager.sequenceNumber(tp0));
         assertEquals(OptionalInt.empty(), 
transactionManager.lastAckedSequence(tp0));
 
         // Send second ProduceRequest
         Future<RecordMetadata> request2 = appendToAccumulator(tp0);
         sender.runOnce();
         assertEquals(2, client.inFlightRequestCount());
-        assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(2, transactionManager.sequenceNumber(tp0));
         assertEquals(OptionalInt.empty(), 
transactionManager.lastAckedSequence(tp0));
         assertFalse(request1.isDone());
         assertFalse(request2.isDone());
@@ -1842,14 +1842,14 @@ public class SenderTest {
         client.prepareResponse(buildAddPartitionsToTxnResponseData(0, 
Collections.singletonMap(tp0, Errors.NONE)));
         sender.runOnce(); // Receive AddPartitions response
 
-        assertEquals(0, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(0, transactionManager.sequenceNumber(tp0));
 
         // Send first ProduceRequest
         Future<RecordMetadata> request1 = appendToAccumulator(tp0);
         sender.runOnce();
 
         assertEquals(1, client.inFlightRequestCount());
-        assertEquals(1, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(1, transactionManager.sequenceNumber(tp0));
         assertEquals(OptionalInt.empty(), 
transactionManager.lastAckedSequence(tp0));
 
         sendIdempotentProducerResponse(0, tp0, Errors.NONE, 1000L, 10L);
@@ -1865,7 +1865,7 @@ public class SenderTest {
         appendToAccumulator(tp0);
         Future<RecordMetadata> request2 = appendToAccumulator(tp0);
         sender.runOnce();
-        assertEquals(3, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(3, transactionManager.sequenceNumber(tp0));
         assertEquals(OptionalInt.of(0), 
transactionManager.lastAckedSequence(tp0));
 
         assertFalse(request2.isDone());
@@ -1875,7 +1875,7 @@ public class SenderTest {
 
         // We should have reset the sequence number state of the partition 
because the state was lost on the broker.
         assertEquals(OptionalInt.empty(), 
transactionManager.lastAckedSequence(tp0));
-        assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(2, transactionManager.sequenceNumber(tp0));
         assertFalse(request2.isDone());
         assertFalse(client.hasInFlightRequests());
 
@@ -1885,7 +1885,7 @@ public class SenderTest {
         sendIdempotentProducerResponse(0, tp0, Errors.NONE, 1011L, 1010L);
         sender.runOnce(); // receive response 1
         assertEquals(OptionalInt.of(1), 
transactionManager.lastAckedSequence(tp0));
-        assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(2, transactionManager.sequenceNumber(tp0));
         assertFalse(client.hasInFlightRequests());
         assertTrue(request2.isDone());
         assertEquals(1012L, request2.get().offset());
@@ -1900,14 +1900,14 @@ public class SenderTest {
         prepareAndReceiveInitProducerId(producerId, Errors.NONE);
         assertTrue(transactionManager.hasProducerId());
 
-        assertEquals(0, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(0, transactionManager.sequenceNumber(tp0));
 
         // Send first ProduceRequest
         Future<RecordMetadata> request1 = appendToAccumulator(tp0);
         sender.runOnce();
 
         assertEquals(1, client.inFlightRequestCount());
-        assertEquals(1, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(1, transactionManager.sequenceNumber(tp0));
         assertEquals(OptionalInt.empty(), 
transactionManager.lastAckedSequence(tp0));
 
         sendIdempotentProducerResponse(0, tp0, Errors.NONE, 1000L, 10L);
@@ -1923,7 +1923,7 @@ public class SenderTest {
         appendToAccumulator(tp0);
         Future<RecordMetadata> request2 = appendToAccumulator(tp0);
         sender.runOnce();
-        assertEquals(3, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(3, transactionManager.sequenceNumber(tp0));
         assertEquals(OptionalInt.of(0), 
transactionManager.lastAckedSequence(tp0));
 
         assertFalse(request2.isDone());
@@ -1934,7 +1934,7 @@ public class SenderTest {
 
         // We should have reset the sequence number state of the partition 
because the state was lost on the broker.
         assertEquals(OptionalInt.empty(), 
transactionManager.lastAckedSequence(tp0));
-        assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(2, transactionManager.sequenceNumber(tp0));
         assertFalse(request2.isDone());
         assertTrue(client.hasInFlightRequests());
         assertEquals((short) 1, transactionManager.producerIdAndEpoch().epoch);
@@ -1943,7 +1943,7 @@ public class SenderTest {
         sendIdempotentProducerResponse(0, tp0, Errors.NONE, 1011L, 1010L);
         sender.runOnce(); // receive response 1
         assertEquals(OptionalInt.of(1), 
transactionManager.lastAckedSequence(tp0));
-        assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(2, transactionManager.sequenceNumber(tp0));
         assertFalse(client.hasInFlightRequests());
         assertTrue(request2.isDone());
         assertEquals(1012L, request2.get().offset());
@@ -1958,14 +1958,14 @@ public class SenderTest {
         prepareAndReceiveInitProducerId(producerId, Errors.NONE);
         assertTrue(transactionManager.hasProducerId());
 
-        assertEquals(0, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(0, transactionManager.sequenceNumber(tp0));
 
         // Send first ProduceRequest
         Future<RecordMetadata> request1 = appendToAccumulator(tp0);
         sender.runOnce();
 
         assertEquals(1, client.inFlightRequestCount());
-        assertEquals(1, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(1, transactionManager.sequenceNumber(tp0));
         assertEquals(OptionalInt.empty(), 
transactionManager.lastAckedSequence(tp0));
 
         sendIdempotentProducerResponse(0, tp0, Errors.NONE, 1000L, 10L);
@@ -1980,7 +1980,7 @@ public class SenderTest {
         // Send second ProduceRequest
         Future<RecordMetadata> request2 = appendToAccumulator(tp0);
         sender.runOnce();
-        assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(2, transactionManager.sequenceNumber(tp0));
         assertEquals(OptionalInt.of(0), 
transactionManager.lastAckedSequence(tp0));
 
         assertFalse(request2.isDone());
@@ -1990,7 +1990,7 @@ public class SenderTest {
 
         // We should have reset the sequence number state of the partition 
because the state was lost on the broker.
         assertEquals(OptionalInt.of(0), 
transactionManager.lastAckedSequence(tp0));
-        assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(2, transactionManager.sequenceNumber(tp0));
         assertFalse(request2.isDone());
         assertFalse(client.hasInFlightRequests());
 
@@ -2001,7 +2001,7 @@ public class SenderTest {
         sendIdempotentProducerResponse(1, tp0, Errors.NONE, 1011L, 1010L);
         sender.runOnce(); // receive response 1
         assertEquals(OptionalInt.of(1), 
transactionManager.lastAckedSequence(tp0));
-        assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(2, transactionManager.sequenceNumber(tp0));
         assertFalse(client.hasInFlightRequests());
         assertTrue(request2.isDone());
         assertEquals(1011L, request2.get().offset());
@@ -2016,14 +2016,14 @@ public class SenderTest {
         prepareAndReceiveInitProducerId(producerId, Errors.NONE);
         assertTrue(transactionManager.hasProducerId());
 
-        assertEquals(0, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(0, transactionManager.sequenceNumber(tp0));
 
         // Send first ProduceRequest
         Future<RecordMetadata> request1 = appendToAccumulator(tp0);
         sender.runOnce();
 
         assertEquals(1, client.inFlightRequestCount());
-        assertEquals(1, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(1, transactionManager.sequenceNumber(tp0));
         assertEquals(OptionalInt.empty(), 
transactionManager.lastAckedSequence(tp0));
 
         sendIdempotentProducerResponse(0, tp0, Errors.NONE, 1000L, 10L);
@@ -2038,14 +2038,14 @@ public class SenderTest {
         // Send second ProduceRequest
         Future<RecordMetadata> request2 = appendToAccumulator(tp0);
         sender.runOnce();
-        assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(2, transactionManager.sequenceNumber(tp0));
         assertEquals(OptionalInt.of(0), 
transactionManager.lastAckedSequence(tp0));
 
         // Send the third ProduceRequest, in parallel with the second. It 
should be retried even though the
         // lastAckedOffset > logStartOffset when its UnknownProducerResponse 
comes back.
         Future<RecordMetadata> request3 = appendToAccumulator(tp0);
         sender.runOnce();
-        assertEquals(3, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(3, transactionManager.sequenceNumber(tp0));
         assertEquals(OptionalInt.of(0), 
transactionManager.lastAckedSequence(tp0));
 
         assertFalse(request2.isDone());
@@ -2058,7 +2058,7 @@ public class SenderTest {
 
         // We should have reset the sequence number state of the partition 
because the state was lost on the broker.
         assertEquals(OptionalInt.empty(), 
transactionManager.lastAckedSequence(tp0));
-        assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(2, transactionManager.sequenceNumber(tp0));
         assertFalse(request2.isDone());
         assertFalse(request3.isDone());
         assertEquals(2, client.inFlightRequestCount());
@@ -2070,7 +2070,7 @@ public class SenderTest {
 
         assertEquals(1, client.inFlightRequestCount());
         assertEquals(OptionalInt.empty(), 
transactionManager.lastAckedSequence(tp0));
-        assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(2, transactionManager.sequenceNumber(tp0));
 
         sendIdempotentProducerResponse(0, tp0, Errors.NONE, 1011L, 1010L);
         sender.runOnce();  // receive response 2, don't send request 3 since 
we can have at most 1 in flight when retrying
@@ -2102,14 +2102,14 @@ public class SenderTest {
         prepareAndReceiveInitProducerId(producerId, Errors.NONE);
         assertTrue(transactionManager.hasProducerId());
 
-        assertEquals(0, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(0, transactionManager.sequenceNumber(tp0));
 
         // Send first ProduceRequest
         Future<RecordMetadata> request1 = appendToAccumulator(tp0);
         sender.runOnce();
 
         assertEquals(1, client.inFlightRequestCount());
-        assertEquals(1, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(1, transactionManager.sequenceNumber(tp0));
         assertEquals(OptionalInt.empty(), 
transactionManager.lastAckedSequence(tp0));
 
         sendIdempotentProducerResponse(0, tp0, Errors.NONE, 1000L, 10L);
@@ -2124,7 +2124,7 @@ public class SenderTest {
         // Send second ProduceRequest,
         Future<RecordMetadata> request2 = appendToAccumulator(tp0);
         sender.runOnce();
-        assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(2, transactionManager.sequenceNumber(tp0));
         assertEquals(OptionalInt.of(0), 
transactionManager.lastAckedSequence(tp0));
 
         assertFalse(request2.isDone());
@@ -2432,7 +2432,7 @@ public class SenderTest {
             sender.runOnce(); // connect
             sender.runOnce(); // send produce request
 
-            assertEquals(2, txnManager.sequenceNumber(tp).longValue(), "The 
next sequence should be 2");
+            assertEquals(2, txnManager.sequenceNumber(tp), "The next sequence 
should be 2");
             String id = client.requests().peek().destination();
             assertEquals(ApiKeys.PRODUCE, 
client.requests().peek().requestBuilder().apiKey());
             Node node = new Node(Integer.parseInt(id), "localhost", 0);
@@ -2443,12 +2443,12 @@ public class SenderTest {
             responseMap.put(tp, new 
ProduceResponse.PartitionResponse(Errors.MESSAGE_TOO_LARGE));
             client.respond(new ProduceResponse(responseMap));
             sender.runOnce(); // split and reenqueue
-            assertEquals(2, txnManager.sequenceNumber(tp).longValue(), "The 
next sequence should be 2");
+            assertEquals(2, txnManager.sequenceNumber(tp), "The next sequence 
should be 2");
             // The compression ratio should have been improved once.
             assertEquals(CompressionType.GZIP.rate - 
CompressionRatioEstimator.COMPRESSION_RATIO_IMPROVING_STEP,
                     CompressionRatioEstimator.estimation(topic, 
CompressionType.GZIP), 0.01);
             sender.runOnce(); // send the first produce request
-            assertEquals(2, txnManager.sequenceNumber(tp).longValue(), "The 
next sequence number should be 2");
+            assertEquals(2, txnManager.sequenceNumber(tp), "The next sequence 
number should be 2");
             assertFalse(f1.isDone(), "The future shouldn't have been done.");
             assertFalse(f2.isDone(), "The future shouldn't have been done.");
             id = client.requests().peek().destination();
@@ -2463,7 +2463,7 @@ public class SenderTest {
 
             sender.runOnce(); // receive
             assertTrue(f1.isDone(), "The future should have been done.");
-            assertEquals(2, txnManager.sequenceNumber(tp).longValue(), "The 
next sequence number should still be 2");
+            assertEquals(2, txnManager.sequenceNumber(tp), "The next sequence 
number should still be 2");
             assertEquals(OptionalInt.of(0), txnManager.lastAckedSequence(tp), 
"The last ack'd sequence number should be 0");
             assertFalse(f2.isDone(), "The future shouldn't have been done.");
             assertEquals(0L, f1.get().offset(), "Offset of the first message 
should be 0");
@@ -2480,7 +2480,7 @@ public class SenderTest {
 
             sender.runOnce(); // receive
             assertTrue(f2.isDone(), "The future should have been done.");
-            assertEquals(2, txnManager.sequenceNumber(tp).longValue(), "The 
next sequence number should be 2");
+            assertEquals(2, txnManager.sequenceNumber(tp), "The next sequence 
number should be 2");
             assertEquals(OptionalInt.of(1), txnManager.lastAckedSequence(tp), 
"The last ack'd sequence number should be 1");
             assertEquals(1L, f2.get().offset(), "Offset of the first message 
should be 1");
             assertTrue(accumulator.getDeque(tp).isEmpty(), "There should be no 
batch in the accumulator");
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index feb764228cb..b615d1c5356 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -564,9 +564,9 @@ public class TransactionManagerTest {
     @Test
     public void testDefaultSequenceNumber() {
         initializeTransactionManager(Optional.empty());
-        assertEquals((int) transactionManager.sequenceNumber(tp0), 0);
+        assertEquals(transactionManager.sequenceNumber(tp0), 0);
         transactionManager.incrementSequenceNumber(tp0, 3);
-        assertEquals((int) transactionManager.sequenceNumber(tp0), 3);
+        assertEquals(transactionManager.sequenceNumber(tp0), 3);
     }
 
     @Test
@@ -579,7 +579,7 @@ public class TransactionManagerTest {
         ProducerBatch b3 = writeIdempotentBatchWithValue(transactionManager, 
tp0, "3");
         ProducerBatch b4 = writeIdempotentBatchWithValue(transactionManager, 
tp0, "4");
         ProducerBatch b5 = writeIdempotentBatchWithValue(transactionManager, 
tp0, "5");
-        assertEquals(5, transactionManager.sequenceNumber(tp0).intValue());
+        assertEquals(5, transactionManager.sequenceNumber(tp0));
 
         // First batch succeeds
         long b1AppendTime = time.milliseconds();
@@ -624,8 +624,8 @@ public class TransactionManagerTest {
 
         ProducerBatch tp0b2 = 
writeIdempotentBatchWithValue(transactionManager, tp0, "2");
         ProducerBatch tp1b2 = 
writeIdempotentBatchWithValue(transactionManager, tp1, "2");
-        assertEquals(2, transactionManager.sequenceNumber(tp0).intValue());
-        assertEquals(2, transactionManager.sequenceNumber(tp1).intValue());
+        assertEquals(2, transactionManager.sequenceNumber(tp0));
+        assertEquals(2, transactionManager.sequenceNumber(tp1));
 
         ProduceResponse.PartitionResponse b1Response = new 
ProduceResponse.PartitionResponse(
                 Errors.UNKNOWN_PRODUCER_ID, -1, -1, 400L);
@@ -637,9 +637,9 @@ public class TransactionManagerTest {
 
         transactionManager.bumpIdempotentEpochAndResetIdIfNeeded();
 
-        assertEquals(1, transactionManager.sequenceNumber(tp0).intValue());
+        assertEquals(1, transactionManager.sequenceNumber(tp0));
         assertEquals(tp0b2, transactionManager.nextBatchBySequence(tp0));
-        assertEquals(2, transactionManager.sequenceNumber(tp1).intValue());
+        assertEquals(2, transactionManager.sequenceNumber(tp1));
         assertEquals(tp1b2, transactionManager.nextBatchBySequence(tp1));
     }
 
@@ -654,7 +654,7 @@ public class TransactionManagerTest {
         writeIdempotentBatchWithValue(transactionManager, tp1, "1");
 
         ProducerBatch b2 = writeIdempotentBatchWithValue(transactionManager, 
tp0, "2");
-        assertEquals(2, transactionManager.sequenceNumber(tp0).intValue());
+        assertEquals(2, transactionManager.sequenceNumber(tp0));
 
         // The producerId might be reset due to a failure on another partition
         transactionManager.requestEpochBumpForPartition(tp1);
@@ -666,7 +666,7 @@ public class TransactionManagerTest {
                 Errors.NONE, 500L, time.milliseconds(), 0L);
         transactionManager.handleCompletedBatch(b1, b1Response);
 
-        assertEquals(2, transactionManager.sequenceNumber(tp0).intValue());
+        assertEquals(2, transactionManager.sequenceNumber(tp0));
         assertEquals(0, transactionManager.lastAckedSequence(tp0).getAsInt());
         assertEquals(b2, transactionManager.nextBatchBySequence(tp0));
         assertEquals(epoch, 
transactionManager.nextBatchBySequence(tp0).producerEpoch());
@@ -676,7 +676,7 @@ public class TransactionManagerTest {
         transactionManager.handleCompletedBatch(b2, b2Response);
 
         transactionManager.maybeUpdateProducerIdAndEpoch(tp0);
-        assertEquals(0, transactionManager.sequenceNumber(tp0).intValue());
+        assertEquals(0, transactionManager.sequenceNumber(tp0));
         assertFalse(transactionManager.lastAckedSequence(tp0).isPresent());
         assertNull(transactionManager.nextBatchBySequence(tp0));
     }
@@ -698,23 +698,23 @@ public class TransactionManagerTest {
                 MAX_REQUEST_SIZE, ACKS_ALL, MAX_RETRIES, new 
SenderMetricsRegistry(metrics), this.time, requestTimeout,
                 0, transactionManager, apiVersions);
 
-        assertEquals(0, transactionManager.sequenceNumber(tp0).intValue());
+        assertEquals(0, transactionManager.sequenceNumber(tp0));
 
         Future<RecordMetadata> responseFuture1 = 
accumulator.append(tp0.topic(), tp0.partition(), time.milliseconds(),
                 "1".getBytes(), "1".getBytes(), Record.EMPTY_HEADERS, null, 
MAX_BLOCK_TIMEOUT, false, time.milliseconds(),
                 TestUtils.singletonCluster()).future;
         sender.runOnce();
-        assertEquals(1, transactionManager.sequenceNumber(tp0).intValue());
+        assertEquals(1, transactionManager.sequenceNumber(tp0));
 
         time.sleep(requestTimeout);
         sender.runOnce();
         assertEquals(0, client.inFlightRequestCount());
         assertTrue(transactionManager.hasInflightBatches(tp0));
-        assertEquals(1, transactionManager.sequenceNumber(tp0).intValue());
+        assertEquals(1, transactionManager.sequenceNumber(tp0));
         sender.runOnce(); // retry
         assertEquals(1, client.inFlightRequestCount());
         assertTrue(transactionManager.hasInflightBatches(tp0));
-        assertEquals(1, transactionManager.sequenceNumber(tp0).intValue());
+        assertEquals(1, transactionManager.sequenceNumber(tp0));
 
         time.sleep(5000); // delivery time out
         sender.runOnce();
@@ -729,7 +729,7 @@ public class TransactionManagerTest {
 
         sender.runOnce(); // bump the epoch
         assertEquals(epoch + 1, transactionManager.producerIdAndEpoch().epoch);
-        assertEquals(0, transactionManager.sequenceNumber(tp0).intValue());
+        assertEquals(0, transactionManager.sequenceNumber(tp0));
 
         Future<RecordMetadata> responseFuture2 = 
accumulator.append(tp0.topic(), tp0.partition(), time.milliseconds(),
                 "2".getBytes(), "2".getBytes(), Record.EMPTY_HEADERS, null, 
MAX_BLOCK_TIMEOUT, false, time.milliseconds(),
@@ -737,7 +737,7 @@ public class TransactionManagerTest {
         sender.runOnce();
         sender.runOnce();
         assertEquals(0, transactionManager.firstInFlightSequence(tp0));
-        assertEquals(1, transactionManager.sequenceNumber(tp0).intValue());
+        assertEquals(1, transactionManager.sequenceNumber(tp0));
 
         time.sleep(5000); // request time out again
         sender.runOnce();
@@ -770,30 +770,30 @@ public class TransactionManagerTest {
     @Test
     public void testSequenceNumberOverflow() {
         initializeTransactionManager(Optional.empty());
-        assertEquals((int) transactionManager.sequenceNumber(tp0), 0);
+        assertEquals(transactionManager.sequenceNumber(tp0), 0);
         transactionManager.incrementSequenceNumber(tp0, Integer.MAX_VALUE);
-        assertEquals((int) transactionManager.sequenceNumber(tp0), 
Integer.MAX_VALUE);
+        assertEquals(transactionManager.sequenceNumber(tp0), 
Integer.MAX_VALUE);
         transactionManager.incrementSequenceNumber(tp0, 100);
-        assertEquals((int) transactionManager.sequenceNumber(tp0), 99);
+        assertEquals(transactionManager.sequenceNumber(tp0), 99);
         transactionManager.incrementSequenceNumber(tp0, Integer.MAX_VALUE);
-        assertEquals((int) transactionManager.sequenceNumber(tp0), 98);
+        assertEquals(transactionManager.sequenceNumber(tp0), 98);
     }
 
     @Test
     public void testProducerIdReset() {
         initializeTransactionManager(Optional.empty());
         initializeIdempotentProducerId(15L, Short.MAX_VALUE);
-        assertEquals((int) transactionManager.sequenceNumber(tp0), 0);
-        assertEquals((int) transactionManager.sequenceNumber(tp1), 0);
+        assertEquals(transactionManager.sequenceNumber(tp0), 0);
+        assertEquals(transactionManager.sequenceNumber(tp1), 0);
         transactionManager.incrementSequenceNumber(tp0, 3);
-        assertEquals((int) transactionManager.sequenceNumber(tp0), 3);
+        assertEquals(transactionManager.sequenceNumber(tp0), 3);
         transactionManager.incrementSequenceNumber(tp1, 3);
-        assertEquals((int) transactionManager.sequenceNumber(tp1), 3);
+        assertEquals(transactionManager.sequenceNumber(tp1), 3);
 
         transactionManager.requestEpochBumpForPartition(tp0);
         transactionManager.bumpIdempotentEpochAndResetIdIfNeeded();
-        assertEquals((int) transactionManager.sequenceNumber(tp0), 0);
-        assertEquals((int) transactionManager.sequenceNumber(tp1), 3);
+        assertEquals(transactionManager.sequenceNumber(tp0), 0);
+        assertEquals(transactionManager.sequenceNumber(tp1), 3);
     }
 
     @Test
@@ -2835,7 +2835,7 @@ public class TransactionManagerTest {
         ProducerBatch b1 = writeIdempotentBatchWithValue(transactionManager, 
tp0, "1");
         ProducerBatch b2 = writeIdempotentBatchWithValue(transactionManager, 
tp0, "2");
         ProducerBatch b3 = writeIdempotentBatchWithValue(transactionManager, 
tp0, "3");
-        assertEquals(3, transactionManager.sequenceNumber(tp0).intValue());
+        assertEquals(3, transactionManager.sequenceNumber(tp0));
 
         // The first batch fails with a timeout
         transactionManager.markSequenceUnresolved(b1);
@@ -2860,7 +2860,7 @@ public class TransactionManagerTest {
         transactionManager.maybeResolveSequences();
         assertEquals(producerIdAndEpoch, 
transactionManager.producerIdAndEpoch());
         assertFalse(transactionManager.hasUnresolvedSequences());
-        assertEquals(3, transactionManager.sequenceNumber(tp0).intValue());
+        assertEquals(3, transactionManager.sequenceNumber(tp0));
     }
 
     @Test
@@ -2893,7 +2893,7 @@ public class TransactionManagerTest {
         // Run sender loop to trigger epoch bump
         runUntil(() -> transactionManager.producerIdAndEpoch().epoch == 2);
         assertFalse(transactionManager.hasUnresolvedSequences());
-        assertEquals(0, transactionManager.sequenceNumber(tp0).intValue());
+        assertEquals(0, transactionManager.sequenceNumber(tp0));
     }
 
     @Test
@@ -2966,7 +2966,7 @@ public class TransactionManagerTest {
         prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, producerId);
         runUntil(() -> transactionManager.isPartitionAdded(tp0));  // Send 
AddPartitionsRequest
 
-        assertEquals(2, transactionManager.sequenceNumber(tp0).intValue());
+        assertEquals(2, transactionManager.sequenceNumber(tp0));
     }
 
     @Test
@@ -3026,8 +3026,8 @@ public class TransactionManagerTest {
         prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, producerId);
         runUntil(() -> transactionManager.isPartitionAdded(tp0));
 
-        assertEquals(0, transactionManager.sequenceNumber(tp0).intValue());
-        assertEquals(1, transactionManager.sequenceNumber(tp1).intValue());
+        assertEquals(0, transactionManager.sequenceNumber(tp0));
+        assertEquals(1, transactionManager.sequenceNumber(tp1));
     }
 
     @Test
@@ -3073,7 +3073,7 @@ public class TransactionManagerTest {
         prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, bumpedEpoch, 
producerId);
         runUntil(() -> transactionManager.isPartitionAdded(tp0));
 
-        assertEquals(0, transactionManager.sequenceNumber(tp0).intValue());
+        assertEquals(0, transactionManager.sequenceNumber(tp0));
     }
 
     @Test
@@ -3120,7 +3120,7 @@ public class TransactionManagerTest {
         prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, bumpedEpoch, 
producerId);
         runUntil(() -> transactionManager.isPartitionAdded(tp0));
 
-        assertEquals(0, transactionManager.sequenceNumber(tp0).intValue());
+        assertEquals(0, transactionManager.sequenceNumber(tp0));
     }
 
     @Test
@@ -3179,7 +3179,7 @@ public class TransactionManagerTest {
         prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, bumpedEpoch, 
producerId);
         runUntil(() -> transactionManager.isPartitionAdded(tp0));
 
-        assertEquals(0, transactionManager.sequenceNumber(tp0).intValue());
+        assertEquals(0, transactionManager.sequenceNumber(tp0));
     }
 
     @Test
@@ -3249,8 +3249,8 @@ public class TransactionManagerTest {
         writeIdempotentBatchWithValue(transactionManager, tp0, "3");
         ProducerBatch tp1b1 = 
writeIdempotentBatchWithValue(transactionManager, tp1, "4");
         ProducerBatch tp1b2 = 
writeIdempotentBatchWithValue(transactionManager, tp1, "5");
-        assertEquals(3, transactionManager.sequenceNumber(tp0).intValue());
-        assertEquals(2, transactionManager.sequenceNumber(tp1).intValue());
+        assertEquals(3, transactionManager.sequenceNumber(tp0));
+        assertEquals(2, transactionManager.sequenceNumber(tp1));
 
         // First batch of each partition succeeds
         long b1AppendTime = time.milliseconds();
@@ -3311,7 +3311,7 @@ public class TransactionManagerTest {
 
         transactionManager.maybeUpdateProducerIdAndEpoch(tp1);
         assertFalse(transactionManager.hasInflightBatches(tp1));
-        assertEquals(0, transactionManager.sequenceNumber(tp1).intValue());
+        assertEquals(0, transactionManager.sequenceNumber(tp1));
 
         // The last batch should now be drained and sent
         runUntil(() -> transactionManager.hasInflightBatches(tp1));
@@ -3326,7 +3326,7 @@ public class TransactionManagerTest {
 
         transactionManager.maybeUpdateProducerIdAndEpoch(tp1);
         assertFalse(transactionManager.hasInflightBatches(tp1));
-        assertEquals(1, transactionManager.sequenceNumber(tp1).intValue());
+        assertEquals(1, transactionManager.sequenceNumber(tp1));
     }
 
     @Test
@@ -3373,8 +3373,8 @@ public class TransactionManagerTest {
         writeIdempotentBatchWithValue(transactionManager, tp0, "3");
         ProducerBatch tp1b1 = 
writeIdempotentBatchWithValue(transactionManager, tp1, "4");
         ProducerBatch tp1b2 = 
writeIdempotentBatchWithValue(transactionManager, tp1, "5");
-        assertEquals(3, transactionManager.sequenceNumber(tp0).intValue());
-        assertEquals(2, transactionManager.sequenceNumber(tp1).intValue());
+        assertEquals(3, transactionManager.sequenceNumber(tp0));
+        assertEquals(2, transactionManager.sequenceNumber(tp1));
 
         // First batch of each partition succeeds
         long b1AppendTime = time.milliseconds();
@@ -3435,7 +3435,7 @@ public class TransactionManagerTest {
 
         transactionManager.maybeUpdateProducerIdAndEpoch(tp1);
         assertFalse(transactionManager.hasInflightBatches(tp1));
-        assertEquals(0, transactionManager.sequenceNumber(tp1).intValue());
+        assertEquals(0, transactionManager.sequenceNumber(tp1));
 
         // The last batch should now be drained and sent
         runUntil(() -> transactionManager.hasInflightBatches(tp1));
@@ -3449,7 +3449,7 @@ public class TransactionManagerTest {
         transactionManager.handleCompletedBatch(tp1b3, t1b3Response);
 
         assertFalse(transactionManager.hasInflightBatches(tp1));
-        assertEquals(1, transactionManager.sequenceNumber(tp1).intValue());
+        assertEquals(1, transactionManager.sequenceNumber(tp1));
     }
 
     @Test

Reply via email to