This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new af282f7  KAFKA-13045: Adding a test for batched offsetFetch requests 
with one group repeating (#11000)
af282f7 is described below

commit af282f76a7fcf15e87071523b2a7dcae9e10524b
Author: Sanjana Kaundinya <[email protected]>
AuthorDate: Sat Jul 10 00:43:55 2021 -0700

    KAFKA-13045: Adding a test for batched offsetFetch requests with one group 
repeating (#11000)
    
    Reviewers: Rajini Sivaram <[email protected]>
---
 .../kafka/api/AuthorizerIntegrationTest.scala      |  58 ++------
 .../unit/kafka/server/OffsetFetchRequestTest.scala | 147 ++++++++++-----------
 2 files changed, 84 insertions(+), 121 deletions(-)

diff --git 
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 206ad50..264f6eb 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -1446,40 +1446,23 @@ class AuthorizerIntegrationTest extends BaseRequestTest 
{
     val offset = 15L
     val leaderEpoch: Optional[Integer] = Optional.of(1)
     val metadata = "metadata"
-    val topicOneOffsets = topic1List.asScala.map {
-      tp => (tp, new OffsetAndMetadata(offset, leaderEpoch, metadata))
-    }.toMap.asJava
-    val topicOneAndTwoOffsets = topic1And2List.asScala.map {
-      tp => (tp, new OffsetAndMetadata(offset, leaderEpoch, metadata))
-    }.toMap.asJava
-    val allTopicOffsets = allTopicsList.asScala.map {
-      tp => (tp, new OffsetAndMetadata(offset, leaderEpoch, metadata))
-    }.toMap.asJava
 
-    // create 5 consumers to commit offsets so we can fetch them later
-
-    def commitOffsets(tpList: util.List[TopicPartition],
-                      offsets: util.Map[TopicPartition, OffsetAndMetadata]): 
Unit = {
+    def commitOffsets(tpList: util.List[TopicPartition]): Unit = {
       val consumer = createConsumer()
       consumer.assign(tpList)
+      val offsets = tpList.asScala.map{
+        tp => (tp, new OffsetAndMetadata(offset, leaderEpoch, metadata))
+      }.toMap.asJava
       consumer.commitSync(offsets)
       consumer.close()
     }
 
-    consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groups(0))
-    commitOffsets(topic1List, topicOneOffsets)
-
-    consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groups(1))
-    commitOffsets(topic1And2List, topicOneAndTwoOffsets)
-
-    consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groups(2))
-    commitOffsets(allTopicsList, allTopicOffsets)
-
-    consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groups(3))
-    commitOffsets(allTopicsList, allTopicOffsets)
-
-    consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groups(4))
-    commitOffsets(allTopicsList, allTopicOffsets)
+    // create 5 consumers to commit offsets so we can fetch them later
+    val partitionMap = groupToPartitionMap.asScala.map(e => (e._1, 
Option(e._2).getOrElse(allTopicsList)))
+    groups.foreach { groupId =>
+      consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId)
+      commitOffsets(partitionMap(groupId))
+    }
 
     removeAllClientAcls()
 
@@ -1572,24 +1555,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     // from the offsetFetch response
     addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, 
WildcardHost, DESCRIBE, ALLOW)), topicResources(2))
     offsetFetchResponse = 
connectAndReceive[OffsetFetchResponse](offsetFetchRequest)
-    offsetFetchResponse.data().groups().forEach(g =>
-      g.groupId() match {
-        case "group1" =>
-          verifyResponse(offsetFetchResponse.groupLevelError(groups(0)), 
offsetFetchResponse
-            .partitionDataMap(groups(0)), topic1List)
-        case "group2" =>
-          verifyResponse(offsetFetchResponse.groupLevelError(groups(1)), 
offsetFetchResponse
-            .partitionDataMap(groups(1)), topic1And2List)
-        case "group3" =>
-          verifyResponse(offsetFetchResponse.groupLevelError(groups(2)), 
offsetFetchResponse
-            .partitionDataMap(groups(2)), allTopicsList)
-        case "group4" =>
-          verifyResponse(offsetFetchResponse.groupLevelError(groups(3)), 
offsetFetchResponse
-            .partitionDataMap(groups(3)), allTopicsList)
-        case "group5" =>
-          verifyResponse(offsetFetchResponse.groupLevelError(groups(4)), 
offsetFetchResponse
-            .partitionDataMap(groups(4)), allTopicsList)
-      })
+    offsetFetchResponse.data.groups.asScala.map(_.groupId).foreach( groupId =>
+      verifyResponse(offsetFetchResponse.groupLevelError(groupId), 
offsetFetchResponse.partitionDataMap(groupId), partitionMap(groupId))
+    )
   }
 
   @Test
