This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a81f08d368a KAFKA-19550: Integration test for Streams-related Admin
APIs [1/N] (#20244)
a81f08d368a is described below
commit a81f08d368afccd681fc44010c57d5395669d60c
Author: lucliu1108 <[email protected]>
AuthorDate: Thu Sep 4 08:09:21 2025 -0500
KAFKA-19550: Integration test for Streams-related Admin APIs [1/N] (#20244)
This change adds:
- Integration test for `Admin#describeStreamsGroups` API
- Integration test for `Admin#deleteStreamsGroup` API
Reviewers: Alieh Saeedi <[email protected]>, Lucas Brutschy
<[email protected]>
---------
Co-authored-by: Lucas Brutschy <[email protected]>
---
.../kafka/api/IntegrationTestHarness.scala | 52 ++++++-
.../kafka/api/PlaintextAdminIntegrationTest.scala | 168 ++++++++++++++++++++-
2 files changed, 213 insertions(+), 7 deletions(-)
diff --git
a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index 7c08dd9c3fe..fe24e45f16b 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -22,13 +22,14 @@ import org.apache.kafka.clients.consumer.{Consumer,
ConsumerConfig, KafkaConsume
import kafka.utils.TestUtils
import kafka.utils.Implicits._
-import java.util.{Optional, Properties}
+import java.util
+import java.util.{Optional, Properties, UUID}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig}
import kafka.server.KafkaConfig
import kafka.integration.KafkaServerTestHarness
import kafka.security.JaasTestUtils
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig}
-import org.apache.kafka.clients.consumer.internals.{AsyncKafkaConsumer,
StreamsRebalanceData}
+import org.apache.kafka.clients.consumer.internals.{AsyncKafkaConsumer,
StreamsRebalanceData, StreamsRebalanceListener}
import org.apache.kafka.common.network.{ConnectionMode, ListenerName}
import org.apache.kafka.common.serialization.{ByteArrayDeserializer,
ByteArraySerializer, Deserializer, Serializer}
import org.apache.kafka.common.utils.Utils
@@ -39,6 +40,7 @@ import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
import scala.collection.mutable
import scala.collection.Seq
+import scala.jdk.CollectionConverters._
import scala.jdk.javaapi.OptionConverters
/**
@@ -235,6 +237,52 @@ abstract class IntegrationTestHarness extends
KafkaServerTestHarness {
streamsConsumer
}
+ def createStreamsGroup[K, V](configOverrides: Properties = new Properties,
+ configsToRemove: List[String] = List(),
+ inputTopic: String,
+ streamsGroupId: String): AsyncKafkaConsumer[K,
V] = {
+ val props = new Properties()
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, streamsGroupId)
+ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
classOf[ByteArrayDeserializer].getName)
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
classOf[ByteArrayDeserializer].getName)
+ props ++= configOverrides
+ configsToRemove.foreach(props.remove(_))
+
+ val streamsRebalanceData = new StreamsRebalanceData(
+ UUID.randomUUID(),
+ Optional.empty(),
+ util.Map.of(
+ "subtopology-0", new StreamsRebalanceData.Subtopology(
+ util.Set.of(inputTopic),
+ util.Set.of(),
+ util.Map.of(),
+ util.Map.of(inputTopic + "-store-changelog", new
StreamsRebalanceData.TopicInfo(Optional.of(1), Optional.empty(),
util.Map.of())),
+ util.Set.of()
+ )),
+ Map.empty[String, String].asJava
+ )
+
+ val consumer = createStreamsConsumer(
+ keyDeserializer = new
ByteArrayDeserializer().asInstanceOf[Deserializer[K]],
+ valueDeserializer = new
ByteArrayDeserializer().asInstanceOf[Deserializer[V]],
+ configOverrides = props,
+ streamsRebalanceData = streamsRebalanceData
+ )
+ consumer.subscribe(util.Set.of(inputTopic),
+ new StreamsRebalanceListener {
+ override def onTasksRevoked(tasks:
util.Set[StreamsRebalanceData.TaskId]): Optional[Exception] =
+ Optional.empty()
+ override def onTasksAssigned(assignment:
StreamsRebalanceData.Assignment): Optional[Exception] = {
+ Optional.empty()
+ }
+ override def onAllTasksLost(): Optional[Exception] =
+ Optional.empty()
+ })
+ consumer
+ }
+
def createAdminClient(
listenerName: ListenerName = listenerName,
configOverrides: Properties = new Properties
diff --git
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index 44835885e0c..286ac2b098c 100644
---
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -34,6 +34,7 @@ import org.apache.kafka.clients.HostResolver
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
import org.apache.kafka.clients.admin.ConfigEntry.ConfigSource
import org.apache.kafka.clients.admin._
+import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer
import org.apache.kafka.clients.consumer.{CommitFailedException, Consumer,
ConsumerConfig, GroupProtocol, KafkaConsumer, OffsetAndMetadata, ShareConsumer}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig,
ProducerRecord}
import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding,
AclBindingFilter, AclOperation, AclPermissionType}
@@ -2573,8 +2574,12 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
val consumerGroupId = "consumer_group_id"
val shareGroupId = "share_group_id"
val simpleGroupId = "simple_group_id"
+ val streamsGroupId = "streams_group_id"
val testTopicName = "test_topic"
+ val config = createConfig
+ client = Admin.create(config)
+
consumerConfig.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.CLASSIC.name)
val classicGroupConfig = new Properties(consumerConfig)
classicGroupConfig.put(ConsumerConfig.GROUP_ID_CONFIG, classicGroupId)
@@ -2589,8 +2594,11 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
shareGroupConfig.put(ConsumerConfig.GROUP_ID_CONFIG, shareGroupId)
val shareGroup = createShareConsumer(configOverrides = shareGroupConfig)
- val config = createConfig
- client = Admin.create(config)
+ val streamsGroup = createStreamsGroup(
+ inputTopic = testTopicName,
+ streamsGroupId = streamsGroupId
+ )
+
try {
client.createTopics(util.Set.of(
new NewTopic(testTopicName, 1, 1.toShort)
@@ -2604,6 +2612,7 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
consumerGroup.poll(JDuration.ofMillis(1000))
shareGroup.subscribe(util.Set.of(testTopicName))
shareGroup.poll(JDuration.ofMillis(1000))
+ streamsGroup.poll(JDuration.ofMillis(1000))
val alterConsumerGroupOffsetsResult =
client.alterConsumerGroupOffsets(simpleGroupId,
util.Map.of(topicPartition, new OffsetAndMetadata(0L)))
@@ -2612,18 +2621,27 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
TestUtils.waitUntilTrue(() => {
val groups = client.listGroups().all().get()
- groups.size() == 4
+ groups.size() == 5
}, "Expected to find all groups")
val classicGroupListing = new GroupListing(classicGroupId,
Optional.of(GroupType.CLASSIC), "consumer", Optional.of(GroupState.STABLE))
val consumerGroupListing = new GroupListing(consumerGroupId,
Optional.of(GroupType.CONSUMER), "consumer", Optional.of(GroupState.STABLE))
val shareGroupListing = new GroupListing(shareGroupId,
Optional.of(GroupType.SHARE), "share", Optional.of(GroupState.STABLE))
val simpleGroupListing = new GroupListing(simpleGroupId,
Optional.of(GroupType.CLASSIC), "", Optional.of(GroupState.EMPTY))
+ // Streams group could either be in STABLE or NOT_READY state
+ val streamsGroupListingStable = new GroupListing(streamsGroupId,
Optional.of(GroupType.STREAMS), "streams", Optional.of(GroupState.STABLE))
+ val streamsGroupListingNotReady = new GroupListing(streamsGroupId,
Optional.of(GroupType.STREAMS), "streams", Optional.of(GroupState.NOT_READY))
var listGroupsResult = client.listGroups()
assertTrue(listGroupsResult.errors().get().isEmpty)
- assertEquals(Set(classicGroupListing, simpleGroupListing,
consumerGroupListing, shareGroupListing),
listGroupsResult.all().get().asScala.toSet)
- assertEquals(Set(classicGroupListing, simpleGroupListing,
consumerGroupListing, shareGroupListing),
listGroupsResult.valid().get().asScala.toSet)
+
+ val expectedStreamListings = Set(streamsGroupListingStable,
streamsGroupListingNotReady)
+ val expectedListings = Set(classicGroupListing, simpleGroupListing,
consumerGroupListing, shareGroupListing)
+ val actualListings = listGroupsResult.all().get().asScala.toSet
+
+ // Check that actualListings contains all expectedListings and one of
the streams listings
+ assertTrue(expectedListings.subsetOf(actualListings))
+ assertTrue(actualListings.exists(expectedStreamListings.contains))
listGroupsResult = client.listGroups(new
ListGroupsOptions().withTypes(util.Set.of(GroupType.CLASSIC)))
assertTrue(listGroupsResult.errors().get().isEmpty)
@@ -2639,10 +2657,19 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
assertTrue(listGroupsResult.errors().get().isEmpty)
assertEquals(Set(shareGroupListing),
listGroupsResult.all().get().asScala.toSet)
assertEquals(Set(shareGroupListing),
listGroupsResult.valid().get().asScala.toSet)
+
+ listGroupsResult = client.listGroups(new
ListGroupsOptions().withTypes(util.Set.of(GroupType.STREAMS)))
+ assertTrue(listGroupsResult.errors().get().isEmpty)
+
assertTrue(listGroupsResult.all().get().asScala.toSet.equals(Set(streamsGroupListingStable))
||
+
listGroupsResult.all().get().asScala.toSet.equals(Set(streamsGroupListingNotReady)))
+
assertTrue(listGroupsResult.valid().get().asScala.toSet.equals(Set(streamsGroupListingStable))
||
+
listGroupsResult.valid().get().asScala.toSet.equals(Set(streamsGroupListingNotReady)))
+
} finally {
Utils.closeQuietly(classicGroup, "classicGroup")
Utils.closeQuietly(consumerGroup, "consumerGroup")
Utils.closeQuietly(shareGroup, "shareGroup")
+ Utils.closeQuietly(streamsGroup, "streamsGroup")
Utils.closeQuietly(client, "adminClient")
}
}
@@ -4363,6 +4390,137 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
}
}
}
+
+ @Test
+ def testDescribeStreamsGroups(): Unit = {
+ val streamsGroupId = "stream_group_id"
+ val testTopicName = "test_topic"
+ val testNumPartitions = 1
+
+ val config = createConfig
+ client = Admin.create(config)
+
+ prepareTopics(List(testTopicName), testNumPartitions)
+ prepareRecords(testTopicName)
+
+ val streams = createStreamsGroup(
+ inputTopic = testTopicName,
+ streamsGroupId = streamsGroupId
+ )
+ streams.poll(JDuration.ofMillis(500L))
+
+ try {
+ TestUtils.waitUntilTrue(() => {
+ val firstGroup = client.listGroups().all().get().stream()
+ .filter(g => g.groupId() == streamsGroupId).findFirst().orElse(null)
+ firstGroup.groupState().orElse(null) == GroupState.STABLE &&
firstGroup.groupId() == streamsGroupId
+ }, "Stream group not stable yet")
+
+ // Verify the describe call works correctly
+ val describedGroups =
client.describeStreamsGroups(util.List.of(streamsGroupId)).all().get()
+ val group = describedGroups.get(streamsGroupId)
+ assertNotNull(group)
+ assertEquals(streamsGroupId, group.groupId())
+ assertFalse(group.members().isEmpty)
+ assertNotNull(group.subtopologies())
+ assertFalse(group.subtopologies().isEmpty)
+
+ // Verify the topology contains the expected source and sink topics
+ val subtopologies = group.subtopologies().asScala
+ assertTrue(subtopologies.exists(subtopology =>
+ subtopology.sourceTopics().contains(testTopicName)))
+
+ // Test describing a non-existing group
+ val nonExistingGroup = "non_existing_stream_group"
+ val describedNonExistingGroupResponse =
client.describeStreamsGroups(util.List.of(nonExistingGroup))
+ assertFutureThrows(classOf[GroupIdNotFoundException],
describedNonExistingGroupResponse.all())
+
+ } finally {
+ Utils.closeQuietly(streams, "streams")
+ Utils.closeQuietly(client, "adminClient")
+ }
+ }
+
+ @Test
+ def testDeleteStreamsGroups(): Unit = {
+ val testTopicName = "test_topic"
+ val testNumPartitions = 3
+ val testNumStreamsGroup = 3
+
+ val targetDeletedGroups = util.List.of("stream_group_id_2",
"stream_group_id_3")
+ val targetRemainingGroups = util.List.of("stream_group_id_1")
+
+ val config = createConfig
+ client = Admin.create(config)
+
+ prepareTopics(List(testTopicName), testNumPartitions)
+ prepareRecords(testTopicName)
+
+ val streamsList = scala.collection.mutable.ListBuffer[(String,
AsyncKafkaConsumer[_,_])]()
+
+ try {
+ for (i <- 1 to testNumStreamsGroup) {
+ val streamsGroupId = s"stream_group_id_$i"
+
+ val streams = createStreamsGroup(
+ inputTopic = testTopicName,
+ streamsGroupId = streamsGroupId,
+ )
+ streams.poll(JDuration.ofMillis(500L))
+ streamsList += ((streamsGroupId, streams))
+ }
+
+ TestUtils.waitUntilTrue(() => {
+ val groups = client.listGroups().all().get()
+ groups.stream()
+ .anyMatch(g => g.groupId().startsWith("stream_group_id_")) &&
testNumStreamsGroup == groups.size()
+ }, "Streams groups not ready to delete yet")
+
+ // Test deletion of non-empty existing groups
+ var deleteStreamsGroupResult =
client.deleteStreamsGroups(targetDeletedGroups)
+ assertFutureThrows(classOf[GroupNotEmptyException],
deleteStreamsGroupResult.all())
+ assertEquals(2, deleteStreamsGroupResult.deletedGroups().size())
+
+ // Stop and clean up the streams for the groups that are going to be
deleted
+ streamsList
+ .filter { case (groupId, _) => targetDeletedGroups.contains(groupId) }
+ .foreach { case (_, streams) =>
+ streams.close()
+ }
+
+ val listTopicResult = client.listTopics()
+ assertEquals(2, listTopicResult.names().get().size())
+
+ // Test deletion of emptied existing streams groups
+ deleteStreamsGroupResult =
client.deleteStreamsGroups(targetDeletedGroups)
+ assertEquals(2, deleteStreamsGroupResult.deletedGroups().size())
+
+ // Wait for the deleted groups to be removed
+ TestUtils.waitUntilTrue(() => {
+ val groupIds =
client.listGroups().all().get().asScala.map(_.groupId()).toSet
+ targetDeletedGroups.asScala.forall(id => !groupIds.contains(id))
+ }, "Deleted groups not yet deleted")
+
+ // Verify that the deleted groups are no longer present
+ val remainingGroups = client.listGroups().all().get()
+ assertEquals(targetRemainingGroups.size(), remainingGroups.size())
+ remainingGroups.stream().forEach(g => {
+ assertTrue(targetRemainingGroups.contains(g.groupId()))
+ })
+
+ // Test deletion of a non-existing group
+ val nonExistingGroup = "non_existing_stream_group"
+ val deleteNonExistingGroupResult =
client.deleteStreamsGroups(util.List.of(nonExistingGroup))
+ assertFutureThrows(classOf[GroupIdNotFoundException],
deleteNonExistingGroupResult.all())
+ assertEquals(deleteNonExistingGroupResult.deletedGroups().size(), 1)
+
+ } finally{
+ streamsList.foreach { case (_, streams) =>
+ streams.close()
+ }
+ Utils.closeQuietly(client, "adminClient")
+ }
+ }
}
object PlaintextAdminIntegrationTest {