artemlivshits commented on code in PR #20882:
URL: https://github.com/apache/kafka/pull/20882#discussion_r2528862168
##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1708,7 +1708,9 @@ class KafkaApis(val requestChannel: RequestChannel,
}
val writeTxnMarkersRequest = request.body[WriteTxnMarkersRequest]
val errors = new ConcurrentHashMap[java.lang.Long,
util.Map[TopicPartition, Errors]]()
- val markers = writeTxnMarkersRequest.markers
+ // List of transaction marker entries, each containing producer metadata,
transaction result (commit/abort),
+ // target partitions, and transaction version.
+ val markers = writeTxnMarkersRequest.markers()
Review Comment:
Do we need this change?
##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1780,6 +1782,8 @@ class KafkaApis(val requestChannel: RequestChannel,
}
val controlRecords = mutable.Map.empty[TopicIdPartition, MemoryRecords]
+ val markerTransactionVersion = marker.transactionVersion()
Review Comment:
In Scala, it's more idiomatic to use `marker.transactionVersion` (i.e.
without parentheses).
##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1780,6 +1782,8 @@ class KafkaApis(val requestChannel: RequestChannel,
}
val controlRecords = mutable.Map.empty[TopicIdPartition, MemoryRecords]
+ val markerTransactionVersion = marker.transactionVersion()
+
Review Comment:
We should just do `val producerEpoch = if (marker.transactionVersion < 2)
marker.producerEpoch else marker.producerEpoch - 1` and add a comment that for
TV2+ we expect proper transaction markers to have bumped epoch, so we need to
undo the bump so that comparison can fence transaction markers from previous
transaction that may arrive with current epoch.
Then most of the other changes can be removed.
This would also work for `groupCoordinator.completeTransaction` code path.
##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1828,7 +1832,8 @@ class KafkaApis(val requestChannel: RequestChannel,
errors.foreachEntry { (topicIdPartition, partitionResponse) =>
addResultAndMaybeComplete(topicIdPartition.topicPartition(),
partitionResponse.error)
}
- }
+ },
+ transactionVersion = markerTransactionVersion
Review Comment:
What about`groupCoordinator.completeTransaction`? It also needs to check
the epoch properly.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]