jolshan commented on code in PR #17698:
URL: https://github.com/apache/kafka/pull/17698#discussion_r1847092394
##########
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##########
@@ -610,7 +615,18 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
else
logInvalidStateTransitionAndReturnError(transactionalId,
txnMetadata.state, txnMarkerResult)
case Empty =>
- logInvalidStateTransitionAndReturnError(transactionalId,
txnMetadata.state, txnMarkerResult)
+ if (txnMarkerResult == TransactionResult.ABORT &&
clientTransactionVersion.supportsEpochBump() &&
+ !txnMetadata.pendingState.contains(PrepareEpochFence)) {
+ // If the client and server both use transaction V2, the
client is allowed to abort
+ // transactions when the transaction state is Empty because
the client can't be sure about the
+ // current transaction state.
+ // Note that, we should not use txnMetadata info to check if
the client is using V2 because the
+ // only request received by server so far is InitProducerId
request which does not tell whether
Review Comment:
I don't think it is necessarily the case the only request received by the
server is InitProducerId, but in the worst case it is. It is sufficient to just
use the EndTxn request version though 👍
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]