Copilot commented on code in PR #20099:
URL: https://github.com/apache/kafka/pull/20099#discussion_r2185196020


##########
tools/src/test/java/org/apache/kafka/tools/streams/DescribeStreamsGroupTest.java:
##########
@@ -170,21 +199,87 @@ public void 
testDescribeStreamsGroupWithMembersAndVerboseOptions() throws Except
         final List<Integer> dontCares = List.of(3, 6, 7);
 
         validateDescribeOutput(
-            Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), 
"--describe", "--members", "--verbose", "--group", APP_ID), expectedHeader, 
expectedRows, dontCares);
+            Arrays.asList("--bootstrap-server", bootstrapServers, 
"--describe", "--members", "--verbose", "--group", APP_ID), expectedHeader, 
expectedRows, dontCares);
         validateDescribeOutput(
-            Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), 
"--describe", "--verbose", "--members", "--group", APP_ID), expectedHeader, 
expectedRows, dontCares);
+            Arrays.asList("--bootstrap-server", bootstrapServers, 
"--describe", "--verbose", "--members", "--group", APP_ID), expectedHeader, 
expectedRows, dontCares);
+    }
+
+    @Test
+    public void testDescribeMultipleStreamsGroupWithMembersAndVerboseOptions() 
throws Exception {
+        KafkaStreams streams2 = new KafkaStreams(topology(INPUT_TOPIC_2, 
OUTPUT_TOPIC_2), streamsProp(APP_ID_2));
+        startApplicationAndWaitUntilRunning(streams2);
+
+        final List<String> expectedHeader = List.of("GROUP", 
"TARGET-ASSIGNMENT-EPOCH", "TOPOLOGY-EPOCH", "MEMBER", "MEMBER-PROTOCOL", 
"MEMBER-EPOCH", "PROCESS", "CLIENT-ID", "ASSIGNMENTS");
+        final Set<List<String>> expectedRows1 = Set.of(
+            List.of(APP_ID, "3", "0", "", "streams", "3", "", "", "ACTIVE:", 
"0:[0,1];", "TARGET-ACTIVE:", "0:[0,1];"),
+            List.of(APP_ID, "3", "0", "", "streams", "3", "", "", "ACTIVE:", 
"1:[0,1];", "TARGET-ACTIVE:", "1:[0,1];"));

Review Comment:
   The first expected row has 12 elements but the header defines 9 columns; 
this mismatch will cause the test to fail. Combine multiple assignment values 
into the single ASSIGNMENTS column or adjust the header to match.
   ```suggestion
               List.of(APP_ID, "3", "0", "", "streams", "3", "", "", "ACTIVE: 
0:[0,1]; TARGET-ACTIVE: 0:[0,1];"),
               List.of(APP_ID, "3", "0", "", "streams", "3", "", "", "ACTIVE: 
1:[0,1]; TARGET-ACTIVE: 1:[0,1];"));
   ```



##########
tools/src/test/java/org/apache/kafka/tools/streams/DescribeStreamsGroupTest.java:
##########
@@ -170,21 +199,87 @@ public void 
testDescribeStreamsGroupWithMembersAndVerboseOptions() throws Except
         final List<Integer> dontCares = List.of(3, 6, 7);
 
         validateDescribeOutput(
-            Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), 
"--describe", "--members", "--verbose", "--group", APP_ID), expectedHeader, 
expectedRows, dontCares);
+            Arrays.asList("--bootstrap-server", bootstrapServers, 
"--describe", "--members", "--verbose", "--group", APP_ID), expectedHeader, 
expectedRows, dontCares);
         validateDescribeOutput(
-            Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), 
"--describe", "--verbose", "--members", "--group", APP_ID), expectedHeader, 
expectedRows, dontCares);
+            Arrays.asList("--bootstrap-server", bootstrapServers, 
"--describe", "--verbose", "--members", "--group", APP_ID), expectedHeader, 
expectedRows, dontCares);
+    }
+
+    @Test
+    public void testDescribeMultipleStreamsGroupWithMembersAndVerboseOptions() 
throws Exception {
+        KafkaStreams streams2 = new KafkaStreams(topology(INPUT_TOPIC_2, 
OUTPUT_TOPIC_2), streamsProp(APP_ID_2));
+        startApplicationAndWaitUntilRunning(streams2);
+
+        final List<String> expectedHeader = List.of("GROUP", 
"TARGET-ASSIGNMENT-EPOCH", "TOPOLOGY-EPOCH", "MEMBER", "MEMBER-PROTOCOL", 
"MEMBER-EPOCH", "PROCESS", "CLIENT-ID", "ASSIGNMENTS");
+        final Set<List<String>> expectedRows1 = Set.of(
+            List.of(APP_ID, "3", "0", "", "streams", "3", "", "", "ACTIVE:", 
"0:[0,1];", "TARGET-ACTIVE:", "0:[0,1];"),
+            List.of(APP_ID, "3", "0", "", "streams", "3", "", "", "ACTIVE:", 
"1:[0,1];", "TARGET-ACTIVE:", "1:[0,1];"));
+        final Set<List<String>> expectedRows2 = Set.of(
+            List.of(APP_ID_2, "2", "0", "dont-care", "streams", "2", "", ""),

Review Comment:
   The second expected row has only 8 elements but the header lists 9 columns; 
the missing ASSIGNMENTS value should be added or the row adjusted to match the 
header.
   ```suggestion
               List.of(APP_ID_2, "2", "0", "dont-care", "streams", "2", "", "", 
""),
   ```



##########
tools/src/test/java/org/apache/kafka/tools/streams/DescribeStreamsGroupTest.java:
##########
@@ -90,6 +97,28 @@ public static void closeCluster() {
         cluster = null;
     }
 
+    @Test
+    public void testDescribeWithUnrecognizedOption() {
+        String[] args = new String[]{"--unrecognized-option", 
"--bootstrap-server", bootstrapServers, "--describe", "--group", APP_ID};
+        assertThrows(OptionException.class, () -> 
getStreamsGroupService(args));
+    }
+
+    @Test
+    public void testDescribeWithoutGroupOption() {
+        final String[] args = new String[]{"--bootstrap-server", 
bootstrapServers, "--describe"};
+        AtomicBoolean exited = new AtomicBoolean(false);

Review Comment:
   [nitpick] The test overrides the global Exit procedure but does not restore 
the original. Consider resetting the default exit procedure at the end of the 
test to avoid side effects on other tests.
   ```suggestion
           AtomicBoolean exited = new AtomicBoolean(false);
           Exit.Procedure originalExitProcedure = Exit.getExitProcedure(); // 
Save the original Exit procedure
   ```



##########
tools/src/test/java/org/apache/kafka/tools/streams/DescribeStreamsGroupTest.java:
##########
@@ -170,21 +199,87 @@ public void 
testDescribeStreamsGroupWithMembersAndVerboseOptions() throws Except
         final List<Integer> dontCares = List.of(3, 6, 7);
 
         validateDescribeOutput(
-            Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), 
"--describe", "--members", "--verbose", "--group", APP_ID), expectedHeader, 
expectedRows, dontCares);
+            Arrays.asList("--bootstrap-server", bootstrapServers, 
"--describe", "--members", "--verbose", "--group", APP_ID), expectedHeader, 
expectedRows, dontCares);
         validateDescribeOutput(
-            Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), 
"--describe", "--verbose", "--members", "--group", APP_ID), expectedHeader, 
expectedRows, dontCares);
+            Arrays.asList("--bootstrap-server", bootstrapServers, 
"--describe", "--verbose", "--members", "--group", APP_ID), expectedHeader, 
expectedRows, dontCares);
+    }
+
+    @Test
+    public void testDescribeMultipleStreamsGroupWithMembersAndVerboseOptions() 
throws Exception {
+        KafkaStreams streams2 = new KafkaStreams(topology(INPUT_TOPIC_2, 
OUTPUT_TOPIC_2), streamsProp(APP_ID_2));
+        startApplicationAndWaitUntilRunning(streams2);
+
+        final List<String> expectedHeader = List.of("GROUP", 
"TARGET-ASSIGNMENT-EPOCH", "TOPOLOGY-EPOCH", "MEMBER", "MEMBER-PROTOCOL", 
"MEMBER-EPOCH", "PROCESS", "CLIENT-ID", "ASSIGNMENTS");
+        final Set<List<String>> expectedRows1 = Set.of(
+            List.of(APP_ID, "3", "0", "", "streams", "3", "", "", "ACTIVE:", 
"0:[0,1];", "TARGET-ACTIVE:", "0:[0,1];"),
+            List.of(APP_ID, "3", "0", "", "streams", "3", "", "", "ACTIVE:", 
"1:[0,1];", "TARGET-ACTIVE:", "1:[0,1];"));
+        final Set<List<String>> expectedRows2 = Set.of(
+            List.of(APP_ID_2, "2", "0", "dont-care", "streams", "2", "", ""),
+            List.of(APP_ID_2, "2", "0", "", "streams", "2", "", ""));
+        final Map<String, Set<List<String>>> expectedRowsMap = new HashMap<>();
+        expectedRowsMap.put(APP_ID, expectedRows1);
+        expectedRowsMap.put(APP_ID_2, expectedRows2);
+
+        // The member and process names as well as client-id are not 
deterministic, so we don't care about them.
+        final List<Integer> dontCares = List.of(3, 6, 7);
+
+        validateDescribeOutput(
+            Arrays.asList("--bootstrap-server", bootstrapServers, 
"--describe", "--members", "--verbose", "--group", APP_ID, "--group", APP_ID_2),
+            expectedHeader, expectedRowsMap, dontCares);
+        validateDescribeOutput(
+            Arrays.asList("--bootstrap-server", bootstrapServers, 
"--describe", "--verbose", "--members", "--group", APP_ID, "--group", APP_ID_2),
+            expectedHeader, expectedRowsMap, dontCares);
+        validateDescribeOutput(
+            Arrays.asList("--bootstrap-server", bootstrapServers, 
"--describe", "--verbose", "--members", "--all-groups"),
+            expectedHeader, expectedRowsMap, dontCares);
+
+        streams2.close();
+        streams2.cleanUp();
+    }
+
+    @Test
+    public void testDescribeNonExistingStreamsGroup() {
+        final String nonExistingGroup = "non-existing-group";
+        final String errorMessage = String.format(
+            "Error: Executing streams group command failed due to 
org.apache.kafka.common.errors.GroupIdNotFoundException: Group %s not found.",
+            nonExistingGroup);
+
+        validateDescribeOutput(
+            Arrays.asList("--bootstrap-server", bootstrapServers, 
"--describe", "--members", "--verbose", "--group", nonExistingGroup), 
errorMessage);
+        validateDescribeOutput(
+            Arrays.asList("--bootstrap-server", bootstrapServers, 
"--describe", "--verbose", "--members", "--group", nonExistingGroup), 
errorMessage);

Review Comment:
   This call is duplicated immediately below with identical arguments. Remove 
the redundant invocation or replace it with a test of `--all-groups` to cover 
that code path.
   ```suggestion
   
   ```



##########
tools/src/test/java/org/apache/kafka/tools/streams/DescribeStreamsGroupTest.java:
##########
@@ -227,4 +327,69 @@ private static void validateDescribeOutput(
                 );
         }, () -> String.format("Expected header=%s and groups=%s, but 
found:%n%s", expectedHeader, expectedRows, out.get()));
     }
