This is an automated email from the ASF dual-hosted git repository.

frankvicky pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 48a52701b9c KAFKA-18424: Consider splitting 
PlaintextAdminIntegrationTest#testConsumerGroups (#19093)
48a52701b9c is described below

commit 48a52701b9cd45c4854f910990a85be7d73e22f5
Author: TengYao Chi <[email protected]>
AuthorDate: Mon May 26 12:10:49 2025 +0800

    KAFKA-18424: Consider splitting 
PlaintextAdminIntegrationTest#testConsumerGroups (#19093)
    
    JIRA: KAFKA-18424
    `PlaintextAdminIntegrationTest#testConsumerGroups` test too many things.
    We should split it into smaller units.
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../kafka/api/PlaintextAdminIntegrationTest.scala  | 297 ++++++++++++---------
 1 file changed, 176 insertions(+), 121 deletions(-)

diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index dc24bd84c59..b99cdc73ce3 100644
--- 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -1818,48 +1818,90 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     assertEquals(1, factory.failuresInjected)
   }
 
-  /**
-   * Test the consumer group APIs.
-   */
   @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
   @MethodSource(Array("getTestGroupProtocolParametersAll"))
-  def testConsumerGroups(groupProtocol: String): Unit = {
+  def testListConsumerGroupOffsets(groupProtocol: String): Unit = {
     val config = createConfig
     client = Admin.create(config)
     try {
-      // Verify that initially there are no consumer groups to list.
-      val list1 = client.listConsumerGroups()
-      assertEquals(0, list1.all().get().size())
-      assertEquals(0, list1.errors().get().size())
-      assertEquals(0, list1.valid().get().size())
+      assertConsumerGroupsIsClean()
+
       val testTopicName = "test_topic"
-      val testTopicName1 = testTopicName + "1"
-      val testTopicName2 = testTopicName + "2"
-      val testNumPartitions = 2
+      prepareTopics(List(testTopicName), 2)
+      prepareRecords(testTopicName)
 
-      client.createTopics(util.List.of(
-        new NewTopic(testTopicName, testNumPartitions, 1.toShort),
-        new NewTopic(testTopicName1, testNumPartitions, 1.toShort),
-        new NewTopic(testTopicName2, testNumPartitions, 1.toShort)
-      )).all().get()
-      waitForTopics(client, List(testTopicName, testTopicName1, 
testTopicName2), List())
+      val testGroupId = "test_group_id"
+      val testClientId = "test_client_id"
+      val groupInstances = Set("")
+      val topics = Set(testTopicName)
+
+      // We need to disable the auto commit because after the members got 
removed from group, the offset commit
+      // will cause the member rejoining and the test will be flaky (check 
ConsumerCoordinator#OffsetCommitResponseHandler)
+      val defaultConsumerConfig = new Properties(consumerConfig)
+      
defaultConsumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"false")
+      defaultConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
testGroupId)
+      defaultConsumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, 
testClientId)
+      val backgroundConsumers = prepareConsumers(groupInstances, topics, 
defaultConsumerConfig)
 
