lucasbru commented on code in PR #20244:
URL: https://github.com/apache/kafka/pull/20244#discussion_r2306898719
##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -4363,6 +4401,162 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
}
}
}
+
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
+
@MethodSource(Array("getTestGroupProtocolParametersStreamsGroupProtocolOnly"))
+ def testDescribeStreamsGroups(groupProtocol: String): Unit = {
+ val streamsGroupId = "stream_group_id"
+ val testTopicName = "test_topic"
+ val testOutputTopicName = "test_output_topic"
+ val testNumPartitions = 1
+
+ val config = createConfig
+ client = Admin.create(config)
+
+ prepareTopics(List(testTopicName, testOutputTopicName), testNumPartitions)
+ prepareRecords(testTopicName)
+
+ val streamsConfig = new Properties()
+ streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+ streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000)
+ val streams = createStreamsGroup(
+ configOverrides = streamsConfig,
+ inputTopic = testTopicName,
+ outputTopic = testOutputTopicName,
+ streamsGroupId = streamsGroupId,
+ groupProtocol = groupProtocol
+ )
+
+ try {
+ streams.cleanUp()
+ streams.start()
+
+ TestUtils.waitUntilTrue(() => streams.state() ==
KafkaStreams.State.RUNNING, "Streams not in RUNNING state")
+
+ TestUtils.waitUntilTrue(() => {
+ val firstGroup =
client.listGroups().all().get().stream().findFirst().orElse(null)
+ firstGroup.groupState().orElse(null) == GroupState.STABLE &&
firstGroup.groupId() == streamsGroupId
+ }, "Stream group not stable yet")
+
+ // Verify the describe call works correctly
+ val describedGroups =
client.describeStreamsGroups(util.List.of(streamsGroupId)).all().get()
+ val group = describedGroups.get(streamsGroupId)
+ assertNotNull(group)
+ assertEquals(streamsGroupId, group.groupId())
+ assertFalse(group.members().isEmpty)
+ assertNotNull(group.subtopologies())
+ assertFalse(group.subtopologies().isEmpty)
+
+ // Verify the topology contains the expected source and sink topics
+ val subtopologies = group.subtopologies().asScala
+ assertTrue(subtopologies.exists(subtopology =>
+ subtopology.sourceTopics().contains(testTopicName)))
+
+ // Test describing a non-existing group
+ val nonExistingGroup = "non_existing_stream_group"
+ val describedNonExistingGroupResponse =
client.describeStreamsGroups(util.List.of(nonExistingGroup))
+ assertFutureThrows(classOf[GroupIdNotFoundException],
describedNonExistingGroupResponse.all())
+
+ } finally {
+ Utils.closeQuietly(streams, "streams")
+ Utils.closeQuietly(client, "adminClient")
+ }
+ }
+
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
+
@MethodSource(Array("getTestGroupProtocolParametersStreamsGroupProtocolOnly"))
Review Comment:
Why parametrize the test if there is only one variation? Could we just
remove the parameter?
##########
core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala:
##########
@@ -235,6 +241,41 @@ abstract class IntegrationTestHarness extends
KafkaServerTestHarness {
streamsConsumer
}
+ def createStreamsGroup[K, V](configOverrides: Properties = new Properties,
Review Comment:
I wonder if could create the streams groups without adding a dependency to
the streams package - this seems to create an extra dependency that seems quite
circular. It's not too bad since its a testDependency only.
Can you check AuthorizerIntegrationTest.createStreamsGroupToDescribe and
check how I created a streams groups with input topic / output topic and also a
changelog topic? This should work here as well.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]