This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch 4.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.1 by this push:
     new a10c1f3ea14 KAFKA-19690 Add epoch check before verification guard 
check to prevent unexpected fatal error (#20618)
a10c1f3ea14 is described below

commit a10c1f3ea1454dbd644ea65a885c965529e1d37d
Author: Ritika Reddy <[email protected]>
AuthorDate: Thu Oct 2 06:18:34 2025 -0700

    KAFKA-19690 Add epoch check before verification guard check to prevent 
unexpected fatal error (#20618)
    
    Cherry-pick changes (#20534) to 4.1
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../test/scala/unit/kafka/log/UnifiedLogTest.scala | 90 ++++++++++++++++++++++
 .../kafka/storage/internals/log/UnifiedLog.java    | 17 +++-
 2 files changed, 103 insertions(+), 4 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala 
b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
index d30d5a1040e..6d312f2c53e 100755
--- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
@@ -4786,6 +4786,96 @@ class UnifiedLogTest {
 
     (log, segmentWithOverflow)
   }
+
+  @Test
+  def testStaleProducerEpochReturnsRecoverableErrorForTV1Clients(): Unit = {
+    // Producer epoch gets incremented (coordinator fail over, completed 
transaction, etc.)
+    // and client has stale cached epoch. Fix prevents fatal 
InvalidTxnStateException.
+
+    val producerStateManagerConfig = new ProducerStateManagerConfig(86400000, 
true)
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
+    val log = createLog(logDir, logConfig, producerStateManagerConfig = 
producerStateManagerConfig)
+
+    val producerId = 123L
+    val oldEpoch = 5.toShort
+    val newEpoch = 6.toShort
+
+    // Step 1: Simulate a scenario where producer epoch was incremented to 
fence the producer
+    val previousRecords = MemoryRecords.withTransactionalRecords(
+      Compression.NONE, producerId, newEpoch, 0,
+      new SimpleRecord("previous-key".getBytes, "previous-value".getBytes)
+    )
+    val previousGuard = log.maybeStartTransactionVerification(producerId, 0, 
newEpoch, false)  // TV1 = supportsEpochBump = false
+    log.appendAsLeader(previousRecords, 0, AppendOrigin.CLIENT, 
RequestLocal.noCaching, previousGuard)
+
+    // Complete the transaction normally (commits do update producer state 
with current epoch)
+    val commitMarker = MemoryRecords.withEndTransactionMarker(
+      producerId, newEpoch, new EndTransactionMarker(ControlRecordType.COMMIT, 
0)
+    )
+    log.appendAsLeader(commitMarker, 0, AppendOrigin.COORDINATOR, 
RequestLocal.noCaching, VerificationGuard.SENTINEL)
+
+    // Step 2: TV1 client tries to write with stale cached epoch (before 
learning about epoch increment)
+    val staleEpochRecords = MemoryRecords.withTransactionalRecords(
+      Compression.NONE, producerId, oldEpoch, 0,
+      new SimpleRecord("stale-epoch-key".getBytes, 
"stale-epoch-value".getBytes)
+    )
+
+    // Step 3: Verify our fix - should get InvalidProducerEpochException 
(recoverable), not InvalidTxnStateException (fatal)
+    val exception = assertThrows(classOf[InvalidProducerEpochException], () => 
{
+      val staleGuard = log.maybeStartTransactionVerification(producerId, 0, 
oldEpoch, false)
+      log.appendAsLeader(staleEpochRecords, 0, AppendOrigin.CLIENT, 
RequestLocal.noCaching, staleGuard)
+     })
+
+     // Verify the error message indicates epoch mismatch
+     assertTrue(exception.getMessage.contains("smaller than the last seen 
epoch"))
+     assertTrue(exception.getMessage.contains(s"$oldEpoch"))
+     assertTrue(exception.getMessage.contains(s"$newEpoch"))
+  }
+
+  @Test
+  def testStaleProducerEpochReturnsRecoverableErrorForTV2Clients(): Unit = {
+    // Check producer epoch FIRST - if stale, return recoverable error before 
verification checks.
+
+    val producerStateManagerConfig = new ProducerStateManagerConfig(86400000, 
true)
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
+    val log = createLog(logDir, logConfig, producerStateManagerConfig = 
producerStateManagerConfig)
+
+    val producerId = 456L
+    val originalEpoch = 3.toShort
+    val bumpedEpoch = 4.toShort
+
+    // Step 1: Start transaction with epoch 3 (before timeout)
+    val initialRecords = MemoryRecords.withTransactionalRecords(
+      Compression.NONE, producerId, originalEpoch, 0,
+      new SimpleRecord("ks-initial-key".getBytes, "ks-initial-value".getBytes)
+    )
+    val initialGuard = log.maybeStartTransactionVerification(producerId, 0, 
originalEpoch, true)  // TV2 = supportsEpochBump = true
+    log.appendAsLeader(initialRecords, 0, AppendOrigin.CLIENT, 
RequestLocal.noCaching, initialGuard)
+
+    // Step 2: Coordinator times out and aborts transaction
+    // TV2 (KIP-890): Coordinator bumps epoch from 3 → 4 and sends abort 
marker with epoch 4
+    val abortMarker = MemoryRecords.withEndTransactionMarker(
+      producerId, bumpedEpoch, new 
EndTransactionMarker(ControlRecordType.ABORT, 0)
+    )
+    log.appendAsLeader(abortMarker, 0, AppendOrigin.COORDINATOR, 
RequestLocal.noCaching, VerificationGuard.SENTINEL)
+
+    // Step 3: TV2 transactional producer tries to append with stale epoch 
(timeout recovery scenario)
+    val staleEpochRecords = MemoryRecords.withTransactionalRecords(
+      Compression.NONE, producerId, originalEpoch, 0,
+      new SimpleRecord("ks-resume-key".getBytes, "ks-resume-value".getBytes)
+    )
+
+    // Step 4: Verify our fix works for TV2 - should get 
InvalidProducerEpochException (recoverable), not InvalidTxnStateException 
(fatal)
+    val exception = assertThrows(classOf[InvalidProducerEpochException], () => 
{
+      val staleGuard = log.maybeStartTransactionVerification(producerId, 0, 
originalEpoch, true)  // TV2 = supportsEpochBump = true
+      log.appendAsLeader(staleEpochRecords, 0, AppendOrigin.CLIENT, 
RequestLocal.noCaching, staleGuard)
+     })
+
+     // Verify the error message indicates epoch mismatch (3 < 4)
+     assertTrue(exception.getMessage.contains("smaller than the last seen 
epoch"))
+     assertTrue(exception.getMessage.contains(s"$originalEpoch"))
+     assertTrue(exception.getMessage.contains(s"$bumpedEpoch"))
+  }
 }
 
 object UnifiedLogTest {
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java
index 2ca67ad47bc..b5b16e42232 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java
@@ -1385,10 +1385,19 @@ public class UnifiedLog implements AutoCloseable {
                     // transaction is completed or aborted. We can guarantee 
the transaction coordinator knows about the transaction given step 1 and that 
the transaction is still
                     // ongoing. If the transaction is expected to be ongoing, 
we will not set a VerificationGuard. If the transaction is aborted, 
hasOngoingTransaction is false and
                     // requestVerificationGuard is the sentinel, so we will 
throw an error. A subsequent produce request (retry) should create verification 
state and return to phase 1.
-                    if (batch.isTransactional()
-                            && !hasOngoingTransaction(batch.producerId(), 
batch.producerEpoch())
-                            && batchMissingRequiredVerification(batch, 
requestVerificationGuard)) {
-                        throw new InvalidTxnStateException("Record was not 
part of an ongoing transaction");
+                    if (batch.isTransactional() && 
!hasOngoingTransaction(batch.producerId(), batch.producerEpoch())) {
+                        // Check epoch first: if producer epoch is stale, 
throw recoverable InvalidProducerEpochException.
+                        ProducerStateEntry entry = 
producerStateManager.activeProducers().get(batch.producerId());
+                        if (entry != null && batch.producerEpoch() < 
entry.producerEpoch()) {
+                            String message = "Epoch of producer " + 
batch.producerId() + " is " + batch.producerEpoch() + 
+                                ", which is smaller than the last seen epoch " 
+ entry.producerEpoch();
+                            throw new InvalidProducerEpochException(message);
+                        }
+                        
+                        // Only check verification if epoch is current
+                        if (batchMissingRequiredVerification(batch, 
requestVerificationGuard)) {
+                            throw new InvalidTxnStateException("Record was not 
part of an ongoing transaction");
+                        }
                     }
                 }
 

Reply via email to