artemlivshits commented on code in PR #21469:
URL: https://github.com/apache/kafka/pull/21469#discussion_r2881357568
##########
transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionMetadata.java:
##########
@@ -139,11 +139,12 @@ public TxnTransitMetadata prepareNoTransit() {
public TxnTransitMetadata prepareFenceProducerEpoch() {
if (producerEpoch == Short.MAX_VALUE)
- throw new IllegalStateException("Cannot fence producer with epoch
equal to Short.MaxValue since this would overflow");
+ LOGGER.error("Fencing producer {} {} with epoch equal to
Short.MaxValue, this must not happen unless there is a bug", transactionalId,
producerId);
// If we've already failed to fence an epoch (because the write to the
log failed), we don't increase it again.
// This is safe because we never return the epoch to client if we fail
to fence the epoch
- short bumpedEpoch = hasFailedEpochFence ? producerEpoch : (short)
(producerEpoch + 1);
+ // Also don't increase if producerEpoch is already at max, to avoid
overflow.
+ short bumpedEpoch = hasFailedEpochFence || producerEpoch ==
Short.MAX_VALUE ? producerEpoch : (short) (producerEpoch + 1);
Review Comment:
This recovery logic does not intend to codify all possible scenarios to
properly support epoch 32767 (it's a bug that should never happen), it just
provide a way to expire the transaction. Otherwise the transaction will stay
forever, even after the bug is fixed. So the fix is made intentionally small
to support only narrow path to avoid potential regressions and unnecessary
complexity.
##########
core/src/test/scala/integration/kafka/api/TransactionsTest.scala:
##########
@@ -695,6 +696,122 @@ class TransactionsTest extends IntegrationTestHarness {
assertThrows(classOf[IllegalStateException], () =>
producer.initTransactions())
}
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
+
@MethodSource(Array("getTestGroupProtocolParametersConsumerGroupProtocolOnly"))
+ def testRecoveryFromEpochOverflow(groupProtocol: String): Unit = {
+ // We could encounter a bug (see
https://issues.apache.org/jira/browse/KAFKA-20090)
+ // that only reproduces when epoch gets to Short.MaxValue - 1 and
transaction is
+ // aborted on timeout.
+ val transactionalId = "test-overflow"
+ var producer = createTransactionalProducer(transactionalId,
transactionTimeoutMs = 500)
+ val abortedRecord = new ProducerRecord[Array[Byte], Array[Byte]](topic1,
0, "key".getBytes, "aborted".getBytes)
+
+ // Create a transaction, produce one record, and abort
+ producer.initTransactions()
+ producer.beginTransaction()
+ producer.send(abortedRecord)
+ producer.abortTransaction()
+ producer.close()
+
+ // Find the transaction coordinator partition for this transactional ID
+ val adminClient = createAdminClient()
+ try {
+ val txnDescription =
adminClient.describeTransactions(java.util.List.of(transactionalId))
+ .description(transactionalId).get()
+ val coordinatorId = txnDescription.coordinatorId()
+
+ // Access the transaction coordinator and update the epoch to
Short.MaxValue - 2
+ val coordinatorBroker = brokers.find(_.config.brokerId ==
coordinatorId).get
+ val txnCoordinator =
coordinatorBroker.asInstanceOf[kafka.server.BrokerServer].transactionCoordinator
+
+ // Get the transaction metadata and update the epoch close to
Short.MaxValue
+ // to trigger the overflow scenario. We'll set it high enough that
subsequent
+ // operations will cause it to reach Short.MaxValue - 1 before the
timeout.
+
txnCoordinator.transactionManager.getTransactionState(transactionalId).foreach
{ txnMetadataOpt =>
+ txnMetadataOpt.foreach { epochAndMetadata =>
+ epochAndMetadata.transactionMetadata.inLock(() => {
+
epochAndMetadata.transactionMetadata.setProducerEpoch((Short.MaxValue -
2).toShort)
+ null // inLock expects a Supplier that returns a value
+ })
+ }
+ }
+ } finally {
+ adminClient.close()
+ }
+
+ // Re-initialize the producer which will bump epoch
+ producer = createTransactionalProducer(transactionalId,
transactionTimeoutMs = 500)
+ producer.initTransactions()
+
+ // Start a transaction
+ producer.beginTransaction()
+ // Produce one record and wait for it to complete
+ producer.send(abortedRecord).get()
+ producer.flush()
+
+ // Check and assert that epoch of the transaction is Short.MaxValue - 1
(before timeout)
+ val adminClient2 = createAdminClient()
+ try {
+ val coordinatorId2 =
adminClient2.describeTransactions(java.util.List.of(transactionalId))
+ .description(transactionalId).get().coordinatorId()
+ val coordinatorBroker2 = brokers.find(_.config.brokerId ==
coordinatorId2).get
+ val txnCoordinator2 =
coordinatorBroker2.asInstanceOf[kafka.server.BrokerServer].transactionCoordinator
+
+
txnCoordinator2.transactionManager.getTransactionState(transactionalId).foreach
{ txnMetadataOpt =>
+ txnMetadataOpt.foreach { epochAndMetadata =>
+ val currentEpoch =
epochAndMetadata.transactionMetadata.producerEpoch()
+ assertEquals((Short.MaxValue - 1).toShort, currentEpoch,
+ s"Expected epoch to be ${Short.MaxValue - 1}, but got
$currentEpoch")
+ }
+ }
+
+ // Wait until state is complete abort
+ waitUntilTrue(() => {
+ val listResult = adminClient2.listTransactions()
+ val txns = listResult.all().get().asScala
+ txns.exists(txn =>
+ txn.transactionalId() == transactionalId &&
+ txn.state() == TransactionState.COMPLETE_ABORT
+ )
+ }, "Transaction was not aborted on timeout")
+ } finally {
+ adminClient2.close()
+ }
+
+ // Abort, this should be treated as retry of the abort caused by timeout
+ producer.abortTransaction()
Review Comment:
This test just follows a scenario for KAFKA-20090, without that bug that
issue won't happen. So there is no contract that expects that the client
should transition to `Short.MaxValue` it should never do it unless there is a
bug.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]