Hangleton commented on code in PR #13378:
URL: https://github.com/apache/kafka/pull/13378#discussion_r1132780881


##########
core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala:
##########
@@ -971,8 +971,11 @@ private[group] class GroupCoordinator(
       if (validationErrorOpt.isDefined) {
         responseCallback(offsetMetadata.map { case (k, _) => k -> 
validationErrorOpt.get })
       } else {
-        groupManager.storeOffsets(group, memberId, offsetMetadata, 
responseCallback, producerId,
-          producerEpoch, requestLocal)
+        val offsets = offsetMetadata.map { case(k,v) => new 
TopicIdPartition(Uuid.ZERO_UUID, k) -> v }

Review Comment:
   Please note that the signature of the offset commits for transactions are 
not changed, hence these conversions. It is possible to increase the scope to 
cover transaction commit offsets but erring on the side of caution here.



##########
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala:
##########
@@ -469,11 +469,11 @@ class GroupMetadataManager(brokerId: Int,
           if (isTxnOffsetCommit) {
             group.inLock {
               addProducerGroup(producerId, group.groupId)
-              group.prepareTxnOffsetCommit(producerId, offsetMetadata)
+              group.prepareTxnOffsetCommit(producerId, offsetMetadata.map { 
case(k, v) => k.topicPartition -> v })
             }
           } else {
             group.inLock {
-              group.prepareOffsetCommit(offsetMetadata)
+              group.prepareOffsetCommit(offsetMetadata.map { case(k, v) => 
k.topicPartition -> v })

Review Comment:
   I didn't propagate the topic ids into `GroupMetadata` where the actual 
offset maps are. So, the conversion of keys from `TopicIdPartition` to 
`TopicPartition` happens here for both offset commit and transaction offset 
commit. Please let me know if we should go one level deeper and do this 
conversion in `GroupMetadata` itself (without changing the retained data).



##########
core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala:
##########
@@ -295,8 +295,12 @@ class GroupCoordinatorConcurrencyTest extends 
AbstractCoordinatorConcurrencyTest
     }
   }
 
-  class CommitTxnOffsetsOperation(lock: Option[Lock] = None) extends 
CommitOffsetsOperation {
-    override def runWithCallback(member: GroupMember, responseCallback: 
CommitOffsetCallback): Unit = {
+  class CommitTxnOffsetsOperation(lock: Option[Lock] = None) extends 
GroupOperation[TxnCommitOffsetCallbackParams, TxnCommitOffsetCallback] {

Review Comment:
   Changed to subclass `GroupOperation` directly to use the different sets of 
type parameters `TxnCommitOffsetCallbackParams` and `TxnCommitOffsetCallback` 
which remains topic-name-based unlike their modified counterparts 
`CommitOffsetCallbackParams` and `CommitOffsetCallback`. This could be avoided 
if the signature change is extended to transaction offset commit as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to