artemlivshits commented on code in PR #17698:
URL: https://github.com/apache/kafka/pull/17698#discussion_r1847462022
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -411,7 +421,7 @@ RuntimeException lastError() {
synchronized boolean isSendToPartitionAllowed(TopicPartition tp) {
if (hasFatalError())
return false;
- return !isTransactional() || partitionsInTransaction.contains(tp);
+ return !isTransactional() || partitionsInTransaction.contains(tp) ||
isTransactionV2Enabled();
Review Comment:
Minor perf: we should check if `isTransactionV2Enabled` first as it's going
to be the primary code path in the future and we don't need to look up into
`partitionsInTransaction`.
##########
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##########
@@ -476,7 +476,7 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
private def logInvalidStateTransitionAndReturnError(transactionalId: String,
transactionState:
TransactionState,
transactionResult:
TransactionResult) = {
- debug(s"TransactionalId: $transactionalId's state is $transactionState,
but received transaction " +
+ error(s"TransactionalId: $transactionalId's state is $transactionState,
but received transaction " +
Review Comment:
I think it makes sense to keep it an error, these cases shouldn't be too
frequent.
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##########
@@ -904,16 +904,27 @@ private void sendProduceRequest(long now, int
destination, short acks, int timeo
}
String transactionalId = null;
+
+ // To determine what produce version to use:
+ // If it is not transactional, produce version = latest
+ // If it is transactional but transaction V2 disabled, produce
version = min(latest, LAST_STABLE_VERSION_BEFORE_TRANSACTION_V2)
+ // If it is transactional and transaction V2 enabled, produce
version = latest
Review Comment:
Maybe simplify the comment to match the logic: when we use transaction V1
protocol we downgrade the request version to
`LAST_STABLE_VERSION_BEFORE_TRANSACTION_V2` so that the broker knows that we're
using transaction protocol V1.
##########
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##########
@@ -564,33 +592,10 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
else
PrepareAbort
- // Maybe allocate new producer ID if we are bumping epoch and
epoch is exhausted
- val nextProducerIdOrErrors =
- if (clientTransactionVersion.supportsEpochBump() &&
!txnMetadata.pendingState.contains(PrepareEpochFence) &&
txnMetadata.isProducerEpochExhausted) {
- try {
- Right(producerIdManager.generateProducerId())
- } catch {
- case e: Exception => Left(Errors.forException(e))
- }
- } else {
- Right(RecordBatch.NO_PRODUCER_ID)
- }
-
- if (nextState == PrepareAbort &&
txnMetadata.pendingState.contains(PrepareEpochFence)) {
- // We should clear the pending state to make way for the
transition to PrepareAbort and also bump
- // the epoch in the transaction metadata we are about to
append.
- isEpochFence = true
- txnMetadata.pendingState = None
- txnMetadata.producerEpoch = producerEpoch
- txnMetadata.lastProducerEpoch = RecordBatch.NO_PRODUCER_EPOCH
- }
-
- nextProducerIdOrErrors.flatMap {
- nextProducerId =>
- Right(coordinatorEpoch,
txnMetadata.prepareAbortOrCommit(nextState, clientTransactionVersion,
nextProducerId.asInstanceOf[Long], time.milliseconds()))
- }
+ generateTxnTransitMetadataForTxnCompletion(nextState)
case CompleteCommit =>
- if (txnMarkerResult == TransactionResult.COMMIT)
+ // The epoch should be valid as it is checked above
+ if (txnMarkerResult == TransactionResult.COMMIT ||
currentTxnMetadataIsAtLeastTransactionsV2)
Review Comment:
We need to go through the full epoch bump. We also should have the logic
for the CompleteAbort case.
For the same reasons described here:
https://github.com/apache/kafka/pull/17698#discussion_r1841263720.
And we need to check the epoch to see if this is a retry or an empty abort,
as described here:
https://github.com/apache/kafka/pull/17698#discussion_r1841274726. For retry
we just return success. For abort we initiate the full abort flow. If we
detect a retry abort on a CompleteCommit, it's an invalid state.
##########
core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala:
##########
@@ -112,11 +113,14 @@ private[transaction] case object PrepareCommit extends
TransactionState {
* Group is preparing to abort
*
* transition: received acks from all partitions => CompleteAbort
+ *
+ * Note, In transaction v2, we allow Empty to transition to PrepareCommit.
because the client may not know the
Review Comment:
Comment says "PrepareCommit"?
##########
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/producer/ProducerRequestBenchmark.java:
##########
@@ -68,7 +68,7 @@ public class ProducerRequestBenchmark {
.setTopicData(new
ProduceRequestData.TopicProduceDataCollection(TOPIC_PRODUCE_DATA.iterator()));
private static ProduceRequest request() {
- return ProduceRequest.forMagic(RecordBatch.CURRENT_MAGIC_VALUE,
PRODUCE_REQUEST_DATA).build();
+ return ProduceRequest.forMagic(RecordBatch.CURRENT_MAGIC_VALUE,
PRODUCE_REQUEST_DATA, true).build();
Review Comment:
Should it be `false`?
##########
core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala:
##########
@@ -490,9 +494,10 @@ private[transaction] class TransactionMetadata(val
transactionalId: String,
}
case CompleteAbort | CompleteCommit => // from write markers
+ // With transaction V2, we allow Empty transaction to be aborted, so
the txnStartTimestamp can be -1.
Review Comment:
When we start abort flow from Empty state (or from CompleteCommit /
CompleteAbort states) we need to properly update the txnStartTimestamp to
indicate the start of the abort.
##########
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##########
@@ -225,7 +245,7 @@ class AddPartitionsToTxnManager(
val code =
if (partitionResult.partitionErrorCode ==
Errors.PRODUCER_FENCED.code)
Errors.INVALID_PRODUCER_EPOCH.code
- else if (partitionResult.partitionErrorCode() ==
Errors.TRANSACTION_ABORTABLE.code &&
transactionDataAndCallbacks.transactionSupportedOperation != genericError) //
For backward compatibility with clients.
+ else if (partitionResult.partitionErrorCode() ==
Errors.TRANSACTION_ABORTABLE.code &&
transactionDataAndCallbacks.transactionSupportedOperation !=
genericErrorSupported) // For backward compatibility with clients.
Review Comment:
Is this the correct logic? I presume we also want to return abortable
errors for addPartition case.
##########
clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java:
##########
@@ -53,13 +54,15 @@ public static Builder forMagic(byte magic,
ProduceRequestData data) {
maxVersion = 2;
} else {
minVersion = 3;
- maxVersion = ApiKeys.PRODUCE.latestVersion();
+ short latestVersion = ApiKeys.PRODUCE.latestVersion();
+ maxVersion = useTransactionV1Version ?
+ (short) Math.min(latestVersion,
LAST_STABLE_VERSION_BEFORE_TRANSACTION_V2) : latestVersion;
}
return new Builder(minVersion, maxVersion, data);
}
public static Builder forCurrentMagic(ProduceRequestData data) {
- return forMagic(RecordBatch.CURRENT_MAGIC_VALUE, data);
+ return forMagic(RecordBatch.CURRENT_MAGIC_VALUE, data, true);
Review Comment:
Should it be `false`? If it should, can we write a unit test that would
catch this?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]