AndrewJSchofield commented on code in PR #20103: URL: https://github.com/apache/kafka/pull/20103#discussion_r2216189008
########## core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala: ########## @@ -2697,6 +2697,34 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } } + /** + * Verify that initially there are no share groups to list. + */ + private def assertShareGroupsIsClean(): Unit = { Review Comment: nit: Maybe `assertNoShareGroupsExist` is better than "clean". ########## core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala: ########## @@ -2845,7 +2844,178 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } } finally { consumerSet.foreach(consumer => Utils.closeQuietly(consumer, "consumer")) - Utils.closeQuietly(producer, "producer") + Utils.closeQuietly(client, "adminClient") + } + } + + @Test + def testDeleteShareGroupOffsets(): Unit = { + val config = createConfig + client = Admin.create(config) + val testTopicName = "test_topic" + val testGroupId = "test_group_id" + val testClientId = "test_client_id" + val fakeGroupId = "fake_group_id" + val fakeTopicName = "foo" + + try { + prepareTopics(List(testTopicName), 1) + prepareRecords(testTopicName) + + val newShareConsumerConfig = new Properties(consumerConfig) + newShareConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, testGroupId) + newShareConsumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, testClientId) + + Using.resource(createShareConsumer(configOverrides = newShareConsumerConfig)) { consumer => + consumer.subscribe(util.List.of(testTopicName)) + consumer.poll(JDuration.ofMillis(DEFAULT_MAX_WAIT_MS)) + consumer.commitSync() + + // listGroups is used to list share groups + // Test that we can list the new group. + TestUtils.waitUntilTrue(() => { + client.listGroups.all.get.stream().filter(group => + group.groupId == testGroupId && + group.groupState.get == GroupState.STABLE).count() == 1 + }, s"Expected to be able to list $testGroupId") + + // Test offset deletion while consuming + val offsetDeleteResult = client.deleteShareGroupOffsets(testGroupId, util.Set.of(testTopicName, fakeTopicName)) + + // Deleting the offset with real group ID should get GroupNotEmptyException + assertFutureThrows(classOf[GroupNotEmptyException], offsetDeleteResult.all()) + assertFutureThrows(classOf[GroupNotEmptyException], offsetDeleteResult.topicResult(testTopicName)) + assertFutureThrows(classOf[GroupNotEmptyException], offsetDeleteResult.topicResult(fakeTopicName)) + + // Test the fake group ID + val fakeDeleteResult = client.deleteShareGroupOffsets(fakeGroupId, util.Set.of(testTopicName, fakeTopicName)) + + assertFutureThrows(classOf[GroupIdNotFoundException], fakeDeleteResult.all()) + assertFutureThrows(classOf[GroupIdNotFoundException], fakeDeleteResult.topicResult(testTopicName)) + assertFutureThrows(classOf[GroupIdNotFoundException], fakeDeleteResult.topicResult(fakeTopicName)) + } + + // Test offset deletion when group is empty + val offsetDeleteResult = client.deleteShareGroupOffsets(testGroupId, util.Set.of(testTopicName, fakeTopicName)) + + assertFutureThrows(classOf[UnknownTopicOrPartitionException], offsetDeleteResult.all()) + assertNull(offsetDeleteResult.topicResult(testTopicName).get()) + assertFutureThrows(classOf[UnknownTopicOrPartitionException], offsetDeleteResult.topicResult(fakeTopicName)) + } finally { Review Comment: We should be able to see whether offsets were deleted using `listShareGroupOffsets`. ########## core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala: ########## @@ -2845,7 +2844,178 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } } finally { consumerSet.foreach(consumer => Utils.closeQuietly(consumer, "consumer")) - Utils.closeQuietly(producer, "producer") + Utils.closeQuietly(client, "adminClient") + } + } + + @Test + def testDeleteShareGroupOffsets(): Unit = { + val config = createConfig + client = Admin.create(config) + val testTopicName = "test_topic" + val testGroupId = "test_group_id" + val testClientId = "test_client_id" + val fakeGroupId = "fake_group_id" + val fakeTopicName = "foo" + + try { + prepareTopics(List(testTopicName), 1) + prepareRecords(testTopicName) + + val newShareConsumerConfig = new Properties(consumerConfig) + newShareConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, testGroupId) + newShareConsumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, testClientId) + + Using.resource(createShareConsumer(configOverrides = newShareConsumerConfig)) { consumer => + consumer.subscribe(util.List.of(testTopicName)) + consumer.poll(JDuration.ofMillis(DEFAULT_MAX_WAIT_MS)) + consumer.commitSync() + + // listGroups is used to list share groups + // Test that we can list the new group. + TestUtils.waitUntilTrue(() => { + client.listGroups.all.get.stream().filter(group => + group.groupId == testGroupId && + group.groupState.get == GroupState.STABLE).count() == 1 + }, s"Expected to be able to list $testGroupId") + + // Test offset deletion while consuming + val offsetDeleteResult = client.deleteShareGroupOffsets(testGroupId, util.Set.of(testTopicName, fakeTopicName)) + + // Deleting the offset with real group ID should get GroupNotEmptyException + assertFutureThrows(classOf[GroupNotEmptyException], offsetDeleteResult.all()) + assertFutureThrows(classOf[GroupNotEmptyException], offsetDeleteResult.topicResult(testTopicName)) + assertFutureThrows(classOf[GroupNotEmptyException], offsetDeleteResult.topicResult(fakeTopicName)) + + // Test the fake group ID + val fakeDeleteResult = client.deleteShareGroupOffsets(fakeGroupId, util.Set.of(testTopicName, fakeTopicName)) + + assertFutureThrows(classOf[GroupIdNotFoundException], fakeDeleteResult.all()) + assertFutureThrows(classOf[GroupIdNotFoundException], fakeDeleteResult.topicResult(testTopicName)) + assertFutureThrows(classOf[GroupIdNotFoundException], fakeDeleteResult.topicResult(fakeTopicName)) + } + + // Test offset deletion when group is empty + val offsetDeleteResult = client.deleteShareGroupOffsets(testGroupId, util.Set.of(testTopicName, fakeTopicName)) + + assertFutureThrows(classOf[UnknownTopicOrPartitionException], offsetDeleteResult.all()) + assertNull(offsetDeleteResult.topicResult(testTopicName).get()) + assertFutureThrows(classOf[UnknownTopicOrPartitionException], offsetDeleteResult.topicResult(fakeTopicName)) + } finally { + Utils.closeQuietly(client, "adminClient") + } + } + + @Test + def testAlterShareGroupOffsets(): Unit = { + val config = createConfig + client = Admin.create(config) + val testTopicName = "test_topic" + val testGroupId = "test_group_id" + val testClientId = "test_client_id" + val fakeGroupId = "fake_group_id" + val fakeTopicName = "foo" + + val tp1 = new TopicPartition(testTopicName, 0) + val tp2 = new TopicPartition(fakeTopicName, 0) + try { + prepareTopics(List(testTopicName), 1) + prepareRecords(testTopicName) + + val newShareConsumerConfig = new Properties(consumerConfig) + newShareConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, testGroupId) + newShareConsumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, testClientId) + + Using.resource(createShareConsumer(configOverrides = newShareConsumerConfig)) { consumer => + consumer.subscribe(util.List.of(testTopicName)) + consumer.poll(JDuration.ofMillis(DEFAULT_MAX_WAIT_MS)) + consumer.commitSync() + + // listGroups is used to list share groups + // Test that we can list the new group. + TestUtils.waitUntilTrue(() => { + client.listGroups.all.get.stream().filter(group => + group.groupId == testGroupId && + group.groupState.get == GroupState.STABLE).count() == 1 + }, s"Expected to be able to list $testGroupId") + + // Test offset alter while consuming + val offsetAlterResult = client.alterShareGroupOffsets(testGroupId, util.Map.of(tp1, 0, tp2, 0)) + + // Altering the offset with real group ID should get GroupNotEmptyException + assertFutureThrows(classOf[GroupNotEmptyException], offsetAlterResult.all()) + assertFutureThrows(classOf[GroupNotEmptyException], offsetAlterResult.partitionResult(tp1)) + assertFutureThrows(classOf[GroupNotEmptyException], offsetAlterResult.partitionResult(tp2)) + + // Test the fake group ID + val fakeAlterResult = client.alterShareGroupOffsets(fakeGroupId, util.Map.of(tp1, 0, tp2, 0)) + + assertFutureThrows(classOf[GroupIdNotFoundException], fakeAlterResult.all()) + assertFutureThrows(classOf[GroupIdNotFoundException], fakeAlterResult.partitionResult(tp1)) + assertFutureThrows(classOf[GroupIdNotFoundException], fakeAlterResult.partitionResult(tp2)) + } + + // Test offset alter when group is empty + val offsetAlterResult = client.alterShareGroupOffsets(testGroupId, util.Map.of(tp1, 0, tp2, 0)) + + assertFutureThrows(classOf[UnknownTopicOrPartitionException], offsetAlterResult.all()) + assertNull(offsetAlterResult.partitionResult(tp1).get()) + assertFutureThrows(classOf[UnknownTopicOrPartitionException], offsetAlterResult.partitionResult(tp2)) Review Comment: It would be nice to check that the alteration succeeded using `listShareGroupOffsets`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org