Copilot commented on code in PR #20244:
URL: https://github.com/apache/kafka/pull/20244#discussion_r2317179080
##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -2589,13 +2590,14 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
shareGroupConfig.put(ConsumerConfig.GROUP_ID_CONFIG, shareGroupId)
val shareGroup = createShareConsumer(configOverrides = shareGroupConfig)
+ val streamsGroup = createStreamsGroup(
+ inputTopic = testTopicName,
+ streamsGroupId = streamsGroupId
+ )
+
val config = createConfig
client = Admin.create(config)
try {
Review Comment:
The topic creation code was removed but the test still references
`testTopicName`. The topic needs to be created before it can be used by the
groups.
```suggestion
try {
// Create the topic before using it
val newTopic = new NewTopic(testTopicName, 1, 1.toShort)
client.createTopics(util.Collections.singleton(newTopic)).all().get()
```
##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -4363,6 +4384,136 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
}
}
}
+
+ @Test
+ def testDescribeStreamsGroups(): Unit = {
+ val streamsGroupId = "stream_group_id"
+ val testTopicName = "test_topic"
+ val testNumPartitions = 1
+
+ val config = createConfig
+ client = Admin.create(config)
+
+ prepareTopics(List(testTopicName), testNumPartitions)
+ prepareRecords(testTopicName)
+
+ val streams = createStreamsGroup(
+ inputTopic = testTopicName,
+ streamsGroupId = streamsGroupId
+ )
+ streams.poll(JDuration.ofMillis(500L))
+
+ try {
+ TestUtils.waitUntilTrue(() => {
+ val firstGroup =
client.listGroups().all().get().stream().findFirst().orElse(null)
+ firstGroup.groupState().orElse(null) == GroupState.STABLE &&
firstGroup.groupId() == streamsGroupId
Review Comment:
This logic assumes the first group in the list is the streams group, but
there's no guarantee of ordering. It should filter for the specific
`streamsGroupId` instead of using `findFirst()`.
```suggestion
val streamsGroupOpt = client.listGroups().all().get().stream()
.filter(g => g.groupId() == streamsGroupId)
.findFirst()
.orElse(null)
streamsGroupOpt != null && streamsGroupOpt.groupState().orElse(null)
== GroupState.STABLE
```
##########
core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala:
##########
@@ -56,6 +59,7 @@ abstract class IntegrationTestHarness extends
KafkaServerTestHarness {
val superuserClientConfig = new Properties
val serverConfig = new Properties
val controllerConfig = new Properties
+ var streamsGroupConfig = new Properties
Review Comment:
The `streamsGroupConfig` variable is declared but never used in the code.
Consider removing it or documenting its intended purpose.
```suggestion
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]