This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ef07b5fad1a KAFKA-19461: Add share group admin integration tests to 
PlaintextAdminIntegrationTest (#20103)
ef07b5fad1a is described below

commit ef07b5fad1a864829022c5c4210b1edde0d40843
Author: Lan Ding <[email protected]>
AuthorDate: Mon Jul 21 16:08:26 2025 +0800

    KAFKA-19461: Add share group admin integration tests to 
PlaintextAdminIntegrationTest (#20103)
    
    Add its for `Admin.deleteShareGroupOffsets`,
    `Admin.alterShareGroupOffsets` and `Admin.listShareGroupOffsets`  to
    `PlaintextAdminIntegrationTest`.
    
    Reviewers: Andrew Schofield <[email protected]>
---
 .../kafka/api/PlaintextAdminIntegrationTest.scala  | 251 ++++++++++++++++++---
 1 file changed, 217 insertions(+), 34 deletions(-)

diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index 59eba1eb186..44835885e0c 100644
--- 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -2698,6 +2698,34 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     }
   }
 
+  /**
+   * Verify that initially there are no share groups to list.
+   */
+  private def assertNoShareGroupsExist(): Unit = {
+    val list = client.listGroups()
+    assertEquals(0, list.all().get().size())
+    assertEquals(0, list.errors().get().size())
+    assertEquals(0, list.valid().get().size())
+  }
+
+  private def createShareConsumerThread[K,V](consumer: ShareConsumer[K,V], 
topic: String, latch: CountDownLatch): Thread = {
+    new Thread {
+      override def run : Unit = {
+        consumer.subscribe(util.Set.of(topic))
+        try {
+          while (true) {
+            consumer.poll(JDuration.ofSeconds(5))
+            if (latch.getCount > 0L)
+              latch.countDown()
+            consumer.commitSync()
+          }
+        } catch {
+          case _: InterruptException => // Suppress the output to stderr
+        }
+      }
+    }
+  }
+
   @Test
   def testShareGroups(): Unit = {
     val testGroupId = "test_group_id"
@@ -2715,46 +2743,17 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
 
     val consumerSet = Set(createShareConsumer(configOverrides = 
createProperties()))
     val topicSet = Set(testTopicName)
-
     val latch = new CountDownLatch(consumerSet.size)
 
-    def createShareConsumerThread[K,V](consumer: ShareConsumer[K,V], topic: 
String): Thread = {
-      new Thread {
-        override def run : Unit = {
-          consumer.subscribe(util.Set.of(topic))
-          try {
-            while (true) {
-              consumer.poll(JDuration.ofSeconds(5))
-              if (latch.getCount > 0L)
-                latch.countDown()
-              consumer.commitSync()
-            }
-          } catch {
-            case _: InterruptException => // Suppress the output to stderr
-          }
-        }
-      }
-    }
-
     val config = createConfig
     client = Admin.create(config)
-    val producer = createProducer()
     try {
-      // Verify that initially there are no share groups to list.
-      val list = client.listGroups()
-      assertEquals(0, list.all().get().size())
-      assertEquals(0, list.errors().get().size())
-      assertEquals(0, list.valid().get().size())
-
-      client.createTopics(util.Set.of(
-        new NewTopic(testTopicName, testNumPartitions, 1.toShort)
-      )).all().get()
-      waitForTopics(client, List(testTopicName), List())
-
-      producer.send(new ProducerRecord(testTopicName, 0, null, null)).get()
+      assertNoShareGroupsExist()
+      prepareTopics(List(testTopicName), testNumPartitions)
+      prepareRecords(testTopicName)
 
       // Start consumers in a thread that will subscribe to a new group.
-      val consumerThreads = consumerSet.zip(topicSet).map(zipped => 
createShareConsumerThread(zipped._1, zipped._2))
+      val consumerThreads = consumerSet.zip(topicSet).map(zipped => 
createShareConsumerThread(zipped._1, zipped._2, latch))
 
       try {
         consumerThreads.foreach(_.start())
@@ -2846,7 +2845,191 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
       }
     } finally {
       consumerSet.foreach(consumer => Utils.closeQuietly(consumer, "consumer"))
-      Utils.closeQuietly(producer, "producer")
+      Utils.closeQuietly(client, "adminClient")
+    }
+  }
+
+  @Test
+  def testDeleteShareGroupOffsets(): Unit = {
+    val config = createConfig
+    client = Admin.create(config)
+    val testTopicName = "test_topic"
+    val testGroupId = "test_group_id"
+    val testClientId = "test_client_id"
+    val fakeGroupId = "fake_group_id"
+    val fakeTopicName = "foo"
+
+    try {
+      prepareTopics(List(testTopicName), 1)
+      prepareRecords(testTopicName)
+
+      val newShareConsumerConfig = new Properties(consumerConfig)
+      newShareConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
testGroupId)
+      newShareConsumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, 
testClientId)
+
+      Using.resource(createShareConsumer(configOverrides = 
newShareConsumerConfig)) { consumer =>
+        consumer.subscribe(util.List.of(testTopicName))
+        consumer.poll(JDuration.ofMillis(DEFAULT_MAX_WAIT_MS))
+        consumer.commitSync()
+
+        // listGroups is used to list share groups
+        // Test that we can list the new group.
+        TestUtils.waitUntilTrue(() => {
+          client.listGroups.all.get.stream().filter(group =>
+            group.groupId == testGroupId &&
+              group.groupState.get == GroupState.STABLE).count() == 1
+        }, s"Expected to be able to list $testGroupId")
+
+        // Test offset deletion while consuming
+        val offsetDeleteResult = client.deleteShareGroupOffsets(testGroupId, 
util.Set.of(testTopicName, fakeTopicName))
+
+        // Deleting the offset with real group ID should get 
GroupNotEmptyException
+        assertFutureThrows(classOf[GroupNotEmptyException], 
offsetDeleteResult.all())
+        assertFutureThrows(classOf[GroupNotEmptyException], 
offsetDeleteResult.topicResult(testTopicName))
+        assertFutureThrows(classOf[GroupNotEmptyException], 
offsetDeleteResult.topicResult(fakeTopicName))
+
+        // Test the fake group ID
+        val fakeDeleteResult = client.deleteShareGroupOffsets(fakeGroupId, 
util.Set.of(testTopicName, fakeTopicName))
+
+        assertFutureThrows(classOf[GroupIdNotFoundException], 
fakeDeleteResult.all())
+        assertFutureThrows(classOf[GroupIdNotFoundException], 
fakeDeleteResult.topicResult(testTopicName))
+        assertFutureThrows(classOf[GroupIdNotFoundException], 
fakeDeleteResult.topicResult(fakeTopicName))
+      }
+
+      // Test offset deletion when group is empty
+      val offsetDeleteResult = client.deleteShareGroupOffsets(testGroupId, 
util.Set.of(testTopicName, fakeTopicName))
+
+      assertFutureThrows(classOf[UnknownTopicOrPartitionException], 
offsetDeleteResult.all())
+      assertNull(offsetDeleteResult.topicResult(testTopicName).get())
+      assertFutureThrows(classOf[UnknownTopicOrPartitionException], 
offsetDeleteResult.topicResult(fakeTopicName))
+
+      val tp1 = new TopicPartition(testTopicName, 0)
+      val parts = client.listShareGroupOffsets(util.Map.of(testGroupId, new 
ListShareGroupOffsetsSpec().topicPartitions(util.List.of(tp1))))
+        .partitionsToOffsetAndMetadata(testGroupId)
+        .get()
+      assertTrue(parts.containsKey(tp1))
+      assertNull(parts.get(tp1))
+    } finally {
+      Utils.closeQuietly(client, "adminClient")
+    }
+  }
+
+  @Test
+  def testAlterShareGroupOffsets(): Unit = {
+    val config = createConfig
+    client = Admin.create(config)
+    val testTopicName = "test_topic"
+    val testGroupId = "test_group_id"
+    val testClientId = "test_client_id"
+    val fakeGroupId = "fake_group_id"
+    val fakeTopicName = "foo"
+
+    val tp1 = new TopicPartition(testTopicName, 0)
+    val tp2 = new TopicPartition(fakeTopicName, 0)
+    try {
+      prepareTopics(List(testTopicName), 1)
+      prepareRecords(testTopicName)
+
+      val newShareConsumerConfig = new Properties(consumerConfig)
+      newShareConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
testGroupId)
+      newShareConsumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, 
testClientId)
+
+      Using.resource(createShareConsumer(configOverrides = 
newShareConsumerConfig)) { consumer =>
+        consumer.subscribe(util.List.of(testTopicName))
+        consumer.poll(JDuration.ofMillis(DEFAULT_MAX_WAIT_MS))
+        consumer.commitSync()
+
+        // listGroups is used to list share groups
+        // Test that we can list the new group.
+        TestUtils.waitUntilTrue(() => {
+          client.listGroups.all.get.stream().filter(group =>
+            group.groupId == testGroupId &&
+              group.groupState.get == GroupState.STABLE).count() == 1
+        }, s"Expected to be able to list $testGroupId")
+
+        // Test offset alter while consuming
+        val offsetAlterResult = client.alterShareGroupOffsets(testGroupId, 
util.Map.of(tp1, 0, tp2, 0))
+
+        // Altering the offset with real group ID should get 
GroupNotEmptyException
+        assertFutureThrows(classOf[GroupNotEmptyException], 
offsetAlterResult.all())
+        assertFutureThrows(classOf[GroupNotEmptyException], 
offsetAlterResult.partitionResult(tp1))
+        assertFutureThrows(classOf[GroupNotEmptyException], 
offsetAlterResult.partitionResult(tp2))
+
+        // Test the fake group ID
+        val fakeAlterResult = client.alterShareGroupOffsets(fakeGroupId, 
util.Map.of(tp1, 0, tp2, 0))
+
+        assertFutureThrows(classOf[GroupIdNotFoundException], 
fakeAlterResult.all())
+        assertFutureThrows(classOf[GroupIdNotFoundException], 
fakeAlterResult.partitionResult(tp1))
+        assertFutureThrows(classOf[GroupIdNotFoundException], 
fakeAlterResult.partitionResult(tp2))
+      }
+
+      // Test offset alter when group is empty
+      val offsetAlterResult = client.alterShareGroupOffsets(testGroupId, 
util.Map.of(tp1, 0, tp2, 0))
+
+      assertFutureThrows(classOf[UnknownTopicOrPartitionException], 
offsetAlterResult.all())
+      assertNull(offsetAlterResult.partitionResult(tp1).get())
+      assertFutureThrows(classOf[UnknownTopicOrPartitionException], 
offsetAlterResult.partitionResult(tp2))
+
+      val parts = client.listShareGroupOffsets(util.Map.of(testGroupId, new 
ListShareGroupOffsetsSpec().topicPartitions(util.List.of(tp1))))
+        .partitionsToOffsetAndMetadata(testGroupId)
+        .get()
+      assertTrue(parts.containsKey(tp1))
+      assertEquals(0, parts.get(tp1).offset())
+    } finally {
+      Utils.closeQuietly(client, "adminClient")
+    }
+  }
+
+  @Test
+  def testListShareGroupOffsets(): Unit = {
+    val config = createConfig
+    client = Admin.create(config)
+    val testTopicName = "test_topic"
+    val testGroupId = "test_group_id"
+    val testClientId = "test_client_id"
+
+    val newShareConsumerConfig = new Properties(consumerConfig)
+    newShareConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
testGroupId)
+    newShareConsumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, 
testClientId)
+    val consumerSet = Set(createShareConsumer(configOverrides = 
newShareConsumerConfig))
+    val topicSet = Set(testTopicName)
+    val latch = new CountDownLatch(consumerSet.size)
+
+    try {
+      assertNoShareGroupsExist()
+      prepareTopics(List(testTopicName), 2)
+      prepareRecords(testTopicName)
+
+      // Start consumers in a thread that will subscribe to a new group.
+      val consumerThreads = consumerSet.zip(topicSet).map(zipped => 
createShareConsumerThread(zipped._1, zipped._2, latch))
+      try {
+        consumerThreads.foreach(_.start())
+        assertTrue(latch.await(30000, TimeUnit.MILLISECONDS))
+        val tp1 = new TopicPartition(testTopicName, 0)
+        val tp2 = new TopicPartition(testTopicName, 1)
+
+        // Test listShareGroupOffsets
+        TestUtils.waitUntilTrue(() => {
+          val parts = client.listShareGroupOffsets(util.Map.of(testGroupId, 
new ListShareGroupOffsetsSpec()))
+            .partitionsToOffsetAndMetadata(testGroupId)
+            .get()
+          parts.containsKey(tp1) && parts.containsKey(tp2)
+        }, "Expected the result contains all partitions.")
+
+        // Test listShareGroupOffsets with listShareGroupOffsetsSpec
+        val groupSpecs = util.Map.of(testGroupId, new 
ListShareGroupOffsetsSpec().topicPartitions(util.List.of(tp1)))
+        val parts = 
client.listShareGroupOffsets(groupSpecs).partitionsToOffsetAndMetadata(testGroupId).get()
+        assertTrue(parts.containsKey(tp1))
+        assertFalse(parts.containsKey(tp2))
+      } finally {
+        consumerThreads.foreach {
+          case consumerThread =>
+            consumerThread.interrupt()
+            consumerThread.join()
+        }
+      }
+    } finally {
+      consumerSet.foreach(consumer => Utils.closeQuietly(consumer, "consumer"))
       Utils.closeQuietly(client, "adminClient")
     }
   }

Reply via email to