artemlivshits commented on code in PR #17698:
URL: https://github.com/apache/kafka/pull/17698#discussion_r1841271245
##########
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##########
@@ -610,7 +610,17 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
else
logInvalidStateTransitionAndReturnError(transactionalId,
txnMetadata.state, txnMarkerResult)
case Empty =>
- logInvalidStateTransitionAndReturnError(transactionalId,
txnMetadata.state, txnMarkerResult)
+ if (clientTransactionVersion.supportsEpochBump() &&
!txnMetadata.pendingState.contains(PrepareEpochFence)) {
+ // If the client and server both use transaction V2, the
client is allowed to commit/abort
Review Comment:
We don't have to allow commits on top of empty state, only aborts. For
commits there are only 2 cases:
1. Client didn't send any data to partitions / offsets -- it knows for sure
that commit doesn't need to be issued.
2. Client successfully sent all data to partitions / offsets -- it knows for
sure that the transaction is ongoing.
For aborts there are 3 cases:
1. Client didn't send any data to partitions / offsets -- it knows for sure
that abort doesn't need to be issued.
2. Client successfully sent data to some partitions / offsets -- it knows
for sure that the transaction is ongoing.
3. Client tried to send data, but it's not successful -- it needs to send
abort on top of a potentially empty state.
##########
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##########
@@ -610,7 +610,17 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
else
logInvalidStateTransitionAndReturnError(transactionalId,
txnMetadata.state, txnMarkerResult)
case Empty =>
- logInvalidStateTransitionAndReturnError(transactionalId,
txnMetadata.state, txnMarkerResult)
+ if (clientTransactionVersion.supportsEpochBump() &&
!txnMetadata.pendingState.contains(PrepareEpochFence)) {
+ // If the client and server both use transaction V2, the
client is allowed to commit/abort
+ // transactions when the transaction state is Empty because
the client can't be sure about the
+ // current transaction state.
+ // Note that, we should not use txnMetadata info to check if
the client is using V2 because the
+ // only request received by server so far is InitProducerId
request which does not tell whether
+ // the client uses V2.
+ Left(Errors.NONE)
Review Comment:
We cannot do this. We need to go through the actual abort sequence and bump
the epoch (as if the transaction was ongoing). Otherwise we can have:
1. Produce to partition foo-1 with epoch 42.
2. Timeout.
3. Abort, leaving the same epoch 42.
4. Produce to partition foo-1 succeeds, adds partition foo-1 to transaction.
5. Next transaction produces to partition bar-0.
6. Commits data that should've been aborted in step 3.
##########
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##########
@@ -39,13 +39,33 @@ object AddPartitionsToTxnManager {
val VerificationFailureRateMetricName = "VerificationFailureRate"
val VerificationTimeMsMetricName = "VerificationTimeMs"
+
+ def produceRequestVersionToTransactionSupportedOperation(version: Short):
TransactionSupportedOperation = {
+ if (version > 11) {
+ addPartition
+ } else if (version > 10) {
+ genericError
Review Comment:
This looks like we're returning an error. Can we change the names, maybe
`genericErrorSupported` and etc.?
##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -629,10 +629,11 @@ class UnifiedLog(@volatile var logStartOffset: Long,
/**
* Return true if the given producer ID has a transaction ongoing.
+ * Note, if the incoming producer epoch is newer than the stored one, the
transaction may have finished.
*/
- def hasOngoingTransaction(producerId: Long): Boolean = lock synchronized {
+ def hasOngoingTransaction(producerId: Long, producerEpoch: Short): Boolean =
lock synchronized {
val entry = producerStateManager.activeProducers.get(producerId)
- entry != null && entry.currentTxnFirstOffset.isPresent
+ entry != null && entry.currentTxnFirstOffset.isPresent &&
entry.producerEpoch() >= producerEpoch
Review Comment:
Should it be just `entry.producerEpoch() == producerEpoch`?
##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -898,6 +898,8 @@ class ReplicaManager(val config: KafkaConfig,
Errors.COORDINATOR_NOT_AVAILABLE |
Errors.NOT_COORDINATOR => Some(new
NotEnoughReplicasException(
s"Unable to verify the partition has been added to the
transaction. Underlying error: ${error.toString}"))
+ case Errors.UNKNOWN_PRODUCER_ID => Some(new
OutOfOrderSequenceException(
Review Comment:
Do we actually return `UNKNOWN_PRODUCER_ID`?
##########
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##########
@@ -610,7 +610,17 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
else
logInvalidStateTransitionAndReturnError(transactionalId,
txnMetadata.state, txnMarkerResult)
case Empty =>
- logInvalidStateTransitionAndReturnError(transactionalId,
txnMetadata.state, txnMarkerResult)
+ if (clientTransactionVersion.supportsEpochBump() &&
!txnMetadata.pendingState.contains(PrepareEpochFence)) {
Review Comment:
We should also support aborts on top of CompleteAbort and CompleteCommit.
The epoch provides an indication if the incoming abort is a retry (then we just
need to return previous result) or a new empty abort (in this case we need to
go through the full abort sequence with epoch bumping and etc.).
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##########
@@ -904,16 +904,22 @@ private void sendProduceRequest(long now, int
destination, short acks, int timeo
}
String transactionalId = null;
+ boolean canUseTransactionV2AboveVersion = true;
Review Comment:
Should we just name it `useTransactionV1Version` with the default value
`false` and set it to `true` if we need to use non-latest version?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]