rajinisivaram commented on a change in pull request #11000: URL: https://github.com/apache/kafka/pull/11000#discussion_r666970340
########## File path: core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala ########## @@ -193,6 +192,96 @@ class OffsetFetchRequestTest extends BaseRequestTest { } } + @Test + def testOffsetFetchRequestWithMultipleGroupsWithOneGroupRepeating(): Unit = { + val topic1List = singletonList(new TopicPartition(topics(0), 0)) + val topic1And2List = util.Arrays.asList( + new TopicPartition(topics(0), 0), + new TopicPartition(topics(1), 0), + new TopicPartition(topics(1), 1)) + val allTopicsList = util.Arrays.asList( + new TopicPartition(topics(0), 0), + new TopicPartition(topics(1), 0), + new TopicPartition(topics(1), 1), + new TopicPartition(topics(2), 0), + new TopicPartition(topics(2), 1), + new TopicPartition(topics(2), 2)) + + // create group to partition map to build batched offsetFetch request + val groupToPartitionMap: util.Map[String, util.List[TopicPartition]] = + new util.HashMap[String, util.List[TopicPartition]]() + groupToPartitionMap.put(groups(0), topic1List) + groupToPartitionMap.put(groups(1), topic1And2List) + groupToPartitionMap.put(groups(2), allTopicsList) + groupToPartitionMap.put(groups(3), null) + groupToPartitionMap.put(groups(4), null) + + createTopic(topics(0)) + createTopic(topics(1), numPartitions = 2) + createTopic(topics(2), numPartitions = 3) + + val topicOneOffsets = topic1List.asScala.map{ + tp => (tp, new OffsetAndMetadata(offset, leaderEpoch, metadata)) + }.toMap.asJava + val topicOneAndTwoOffsets = topic1And2List.asScala.map{ + tp => (tp, new OffsetAndMetadata(offset, leaderEpoch, metadata)) + }.toMap.asJava + val allTopicOffsets = allTopicsList.asScala.map{ + tp => (tp, new OffsetAndMetadata(offset, leaderEpoch, metadata)) + }.toMap.asJava + + // create 5 consumers to commit offsets so we can fetch them later + consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groups(0)) + commitOffsets(topic1List, topicOneOffsets) Review comment: Instead of creating all these offset maps and passing them to commitOffsets, we could change commitOffsets to take a list and compute the offset map since we are just using the same offset. We can then just invoke commitOffsets for the collection instead of one-by-one: ``` val partitionMap = groupToPartitionMap.asScala.map(e => (e._1, Option(e._2).getOrElse(allTopicsList))) groups.foreach { groupId => consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId) commitOffsets(partitionMap(groupId)) } ``` ########## File path: core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala ########## @@ -138,9 +137,9 @@ class OffsetFetchRequestTest extends BaseRequestTest { groupToPartitionMap.put(groups(3), null) groupToPartitionMap.put(groups(4), null) - createTopic(topic1) - createTopic(topic2, numPartitions = 2) - createTopic(topic3, numPartitions = 3) + createTopic(topics(0)) + createTopic(topics(1), numPartitions = 2) + createTopic(topics(2), numPartitions = 3) val topicOneOffsets = topic1List.asScala.map{ Review comment: The changes suggested in the following method below to work on the collection can also be applied to `testOffsetFetchRequestWithMultipleGroups`. ########## File path: core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala ########## @@ -193,6 +192,96 @@ class OffsetFetchRequestTest extends BaseRequestTest { } } + @Test + def testOffsetFetchRequestWithMultipleGroupsWithOneGroupRepeating(): Unit = { + val topic1List = singletonList(new TopicPartition(topics(0), 0)) + val topic1And2List = util.Arrays.asList( + new TopicPartition(topics(0), 0), + new TopicPartition(topics(1), 0), + new TopicPartition(topics(1), 1)) + val allTopicsList = util.Arrays.asList( + new TopicPartition(topics(0), 0), + new TopicPartition(topics(1), 0), + new TopicPartition(topics(1), 1), + new TopicPartition(topics(2), 0), + new TopicPartition(topics(2), 1), + new TopicPartition(topics(2), 2)) + + // create group to partition map to build batched offsetFetch request + val groupToPartitionMap: util.Map[String, util.List[TopicPartition]] = + new util.HashMap[String, util.List[TopicPartition]]() + groupToPartitionMap.put(groups(0), topic1List) + groupToPartitionMap.put(groups(1), topic1And2List) + groupToPartitionMap.put(groups(2), allTopicsList) + groupToPartitionMap.put(groups(3), null) + groupToPartitionMap.put(groups(4), null) + + createTopic(topics(0)) + createTopic(topics(1), numPartitions = 2) + createTopic(topics(2), numPartitions = 3) + + val topicOneOffsets = topic1List.asScala.map{ + tp => (tp, new OffsetAndMetadata(offset, leaderEpoch, metadata)) + }.toMap.asJava + val topicOneAndTwoOffsets = topic1And2List.asScala.map{ + tp => (tp, new OffsetAndMetadata(offset, leaderEpoch, metadata)) + }.toMap.asJava + val allTopicOffsets = allTopicsList.asScala.map{ + tp => (tp, new OffsetAndMetadata(offset, leaderEpoch, metadata)) + }.toMap.asJava + + // create 5 consumers to commit offsets so we can fetch them later + consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groups(0)) + commitOffsets(topic1List, topicOneOffsets) + + consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groups(1)) + commitOffsets(topic1And2List, topicOneAndTwoOffsets) + + consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groups(2)) + commitOffsets(allTopicsList, allTopicOffsets) + + consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groups(3)) + commitOffsets(allTopicsList, allTopicOffsets) + + consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groups(4)) + commitOffsets(allTopicsList, allTopicOffsets) + + for (version <- 8 to ApiKeys.OFFSET_FETCH.latestVersion()) { + val request = new OffsetFetchRequest.Builder(groupToPartitionMap, false, false) + .build(version.asInstanceOf[Short]) + val requestGroups = request.data().groups() + requestGroups.add( + // add the same group as before with different topic partitions + new OffsetFetchRequestGroup() + .setGroupId(groups(2)) + .setTopics(singletonList( + new OffsetFetchRequestTopics() + .setName(topics(0)) + .setPartitionIndexes(singletonList(0))))) + request.data().setGroups(requestGroups) + val response = connectAndReceive[OffsetFetchResponse](request) + response.data().groups().forEach(g => Review comment: We can use the `partitionMap` we created above instead of checking one-by-one: ``` response.data.groups.asScala.map(_.groupId).foreach { groupId => if (groupId == "group3") // verify that the response gives back the latest changed topic partition list verifyResponse(response.groupLevelError(groupId), response.partitionDataMap(groupId), topic1List) else verifyResponse(response.groupLevelError(groupId), response.partitionDataMap(groupId), partitionMap(groupId))} } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org