This is an automated email from the ASF dual-hosted git repository.
jolshan pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push:
new c8b8adf3c1d KAFKA-19367: Follow up bug fix (#19991)
c8b8adf3c1d is described below
commit c8b8adf3c1d5fc7ea6256846c4a82c0482ebf0e5
Author: Ritika Reddy <[email protected]>
AuthorDate: Mon Jun 23 15:15:36 2025 -0700
KAFKA-19367: Follow up bug fix (#19991)
This is a follow up to
[https://github.com/apache/kafka/pull/19910](https://github.com/apache/kafka/pull/url)
The coordinator failed to write an epoch fence transition for producer
jt142 to the transaction log with error COORDINATOR_NOT_AVAILABLE. The
epoch was increased to 2 but not returned to the client
(kafka.coordinator.transaction.TransactionCoordinator) -- as we don't
bump the epoch with this change, we should also update the message to
not say "increased" and remove the
epochAndMetadata.transactionMetadata.hasFailedEpochFence = true line
In the test, the expected behavior is:
First append transaction to the log fails with
COORDINATOR_NOT_AVAILABLE (epoch 1)
We try init_pid again, this time the SINGLE epoch bump succeeds, and
the following things happen simultaneously (epoch 2)
-> Transition to COMPLETE_ABORT
-> Return CONCURRENT_TRANSACTION error to the client
The client retries, and there is another epoch bump; state
transitions to EMPTY (epoch 3)
Reviewers: Justine Olshan <[email protected]>
---
.../transaction/TransactionCoordinator.scala | 6 +-
.../transaction/TransactionCoordinatorTest.scala | 144 +++++++++++++++++++++
2 files changed, 147 insertions(+), 3 deletions(-)
diff --git
a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
index 064de12a4a9..fdccf5a0209 100644
---
a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
+++
b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
@@ -960,10 +960,10 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
case Some(epochAndMetadata) =>
if (epochAndMetadata.coordinatorEpoch == coordinatorEpoch)
{
- // This was attempted epoch fence that failed, so mark
this state on the metadata
- epochAndMetadata.transactionMetadata.hasFailedEpochFence
= true
+ // For TV2, we allow re-bumping the epoch on retry,
since we don't complete the epoch bump.
+ // Therefore, we don't set hasFailedEpochFence = true.
warn(s"The coordinator failed to write an epoch fence
transition for producer $transactionalId to the transaction log " +
- s"with error $error. The epoch was increased to
${newMetadata.producerEpoch} but not returned to the client")
+ s"with error $error")
}
}
}
diff --git
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
index bbca105fd44..3adec5c029a 100644
---
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
@@ -33,6 +33,7 @@ import org.junit.jupiter.params.provider.ValueSource
import org.mockito.ArgumentMatchers.{any, anyInt}
import org.mockito.Mockito._
import org.mockito.{ArgumentCaptor, ArgumentMatchers}
+import org.mockito.Mockito.doAnswer
import scala.collection.mutable
import scala.jdk.CollectionConverters._
@@ -1805,4 +1806,147 @@ class TransactionCoordinatorTest {
else
producerEpoch
}
+
+ @Test
+ def testTV2AllowsEpochReBumpingAfterFailedWrite(): Unit = {
+ // Test the complete TV2 flow: failed write → epoch fence → abort → retry
with epoch bump
+ // This demonstrates that TV2 allows epoch re-bumping after failed writes
(unlike TV1)
+ val producerEpoch = 1.toShort
+ val txnMetadata = new TransactionMetadata(transactionalId, producerId,
producerId, RecordBatch.NO_PRODUCER_ID,
+ producerEpoch, RecordBatch.NO_PRODUCER_EPOCH, txnTimeoutMs, Ongoing,
partitions, time.milliseconds(), time.milliseconds(), TV_2)
+
+ when(transactionManager.validateTransactionTimeoutMs(anyInt()))
+ .thenReturn(true)
+
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
+ .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
txnMetadata))))
+ when(transactionManager.transactionVersionLevel()).thenReturn(TV_2)
+
+ // First attempt fails with COORDINATOR_NOT_AVAILABLE
+ when(transactionManager.appendTransactionToLog(
+ ArgumentMatchers.eq(transactionalId),
+ ArgumentMatchers.eq(coordinatorEpoch),
+ any(),
+ any(),
+ any(),
+ any()
+ )).thenAnswer(invocation => {
+ val callback = invocation.getArgument[Errors => Unit](3)
+
+ // Simulate the real TransactionStateManager behavior: reset
pendingState on failure
+ // since handleInitProducerId doesn't provide a custom retryOnError
function
+ txnMetadata.pendingState = None
+
+ // For TV2, hasFailedEpochFence is NOT set to true, allowing epoch bumps
on retry
+ // The epoch remains at its original value (1) since
completeTransitionTo was never called
+
+ callback.apply(Errors.COORDINATOR_NOT_AVAILABLE)
+ })
+
+ coordinator.handleInitProducerId(
+ transactionalId,
+ txnTimeoutMs,
+ None,
+ initProducerIdMockCallback
+ )
+ assertEquals(InitProducerIdResult(-1, -1,
Errors.COORDINATOR_NOT_AVAILABLE), result)
+
+ // After the first failed attempt, the state should be:
+ // - hasFailedEpochFence = false (NOT set for TV2)
+ // - pendingState = None (reset by TransactionStateManager)
+ // - producerEpoch = 1 (unchanged since completeTransitionTo was never
called)
+ // - transaction still ONGOING
+
+ // Second attempt: Should abort the ongoing transaction
+ reset(transactionManager)
+ when(transactionManager.validateTransactionTimeoutMs(anyInt()))
+ .thenReturn(true)
+
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
+ .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
txnMetadata))))
+ when(transactionManager.transactionVersionLevel()).thenReturn(TV_2)
+
+ // Mock the appendTransactionToLog to succeed for the endTransaction call
+ when(transactionManager.appendTransactionToLog(
+ ArgumentMatchers.eq(transactionalId),
+ ArgumentMatchers.eq(coordinatorEpoch),
+ any(),
+ any(),
+ any(),
+ any()
+ )).thenAnswer(invocation => {
+ val newMetadata = invocation.getArgument[TxnTransitMetadata](2)
+ val callback = invocation.getArgument[Errors => Unit](3)
+
+ // Complete the transition and call the callback with success
+ txnMetadata.completeTransitionTo(newMetadata)
+ callback.apply(Errors.NONE)
+ })
+
+ // Mock the transactionMarkerChannelManager to simulate the second write
(PREPARE_ABORT -> COMPLETE_ABORT)
+ doAnswer(invocation => {
+ val newMetadata = invocation.getArgument[TxnTransitMetadata](3)
+ // Simulate the completion of transaction markers and the second write
+ // This would normally happen asynchronously after markers are sent
+ txnMetadata.completeTransitionTo(newMetadata) // This transitions to
COMPLETE_ABORT
+ txnMetadata.pendingState = None
+
+ null
+ }).when(transactionMarkerChannelManager).addTxnMarkersToSend(
+ ArgumentMatchers.eq(coordinatorEpoch),
+ ArgumentMatchers.eq(TransactionResult.ABORT),
+ ArgumentMatchers.eq(txnMetadata),
+ any()
+ )
+
+ coordinator.handleInitProducerId(
+ transactionalId,
+ txnTimeoutMs,
+ None,
+ initProducerIdMockCallback
+ )
+
+ // The second attempt should return CONCURRENT_TRANSACTIONS (this is
intentional)
+ assertEquals(InitProducerIdResult(-1, -1, Errors.CONCURRENT_TRANSACTIONS),
result)
+
+ // The transactionMarkerChannelManager mock should have completed the
transition to COMPLETE_ABORT
+ // Verify that hasFailedEpochFence was never set to true for TV2, allowing
future epoch bumps
+ assertFalse(txnMetadata.hasFailedEpochFence)
+
+ // Third attempt: Client retries after CONCURRENT_TRANSACTIONS
+ reset(transactionManager)
+ when(transactionManager.validateTransactionTimeoutMs(anyInt()))
+ .thenReturn(true)
+
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
+ .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
txnMetadata))))
+ when(transactionManager.transactionVersionLevel()).thenReturn(TV_2)
+
+ when(transactionManager.appendTransactionToLog(
+ ArgumentMatchers.eq(transactionalId),
+ ArgumentMatchers.eq(coordinatorEpoch),
+ any(),
+ any(),
+ any(),
+ any()
+ )).thenAnswer(invocation => {
+ val newMetadata = invocation.getArgument[TxnTransitMetadata](2)
+ val callback = invocation.getArgument[Errors => Unit](3)
+
+ // Complete the transition and call the callback with success
+ txnMetadata.completeTransitionTo(newMetadata)
+ callback.apply(Errors.NONE)
+ })
+
+ coordinator.handleInitProducerId(
+ transactionalId,
+ txnTimeoutMs,
+ None,
+ initProducerIdMockCallback
+ )
+
+ // The third attempt should succeed with epoch 3 (2 + 1)
+ // This demonstrates that TV2 allows epoch re-bumping after failed writes
+ assertEquals(InitProducerIdResult(producerId, 3.toShort, Errors.NONE),
result)
+
+ // Final verification that hasFailedEpochFence was never set to true for
TV2
+ assertFalse(txnMetadata.hasFailedEpochFence)
+ }
}