Hangleton commented on code in PR #13378: URL: https://github.com/apache/kafka/pull/13378#discussion_r1132780881
########## core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala: ########## @@ -971,8 +971,11 @@ private[group] class GroupCoordinator( if (validationErrorOpt.isDefined) { responseCallback(offsetMetadata.map { case (k, _) => k -> validationErrorOpt.get }) } else { - groupManager.storeOffsets(group, memberId, offsetMetadata, responseCallback, producerId, - producerEpoch, requestLocal) + val offsets = offsetMetadata.map { case(k,v) => new TopicIdPartition(Uuid.ZERO_UUID, k) -> v } Review Comment: Please note that the signature of the offset commits for transactions are not changed, hence these conversions. It is possible to increase the scope to cover transaction commit offsets but erring on the side of caution here. ########## core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala: ########## @@ -469,11 +469,11 @@ class GroupMetadataManager(brokerId: Int, if (isTxnOffsetCommit) { group.inLock { addProducerGroup(producerId, group.groupId) - group.prepareTxnOffsetCommit(producerId, offsetMetadata) + group.prepareTxnOffsetCommit(producerId, offsetMetadata.map { case(k, v) => k.topicPartition -> v }) } } else { group.inLock { - group.prepareOffsetCommit(offsetMetadata) + group.prepareOffsetCommit(offsetMetadata.map { case(k, v) => k.topicPartition -> v }) Review Comment: I didn't propagate the topic ids into `GroupMetadata` where the actual offset maps are. So, the conversion of keys from `TopicIdPartition` to `TopicPartition` happens here for both offset commit and transaction offset commit. Please let me know if we should go one level deeper and do this conversion in `GroupMetadata` itself (without changing the retained data). ########## core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala: ########## @@ -295,8 +295,12 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest } } - class CommitTxnOffsetsOperation(lock: Option[Lock] = None) extends CommitOffsetsOperation { - override def runWithCallback(member: GroupMember, responseCallback: CommitOffsetCallback): Unit = { + class CommitTxnOffsetsOperation(lock: Option[Lock] = None) extends GroupOperation[TxnCommitOffsetCallbackParams, TxnCommitOffsetCallback] { Review Comment: Changed to subclass `GroupOperation` directly to use the different sets of type parameters `TxnCommitOffsetCallbackParams` and `TxnCommitOffsetCallback` which remains topic-name-based unlike their modified counterparts `CommitOffsetCallbackParams` and `CommitOffsetCallback`. This could be avoided if the signature change is extended to transaction offset commit as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org