rajinisivaram commented on a change in pull request #11000:
URL: https://github.com/apache/kafka/pull/11000#discussion_r666970340



##########
File path: core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
##########
@@ -193,6 +192,96 @@ class OffsetFetchRequestTest extends BaseRequestTest {
     }
   }
 
+  @Test
+  def testOffsetFetchRequestWithMultipleGroupsWithOneGroupRepeating(): Unit = {
+    val topic1List = singletonList(new TopicPartition(topics(0), 0))
+    val topic1And2List = util.Arrays.asList(
+      new TopicPartition(topics(0), 0),
+      new TopicPartition(topics(1), 0),
+      new TopicPartition(topics(1), 1))
+    val allTopicsList = util.Arrays.asList(
+      new TopicPartition(topics(0), 0),
+      new TopicPartition(topics(1), 0),
+      new TopicPartition(topics(1), 1),
+      new TopicPartition(topics(2), 0),
+      new TopicPartition(topics(2), 1),
+      new TopicPartition(topics(2), 2))
+
+    // create group to partition map to build batched offsetFetch request
+    val groupToPartitionMap: util.Map[String, util.List[TopicPartition]] =
+      new util.HashMap[String, util.List[TopicPartition]]()
+    groupToPartitionMap.put(groups(0), topic1List)
+    groupToPartitionMap.put(groups(1), topic1And2List)
+    groupToPartitionMap.put(groups(2), allTopicsList)
+    groupToPartitionMap.put(groups(3), null)
+    groupToPartitionMap.put(groups(4), null)
+
+    createTopic(topics(0))
+    createTopic(topics(1), numPartitions = 2)
+    createTopic(topics(2), numPartitions = 3)
+
+    val topicOneOffsets = topic1List.asScala.map{
+      tp => (tp, new OffsetAndMetadata(offset, leaderEpoch, metadata))
+    }.toMap.asJava
+    val topicOneAndTwoOffsets = topic1And2List.asScala.map{
+      tp => (tp, new OffsetAndMetadata(offset, leaderEpoch, metadata))
+    }.toMap.asJava
+    val allTopicOffsets = allTopicsList.asScala.map{
+      tp => (tp, new OffsetAndMetadata(offset, leaderEpoch, metadata))
+    }.toMap.asJava
+
+    // create 5 consumers to commit offsets so we can fetch them later
+    consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groups(0))
+    commitOffsets(topic1List, topicOneOffsets)

Review comment:
       Instead of creating all these offset maps and passing them to 
commitOffsets, we could change commitOffsets to take a list and compute the 
offset map since we are just using the same offset. We can then just invoke 
commitOffsets for the collection instead of one-by-one:
   ```
   val partitionMap = groupToPartitionMap.asScala.map(e => (e._1, 
Option(e._2).getOrElse(allTopicsList)))
   groups.foreach { groupId =>
     consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId)
     commitOffsets(partitionMap(groupId))
   }
   ```
   

##########
File path: core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
##########
@@ -138,9 +137,9 @@ class OffsetFetchRequestTest extends BaseRequestTest {
     groupToPartitionMap.put(groups(3), null)
     groupToPartitionMap.put(groups(4), null)
 
-    createTopic(topic1)
-    createTopic(topic2, numPartitions = 2)
-    createTopic(topic3, numPartitions = 3)
+    createTopic(topics(0))
+    createTopic(topics(1), numPartitions = 2)
+    createTopic(topics(2), numPartitions = 3)
 
     val topicOneOffsets = topic1List.asScala.map{

Review comment:
       The changes suggested in the following method below to work on the 
collection can also be applied to `testOffsetFetchRequestWithMultipleGroups`.

##########
File path: core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
##########
@@ -193,6 +192,96 @@ class OffsetFetchRequestTest extends BaseRequestTest {
     }
   }
 
+  @Test
+  def testOffsetFetchRequestWithMultipleGroupsWithOneGroupRepeating(): Unit = {
+    val topic1List = singletonList(new TopicPartition(topics(0), 0))
+    val topic1And2List = util.Arrays.asList(
+      new TopicPartition(topics(0), 0),
+      new TopicPartition(topics(1), 0),
+      new TopicPartition(topics(1), 1))
+    val allTopicsList = util.Arrays.asList(
+      new TopicPartition(topics(0), 0),
+      new TopicPartition(topics(1), 0),
+      new TopicPartition(topics(1), 1),
+      new TopicPartition(topics(2), 0),
+      new TopicPartition(topics(2), 1),
+      new TopicPartition(topics(2), 2))
+
+    // create group to partition map to build batched offsetFetch request
+    val groupToPartitionMap: util.Map[String, util.List[TopicPartition]] =
+      new util.HashMap[String, util.List[TopicPartition]]()
+    groupToPartitionMap.put(groups(0), topic1List)
+    groupToPartitionMap.put(groups(1), topic1And2List)
+    groupToPartitionMap.put(groups(2), allTopicsList)
+    groupToPartitionMap.put(groups(3), null)
+    groupToPartitionMap.put(groups(4), null)
+
+    createTopic(topics(0))
+    createTopic(topics(1), numPartitions = 2)
+    createTopic(topics(2), numPartitions = 3)
+
+    val topicOneOffsets = topic1List.asScala.map{
+      tp => (tp, new OffsetAndMetadata(offset, leaderEpoch, metadata))
+    }.toMap.asJava
+    val topicOneAndTwoOffsets = topic1And2List.asScala.map{
+      tp => (tp, new OffsetAndMetadata(offset, leaderEpoch, metadata))
+    }.toMap.asJava
+    val allTopicOffsets = allTopicsList.asScala.map{
+      tp => (tp, new OffsetAndMetadata(offset, leaderEpoch, metadata))
+    }.toMap.asJava
+
+    // create 5 consumers to commit offsets so we can fetch them later
+    consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groups(0))
+    commitOffsets(topic1List, topicOneOffsets)
+
+    consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groups(1))
+    commitOffsets(topic1And2List, topicOneAndTwoOffsets)
+
+    consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groups(2))
+    commitOffsets(allTopicsList, allTopicOffsets)
+
+    consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groups(3))
+    commitOffsets(allTopicsList, allTopicOffsets)
+
+    consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groups(4))
+    commitOffsets(allTopicsList, allTopicOffsets)
+
+    for (version <- 8 to ApiKeys.OFFSET_FETCH.latestVersion()) {
+      val request = new OffsetFetchRequest.Builder(groupToPartitionMap, false, 
false)
+        .build(version.asInstanceOf[Short])
+      val requestGroups = request.data().groups()
+      requestGroups.add(
+        // add the same group as before with different topic partitions
+        new OffsetFetchRequestGroup()
+          .setGroupId(groups(2))
+          .setTopics(singletonList(
+            new OffsetFetchRequestTopics()
+              .setName(topics(0))
+              .setPartitionIndexes(singletonList(0)))))
+      request.data().setGroups(requestGroups)
+      val response = connectAndReceive[OffsetFetchResponse](request)
+      response.data().groups().forEach(g =>

Review comment:
       We can use the `partitionMap` we created above instead of checking 
one-by-one:
   ```
   response.data.groups.asScala.map(_.groupId).foreach { groupId =>
     if (groupId == "group3") // verify that the response gives back the latest 
changed topic partition list
       verifyResponse(response.groupLevelError(groupId), 
response.partitionDataMap(groupId), topic1List)
      else
     verifyResponse(response.groupLevelError(groupId), 
response.partitionDataMap(groupId), partitionMap(groupId))}
   }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to