-      val producer = createProducer()
       try {
-        producer.send(new ProducerRecord(testTopicName, 0, null, null)).get()
+        // Start consumer polling threads in the background
+        backgroundConsumers.start()
+        val topicPartition = new TopicPartition(testTopicName, 0)
+
+        // Test listConsumerGroupOffsets
+        TestUtils.waitUntilTrue(() => {
+          val parts = 
client.listConsumerGroupOffsets(testGroupId).partitionsToOffsetAndMetadata().get()
+          parts.containsKey(topicPartition) && 
(parts.get(topicPartition).offset() == 1)
+        }, "Expected the offset for partition 0 to eventually become 1.")
+
+        // Test listConsumerGroupOffsets with requireStable true
+        val options = new ListConsumerGroupOffsetsOptions().requireStable(true)
+        var parts = client.listConsumerGroupOffsets(testGroupId, options)
+          .partitionsToOffsetAndMetadata()
+          .get()
+        assertTrue(parts.containsKey(topicPartition))
+        assertEquals(1, parts.get(topicPartition).offset())
+
+        // Test listConsumerGroupOffsets with listConsumerGroupOffsetsSpec
+        val groupSpecs = util.Map.of(
+          testGroupId,
+          new ListConsumerGroupOffsetsSpec().topicPartitions(util.List.of(new 
TopicPartition(testTopicName, 0)))
+        )
+        parts = client.listConsumerGroupOffsets(groupSpecs)
+          .partitionsToOffsetAndMetadata()
+          .get()
+        assertTrue(parts.containsKey(topicPartition))
+        assertEquals(1, parts.get(topicPartition).offset())
+
+        // Test listConsumerGroupOffsets with listConsumerGroupOffsetsSpec and 
requireStable option
+        parts = client.listConsumerGroupOffsets(groupSpecs, options)
+          .partitionsToOffsetAndMetadata()
+          .get()
+        assertTrue(parts.containsKey(topicPartition))
+        assertEquals(1, parts.get(topicPartition).offset())
       } finally {
-        Utils.closeQuietly(producer, "producer")
+        backgroundConsumers.close()
       }
+    } finally {
+      Utils.closeQuietly(client, "adminClient")
+    }
+  }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
+  @MethodSource(Array("getTestGroupProtocolParametersAll"))
+  def testListConsumerGroups(groupProtocol: String): Unit = {
+    val config = createConfig
+    client = Admin.create(config)
+    try {
+      assertConsumerGroupsIsClean()
+
+      val testTopicName = "test_topic"
+      prepareTopics(List(testTopicName), 2)
 
       val testGroupId = "test_group_id"
       val testClientId = "test_client_id"
-      val testInstanceId1 = "test_instance_id_1"
-      val testInstanceId2 = "test_instance_id_2"
-      val fakeGroupId = "fake_group_id"
-
-      // contains two static members and one dynamic member
-      val groupInstanceSet = Set(testInstanceId1, testInstanceId2, "")
-      val topicSet = Set(testTopicName, testTopicName1, testTopicName2)
+      val groupInstances = Set("")
+      val topics = Set(testTopicName)
 
       // We need to disable the auto commit because after the members got 
removed from group, the offset commit
       // will cause the member rejoining and the test will be flaky (check 
ConsumerCoordinator#OffsetCommitResponseHandler)
@@ -1867,27 +1909,17 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
       
defaultConsumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"false")
       defaultConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
testGroupId)
       defaultConsumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, 
testClientId)
-
-      val backgroundConsumerSet = new 
BackgroundConsumerSet(defaultConsumerConfig)
-      groupInstanceSet.zip(topicSet).foreach { case (groupInstanceId, topic) =>
-        val configOverrides = new Properties()
-        if (groupInstanceId != "") {
-          // static member
-          configOverrides.setProperty(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, 
groupInstanceId)
-        }
-        backgroundConsumerSet.addConsumer(topic, configOverrides)
-      }
+      val backgroundConsumers = prepareConsumers(groupInstances, topics, 
defaultConsumerConfig)
 
       try {
         val groupType = if 
(groupProtocol.equalsIgnoreCase(GroupProtocol.CONSUMER.name)) 
GroupType.CONSUMER else GroupType.CLASSIC
         // Start consumer polling threads in the background
-        backgroundConsumerSet.start()
+        backgroundConsumers.start()
 
         // Test that we can list the new group.
         TestUtils.waitUntilTrue(() => {
           val matching = 
client.listConsumerGroups.all.get.asScala.filter(group =>
-              group.groupId == testGroupId &&
-              group.groupState.get == GroupState.STABLE)
+            group.groupId == testGroupId && group.groupState.get == 
GroupState.STABLE)
           matching.size == 1
         }, s"Expected to be able to list $testGroupId")
 
@@ -1903,25 +1935,73 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
           val options = new 
ListConsumerGroupsOptions().withTypes(util.Set.of(groupType))
             .inGroupStates(util.Set.of(GroupState.STABLE))
           val matching = 