diff --git a/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
index ea5064b..5e821fb 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
@@ -20,6 +20,7 @@ package kafka.server
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.consumer.{ConsumerConfig, OffsetAndMetadata}
 import org.apache.kafka.common.TopicPartition
+import 
org.apache.kafka.common.message.OffsetFetchRequestData.{OffsetFetchRequestGroup,
 OffsetFetchRequestTopics}
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData
 import org.apache.kafka.common.requests.{AbstractResponse, OffsetFetchRequest, 
OffsetFetchResponse}
@@ -42,6 +43,26 @@ class OffsetFetchRequestTest extends BaseRequestTest {
   val topic = "topic"
   val groupId = "groupId"
   val groups: Seq[String] = (1 to 5).map(i => s"group$i")
+  val topics: Seq[String] = (1 to 3).map(i => s"topic$i")
+  val topic1List = singletonList(new TopicPartition(topics(0), 0))
+  val topic1And2List = util.Arrays.asList(
+    new TopicPartition(topics(0), 0),
+    new TopicPartition(topics(1), 0),
+    new TopicPartition(topics(1), 1))
+  val allTopicsList = util.Arrays.asList(
+    new TopicPartition(topics(0), 0),
+    new TopicPartition(topics(1), 0),
+    new TopicPartition(topics(1), 1),
+    new TopicPartition(topics(2), 0),
+    new TopicPartition(topics(2), 1),
+    new TopicPartition(topics(2), 2))
+  val groupToPartitionMap: util.Map[String, util.List[TopicPartition]] =
+    new util.HashMap[String, util.List[TopicPartition]]()
+  groupToPartitionMap.put(groups(0), topic1List)
+  groupToPartitionMap.put(groups(1), topic1And2List)
+  groupToPartitionMap.put(groups(2), allTopicsList)
+  groupToPartitionMap.put(groups(3), null)
+  groupToPartitionMap.put(groups(4), null)
 
   override def brokerPropertyOverrides(properties: Properties): Unit = {
     properties.put(KafkaConfig.BrokerIdProp, brokerId.toString)
@@ -64,12 +85,8 @@ class OffsetFetchRequestTest extends BaseRequestTest {
     createTopic(topic)
 
     val tpList = singletonList(new TopicPartition(topic, 0))
-    val topicOffsets = tpList.asScala.map{
-      tp => (tp, new OffsetAndMetadata(offset, leaderEpoch, metadata))
-    }.toMap.asJava
-
     consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId)
-    commitOffsets(tpList, topicOffsets)
+    commitOffsets(tpList)
 
     // testing from version 1 onward since version 0 read offsets from ZK
     for (version <- 1 to ApiKeys.OFFSET_FETCH.latestVersion()) {
@@ -112,84 +129,60 @@ class OffsetFetchRequestTest extends BaseRequestTest {
 
   @Test
   def testOffsetFetchRequestWithMultipleGroups(): Unit = {
-
-    val topic1 = "topic1"
-    val topic1List = singletonList(new TopicPartition(topic1, 0))
-    val topic2 = "topic2"
-    val topic1And2List = util.Arrays.asList(
-      new TopicPartition(topic1, 0),
-      new TopicPartition(topic2, 0),
-      new TopicPartition(topic2, 1))
-    val topic3 = "topic3"
-    val allTopicsList = util.Arrays.asList(
-      new TopicPartition(topic1, 0),
-      new TopicPartition(topic2, 0),
-      new TopicPartition(topic2, 1),
-      new TopicPartition(topic3, 0),
-      new TopicPartition(topic3, 1),
-      new TopicPartition(topic3, 2))
-
-    // create group to partition map to build batched offsetFetch request
-    val groupToPartitionMap: util.Map[String, util.List[TopicPartition]] =
-      new util.HashMap[String, util.List[TopicPartition]]()
-    groupToPartitionMap.put(groups(0), topic1List)
-    groupToPartitionMap.put(groups(1), topic1And2List)
-    groupToPartitionMap.put(groups(2), allTopicsList)
-    groupToPartitionMap.put(groups(3), null)
-    groupToPartitionMap.put(groups(4), null)
-
-    createTopic(topic1)
-    createTopic(topic2, numPartitions = 2)
-    createTopic(topic3, numPartitions = 3)
-
-    val topicOneOffsets = topic1List.asScala.map{
-      tp => (tp, new OffsetAndMetadata(offset, leaderEpoch, metadata))
-    }.toMap.asJava
-    val topicOneAndTwoOffsets = topic1And2List.asScala.map{
-      tp => (tp, new OffsetAndMetadata(offset, leaderEpoch, metadata))
-    }.toMap.asJava
-    val allTopicOffsets = allTopicsList.asScala.map{
-      tp => (tp, new OffsetAndMetadata(offset, leaderEpoch, metadata))
-    }.toMap.asJava
+    createTopic(topics(0))
+    createTopic(topics(1), numPartitions = 2)
+    createTopic(topics(2), numPartitions = 3)
 
     // create 5 consumers to commit offsets so we can fetch them later
-    consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groups(0))
-    commitOffsets(topic1List, topicOneOffsets)
-
-    consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groups(1))
-    commitOffsets(topic1And2List, topicOneAndTwoOffsets)
+    val partitionMap = groupToPartitionMap.asScala.map(e => (e._1, 
Option(e._2).getOrElse(allTopicsList)))
+    groups.foreach { groupId =>
+      consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId)
+      commitOffsets(partitionMap(groupId))
+    }
 
-    consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groups(2))
-    commitOffsets(allTopicsList, allTopicOffsets)
+    for (version <- 8 to ApiKeys.OFFSET_FETCH.latestVersion()) {
+      val request =  new OffsetFetchRequest.Builder(groupToPartitionMap, 
false, false)
+        .build(version.asInstanceOf[Short])
+      val response = connectAndReceive[OffsetFetchResponse](request)
+      response.data.groups.asScala.map(_.groupId).foreach( groupId =>
+        verifyResponse(response.groupLevelError(groupId), 
response.partitionDataMap(groupId), partitionMap(groupId))
+      )
+    }
+  }
 
