Hangleton commented on code in PR #13378:
URL: https://github.com/apache/kafka/pull/13378#discussion_r1132780881
##########
core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala:
##########
@@ -971,8 +971,11 @@ private[group] class GroupCoordinator(
if (validationErrorOpt.isDefined) {
responseCallback(offsetMetadata.map { case (k, _) => k ->
validationErrorOpt.get })
} else {
- groupManager.storeOffsets(group, memberId, offsetMetadata,
responseCallback, producerId,
- producerEpoch, requestLocal)
+ val offsets = offsetMetadata.map { case(k,v) => new
TopicIdPartition(Uuid.ZERO_UUID, k) -> v }
Review Comment:
Please note that the signature of the offset commits for transactions are
not changed, hence these conversions. It is possible to increase the scope to
cover transaction commit offsets but erring on the side of caution here.
##########
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala:
##########
@@ -469,11 +469,11 @@ class GroupMetadataManager(brokerId: Int,
if (isTxnOffsetCommit) {
group.inLock {
addProducerGroup(producerId, group.groupId)
- group.prepareTxnOffsetCommit(producerId, offsetMetadata)
+ group.prepareTxnOffsetCommit(producerId, offsetMetadata.map {
case(k, v) => k.topicPartition -> v })
}
} else {
group.inLock {
- group.prepareOffsetCommit(offsetMetadata)
+ group.prepareOffsetCommit(offsetMetadata.map { case(k, v) =>
k.topicPartition -> v })
Review Comment:
I didn't propagate the topic ids into `GroupMetadata` where the actual
offset maps are. So, the conversion of keys from `TopicIdPartition` to
`TopicPartition` happens here for both offset commit and transaction offset
commit. Please let me know if we should go one level deeper and do this
conversion in `GroupMetadata` itself (without changing the retained data).
##########
core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala:
##########
@@ -295,8 +295,12 @@ class GroupCoordinatorConcurrencyTest extends
AbstractCoordinatorConcurrencyTest
}
}
- class CommitTxnOffsetsOperation(lock: Option[Lock] = None) extends
CommitOffsetsOperation {
- override def runWithCallback(member: GroupMember, responseCallback:
CommitOffsetCallback): Unit = {
+ class CommitTxnOffsetsOperation(lock: Option[Lock] = None) extends
GroupOperation[TxnCommitOffsetCallbackParams, TxnCommitOffsetCallback] {
Review Comment:
Changed to subclass `GroupOperation` directly to use the different sets of
type parameters `TxnCommitOffsetCallbackParams` and `TxnCommitOffsetCallback`
which remains topic-name-based unlike their modified counterparts
`CommitOffsetCallbackParams` and `CommitOffsetCallback`. This could be avoided
if the signature change is extended to transaction offset commit as well.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]