This is an automated email from the ASF dual-hosted git repository.

jolshan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4fc9e442c30 KAFKA-17898: Refine Epoch Bumping Logic (#17849)
4fc9e442c30 is described below

commit 4fc9e442c308364acc43046bdaefd87571ab20a5
Author: Ritika Reddy <[email protected]>
AuthorDate: Mon Nov 25 14:29:15 2024 -0800

    KAFKA-17898: Refine Epoch Bumping Logic (#17849)
    
    With KAFKA-14562, we implemented epoch bump on both the client and the 
server. Mentioned below are the different epoch bump scenarios we have on hand 
after enabled tv2
    
    Non-Transactional Producers
    • Epoch bumping is always allowed.
    • Different code paths are used to handle epoch bumping.
    
    Transactional Producers
    
    No Epoch Bump Allowed
    • coordinatorSupportsBumpingEpoch = false when initPIDVersion < 3 or 
initPIDVersion = null.
    
    Client-Triggered Epoch Bump Allowed
    • coordinatorSupportsBumpingEpoch = true when initPIDVersion >= 3.
    • TransactionVersion2Enabled = false when endTxnVersion < 5.
    
    Only Server-Triggered Epoch Bump Allowed
    • TransactionVersion2Enabled = true and endTxnVersion >= 5.
    
    We want to refine the code and make it more structured to correctly handle 
epoch bumping in the above mentioned cases.
    
    The changes made in this patch are:
    
    Rename epochBumpRequired to epochBumpTriggerRequired to symbolize a manual 
epoch bump request from the client
    Modify canEpochBump method according to the above mentioned scenarios
    
    Reviewers: Artem Livshits <[email protected]>, Calvin Liu 
<[email protected]>, Justine Olshan <[email protected]>
---
 .../producer/internals/TransactionManager.java     | 133 ++++++++++++++++-----
 .../producer/internals/TransactionManagerTest.java |  60 +++++++++-
 2 files changed, 155 insertions(+), 38 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index 3c916de9c0f..38bdc86aedd 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -191,7 +191,7 @@ public class TransactionManager {
     private volatile RuntimeException lastError = null;
     private volatile ProducerIdAndEpoch producerIdAndEpoch;
     private volatile boolean transactionStarted = false;
-    private volatile boolean epochBumpRequired = false;
+    private volatile boolean clientSideEpochBumpRequired = false;
     private volatile long latestFinalizedFeaturesEpoch = -1;
     private volatile boolean isTransactionV2Enabled = false;
 
@@ -351,7 +351,7 @@ public class TransactionManager {
         enqueueRequest(handler);
 
         // If an epoch bump is required for recovery, initialize the 
transaction after completing the EndTxn request.
-        if (epochBumpRequired) {
+        if (clientSideEpochBumpRequired) {
             return initializeTransactions(this.producerIdAndEpoch);
         }
 
@@ -477,6 +477,26 @@ public class TransactionManager {
         }
     }
 
+    /**
+     * Transitions to an abortable error state if the coordinator can handle 
an abortable error or
+     * to a fatal error if not.
+     *
+     * @param abortableException    The exception in case of an abortable 
error.
+     * @param fatalException        The exception in case of a fatal error.
+     */
+    private void transitionToAbortableErrorOrFatalError(
+        RuntimeException abortableException,
+        RuntimeException fatalException
+    ) {
+        if (canHandleAbortableError()) {
+            if (needToTriggerEpochBumpFromClient())
+                clientSideEpochBumpRequired = true;
+            transitionToAbortableError(abortableException);
+        } else {
+            transitionToFatalError(fatalException);
+        }
+    }
+
     // visible for testing
     synchronized boolean isPartitionAdded(TopicPartition partition) {
         return partitionsInTransaction.contains(partition);
@@ -544,8 +564,11 @@ public class TransactionManager {
         this.partitionsWithUnresolvedSequences.clear();
     }
 
-    synchronized void requestEpochBumpForPartition(TopicPartition tp) {
-        epochBumpRequired = true;
+    /**
+     * This method is used to trigger an epoch bump for non-transactional 
idempotent producers.
+     */
+    synchronized void requestIdempotentEpochBumpForPartition(TopicPartition 
tp) {
+        clientSideEpochBumpRequired = true;
         this.partitionsToRewriteSequences.add(tp);
     }
 
@@ -564,12 +587,12 @@ public class TransactionManager {
         }
         this.partitionsToRewriteSequences.clear();
 
-        epochBumpRequired = false;
+        clientSideEpochBumpRequired = false;
     }
 
     synchronized void bumpIdempotentEpochAndResetIdIfNeeded() {
         if (!isTransactional()) {
-            if (epochBumpRequired) {
+            if (clientSideEpochBumpRequired) {
                 bumpIdempotentProducerEpoch();
             }
             if (currentState != State.INITIALIZING && !hasProducerId()) {
@@ -675,8 +698,8 @@ public class TransactionManager {
                 || exception instanceof UnsupportedVersionException) {
             transitionToFatalError(exception);
         } else if (isTransactional()) {
-            if (canBumpEpoch() && !isCompleting()) {
-                epochBumpRequired = true;
+            if (needToTriggerEpochBumpFromClient() && !isCompleting()) {
+                clientSideEpochBumpRequired = true;
             }
             transitionToAbortableError(exception);
         }
@@ -699,7 +722,7 @@ public class TransactionManager {
 
             // If we fail with an OutOfOrderSequenceException, we have a gap 
in the log. Bump the epoch for this
             // partition, which will reset the sequence number to 0 and allow 
us to continue
-            requestEpochBumpForPartition(batch.topicPartition);
+            requestIdempotentEpochBumpForPartition(batch.topicPartition);
         } else if (exception instanceof UnknownProducerIdException) {
             // If we get an UnknownProducerId for a partition, then the broker 
has no state for that producer. It will
             // therefore accept a write with sequence number 0. We reset the 
sequence number for the partition here so
@@ -710,7 +733,7 @@ public class TransactionManager {
         } else {
             if (adjustSequenceNumbers) {
                 if (!isTransactional()) {
-                    requestEpochBumpForPartition(batch.topicPartition);
+                    
requestIdempotentEpochBumpForPartition(batch.topicPartition);
                 } else {
                     txnPartitionMap.adjustSequencesDueToFailedBatch(batch);
                 }
@@ -760,21 +783,17 @@ public class TransactionManager {
                         // For the transactional producer, we bump the epoch 
if possible, otherwise we transition to a fatal error
                         String unackedMessagesErr = "The client hasn't 
received acknowledgment for some previously " +
                                 "sent messages and can no longer retry them. ";
-                        if (canBumpEpoch()) {
-                            epochBumpRequired = true;
-                            KafkaException exception = new 
KafkaException(unackedMessagesErr + "It is safe to abort " +
-                                    "the transaction and continue.");
-                            transitionToAbortableError(exception);
-                        } else {
-                            KafkaException exception = new 
KafkaException(unackedMessagesErr + "It isn't safe to continue.");
-                            transitionToFatalError(exception);
-                        }
+                        KafkaException abortableException = new 
KafkaException(unackedMessagesErr + "It is safe to abort " +
+                                "the transaction and continue.");
+                        KafkaException fatalException = new 
KafkaException(unackedMessagesErr + "It isn't safe to continue.");
+
+                        
transitionToAbortableErrorOrFatalError(abortableException, fatalException);
                     } else {
                         // For the idempotent producer, bump the epoch
                         log.info("No inflight batches remaining for {}, last 
ack'd sequence for partition is {}, next sequence is {}. " +
                                         "Going to bump epoch and reset 
sequence numbers.", topicPartition,
                                 
lastAckedSequence(topicPartition).orElse(TxnPartitionEntry.NO_LAST_ACKED_SEQUENCE_NUMBER),
 sequenceNumber(topicPartition));
-                        requestEpochBumpForPartition(topicPartition);
+                        requestIdempotentEpochBumpForPartition(topicPartition);
                     }
 
                     iter.remove();
@@ -943,7 +962,7 @@ public class TransactionManager {
                 if (isTransactional()) {
                     
txnPartitionMap.startSequencesAtBeginning(batch.topicPartition, 
this.producerIdAndEpoch);
                 } else {
-                    requestEpochBumpForPartition(batch.topicPartition);
+                    
requestIdempotentEpochBumpForPartition(batch.topicPartition);
                 }
                 return true;
             }
@@ -951,7 +970,7 @@ public class TransactionManager {
             if (!isTransactional()) {
                 // For the idempotent producer, always retry 
UNKNOWN_PRODUCER_ID errors. If the batch has the current
                 // producer ID and epoch, request a bump of the epoch. 
Otherwise just retry the produce.
-                requestEpochBumpForPartition(batch.topicPartition);
+                requestIdempotentEpochBumpForPartition(batch.topicPartition);
                 return true;
             }
         } else if (error == Errors.OUT_OF_ORDER_SEQUENCE_NUMBER) {
@@ -967,7 +986,7 @@ public class TransactionManager {
                 // and wait to see if the sequence resolves
                 if (!hasUnresolvedSequence(batch.topicPartition) ||
                         
isNextSequenceForUnresolvedPartition(batch.topicPartition, 
batch.baseSequence())) {
-                    requestEpochBumpForPartition(batch.topicPartition);
+                    
requestIdempotentEpochBumpForPartition(batch.topicPartition);
                 }
                 return true;
             }
@@ -1164,23 +1183,59 @@ public class TransactionManager {
         return result;
     }
 
+    /**
+     * Determines if an epoch bump can be triggered manually based on the api 
versions.
+     *
+     * <b>NOTE:</b>
+     * This method should only be used for transactional producers.
+     * For non-transactional producers epoch bumping is always allowed.
+     *
+     * <ol>
+     *   <li><b>Client-Triggered Epoch Bump</b>:
+     *          If the coordinator supports epoch bumping 
(initProducerIdVersion.maxVersion() >= 3),
+     *          client-triggered epoch bumping is allowed, returns true.
+     *          <code>clientSideEpochBumpTriggerRequired</code> must be set to 
true in this case.</li>
+     *
+     *   <li><b>No Epoch Bump Allowed</b>:
+     *          If the coordinator does not support epoch bumping, returns 
false.</li>
+     *
+     *   <li><b>Server-Triggered Only</b>:
+     *          When TransactionV2 is enabled, epoch bumping is handled 
automatically
+     *          by the server in EndTxn, so manual epoch bumping is not 
required, returns false.</li>
+     * </ol>
+     *
+     * @return true if a client-triggered epoch bump is allowed, otherwise 
false.
+     */
     // package-private for testing
-    boolean canBumpEpoch() {
-        if (!isTransactional()) {
-            return true;
-        }
+    boolean needToTriggerEpochBumpFromClient() {
+        return coordinatorSupportsBumpingEpoch && !isTransactionV2Enabled;
+    }
 
-        return coordinatorSupportsBumpingEpoch;
+    /**
+     * Determines if the coordinator can handle an abortable error.
+     * Recovering from an abortable error requires an epoch bump which can be 
triggered by the client
+     * or automatically taken care of at the end of every transaction 
(Transaction V2).
+     * Use <code>needToTriggerEpochBumpFromClient</code> to check whether the 
epoch bump needs to be triggered
+     * manually.
+     *
+     * <b>NOTE:</b>
+     * This method should only be used for transactional producers.
+     * There is no concept of abortable errors for idempotent producers.
+     *
+     * @return true if an abortable error can be handled, otherwise false.
+     */
+    boolean canHandleAbortableError() {
+        return coordinatorSupportsBumpingEpoch || isTransactionV2Enabled;
     }
 
     private void completeTransaction() {
-        if (epochBumpRequired) {
+        if (clientSideEpochBumpRequired) {
             transitionTo(State.INITIALIZING);
         } else {
             transitionTo(State.READY);
         }
         lastError = null;
-        epochBumpRequired = false;
+        clientSideEpochBumpRequired = false;
         transactionStarted = false;
         newPartitionsInTransaction.clear();
         pendingPartitionsInTransaction.clear();
@@ -1209,9 +1264,23 @@ public class TransactionManager {
             transitionToAbortableError(e);
         }
 
+        /**
+         * Determines if an error should be treated as abortable or fatal, 
based on transaction state and configuration.
+         * <ol><l> NOTE: Only use this method for transactional producers 
</l></ol>
+         *
+         * - <b>Abortable Error</b>:
+         *     An abortable error can be handled effectively, if epoch bumping 
is supported.
+         *     1) If transactionV2 is enabled, automatic epoch bumping happens 
at the end of every transaction.
+         *     2) If the client can trigger an epoch bump, the abortable error 
can be handled.
+         *
+         *- <b>Fatal Error</b>:
+         *      If epoch bumping is not supported, the system cannot recover 
and the error must be treated as fatal.
+         * @param e the error to determine as either abortable or fatal.
+         */
         void abortableErrorIfPossible(RuntimeException e) {
-            if (canBumpEpoch()) {
-                epochBumpRequired = true;
+            if (canHandleAbortableError()) {
+                if (needToTriggerEpochBumpFromClient())
+                    clientSideEpochBumpRequired = true;
                 abortableError(e);
             } else {
                 fatalError(e);
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index 02570b083ec..9ae80dc19ba 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -678,7 +678,7 @@ public class TransactionManagerTest {
         assertEquals(2, transactionManager.sequenceNumber(tp0));
 
         // The producerId might be reset due to a failure on another partition
-        transactionManager.requestEpochBumpForPartition(tp1);
+        transactionManager.requestIdempotentEpochBumpForPartition(tp1);
         transactionManager.bumpIdempotentEpochAndResetIdIfNeeded();
         initializeIdempotentProducerId(producerId + 1, (short) 0);
 
@@ -780,6 +780,21 @@ public class TransactionManagerTest {
         return batch;
     }
 
+    private ProducerBatch writeTransactionalBatchWithValue(
+        TransactionManager manager,
+        TopicPartition tp,
+        String value
+    ) {
+        manager.maybeUpdateProducerIdAndEpoch(tp);
+        int seq = manager.sequenceNumber(tp);
+        manager.incrementSequenceNumber(tp, 1);
+        ProducerBatch batch = batchWithValue(tp, value);
+        batch.setProducerState(manager.producerIdAndEpoch(), seq, true);
+        manager.addInFlightBatch(batch);
+        batch.close();
+        return batch;
+    }
+
     private ProducerBatch batchWithValue(TopicPartition tp, String value) {
         MemoryRecordsBuilder builder = 
MemoryRecords.builder(ByteBuffer.allocate(64),
                 Compression.NONE, TimestampType.CREATE_TIME, 0L);
@@ -814,7 +829,7 @@ public class TransactionManagerTest {
         transactionManager.incrementSequenceNumber(tp1, 3);
         assertEquals(transactionManager.sequenceNumber(tp1), 3);
 
-        transactionManager.requestEpochBumpForPartition(tp0);
+        transactionManager.requestIdempotentEpochBumpForPartition(tp0);
         transactionManager.bumpIdempotentEpochAndResetIdIfNeeded();
         assertEquals(transactionManager.sequenceNumber(tp0), 0);
         assertEquals(transactionManager.sequenceNumber(tp1), 3);
@@ -2948,7 +2963,7 @@ public class TransactionManagerTest {
 
     @ParameterizedTest
     @ValueSource(booleans = {true, false})
-    public void testEpochBumpAfterLastInflightBatchFails(boolean 
transactionV2Enabled) {
+    public void 
testEpochBumpAfterLastInFlightBatchFailsIdempotentProducer(boolean 
transactionV2Enabled) {
         initializeTransactionManager(Optional.empty(), transactionV2Enabled);
         ProducerIdAndEpoch producerIdAndEpoch = new 
ProducerIdAndEpoch(producerId, epoch);
         initializeIdempotentProducerId(producerId, epoch);
@@ -2980,6 +2995,39 @@ public class TransactionManagerTest {
         assertEquals(0, transactionManager.sequenceNumber(tp0));
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testMaybeResolveSequencesTransactionalProducer(boolean 
transactionV2Enabled) throws Exception {
+        initializeTransactionManager(Optional.of(transactionalId), 
transactionV2Enabled);
+
+        // Initialize transaction with initial producer ID and epoch.
+        doInitTransactions(producerId, epoch);
+
+        transactionManager.beginTransaction();
+        transactionManager.maybeAddPartition(tp0);
+        prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, producerId);
+        runUntil(() -> transactionManager.isPartitionAdded(tp0));
+
+        ProducerBatch b1 = 
writeTransactionalBatchWithValue(transactionManager, tp0, "1");
+        assertEquals(Integer.valueOf(1), 
transactionManager.sequenceNumber(tp0));
+
+        transactionManager.markSequenceUnresolved(b1);
+        assertTrue(transactionManager.hasUnresolvedSequences());
+
+        transactionManager.handleFailedBatch(b1, new TimeoutException(), 
false);
+        // Call maybeResolveSequences to trigger resolution logic
+        transactionManager.maybeResolveSequences();
+
+        // Verify the type of error state the transaction is in.
+        if (transactionManager.isTransactionV2Enabled() || 
transactionManager.needToTriggerEpochBumpFromClient()) {
+            // Expected to throw an abortable error when epoch bumping is 
allowed
+            assertTrue(transactionManager.hasAbortableError());
+        } else {
+            // Expected to throw a fatal error when epoch bumping is not 
allowed
+            assertTrue(transactionManager.hasFatalError());
+        }
+    }
+
     @Test
     public void testEpochUpdateAfterBumpFromEndTxnResponseInV2() throws 
InterruptedException {
         initializeTransactionManager(Optional.of(transactionalId), true);
@@ -3506,13 +3554,13 @@ public class TransactionManagerTest {
     }
 
     @Test
-    public void testCanBumpEpochDuringCoordinatorDisconnect() {
+    public void 
testNeedToTriggerEpochBumpFromClientDuringCoordinatorDisconnect() {
         doInitTransactions(0, (short) 0);
         runUntil(() -> 
transactionManager.coordinator(CoordinatorType.TRANSACTION) != null);
-        assertTrue(transactionManager.canBumpEpoch());
+        assertTrue(transactionManager.needToTriggerEpochBumpFromClient());
 
         
apiVersions.remove(transactionManager.coordinator(CoordinatorType.TRANSACTION).idString());
-        assertTrue(transactionManager.canBumpEpoch());
+        assertTrue(transactionManager.needToTriggerEpochBumpFromClient());
     }
 
     @ParameterizedTest

Reply via email to