client.listConsumerGroups(options).all.get.asScala.filter(group =>
-            group.groupId == testGroupId &&
-              group.groupState.get == GroupState.STABLE)
+            group.groupId == testGroupId && group.groupState.get == 
GroupState.STABLE)
           matching.size == 1
         }, s"Expected to be able to list $testGroupId in group type $groupType 
and state Stable")
 
         TestUtils.waitUntilTrue(() => {
           val options = new 
ListConsumerGroupsOptions().inGroupStates(util.Set.of(GroupState.STABLE))
           val matching = 
client.listConsumerGroups(options).all.get.asScala.filter(group =>
-              group.groupId == testGroupId &&
-              group.groupState.get == GroupState.STABLE)
+            group.groupId == testGroupId && group.groupState.get == 
GroupState.STABLE)
           matching.size == 1
         }, s"Expected to be able to list $testGroupId in state Stable")
 
         TestUtils.waitUntilTrue(() => {
           val options = new 
ListConsumerGroupsOptions().inGroupStates(util.Set.of(GroupState.EMPTY))
-          val matching = 
client.listConsumerGroups(options).all.get.asScala.filter(
-              _.groupId == testGroupId)
+          val matching = 
client.listConsumerGroups(options).all.get.asScala.filter(_.groupId == 
testGroupId)
           matching.isEmpty
-        }, s"Expected to find zero groups")
+        }, "Expected to find zero groups")
+      } finally {
+        backgroundConsumers.close()
+      }
+    } finally {
+      Utils.closeQuietly(client, "adminClient")
+    }
+  }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
+  @MethodSource(Array("getTestGroupProtocolParametersAll"))
+  def testDescribeGroups(groupProtocol: String): Unit = {
+    val config = createConfig
+    client = Admin.create(config)
+    try {
+      assertConsumerGroupsIsClean()
+
+      val testTopicName = "test_topic"
+      val testTopicName1 = testTopicName + "1"
+      val testTopicName2 = testTopicName + "2"
+      val testNumPartitions = 2
+      prepareTopics(List(testTopicName, testTopicName1, testTopicName2), 
testNumPartitions)
+
+      val producer = createProducer()
+      try {
+        producer.send(new ProducerRecord(testTopicName, 0, null, null)).get()
+      } finally {
+        Utils.closeQuietly(producer, "producer")
+      }
+
+      val testGroupId = "test_group_id"
+      val testClientId = "test_client_id"
+      val testInstanceId1 = "test_instance_id_1"
+      val testInstanceId2 = "test_instance_id_2"
+      val fakeGroupId = "fake_group_id"
+
+      // contains two static members and one dynamic member
+      val groupInstances = Set(testInstanceId1, testInstanceId2, "")
+      val topics = Set(testTopicName, testTopicName1, testTopicName2)
+
+      // We need to disable the auto commit because after the members got 
removed from group, the offset commit
+      // will cause the member rejoining and the test will be flaky (check 
ConsumerCoordinator#OffsetCommitResponseHandler)
+      val defaultConsumerConfig = new Properties(consumerConfig)
+      
defaultConsumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"false")
+      defaultConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
testGroupId)
+      defaultConsumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, 
testClientId)
+      val backgroundConsumers = prepareConsumers(groupInstances, topics, 
defaultConsumerConfig)
+
+      try {
+        val groupType = if 
(groupProtocol.equalsIgnoreCase(GroupProtocol.CONSUMER.name)) 
GroupType.CONSUMER else GroupType.CLASSIC
+        // Start consumer polling threads in the background
+        backgroundConsumers.start()
 
         val describeWithFakeGroupResult = 
client.describeConsumerGroups(util.List.of(testGroupId, fakeGroupId),
           new 
DescribeConsumerGroupsOptions().includeAuthorizedOperations(true))
@@ -1940,17 +2020,14 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
 
         assertEquals(testGroupId, testGroupDescription.groupId())
         assertFalse(testGroupDescription.isSimpleConsumerGroup)
-        assertEquals(groupInstanceSet.size, 
testGroupDescription.members().size())
+        assertEquals(groupInstances.size, 
testGroupDescription.members().size())
         val members = testGroupDescription.members()
         members.asScala.foreach { member =>
           assertEquals(testClientId, member.clientId)
           assertEquals(if (groupType == GroupType.CLASSIC) Optional.empty else 
Optional.of(true), member.upgraded)
         }
         val topicPartitionsByTopic = 
members.asScala.flatMap(_.assignment().topicPartitions().asScala).groupBy(_.topic())
-        topicSet.foreach { topic =>
-          val topicPartitions = topicPartitionsByTopic.getOrElse(topic, 
List.empty)
-          assertEquals(testNumPartitions, topicPartitions.size)
-        }
+        topics.foreach(topic => assertEquals(testNumPartitions, 
topicPartitionsByTopic.getOrElse(topic, List.empty).size))
 
         val expectedOperations = 
AclEntry.supportedOperations(ResourceType.GROUP)
         assertEquals(expectedOperations, 
testGroupDescription.authorizedOperations())
@@ -1963,35 +2040,8 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
         // Test that all() also throws GroupIdNotFoundException
         assertFutureThrows(classOf[GroupIdNotFoundException], 
describeWithFakeGroupResult.all(),
           s"Group $fakeGroupId not found.")
-
-        val testTopicPart0 = new TopicPartition(testTopicName, 0)
-
-        // Test listConsumerGroupOffsets
-        TestUtils.waitUntilTrue(() => {
-          val parts = 
client.listConsumerGroupOffsets(testGroupId).partitionsToOffsetAndMetadata().get()
-          parts.containsKey(testTopicPart0) && 
(parts.get(testTopicPart0).offset() == 1)
-        }, s"Expected the offset for partition 0 to eventually become 1.")
-
-        // Test listConsumerGroupOffsets with requireStable true
-        val options = new ListConsumerGroupOffsetsOptions().requireStable(true)
-        var parts = client.listConsumerGroupOffsets(testGroupId, options)
-          .partitionsToOffsetAndMetadata().get()
-        assertTrue(parts.containsKey(testTopicPart0))
-        assertEquals(1, parts.get(testTopicPart0).offset())
-
-        // Test listConsumerGroupOffsets with listConsumerGroupOffsetsSpec
-        val groupSpecs = util.Map.of(testGroupId,
-          new ListConsumerGroupOffsetsSpec().topicPartitions(util.Set.of(new 
TopicPartition(testTopicName, 0))))
-        parts = 
client.listConsumerGroupOffsets(groupSpecs).partitionsToOffsetAndMetadata().get()
-        assertTrue(parts.containsKey(testTopicPart0))
-        assertEquals(1, parts.get(testTopicPart0).offset())
-
-        // Test listConsumerGroupOffsets with listConsumerGroupOffsetsSpec and 
requireStable option
-        parts = client.listConsumerGroupOffsets(groupSpecs, 
options).partitionsToOffsetAndMetadata().get()
-        assertTrue(parts.containsKey(testTopicPart0))
-        assertEquals(1, parts.get(testTopicPart0).offset())
       } finally {
-        backgroundConsumerSet.close()
+        backgroundConsumers.close()
       }
     } finally {
       Utils.closeQuietly(client, "adminClient")
@@ -2089,29 +2139,15 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     val config = createConfig
     client = Admin.create(config)
     try {
-      // Verify that initially there are no consumer groups to list.
-      val list1 = client.listConsumerGroups()
-      assertEquals(0, list1.all().get().size())
-      assertEquals(0, list1.errors().get().size())
-      assertEquals(0, list1.valid().get().size())
+      assertConsumerGroupsIsClean()
+
       val testTopicName = "test_topic"
       val testTopicName1 = testTopicName + "1"
       val testTopicName2 = testTopicName + "2"
       val testNumPartitions = 2
 
-      client.createTopics(util.List.of(
-        new NewTopic(testTopicName, testNumPartitions, 1.toShort),
-        new NewTopic(testTopicName1, testNumPartitions, 1.toShort),
-        new NewTopic(testTopicName2, testNumPartitions, 1.toShort)
-      )).all().get()
-      waitForTopics(client, List(testTopicName, testTopicName1, 
testTopicName2), List())
-
-      val producer = createProducer()
-      try {
-        producer.send(new ProducerRecord(testTopicName, 0, null, null)).get()
-      } finally {
-        Utils.closeQuietly(producer, "producer")
-      }
+      prepareTopics(List(testTopicName, testTopicName1, testTopicName2), 
testNumPartitions)
+      prepareRecords(testTopicName)
 
       val testGroupId = "test_group_id"
       val testClientId = "test_client_id"
@@ -2291,28 +2327,15 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     client = Admin.create(config)
     try {
       // Verify that initially there are no consumer groups to list.
-      val list1 = client.listConsumerGroups()
-      assertEquals(0, list1.all().get().size())
-      assertEquals(0, list1.errors().get().size())
-      assertEquals(0, list1.valid().get().size())
+      assertConsumerGroupsIsClean()
       val testTopicName = "test_topic"
       val testTopicName1 = testTopicName + "1"
       val testTopicName2 = testTopicName + "2"
       val testNumPartitions = 2
 
-      client.createTopics(util.List.of(
-        new NewTopic(testTopicName, testNumPartitions, 1.toShort),
-        new NewTopic(testTopicName1, testNumPartitions, 1.toShort),
-        new NewTopic(testTopicName2, testNumPartitions, 1.toShort)
-      )).all().get()
-      waitForTopics(client, List(testTopicName, testTopicName1, 
testTopicName2), List())
+      prepareTopics(List(testTopicName, testTopicName1, testTopicName2), 
testNumPartitions)
 
-      val producer = createProducer()
-      try {
-        producer.send(new ProducerRecord(testTopicName, 0, null, null)).get()
-      } finally {
-        Utils.closeQuietly(producer, "producer")
-      }
+      prepareRecords(testTopicName)
 
       val testGroupId = "test_group_id"
       val testClientId = "test_client_id"
@@ -2461,12 +2484,7 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
         new NewTopic(testTopicName, 1, 1.toShort))).all().get()
       waitForTopics(client, List(testTopicName), List())
 
-      val producer = createProducer()
-      try {
-        producer.send(new ProducerRecord(testTopicName, 0, null, null)).get()
-      } finally {
-        Utils.closeQuietly(producer, "producer")
-      }
+      prepareRecords(testTopicName)
 
       val newConsumerConfig = new Properties(consumerConfig)
       newConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
testGroupId)
@@ -2511,6 +2529,43 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     }
   }
 
