CalvinConfluent commented on code in PR #17698:
URL: https://github.com/apache/kafka/pull/17698#discussion_r1854240231
##########
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##########
@@ -564,41 +633,44 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
else
PrepareAbort
- // Maybe allocate new producer ID if we are bumping epoch and
epoch is exhausted
- val nextProducerIdOrErrors =
- if (clientTransactionVersion.supportsEpochBump() &&
!txnMetadata.pendingState.contains(PrepareEpochFence) &&
txnMetadata.isProducerEpochExhausted) {
- try {
- Right(producerIdManager.generateProducerId())
- } catch {
- case e: Exception => Left(Errors.forException(e))
- }
+ generateTxnTransitMetadataForTxnCompletion(nextState)
+ case CompleteCommit =>
+ if (currentTxnMetadataIsAtLeastTransactionsV2) {
+ if (txnMarkerResult == TransactionResult.COMMIT) {
+ if (isRetry)
+ Left(Errors.NONE)
+ else
+ logInvalidStateTransitionAndReturnError(transactionalId,
txnMetadata.state, txnMarkerResult)
} else {
- Right(RecordBatch.NO_PRODUCER_ID)
+ if (isRetry)
+ logInvalidStateTransitionAndReturnError(transactionalId,
txnMetadata.state, txnMarkerResult)
+ else
+ generateTxnTransitMetadataForTxnCompletion(PrepareAbort)
}
-
- if (nextState == PrepareAbort &&
txnMetadata.pendingState.contains(PrepareEpochFence)) {
- // We should clear the pending state to make way for the
transition to PrepareAbort and also bump
- // the epoch in the transaction metadata we are about to
append.
- isEpochFence = true
- txnMetadata.pendingState = None
- txnMetadata.producerEpoch = producerEpoch
- txnMetadata.lastProducerEpoch = RecordBatch.NO_PRODUCER_EPOCH
+ } else {
+ // Transaction V1
+ if (txnMarkerResult == TransactionResult.COMMIT) {
+ Left(Errors.NONE)
+ } else
+ logInvalidStateTransitionAndReturnError(transactionalId,
txnMetadata.state, txnMarkerResult)
}
-
- nextProducerIdOrErrors.flatMap {
- nextProducerId =>
- Right(coordinatorEpoch,
txnMetadata.prepareAbortOrCommit(nextState, clientTransactionVersion,
nextProducerId.asInstanceOf[Long], time.milliseconds()))
- }
- case CompleteCommit =>
- if (txnMarkerResult == TransactionResult.COMMIT)
- Left(Errors.NONE)
- else
- logInvalidStateTransitionAndReturnError(transactionalId,
txnMetadata.state, txnMarkerResult)
case CompleteAbort =>
- if (txnMarkerResult == TransactionResult.ABORT)
- Left(Errors.NONE)
- else
- logInvalidStateTransitionAndReturnError(transactionalId,
txnMetadata.state, txnMarkerResult)
+ if (currentTxnMetadataIsAtLeastTransactionsV2) {
+ if (txnMarkerResult == TransactionResult.ABORT) {
+ if (isRetry)
+ Left(Errors.NONE)
+ else
+ generateTxnTransitMetadataForTxnCompletion(PrepareAbort)
+ } else {
+ logInvalidStateTransitionAndReturnError(transactionalId,
txnMetadata.state, txnMarkerResult)
+ }
+ } else {
+ // Transaction V1
+ if (txnMarkerResult == TransactionResult.ABORT)
+ Left(Errors.NONE)
+ else
+ logInvalidStateTransitionAndReturnError(transactionalId,
txnMetadata.state, txnMarkerResult)
+ }
Review Comment:
I agree that the if conditions can be more concise here. However, I found it
more readable to list all the conditions here. Otherwise, it makes my brain
burn when I verify whether the result matches the scenario mentioned in the
table.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]