This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch 4.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.2 by this push:
     new 4a57c6d2289 KAFKA-19999 Transaction coordinator livelock caused by 
invalid producer epoch (#21176)
4a57c6d2289 is described below

commit 4a57c6d22894a0cd2504a2eefda42fc3aec86a9b
Author: Ken Huang <[email protected]>
AuthorDate: Sun Dec 28 12:50:08 2025 +0800

    KAFKA-19999 Transaction coordinator livelock caused by invalid producer 
epoch (#21176)
    
    In Transaction Version 2, strict epoch validation (`markerEpoch >
    currentEpoch`) causes hanging transactions in two scenarios:
    
    1. **Coordinator recovery**: When reloading PREPARE_COMMIT/ABORT from
    transaction log, retried markers are rejected with
    `InvalidProducerEpochException` because they use the same epoch
    2. **Network retry**: When marker write succeeds but response is lost,
    coordinator retries are rejected for the same reason
    
    Both cases leave transactions permanently hanging in PREPARE state,
    causing clients to fail with `CONCURRENT_TRANSACTIONS`.
    
    Detect idempotent marker retries in
    `ProducerStateManager.checkProducerEpoch()` by checking three
    conditions:
    1. Transaction Version ≥ 2
    2. markerEpoch == currentEpoch (same epoch)
    3. currentTxnFirstOffset is empty (transaction already completed)
    
    When all conditions are met, treat the marker as a successful idempotent
    retry instead of throwing an error.
    
    Reviewers: Justine Olshan <[email protected]>, TaiJuWu
     <[email protected]>, Chia-Ping Tsai <[email protected]>
---
 .../storage/internals/log/ProducerAppendInfo.java    | 16 ++++++++++++++++
 .../internals/log/ProducerStateManagerTest.java      | 20 ++++++++++++++++++++
 2 files changed, 36 insertions(+)

diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerAppendInfo.java
 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerAppendInfo.java
index 1f182444a87..976bd79e948 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerAppendInfo.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerAppendInfo.java
@@ -119,6 +119,22 @@ public class ProducerAppendInfo {
         boolean invalidEpoch = (transactionVersion >= 2) ? (producerEpoch <= 
current) : (producerEpoch < current);
 
         if (invalidEpoch) {
+            // TV2 Idempotent Marker Retry Detection (KAFKA-19999):
+            // When markerEpoch == currentEpoch and no transaction is ongoing, 
this indicates
+            // a retry of a marker that was already successfully written. 
Common scenarios:
+            // 1. Coordinator recovery: reloading PREPARE_COMMIT/ABORT from 
transaction log
+            // 2. Network retry: marker was written but response was lost due 
to disconnection
+            // In both cases, the transaction has already ended 
(currentTxnFirstOffset is empty).
+            // We suppress the InvalidProducerEpochException and allow the 
duplicate marker to
+            // be written to the log.
+            if (transactionVersion >= 2 &&
+                    producerEpoch == current &&
+                    updatedEntry.currentTxnFirstOffset().isEmpty()) {
+                log.info("Idempotent transaction marker retry detected for 
producer {} epoch {}. " +
+                                "Transaction already completed, allowing 
duplicate marker write.",
+                        producerId, producerEpoch);
+                return;
+            }
             String comparison = (transactionVersion >= 2) ? "<=" : "<";
             String message = "Epoch of producer " + producerId + " at offset " 
+ offset + " in " + topicPartition +
                     " is " + producerEpoch + ", which is " + comparison + " 
the last seen epoch " + current +
diff --git 
a/storage/src/test/java/org/apache/kafka/storage/internals/log/ProducerStateManagerTest.java
 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/ProducerStateManagerTest.java
index 4d737de4ab4..b504fb76636 100644
--- 
a/storage/src/test/java/org/apache/kafka/storage/internals/log/ProducerStateManagerTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/ProducerStateManagerTest.java
@@ -1090,6 +1090,26 @@ public class ProducerStateManagerTest {
         assertNull(stateManager.verificationStateEntry(producerId));
     }
 
+    @Test
+    public void testIdempotentTransactionMarkerRetryTV2() {
+        short transactionVersion = 2;
+        appendClientEntry(stateManager, producerId, epoch, defaultSequence, 
99, true);
+        short markerEpoch = (short) (epoch + 1);
+        appendEndTxnMarker(stateManager, producerId, markerEpoch, 
ControlRecordType.COMMIT, 100, transactionVersion);
+
+        ProducerStateEntry entry = 
getLastEntryOrElseThrownByProducerId(stateManager, producerId);
+        assertEquals(markerEpoch, entry.producerEpoch());
+        assertEquals(OptionalLong.empty(), entry.currentTxnFirstOffset());
+
+        assertDoesNotThrow(() ->
+                appendEndTxnMarker(stateManager, producerId, markerEpoch, 
ControlRecordType.COMMIT, 101, transactionVersion)
+        );
+
+        ProducerStateEntry entryAfterRetry = 
getLastEntryOrElseThrownByProducerId(stateManager, producerId);
+        assertEquals(markerEpoch, entryAfterRetry.producerEpoch());
+        assertEquals(OptionalLong.empty(), 
entryAfterRetry.currentTxnFirstOffset());
+    }
+
     @Test
     public void testRejectNonZeroSequenceForTransactionsV2WithEmptyState() {
         // Create a verification state entry that supports epoch bump 
(transactions v2)

Reply via email to