This is an automated email from the ASF dual-hosted git repository.
frankvicky pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 48a52701b9c KAFKA-18424: Consider splitting
PlaintextAdminIntegrationTest#testConsumerGroups (#19093)
48a52701b9c is described below
commit 48a52701b9cd45c4854f910990a85be7d73e22f5
Author: TengYao Chi <[email protected]>
AuthorDate: Mon May 26 12:10:49 2025 +0800
KAFKA-18424: Consider splitting
PlaintextAdminIntegrationTest#testConsumerGroups (#19093)
JIRA: KAFKA-18424
`PlaintextAdminIntegrationTest#testConsumerGroups` test too many things.
We should split it into smaller units.
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../kafka/api/PlaintextAdminIntegrationTest.scala | 297 ++++++++++++---------
1 file changed, 176 insertions(+), 121 deletions(-)
diff --git
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index dc24bd84c59..b99cdc73ce3 100644
---
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -1818,48 +1818,90 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
assertEquals(1, factory.failuresInjected)
}
- /**
- * Test the consumer group APIs.
- */
@ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
@MethodSource(Array("getTestGroupProtocolParametersAll"))
- def testConsumerGroups(groupProtocol: String): Unit = {
+ def testListConsumerGroupOffsets(groupProtocol: String): Unit = {
val config = createConfig
client = Admin.create(config)
try {
- // Verify that initially there are no consumer groups to list.
- val list1 = client.listConsumerGroups()
- assertEquals(0, list1.all().get().size())
- assertEquals(0, list1.errors().get().size())
- assertEquals(0, list1.valid().get().size())
+ assertConsumerGroupsIsClean()
+
val testTopicName = "test_topic"
- val testTopicName1 = testTopicName + "1"
- val testTopicName2 = testTopicName + "2"
- val testNumPartitions = 2
+ prepareTopics(List(testTopicName), 2)
+ prepareRecords(testTopicName)
- client.createTopics(util.List.of(
- new NewTopic(testTopicName, testNumPartitions, 1.toShort),
- new NewTopic(testTopicName1, testNumPartitions, 1.toShort),
- new NewTopic(testTopicName2, testNumPartitions, 1.toShort)
- )).all().get()
- waitForTopics(client, List(testTopicName, testTopicName1,
testTopicName2), List())
+ val testGroupId = "test_group_id"
+ val testClientId = "test_client_id"
+ val groupInstances = Set("")
+ val topics = Set(testTopicName)
+
+ // We need to disable the auto commit because after the members got
removed from group, the offset commit
+ // will cause the member rejoining and the test will be flaky (check
ConsumerCoordinator#OffsetCommitResponseHandler)
+ val defaultConsumerConfig = new Properties(consumerConfig)
+
defaultConsumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
"false")
+ defaultConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG,
testGroupId)
+ defaultConsumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG,
testClientId)
+ val backgroundConsumers = prepareConsumers(groupInstances, topics,
defaultConsumerConfig)
- val producer = createProducer()
try {
- producer.send(new ProducerRecord(testTopicName, 0, null, null)).get()
+ // Start consumer polling threads in the background
+ backgroundConsumers.start()
+ val topicPartition = new TopicPartition(testTopicName, 0)
+
+ // Test listConsumerGroupOffsets
+ TestUtils.waitUntilTrue(() => {
+ val parts =
client.listConsumerGroupOffsets(testGroupId).partitionsToOffsetAndMetadata().get()
+ parts.containsKey(topicPartition) &&
(parts.get(topicPartition).offset() == 1)
+ }, "Expected the offset for partition 0 to eventually become 1.")
+
+ // Test listConsumerGroupOffsets with requireStable true
+ val options = new ListConsumerGroupOffsetsOptions().requireStable(true)
+ var parts = client.listConsumerGroupOffsets(testGroupId, options)
+ .partitionsToOffsetAndMetadata()
+ .get()
+ assertTrue(parts.containsKey(topicPartition))
+ assertEquals(1, parts.get(topicPartition).offset())
+
+ // Test listConsumerGroupOffsets with listConsumerGroupOffsetsSpec
+ val groupSpecs = util.Map.of(
+ testGroupId,
+ new ListConsumerGroupOffsetsSpec().topicPartitions(util.List.of(new
TopicPartition(testTopicName, 0)))
+ )
+ parts = client.listConsumerGroupOffsets(groupSpecs)
+ .partitionsToOffsetAndMetadata()
+ .get()
+ assertTrue(parts.containsKey(topicPartition))
+ assertEquals(1, parts.get(topicPartition).offset())
+
+ // Test listConsumerGroupOffsets with listConsumerGroupOffsetsSpec and
requireStable option
+ parts = client.listConsumerGroupOffsets(groupSpecs, options)
+ .partitionsToOffsetAndMetadata()
+ .get()
+ assertTrue(parts.containsKey(topicPartition))
+ assertEquals(1, parts.get(topicPartition).offset())
} finally {
- Utils.closeQuietly(producer, "producer")
+ backgroundConsumers.close()
}
+ } finally {
+ Utils.closeQuietly(client, "adminClient")
+ }
+ }
+
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
+ @MethodSource(Array("getTestGroupProtocolParametersAll"))
+ def testListConsumerGroups(groupProtocol: String): Unit = {
+ val config = createConfig
+ client = Admin.create(config)
+ try {
+ assertConsumerGroupsIsClean()
+
+ val testTopicName = "test_topic"
+ prepareTopics(List(testTopicName), 2)
val testGroupId = "test_group_id"
val testClientId = "test_client_id"
- val testInstanceId1 = "test_instance_id_1"
- val testInstanceId2 = "test_instance_id_2"
- val fakeGroupId = "fake_group_id"
-
- // contains two static members and one dynamic member
- val groupInstanceSet = Set(testInstanceId1, testInstanceId2, "")
- val topicSet = Set(testTopicName, testTopicName1, testTopicName2)
+ val groupInstances = Set("")
+ val topics = Set(testTopicName)
// We need to disable the auto commit because after the members got
removed from group, the offset commit
// will cause the member rejoining and the test will be flaky (check
ConsumerCoordinator#OffsetCommitResponseHandler)
@@ -1867,27 +1909,17 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
defaultConsumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
"false")
defaultConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG,
testGroupId)
defaultConsumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG,
testClientId)
-
- val backgroundConsumerSet = new
BackgroundConsumerSet(defaultConsumerConfig)
- groupInstanceSet.zip(topicSet).foreach { case (groupInstanceId, topic) =>
- val configOverrides = new Properties()
- if (groupInstanceId != "") {
- // static member
- configOverrides.setProperty(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG,
groupInstanceId)
- }
- backgroundConsumerSet.addConsumer(topic, configOverrides)
- }
+ val backgroundConsumers = prepareConsumers(groupInstances, topics,
defaultConsumerConfig)
try {
val groupType = if
(groupProtocol.equalsIgnoreCase(GroupProtocol.CONSUMER.name))
GroupType.CONSUMER else GroupType.CLASSIC
// Start consumer polling threads in the background
- backgroundConsumerSet.start()
+ backgroundConsumers.start()
// Test that we can list the new group.
TestUtils.waitUntilTrue(() => {
val matching =
client.listConsumerGroups.all.get.asScala.filter(group =>
- group.groupId == testGroupId &&
- group.groupState.get == GroupState.STABLE)
+ group.groupId == testGroupId && group.groupState.get ==
GroupState.STABLE)
matching.size == 1
}, s"Expected to be able to list $testGroupId")
@@ -1903,25 +1935,73 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
val options = new
ListConsumerGroupsOptions().withTypes(util.Set.of(groupType))
.inGroupStates(util.Set.of(GroupState.STABLE))
val matching =
client.listConsumerGroups(options).all.get.asScala.filter(group =>
- group.groupId == testGroupId &&
- group.groupState.get == GroupState.STABLE)
+ group.groupId == testGroupId && group.groupState.get ==
GroupState.STABLE)
matching.size == 1
}, s"Expected to be able to list $testGroupId in group type $groupType
and state Stable")
TestUtils.waitUntilTrue(() => {
val options = new
ListConsumerGroupsOptions().inGroupStates(util.Set.of(GroupState.STABLE))
val matching =
client.listConsumerGroups(options).all.get.asScala.filter(group =>
- group.groupId == testGroupId &&
- group.groupState.get == GroupState.STABLE)
+ group.groupId == testGroupId && group.groupState.get ==
GroupState.STABLE)
matching.size == 1
}, s"Expected to be able to list $testGroupId in state Stable")
TestUtils.waitUntilTrue(() => {
val options = new
ListConsumerGroupsOptions().inGroupStates(util.Set.of(GroupState.EMPTY))
- val matching =
client.listConsumerGroups(options).all.get.asScala.filter(
- _.groupId == testGroupId)
+ val matching =
client.listConsumerGroups(options).all.get.asScala.filter(_.groupId ==
testGroupId)
matching.isEmpty
- }, s"Expected to find zero groups")
+ }, "Expected to find zero groups")
+ } finally {
+ backgroundConsumers.close()
+ }
+ } finally {
+ Utils.closeQuietly(client, "adminClient")
+ }
+ }
+
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
+ @MethodSource(Array("getTestGroupProtocolParametersAll"))
+ def testDescribeGroups(groupProtocol: String): Unit = {
+ val config = createConfig
+ client = Admin.create(config)
+ try {
+ assertConsumerGroupsIsClean()
+
+ val testTopicName = "test_topic"
+ val testTopicName1 = testTopicName + "1"
+ val testTopicName2 = testTopicName + "2"
+ val testNumPartitions = 2
+ prepareTopics(List(testTopicName, testTopicName1, testTopicName2),
testNumPartitions)
+
+ val producer = createProducer()
+ try {
+ producer.send(new ProducerRecord(testTopicName, 0, null, null)).get()
+ } finally {
+ Utils.closeQuietly(producer, "producer")
+ }
+
+ val testGroupId = "test_group_id"
+ val testClientId = "test_client_id"
+ val testInstanceId1 = "test_instance_id_1"
+ val testInstanceId2 = "test_instance_id_2"
+ val fakeGroupId = "fake_group_id"
+
+ // contains two static members and one dynamic member
+ val groupInstances = Set(testInstanceId1, testInstanceId2, "")
+ val topics = Set(testTopicName, testTopicName1, testTopicName2)
+
+ // We need to disable the auto commit because after the members got
removed from group, the offset commit
+ // will cause the member rejoining and the test will be flaky (check
ConsumerCoordinator#OffsetCommitResponseHandler)
+ val defaultConsumerConfig = new Properties(consumerConfig)
+
defaultConsumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
"false")
+ defaultConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG,
testGroupId)
+ defaultConsumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG,
testClientId)
+ val backgroundConsumers = prepareConsumers(groupInstances, topics,
defaultConsumerConfig)
+
+ try {
+ val groupType = if
(groupProtocol.equalsIgnoreCase(GroupProtocol.CONSUMER.name))
GroupType.CONSUMER else GroupType.CLASSIC
+ // Start consumer polling threads in the background
+ backgroundConsumers.start()
val describeWithFakeGroupResult =
client.describeConsumerGroups(util.List.of(testGroupId, fakeGroupId),
new
DescribeConsumerGroupsOptions().includeAuthorizedOperations(true))
@@ -1940,17 +2020,14 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
assertEquals(testGroupId, testGroupDescription.groupId())
assertFalse(testGroupDescription.isSimpleConsumerGroup)
- assertEquals(groupInstanceSet.size,
testGroupDescription.members().size())
+ assertEquals(groupInstances.size,
testGroupDescription.members().size())
val members = testGroupDescription.members()
members.asScala.foreach { member =>
assertEquals(testClientId, member.clientId)
assertEquals(if (groupType == GroupType.CLASSIC) Optional.empty else
Optional.of(true), member.upgraded)
}
val topicPartitionsByTopic =
members.asScala.flatMap(_.assignment().topicPartitions().asScala).groupBy(_.topic())
- topicSet.foreach { topic =>
- val topicPartitions = topicPartitionsByTopic.getOrElse(topic,
List.empty)
- assertEquals(testNumPartitions, topicPartitions.size)
- }
+ topics.foreach(topic => assertEquals(testNumPartitions,
topicPartitionsByTopic.getOrElse(topic, List.empty).size))
val expectedOperations =
AclEntry.supportedOperations(ResourceType.GROUP)
assertEquals(expectedOperations,
testGroupDescription.authorizedOperations())
@@ -1963,35 +2040,8 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
// Test that all() also throws GroupIdNotFoundException
assertFutureThrows(classOf[GroupIdNotFoundException],
describeWithFakeGroupResult.all(),
s"Group $fakeGroupId not found.")
-
- val testTopicPart0 = new TopicPartition(testTopicName, 0)
-
- // Test listConsumerGroupOffsets
- TestUtils.waitUntilTrue(() => {
- val parts =
client.listConsumerGroupOffsets(testGroupId).partitionsToOffsetAndMetadata().get()
- parts.containsKey(testTopicPart0) &&
(parts.get(testTopicPart0).offset() == 1)
- }, s"Expected the offset for partition 0 to eventually become 1.")
-
- // Test listConsumerGroupOffsets with requireStable true
- val options = new ListConsumerGroupOffsetsOptions().requireStable(true)
- var parts = client.listConsumerGroupOffsets(testGroupId, options)
- .partitionsToOffsetAndMetadata().get()
- assertTrue(parts.containsKey(testTopicPart0))
- assertEquals(1, parts.get(testTopicPart0).offset())
-
- // Test listConsumerGroupOffsets with listConsumerGroupOffsetsSpec
- val groupSpecs = util.Map.of(testGroupId,
- new ListConsumerGroupOffsetsSpec().topicPartitions(util.Set.of(new
TopicPartition(testTopicName, 0))))
- parts =
client.listConsumerGroupOffsets(groupSpecs).partitionsToOffsetAndMetadata().get()
- assertTrue(parts.containsKey(testTopicPart0))
- assertEquals(1, parts.get(testTopicPart0).offset())
-
- // Test listConsumerGroupOffsets with listConsumerGroupOffsetsSpec and
requireStable option
- parts = client.listConsumerGroupOffsets(groupSpecs,
options).partitionsToOffsetAndMetadata().get()
- assertTrue(parts.containsKey(testTopicPart0))
- assertEquals(1, parts.get(testTopicPart0).offset())
} finally {
- backgroundConsumerSet.close()
+ backgroundConsumers.close()
}
} finally {
Utils.closeQuietly(client, "adminClient")
@@ -2089,29 +2139,15 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
val config = createConfig
client = Admin.create(config)
try {
- // Verify that initially there are no consumer groups to list.
- val list1 = client.listConsumerGroups()
- assertEquals(0, list1.all().get().size())
- assertEquals(0, list1.errors().get().size())
- assertEquals(0, list1.valid().get().size())
+ assertConsumerGroupsIsClean()
+
val testTopicName = "test_topic"
val testTopicName1 = testTopicName + "1"
val testTopicName2 = testTopicName + "2"
val testNumPartitions = 2
- client.createTopics(util.List.of(
- new NewTopic(testTopicName, testNumPartitions, 1.toShort),
- new NewTopic(testTopicName1, testNumPartitions, 1.toShort),
- new NewTopic(testTopicName2, testNumPartitions, 1.toShort)
- )).all().get()
- waitForTopics(client, List(testTopicName, testTopicName1,
testTopicName2), List())
-
- val producer = createProducer()
- try {
- producer.send(new ProducerRecord(testTopicName, 0, null, null)).get()
- } finally {
- Utils.closeQuietly(producer, "producer")
- }
+ prepareTopics(List(testTopicName, testTopicName1, testTopicName2),
testNumPartitions)
+ prepareRecords(testTopicName)
val testGroupId = "test_group_id"
val testClientId = "test_client_id"
@@ -2291,28 +2327,15 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
client = Admin.create(config)
try {
// Verify that initially there are no consumer groups to list.
- val list1 = client.listConsumerGroups()
- assertEquals(0, list1.all().get().size())
- assertEquals(0, list1.errors().get().size())
- assertEquals(0, list1.valid().get().size())
+ assertConsumerGroupsIsClean()
val testTopicName = "test_topic"
val testTopicName1 = testTopicName + "1"
val testTopicName2 = testTopicName + "2"
val testNumPartitions = 2
- client.createTopics(util.List.of(
- new NewTopic(testTopicName, testNumPartitions, 1.toShort),
- new NewTopic(testTopicName1, testNumPartitions, 1.toShort),
- new NewTopic(testTopicName2, testNumPartitions, 1.toShort)
- )).all().get()
- waitForTopics(client, List(testTopicName, testTopicName1,
testTopicName2), List())
+ prepareTopics(List(testTopicName, testTopicName1, testTopicName2),
testNumPartitions)
- val producer = createProducer()
- try {
- producer.send(new ProducerRecord(testTopicName, 0, null, null)).get()
- } finally {
- Utils.closeQuietly(producer, "producer")
- }
+ prepareRecords(testTopicName)
val testGroupId = "test_group_id"
val testClientId = "test_client_id"
@@ -2461,12 +2484,7 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
new NewTopic(testTopicName, 1, 1.toShort))).all().get()
waitForTopics(client, List(testTopicName), List())
- val producer = createProducer()
- try {
- producer.send(new ProducerRecord(testTopicName, 0, null, null)).get()
- } finally {
- Utils.closeQuietly(producer, "producer")
- }
+ prepareRecords(testTopicName)
val newConsumerConfig = new Properties(consumerConfig)
newConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG,
testGroupId)
@@ -2511,6 +2529,43 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
}
}
+ private def prepareTopics(topics: List[String], numberOfPartitions: Int):
Unit = {
+ client.createTopics(topics.map(topic => new NewTopic(topic,
numberOfPartitions, 1.toShort)).asJava).all().get()
+ waitForTopics(client, topics, List())
+ }
+
+ private def prepareRecords(testTopicName: String) = {
+ val producer = createProducer()
+ try {
+ producer.send(new ProducerRecord(testTopicName, 0, null, null)).get()
+ } finally {
+ Utils.closeQuietly(producer, "producer")
+ }
+ }
+
+ private def prepareConsumers(groupInstanceSet: Set[String], topicSet:
Set[String], defaultConsumerConfig: Properties) = {
+ val backgroundConsumerSet = new
BackgroundConsumerSet(defaultConsumerConfig)
+ groupInstanceSet.zip(topicSet).foreach { case (groupInstanceId, topic) =>
+ val configOverrides = new Properties()
+ if (groupInstanceId != "") {
+ // static member
+ configOverrides.setProperty(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG,
groupInstanceId)
+ }
+ backgroundConsumerSet.addConsumer(topic, configOverrides)
+ }
+ backgroundConsumerSet
+ }
+
+ /**
+ * Verify that initially there are no consumer groups to list.
+ */
+ private def assertConsumerGroupsIsClean(): Unit = {
+ val listResult = client.listConsumerGroups()
+ assertEquals(0, listResult.all().get().size())
+ assertEquals(0, listResult.errors().get().size())
+ assertEquals(0, listResult.valid().get().size())
+ }
+
@Test
def testListGroups(): Unit = {
val classicGroupId = "classic_group_id"