This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ef07b5fad1a KAFKA-19461: Add share group admin integration tests to
PlaintextAdminIntegrationTest (#20103)
ef07b5fad1a is described below
commit ef07b5fad1a864829022c5c4210b1edde0d40843
Author: Lan Ding <[email protected]>
AuthorDate: Mon Jul 21 16:08:26 2025 +0800
KAFKA-19461: Add share group admin integration tests to
PlaintextAdminIntegrationTest (#20103)
Add its for `Admin.deleteShareGroupOffsets`,
`Admin.alterShareGroupOffsets` and `Admin.listShareGroupOffsets` to
`PlaintextAdminIntegrationTest`.
Reviewers: Andrew Schofield <[email protected]>
---
.../kafka/api/PlaintextAdminIntegrationTest.scala | 251 ++++++++++++++++++---
1 file changed, 217 insertions(+), 34 deletions(-)
diff --git
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index 59eba1eb186..44835885e0c 100644
---
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -2698,6 +2698,34 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
}
}
+ /**
+ * Verify that initially there are no share groups to list.
+ */
+ private def assertNoShareGroupsExist(): Unit = {
+ val list = client.listGroups()
+ assertEquals(0, list.all().get().size())
+ assertEquals(0, list.errors().get().size())
+ assertEquals(0, list.valid().get().size())
+ }
+
+ private def createShareConsumerThread[K,V](consumer: ShareConsumer[K,V],
topic: String, latch: CountDownLatch): Thread = {
+ new Thread {
+ override def run : Unit = {
+ consumer.subscribe(util.Set.of(topic))
+ try {
+ while (true) {
+ consumer.poll(JDuration.ofSeconds(5))
+ if (latch.getCount > 0L)
+ latch.countDown()
+ consumer.commitSync()
+ }
+ } catch {
+ case _: InterruptException => // Suppress the output to stderr
+ }
+ }
+ }
+ }
+
@Test
def testShareGroups(): Unit = {
val testGroupId = "test_group_id"
@@ -2715,46 +2743,17 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
val consumerSet = Set(createShareConsumer(configOverrides =
createProperties()))
val topicSet = Set(testTopicName)
-
val latch = new CountDownLatch(consumerSet.size)
- def createShareConsumerThread[K,V](consumer: ShareConsumer[K,V], topic:
String): Thread = {
- new Thread {
- override def run : Unit = {
- consumer.subscribe(util.Set.of(topic))
- try {
- while (true) {
- consumer.poll(JDuration.ofSeconds(5))
- if (latch.getCount > 0L)
- latch.countDown()
- consumer.commitSync()
- }
- } catch {
- case _: InterruptException => // Suppress the output to stderr
- }
- }
- }
- }
-
val config = createConfig
client = Admin.create(config)
- val producer = createProducer()
try {
- // Verify that initially there are no share groups to list.
- val list = client.listGroups()
- assertEquals(0, list.all().get().size())
- assertEquals(0, list.errors().get().size())
- assertEquals(0, list.valid().get().size())
-
- client.createTopics(util.Set.of(
- new NewTopic(testTopicName, testNumPartitions, 1.toShort)
- )).all().get()
- waitForTopics(client, List(testTopicName), List())
-
- producer.send(new ProducerRecord(testTopicName, 0, null, null)).get()
+ assertNoShareGroupsExist()
+ prepareTopics(List(testTopicName), testNumPartitions)
+ prepareRecords(testTopicName)
// Start consumers in a thread that will subscribe to a new group.
- val consumerThreads = consumerSet.zip(topicSet).map(zipped =>
createShareConsumerThread(zipped._1, zipped._2))
+ val consumerThreads = consumerSet.zip(topicSet).map(zipped =>
createShareConsumerThread(zipped._1, zipped._2, latch))
try {
consumerThreads.foreach(_.start())
@@ -2846,7 +2845,191 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
}
} finally {
consumerSet.foreach(consumer => Utils.closeQuietly(consumer, "consumer"))
- Utils.closeQuietly(producer, "producer")
+ Utils.closeQuietly(client, "adminClient")
+ }
+ }
+
+ @Test
+ def testDeleteShareGroupOffsets(): Unit = {
+ val config = createConfig
+ client = Admin.create(config)
+ val testTopicName = "test_topic"
+ val testGroupId = "test_group_id"
+ val testClientId = "test_client_id"
+ val fakeGroupId = "fake_group_id"
+ val fakeTopicName = "foo"
+
+ try {
+ prepareTopics(List(testTopicName), 1)
+ prepareRecords(testTopicName)
+
+ val newShareConsumerConfig = new Properties(consumerConfig)
+ newShareConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG,
testGroupId)
+ newShareConsumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG,
testClientId)
+
+ Using.resource(createShareConsumer(configOverrides =
newShareConsumerConfig)) { consumer =>
+ consumer.subscribe(util.List.of(testTopicName))
+ consumer.poll(JDuration.ofMillis(DEFAULT_MAX_WAIT_MS))
+ consumer.commitSync()
+
+ // listGroups is used to list share groups
+ // Test that we can list the new group.
+ TestUtils.waitUntilTrue(() => {
+ client.listGroups.all.get.stream().filter(group =>
+ group.groupId == testGroupId &&
+ group.groupState.get == GroupState.STABLE).count() == 1
+ }, s"Expected to be able to list $testGroupId")
+
+ // Test offset deletion while consuming
+ val offsetDeleteResult = client.deleteShareGroupOffsets(testGroupId,
util.Set.of(testTopicName, fakeTopicName))
+
+ // Deleting the offset with real group ID should get
GroupNotEmptyException
+ assertFutureThrows(classOf[GroupNotEmptyException],
offsetDeleteResult.all())
+ assertFutureThrows(classOf[GroupNotEmptyException],
offsetDeleteResult.topicResult(testTopicName))
+ assertFutureThrows(classOf[GroupNotEmptyException],
offsetDeleteResult.topicResult(fakeTopicName))
+
+ // Test the fake group ID
+ val fakeDeleteResult = client.deleteShareGroupOffsets(fakeGroupId,
util.Set.of(testTopicName, fakeTopicName))
+
+ assertFutureThrows(classOf[GroupIdNotFoundException],
fakeDeleteResult.all())
+ assertFutureThrows(classOf[GroupIdNotFoundException],
fakeDeleteResult.topicResult(testTopicName))
+ assertFutureThrows(classOf[GroupIdNotFoundException],
fakeDeleteResult.topicResult(fakeTopicName))
+ }
+
+ // Test offset deletion when group is empty
+ val offsetDeleteResult = client.deleteShareGroupOffsets(testGroupId,
util.Set.of(testTopicName, fakeTopicName))
+
+ assertFutureThrows(classOf[UnknownTopicOrPartitionException],
offsetDeleteResult.all())
+ assertNull(offsetDeleteResult.topicResult(testTopicName).get())
+ assertFutureThrows(classOf[UnknownTopicOrPartitionException],
offsetDeleteResult.topicResult(fakeTopicName))
+
+ val tp1 = new TopicPartition(testTopicName, 0)
+ val parts = client.listShareGroupOffsets(util.Map.of(testGroupId, new
ListShareGroupOffsetsSpec().topicPartitions(util.List.of(tp1))))
+ .partitionsToOffsetAndMetadata(testGroupId)
+ .get()
+ assertTrue(parts.containsKey(tp1))
+ assertNull(parts.get(tp1))
+ } finally {
+ Utils.closeQuietly(client, "adminClient")
+ }
+ }
+
+ @Test
+ def testAlterShareGroupOffsets(): Unit = {
+ val config = createConfig
+ client = Admin.create(config)
+ val testTopicName = "test_topic"
+ val testGroupId = "test_group_id"
+ val testClientId = "test_client_id"
+ val fakeGroupId = "fake_group_id"
+ val fakeTopicName = "foo"
+
+ val tp1 = new TopicPartition(testTopicName, 0)
+ val tp2 = new TopicPartition(fakeTopicName, 0)
+ try {
+ prepareTopics(List(testTopicName), 1)
+ prepareRecords(testTopicName)
+
+ val newShareConsumerConfig = new Properties(consumerConfig)
+ newShareConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG,
testGroupId)
+ newShareConsumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG,
testClientId)
+
+ Using.resource(createShareConsumer(configOverrides =
newShareConsumerConfig)) { consumer =>
+ consumer.subscribe(util.List.of(testTopicName))
+ consumer.poll(JDuration.ofMillis(DEFAULT_MAX_WAIT_MS))
+ consumer.commitSync()
+
+ // listGroups is used to list share groups
+ // Test that we can list the new group.
+ TestUtils.waitUntilTrue(() => {
+ client.listGroups.all.get.stream().filter(group =>
+ group.groupId == testGroupId &&
+ group.groupState.get == GroupState.STABLE).count() == 1
+ }, s"Expected to be able to list $testGroupId")
+
+ // Test offset alter while consuming
+ val offsetAlterResult = client.alterShareGroupOffsets(testGroupId,
util.Map.of(tp1, 0, tp2, 0))
+
+ // Altering the offset with real group ID should get
GroupNotEmptyException
+ assertFutureThrows(classOf[GroupNotEmptyException],
offsetAlterResult.all())
+ assertFutureThrows(classOf[GroupNotEmptyException],
offsetAlterResult.partitionResult(tp1))
+ assertFutureThrows(classOf[GroupNotEmptyException],
offsetAlterResult.partitionResult(tp2))
+
+ // Test the fake group ID
+ val fakeAlterResult = client.alterShareGroupOffsets(fakeGroupId,
util.Map.of(tp1, 0, tp2, 0))
+
+ assertFutureThrows(classOf[GroupIdNotFoundException],
fakeAlterResult.all())
+ assertFutureThrows(classOf[GroupIdNotFoundException],
fakeAlterResult.partitionResult(tp1))
+ assertFutureThrows(classOf[GroupIdNotFoundException],
fakeAlterResult.partitionResult(tp2))
+ }
+
+ // Test offset alter when group is empty
+ val offsetAlterResult = client.alterShareGroupOffsets(testGroupId,
util.Map.of(tp1, 0, tp2, 0))
+
+ assertFutureThrows(classOf[UnknownTopicOrPartitionException],
offsetAlterResult.all())
+ assertNull(offsetAlterResult.partitionResult(tp1).get())
+ assertFutureThrows(classOf[UnknownTopicOrPartitionException],
offsetAlterResult.partitionResult(tp2))
+
+ val parts = client.listShareGroupOffsets(util.Map.of(testGroupId, new
ListShareGroupOffsetsSpec().topicPartitions(util.List.of(tp1))))
+ .partitionsToOffsetAndMetadata(testGroupId)
+ .get()
+ assertTrue(parts.containsKey(tp1))
+ assertEquals(0, parts.get(tp1).offset())
+ } finally {
+ Utils.closeQuietly(client, "adminClient")
+ }
+ }
+
+ @Test
+ def testListShareGroupOffsets(): Unit = {
+ val config = createConfig
+ client = Admin.create(config)
+ val testTopicName = "test_topic"
+ val testGroupId = "test_group_id"
+ val testClientId = "test_client_id"
+
+ val newShareConsumerConfig = new Properties(consumerConfig)
+ newShareConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG,
testGroupId)
+ newShareConsumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG,
testClientId)
+ val consumerSet = Set(createShareConsumer(configOverrides =
newShareConsumerConfig))
+ val topicSet = Set(testTopicName)
+ val latch = new CountDownLatch(consumerSet.size)
+
+ try {
+ assertNoShareGroupsExist()
+ prepareTopics(List(testTopicName), 2)
+ prepareRecords(testTopicName)
+
+ // Start consumers in a thread that will subscribe to a new group.
+ val consumerThreads = consumerSet.zip(topicSet).map(zipped =>
createShareConsumerThread(zipped._1, zipped._2, latch))
+ try {
+ consumerThreads.foreach(_.start())
+ assertTrue(latch.await(30000, TimeUnit.MILLISECONDS))
+ val tp1 = new TopicPartition(testTopicName, 0)
+ val tp2 = new TopicPartition(testTopicName, 1)
+
+ // Test listShareGroupOffsets
+ TestUtils.waitUntilTrue(() => {
+ val parts = client.listShareGroupOffsets(util.Map.of(testGroupId,
new ListShareGroupOffsetsSpec()))
+ .partitionsToOffsetAndMetadata(testGroupId)
+ .get()
+ parts.containsKey(tp1) && parts.containsKey(tp2)
+ }, "Expected the result contains all partitions.")
+
+ // Test listShareGroupOffsets with listShareGroupOffsetsSpec
+ val groupSpecs = util.Map.of(testGroupId, new
ListShareGroupOffsetsSpec().topicPartitions(util.List.of(tp1)))
+ val parts =
client.listShareGroupOffsets(groupSpecs).partitionsToOffsetAndMetadata(testGroupId).get()
+ assertTrue(parts.containsKey(tp1))
+ assertFalse(parts.containsKey(tp2))
+ } finally {
+ consumerThreads.foreach {
+ case consumerThread =>
+ consumerThread.interrupt()
+ consumerThread.join()
+ }
+ }
+ } finally {
+ consumerSet.foreach(consumer => Utils.closeQuietly(consumer, "consumer"))
Utils.closeQuietly(client, "adminClient")
}
}