+
+    private static void validateDescribeOutput(
+        List<String> args,
+        List<String> expectedHeader,
+        Map<String, Set<List<String>>> expectedRows,
+        List<Integer> dontCareIndices
+    ) throws InterruptedException {
+        final AtomicReference<String> out = new AtomicReference<>("");
+        TestUtils.waitForCondition(() -> {
+            String output = ToolsTestUtils.grabConsoleOutput(() -> 
StreamsGroupCommand.main(args.toArray(new String[0])));
+            out.set(output);
+
+            String[] lines = output.split("\n");
+            if (lines.length == 1 && lines[0].isEmpty()) lines = new 
String[]{};
+
+            if (lines.length == 0) return false;
+            List<String> header = Arrays.asList(lines[0].split("\\s+"));
+            if (!expectedHeader.equals(header)) return false;
+
+            Map<String, Set<List<String>>> groupdescMap = 
splitOutputByGroup(lines);
+
+            if (groupdescMap.size() != expectedRows.size()) return false;
+
+            // clear the dontCare fields and then compare two sets
+            boolean compareResult = true;
+            for (Map.Entry<String, Set<List<String>>> entry : 
groupdescMap.entrySet()) {
+                String group = entry.getKey();
+                Set<List<String>> groupDesc = entry.getValue();
+                if (!expectedRows.containsKey(group)) return false;
+                Set<List<String>> expectedGroupDesc = expectedRows.get(group);
+                if (expectedGroupDesc.size() != groupDesc.size())
+                    compareResult = false;
+                for (List<String> list : groupDesc) {
+                    List<String> listCloned = new ArrayList<>(list);
+                    dontCareIndices.forEach(index -> listCloned.set(index, 
""));
+                    if (!expectedGroupDesc.contains(listCloned)) {
+                        compareResult = false;
+                    }
+                }
+            }
+
+            return compareResult;
+        }, () -> String.format("Expected header=%s and groups=%s, but 
found:%n%s", expectedHeader, expectedRows, out.get()));
+    }
+
+    private static Map<String, Set<List<String>>> splitOutputByGroup(String[] 
lines) {
+        Map<String, Set<List<String>>> groupdescMap = new HashMap<>();
+        String headerLine = lines[0].replaceAll(" ", "");

Review Comment:
   [nitpick] Replacing only space characters may miss other whitespace (e.g., 
tabs). Use `replaceAll("\s+", "")` to normalize all whitespace.
   ```suggestion
           String headerLine = lines[0].replaceAll("\\s+", "");
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to