-    consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groups(3))
-    commitOffsets(allTopicsList, allTopicOffsets)
+  @Test
+  def testOffsetFetchRequestWithMultipleGroupsWithOneGroupRepeating(): Unit = {
+    createTopic(topics(0))
+    createTopic(topics(1), numPartitions = 2)
+    createTopic(topics(2), numPartitions = 3)
 
-    consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groups(4))
-    commitOffsets(allTopicsList, allTopicOffsets)
+    // create 5 consumers to commit offsets so we can fetch them later
+    val partitionMap = groupToPartitionMap.asScala.map(e => (e._1, 
Option(e._2).getOrElse(allTopicsList)))
+    groups.foreach { groupId =>
+      consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId)
+      commitOffsets(partitionMap(groupId))
+    }
 
     for (version <- 8 to ApiKeys.OFFSET_FETCH.latestVersion()) {
-      val request =  new OffsetFetchRequest.Builder(groupToPartitionMap, 
false, false)
+      val request = new OffsetFetchRequest.Builder(groupToPartitionMap, false, 
false)
         .build(version.asInstanceOf[Short])
+      val requestGroups = request.data().groups()
+      requestGroups.add(
+        // add the same group as before with different topic partitions
+        new OffsetFetchRequestGroup()
+          .setGroupId(groups(2))
+          .setTopics(singletonList(
+            new OffsetFetchRequestTopics()
+              .setName(topics(0))
+              .setPartitionIndexes(singletonList(0)))))
+      request.data().setGroups(requestGroups)
       val response = connectAndReceive[OffsetFetchResponse](request)
-      response.data().groups().forEach(g =>
-        g.groupId() match {
-          case "group1" =>
-            verifyResponse(response.groupLevelError(groups(0)),
-              response.partitionDataMap(groups(0)), topic1List)
-          case "group2" =>
-            verifyResponse(response.groupLevelError(groups(1)),
-              response.partitionDataMap(groups(1)), topic1And2List)
-          case "group3" =>
-            verifyResponse(response.groupLevelError(groups(2)),
-              response.partitionDataMap(groups(2)), allTopicsList)
-          case "group4" =>
-            verifyResponse(response.groupLevelError(groups(3)),
-              response.partitionDataMap(groups(3)), allTopicsList)
-          case "group5" =>
-            verifyResponse(response.groupLevelError(groups(4)),
-              response.partitionDataMap(groups(4)), allTopicsList)
-        })
+      response.data.groups.asScala.map(_.groupId).foreach( groupId =>
+        if (groupId == "group3") // verify that the response gives back the 
latest changed topic partition list
+          verifyResponse(response.groupLevelError(groupId), 
response.partitionDataMap(groupId), topic1List)
+        else
+          verifyResponse(response.groupLevelError(groupId), 
response.partitionDataMap(groupId), partitionMap(groupId))
+      )
     }
   }
 
@@ -227,10 +220,12 @@ class OffsetFetchRequestTest extends BaseRequestTest {
     topicList.forEach(t => verifyPartitionData(partitionData.get(t)))
   }
 
-  private def commitOffsets(tpList: util.List[TopicPartition],
-                            offsets: util.Map[TopicPartition, 
OffsetAndMetadata]): Unit = {
+  private def commitOffsets(tpList: util.List[TopicPartition]): Unit = {
     val consumer = createConsumer()
     consumer.assign(tpList)
+    val offsets = tpList.asScala.map{
+      tp => (tp, new OffsetAndMetadata(offset, leaderEpoch, metadata))
+    }.toMap.asJava
     consumer.commitSync(offsets)
     consumer.close()
   }

Reply via email to