Copilot commented on code in PR #20244:
URL: https://github.com/apache/kafka/pull/20244#discussion_r2317179080


##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -2589,13 +2590,14 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     shareGroupConfig.put(ConsumerConfig.GROUP_ID_CONFIG, shareGroupId)
     val shareGroup = createShareConsumer(configOverrides = shareGroupConfig)
 
+    val streamsGroup = createStreamsGroup(
+      inputTopic = testTopicName,
+      streamsGroupId = streamsGroupId
+    )
+
     val config = createConfig
     client = Admin.create(config)
     try {

Review Comment:
   The topic creation code was removed but the test still references 
`testTopicName`. The topic needs to be created before it can be used by the 
groups.
   ```suggestion
       try {
         // Create the topic before using it
         val newTopic = new NewTopic(testTopicName, 1, 1.toShort)
         client.createTopics(util.Collections.singleton(newTopic)).all().get()
   ```



##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -4363,6 +4384,136 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
       }
     }
   }
+
+  @Test
+  def testDescribeStreamsGroups(): Unit = {
+    val streamsGroupId = "stream_group_id"
+    val testTopicName = "test_topic"
+    val testNumPartitions = 1
+
+    val config = createConfig
+    client = Admin.create(config)
+
+    prepareTopics(List(testTopicName), testNumPartitions)
+    prepareRecords(testTopicName)
+
+    val streams = createStreamsGroup(
+      inputTopic = testTopicName,
+      streamsGroupId = streamsGroupId
+    )
+    streams.poll(JDuration.ofMillis(500L))
+
+    try {
+      TestUtils.waitUntilTrue(() => {
+        val firstGroup = 
client.listGroups().all().get().stream().findFirst().orElse(null)
+        firstGroup.groupState().orElse(null) == GroupState.STABLE && 
firstGroup.groupId() == streamsGroupId

Review Comment:
   This logic assumes the first group in the list is the streams group, but 
there's no guarantee of ordering. It should filter for the specific 
`streamsGroupId` instead of using `findFirst()`.
   ```suggestion
           val streamsGroupOpt = client.listGroups().all().get().stream()
             .filter(g => g.groupId() == streamsGroupId)
             .findFirst()
             .orElse(null)
           streamsGroupOpt != null && streamsGroupOpt.groupState().orElse(null) 
== GroupState.STABLE
   ```



##########
core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala:
##########
@@ -56,6 +59,7 @@ abstract class IntegrationTestHarness extends 
KafkaServerTestHarness {
   val superuserClientConfig = new Properties
   val serverConfig = new Properties
   val controllerConfig = new Properties
+  var streamsGroupConfig = new Properties

Review Comment:
   The `streamsGroupConfig` variable is declared but never used in the code. 
Consider removing it or documenting its intended purpose.
   ```suggestion
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to