artemlivshits commented on code in PR #16719:
URL: https://github.com/apache/kafka/pull/16719#discussion_r1720359067
##########
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##########
@@ -181,6 +183,7 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
newMetadata.producerEpoch,
TransactionResult.ABORT,
isFromClient = false,
+ clientTransactionVersion = 0,
Review Comment:
Is using version=0 the right thing here? I'd expect that we want epoch bump
on aborted transactions.
##########
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##########
@@ -668,11 +709,23 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
}
}
+ def isInvalidTxnTransition(txnMetadata: TransactionMetadata,
txnMarkerResult: TransactionResult) = {
+ !((List(PrepareCommit, CompleteCommit).contains(txnMetadata.state) &&
txnMarkerResult == TransactionResult.COMMIT) ||
+ (List(PrepareAbort, CompleteAbort).contains(txnMetadata.state) &&
txnMarkerResult == TransactionResult.ABORT))
+ }
+
+ def isRetryEndTxn(txnMetadata: TransactionMetadata, producerId: Long,
producerEpoch: Short): Boolean = {
+ // The previous producer ID matches and the epoch is either + 1 the
request epoch or 0 if the epoch overflowed.
Review Comment:
Can we add some comments on how we check the conditions? Also, I think if
we structure the code to check retry conditions for (PrepareCommit,
PrepareAbort) and then for (CompleteAbort, CompleteCommit), I think it may be
more readable, especially if we annotate with comments for each case that
explain the transitions.
##########
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##########
@@ -594,7 +635,7 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
txnMetadata.inLock {
if (txnMetadata.producerId != producerId)
Left(Errors.INVALID_PRODUCER_ID_MAPPING)
- else if (txnMetadata.producerEpoch != producerEpoch)
+ else if (txnMetadata.producerEpoch != producerEpoch &&
(requestIsAtLeastTransactionsV2 && txnMetadata.producerEpoch != producerEpoch +
1))
Review Comment:
Should it be `txnMetadata.producerEpoch != producerEpoch &&
(!requestIsAtLeastTransactionsV2 || txnMetadata.producerEpoch != producerEpoch
+ 1)`? As written we effectively ignore epoch check when
`requestIsAtLeastTransactionsV2==false`.
##########
core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala:
##########
@@ -79,7 +79,7 @@ object TransactionLog {
// Serialize with version 0 (highest non-flexible version) until
transaction.version 1 is enabled
// which enables flexible fields in records.
val version: Short =
- if (usesFlexibleRecords) 1 else 0
+ if (transactionVersionLevel > 1) 1 else 0
Review Comment:
According to comments the condition should be `transactionVersionLevel >= 1`.
##########
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##########
@@ -540,24 +572,29 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
txnMetadata.lastProducerEpoch = RecordBatch.NO_PRODUCER_EPOCH
}
- Right(coordinatorEpoch,
txnMetadata.prepareAbortOrCommit(nextState, time.milliseconds()))
+ nextProducerIdOrErrors match {
+ case Left(error) =>
+ Left(error)
+ case Right(nextProducerId) =>
+ Right(coordinatorEpoch,
txnMetadata.prepareAbortOrCommit(nextState, clientTransactionVersion,
nextProducerId, time.milliseconds()))
+ }
case CompleteCommit =>
Review Comment:
Yeah, I wonder if it would be more readable to keep this code as it was, but
when we check producerId and epoch above just do a version-specific producer id
and epoch check.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]