This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 27102b3187e KAFKA-20090 Add recovery logic to handle MaxValue epochFix 
max epoch (#21469)
27102b3187e is described below

commit 27102b3187e183f35d9e2c22bdf92f87d8457131
Author: Artem Livshits <[email protected]>
AuthorDate: Thu Mar 5 04:24:23 2026 -0800

    KAFKA-20090 Add recovery logic to handle MaxValue epochFix max epoch 
(#21469)
    
    Changes:
    - TransactionMetadata: Log errors instead of throwing when epoch hits
    MAX_VALUE
    - ProducerAppendInfo: Allow marker writes at MAX_VALUE epoch for
    recovery
    - TransactionsTest: Add comprehensive test case
    - TransactionCoordinator: Add test accessor for transaction manager
    
    Add testRecoveryFromEpochOverflow to verify that the system correctly
    handles the scenario when producer epoch reaches Short.MaxValue (32767).
    
    The test validates:
    - Epoch can reach Short.MaxValue through transaction timeouts
    - When epoch overflow is detected, errors are logged but processing
    continues
    - Transaction markers at MAX_VALUE epoch are accepted to allow recovery
    - Producer ID rotation occurs after overflow is detected
    - New transactions can proceed with rotated producer ID
    
    Co-Authored-By: Claude Sonnet 4.5 <[email protected]>
    
    Reviewers: Justine Olshan <[email protected]>, chickenchickenlove
     <[email protected]>, Chia-Ping Tsai <[email protected]>
---
 .../transaction/TransactionCoordinator.scala       |   3 +
 .../integration/kafka/api/TransactionsTest.scala   | 117 +++++++++++++++++++++
 .../transaction/TransactionMetadataTest.scala      |   9 +-
 .../storage/internals/log/ProducerAppendInfo.java  |   4 +-
 .../transaction/TransactionMetadata.java           |  15 ++-
 5 files changed, 142 insertions(+), 6 deletions(-)

diff --git 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
index 83ae523b7d5..82a2bd7706b 100644
--- 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
+++ 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
@@ -1006,6 +1006,9 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
 
   def partitionFor(transactionalId: String): Int = 
txnManager.partitionFor(transactionalId)
 
+  // Package-private for testing
+  private[kafka] def transactionManager: TransactionStateManager = txnManager
+
   private def onEndTransactionComplete(txnIdAndPidEpoch: 
TransactionalIdAndProducerIdEpoch)(error: Errors, newProducerId: Long, 
newProducerEpoch: Short): Unit = {
     error match {
       case Errors.NONE =>
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala 
b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
index 023e9f15b00..25b274cf3fe 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
@@ -19,6 +19,7 @@ package kafka.api
 
 import kafka.utils.TestUtils.{consumeRecords, waitUntilTrue}
 import kafka.utils.{TestInfoUtils, TestUtils}
+import org.apache.kafka.clients.admin.TransactionState
 import org.apache.kafka.clients.consumer._
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
 import org.apache.kafka.common.{KafkaException, TopicPartition}
@@ -695,6 +696,122 @@ class TransactionsTest extends IntegrationTestHarness {
     assertThrows(classOf[IllegalStateException], () => 
producer.initTransactions())
   }
 
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
+  
@MethodSource(Array("getTestGroupProtocolParametersConsumerGroupProtocolOnly"))
+  def testRecoveryFromEpochOverflow(groupProtocol: String): Unit = {
+    // We could encounter a bug (see 
https://issues.apache.org/jira/browse/KAFKA-20090)
+    // that only reproduces when epoch gets to Short.MaxValue - 1 and 
transaction is
+    // aborted on timeout.
+    val transactionalId = "test-overflow"
+    var producer = createTransactionalProducer(transactionalId, 
transactionTimeoutMs = 500)
+    val abortedRecord = new ProducerRecord[Array[Byte], Array[Byte]](topic1, 
0, "key".getBytes, "aborted".getBytes)
+
+    // Create a transaction, produce one record, and abort
+    producer.initTransactions()
+    producer.beginTransaction()
+    producer.send(abortedRecord)
+    producer.abortTransaction()
+    producer.close()
+
+    // Find the transaction coordinator partition for this transactional ID
+    val adminClient = createAdminClient()
+    try {
+      val txnDescription = 
adminClient.describeTransactions(java.util.List.of(transactionalId))
+        .description(transactionalId).get()
+      val coordinatorId = txnDescription.coordinatorId()
+
+      // Access the transaction coordinator and update the epoch to 
Short.MaxValue - 2
+      val coordinatorBroker = brokers.find(_.config.brokerId == 
coordinatorId).get
+      val txnCoordinator = 
coordinatorBroker.asInstanceOf[kafka.server.BrokerServer].transactionCoordinator
+
+      // Get the transaction metadata and update the epoch close to 
Short.MaxValue
+      // to trigger the overflow scenario. We'll set it high enough that 
subsequent
+      // operations will cause it to reach Short.MaxValue - 1 before the 
timeout.
+      
txnCoordinator.transactionManager.getTransactionState(transactionalId).foreach 
{ txnMetadataOpt =>
+        txnMetadataOpt.foreach { epochAndMetadata =>
+          epochAndMetadata.transactionMetadata.inLock(() => {
+            
epochAndMetadata.transactionMetadata.setProducerEpoch((Short.MaxValue - 
2).toShort)
+            null // inLock expects a Supplier that returns a value
+          })
+        }
+      }
+    } finally {
+      adminClient.close()
+    }
+
+    // Re-initialize the producer which will bump epoch
+    producer = createTransactionalProducer(transactionalId, 
transactionTimeoutMs = 500)
+    producer.initTransactions()
+
+    // Start a transaction
+    producer.beginTransaction()
+    // Produce one record and wait for it to complete
+    producer.send(abortedRecord).get()
+    producer.flush()
+
+    // Check and assert that epoch of the transaction is Short.MaxValue - 1 
(before timeout)
+    val adminClient2 = createAdminClient()
+    try {
+      val coordinatorId2 = 
adminClient2.describeTransactions(java.util.List.of(transactionalId))
+        .description(transactionalId).get().coordinatorId()
+      val coordinatorBroker2 = brokers.find(_.config.brokerId == 
coordinatorId2).get
+      val txnCoordinator2 = 
coordinatorBroker2.asInstanceOf[kafka.server.BrokerServer].transactionCoordinator
+
+      
txnCoordinator2.transactionManager.getTransactionState(transactionalId).foreach 
{ txnMetadataOpt =>
+        txnMetadataOpt.foreach { epochAndMetadata =>
+          val currentEpoch = 
epochAndMetadata.transactionMetadata.producerEpoch()
+          assertEquals((Short.MaxValue - 1).toShort, currentEpoch,
+            s"Expected epoch to be ${Short.MaxValue - 1}, but got 
$currentEpoch")
+        }
+      }
+
+      // Wait until state is complete abort
+      waitUntilTrue(() => {
+        val listResult = adminClient2.listTransactions()
+        val txns = listResult.all().get().asScala
+        txns.exists(txn =>
+          txn.transactionalId() == transactionalId &&
+          txn.state() == TransactionState.COMPLETE_ABORT
+        )
+      }, "Transaction was not aborted on timeout")
+    } finally {
+      adminClient2.close()
+    }
+
+    // Abort, this should be treated as retry of the abort caused by timeout
+    producer.abortTransaction()
+
+    // Start a transaction, it would use the state from abort
+    producer.beginTransaction()
+    // Produce one record and wait for it to complete
+    producer.send(abortedRecord).get()
+    producer.flush()
+
+    // Now init new producer and commit a transaction with a distinct value
+    val producer2 = createTransactionalProducer(transactionalId, 
transactionTimeoutMs = 500)
+    producer2.initTransactions()
+    producer2.beginTransaction()
+    val committedRecord = new ProducerRecord[Array[Byte], Array[Byte]](topic1, 
0, "key".getBytes, "committed".getBytes)
+    producer2.send(committedRecord).get()
+    producer2.commitTransaction()
+
+    // Verify that exactly one record is visible in read-committed mode
+    val consumer = createReadCommittedConsumer("test-consumer-group")
+    try {
+      val tp = new TopicPartition(topic1, 0)
+      consumer.assign(java.util.Set.of(tp))
+      val records = consumeRecords(consumer, 1)
+
+      val record = records.head
+      assertArrayEquals("key".getBytes, record.key, "Record key should match")
+      assertArrayEquals("committed".getBytes, record.value, "Record value 
should be 'committed'")
+      assertEquals(0, record.partition, "Record should be in partition 0")
+      assertEquals(topic1, record.topic, "Record should be in topic1")
+    } finally {
+      consumer.close()
+    }
+  }
+
   @ParameterizedTest
   @CsvSource(Array(
     "classic,false",
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMetadataTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMetadataTest.scala
index b28e91f75c9..cab8d3e90de 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMetadataTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMetadataTest.scala
@@ -497,7 +497,14 @@ class TransactionMetadataTest {
       time.milliseconds(),
       TV_0)
     assertTrue(txnMetadata.isProducerEpochExhausted)
-    assertThrows(classOf[IllegalStateException], () => 
txnMetadata.prepareFenceProducerEpoch())
+
+    // When epoch is at max, prepareFenceProducerEpoch logs an error but 
doesn't throw
+    // This allows graceful recovery through producer ID rotation
+    val preparedMetadata = txnMetadata.prepareFenceProducerEpoch()
+
+    // Epoch should remain at Short.MaxValue (not overflow to negative)
+    assertEquals(Short.MaxValue, preparedMetadata.producerEpoch)
+    assertEquals(TransactionState.PREPARE_EPOCH_FENCE, 
preparedMetadata.txnState)
   }
 
   @Test
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerAppendInfo.java
 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerAppendInfo.java
index 2a5d408e2e3..3edec7028d5 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerAppendInfo.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerAppendInfo.java
@@ -127,9 +127,11 @@ public class ProducerAppendInfo {
             // In both cases, the transaction has already ended 
(currentTxnFirstOffset is empty).
             // We suppress the InvalidProducerEpochException and allow the 
duplicate marker to
             // be written to the log.
+            // In some buggy scenarios we may start transaction with 
MAX_VALUE.  We allow
+            // code to gracefully recover from that.
             if (transactionVersion >= 2 &&
                     producerEpoch == current &&
-                    updatedEntry.currentTxnFirstOffset().isEmpty()) {
+                    (updatedEntry.currentTxnFirstOffset().isEmpty() || 
producerEpoch == Short.MAX_VALUE)) {
                 log.info("Idempotent transaction marker retry detected for 
producer {} epoch {}. " +
                                 "Transaction already completed, allowing 
duplicate marker write.",
                         producerId, producerEpoch);
diff --git 
a/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionMetadata.java
 
b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionMetadata.java
index 1940a8a90b8..00e4c636b72 100644
--- 
a/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionMetadata.java
+++ 
b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionMetadata.java
@@ -139,11 +139,12 @@ public class TransactionMetadata {
 
     public TxnTransitMetadata prepareFenceProducerEpoch() {
         if (producerEpoch == Short.MAX_VALUE)
-            throw new IllegalStateException("Cannot fence producer with epoch 
equal to Short.MaxValue since this would overflow");
+            LOGGER.error("Fencing producer {} {} with epoch equal to 
Short.MaxValue, this must not happen unless there is a bug", transactionalId, 
producerId);
 
         // If we've already failed to fence an epoch (because the write to the 
log failed), we don't increase it again.
         // This is safe because we never return the epoch to client if we fail 
to fence the epoch
-        short bumpedEpoch = hasFailedEpochFence ? producerEpoch : (short) 
(producerEpoch + 1);
+        // Also don't increase if producerEpoch is already at max, to avoid 
overflow.
+        short bumpedEpoch = hasFailedEpochFence || producerEpoch == 
Short.MAX_VALUE ? producerEpoch : (short) (producerEpoch + 1);
 
         TransitionData data = new 
TransitionData(TransactionState.PREPARE_EPOCH_FENCE);
         data.producerEpoch = bumpedEpoch;
@@ -238,8 +239,14 @@ public class TransactionMetadata {
                                                    boolean noPartitionAdded) {
         TransitionData data = new TransitionData(newState);
         if (clientTransactionVersion.supportsEpochBump()) {
-            // We already ensured that we do not overflow here. MAX_SHORT is 
the highest possible value.
-            data.producerEpoch = (short) (producerEpoch + 1);
+            if (producerEpoch == Short.MAX_VALUE && newState == 
TransactionState.PREPARE_ABORT) {
+                // If we're already in a broken state, we let the abort go 
through without
+                // epoch overflow, so that we can recover and continue.
+                LOGGER.error("Aborting producer {} {} with epoch equal to 
Short.MaxValue, this must not happen unless there is a bug", transactionalId, 
producerId);
+            } else {
+                // We already ensured that we do not overflow here. MAX_SHORT 
is the highest possible value.
+                data.producerEpoch = (short) (producerEpoch + 1);
+            }
             data.lastProducerEpoch = producerEpoch;
         } else {
             data.producerEpoch = producerEpoch;

Reply via email to