+  private def prepareTopics(topics: List[String], numberOfPartitions: Int): 
Unit = {
+    client.createTopics(topics.map(topic => new NewTopic(topic, 
numberOfPartitions, 1.toShort)).asJava).all().get()
+    waitForTopics(client, topics, List())
+  }
+
+  private def prepareRecords(testTopicName: String) = {
+    val producer = createProducer()
+    try {
+      producer.send(new ProducerRecord(testTopicName, 0, null, null)).get()
+    } finally {
+      Utils.closeQuietly(producer, "producer")
+    }
+  }
+
+  private def prepareConsumers(groupInstanceSet: Set[String], topicSet: 
Set[String], defaultConsumerConfig: Properties) = {
+    val backgroundConsumerSet = new 
BackgroundConsumerSet(defaultConsumerConfig)
+    groupInstanceSet.zip(topicSet).foreach { case (groupInstanceId, topic) =>
+      val configOverrides = new Properties()
+      if (groupInstanceId != "") {
+        // static member
+        configOverrides.setProperty(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, 
groupInstanceId)
+      }
+      backgroundConsumerSet.addConsumer(topic, configOverrides)
+    }
+    backgroundConsumerSet
+  }
+
+  /**
+   * Verify that initially there are no consumer groups to list.
+   */
+  private def assertConsumerGroupsIsClean(): Unit = {
+    val listResult = client.listConsumerGroups()
+    assertEquals(0, listResult.all().get().size())
+    assertEquals(0, listResult.errors().get().size())
+    assertEquals(0, listResult.valid().get().size())
+  }
+
   @Test
   def testListGroups(): Unit = {
     val classicGroupId = "classic_group_id"

Reply via email to