Copilot commented on code in PR #20099: URL: https://github.com/apache/kafka/pull/20099#discussion_r2185196020
########## tools/src/test/java/org/apache/kafka/tools/streams/DescribeStreamsGroupTest.java: ########## @@ -170,21 +199,87 @@ public void testDescribeStreamsGroupWithMembersAndVerboseOptions() throws Except final List<Integer> dontCares = List.of(3, 6, 7); validateDescribeOutput( - Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), "--describe", "--members", "--verbose", "--group", APP_ID), expectedHeader, expectedRows, dontCares); + Arrays.asList("--bootstrap-server", bootstrapServers, "--describe", "--members", "--verbose", "--group", APP_ID), expectedHeader, expectedRows, dontCares); validateDescribeOutput( - Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), "--describe", "--verbose", "--members", "--group", APP_ID), expectedHeader, expectedRows, dontCares); + Arrays.asList("--bootstrap-server", bootstrapServers, "--describe", "--verbose", "--members", "--group", APP_ID), expectedHeader, expectedRows, dontCares); + } + + @Test + public void testDescribeMultipleStreamsGroupWithMembersAndVerboseOptions() throws Exception { + KafkaStreams streams2 = new KafkaStreams(topology(INPUT_TOPIC_2, OUTPUT_TOPIC_2), streamsProp(APP_ID_2)); + startApplicationAndWaitUntilRunning(streams2); + + final List<String> expectedHeader = List.of("GROUP", "TARGET-ASSIGNMENT-EPOCH", "TOPOLOGY-EPOCH", "MEMBER", "MEMBER-PROTOCOL", "MEMBER-EPOCH", "PROCESS", "CLIENT-ID", "ASSIGNMENTS"); + final Set<List<String>> expectedRows1 = Set.of( + List.of(APP_ID, "3", "0", "", "streams", "3", "", "", "ACTIVE:", "0:[0,1];", "TARGET-ACTIVE:", "0:[0,1];"), + List.of(APP_ID, "3", "0", "", "streams", "3", "", "", "ACTIVE:", "1:[0,1];", "TARGET-ACTIVE:", "1:[0,1];")); Review Comment: The first expected row has 12 elements but the header defines 9 columns; this mismatch will cause the test to fail. Combine multiple assignment values into the single ASSIGNMENTS column or adjust the header to match. ```suggestion List.of(APP_ID, "3", "0", "", "streams", "3", "", "", "ACTIVE: 0:[0,1]; TARGET-ACTIVE: 0:[0,1];"), List.of(APP_ID, "3", "0", "", "streams", "3", "", "", "ACTIVE: 1:[0,1]; TARGET-ACTIVE: 1:[0,1];")); ``` ########## tools/src/test/java/org/apache/kafka/tools/streams/DescribeStreamsGroupTest.java: ########## @@ -170,21 +199,87 @@ public void testDescribeStreamsGroupWithMembersAndVerboseOptions() throws Except final List<Integer> dontCares = List.of(3, 6, 7); validateDescribeOutput( - Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), "--describe", "--members", "--verbose", "--group", APP_ID), expectedHeader, expectedRows, dontCares); + Arrays.asList("--bootstrap-server", bootstrapServers, "--describe", "--members", "--verbose", "--group", APP_ID), expectedHeader, expectedRows, dontCares); validateDescribeOutput( - Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), "--describe", "--verbose", "--members", "--group", APP_ID), expectedHeader, expectedRows, dontCares); + Arrays.asList("--bootstrap-server", bootstrapServers, "--describe", "--verbose", "--members", "--group", APP_ID), expectedHeader, expectedRows, dontCares); + } + + @Test + public void testDescribeMultipleStreamsGroupWithMembersAndVerboseOptions() throws Exception { + KafkaStreams streams2 = new KafkaStreams(topology(INPUT_TOPIC_2, OUTPUT_TOPIC_2), streamsProp(APP_ID_2)); + startApplicationAndWaitUntilRunning(streams2); + + final List<String> expectedHeader = List.of("GROUP", "TARGET-ASSIGNMENT-EPOCH", "TOPOLOGY-EPOCH", "MEMBER", "MEMBER-PROTOCOL", "MEMBER-EPOCH", "PROCESS", "CLIENT-ID", "ASSIGNMENTS"); + final Set<List<String>> expectedRows1 = Set.of( + List.of(APP_ID, "3", "0", "", "streams", "3", "", "", "ACTIVE:", "0:[0,1];", "TARGET-ACTIVE:", "0:[0,1];"), + List.of(APP_ID, "3", "0", "", "streams", "3", "", "", "ACTIVE:", "1:[0,1];", "TARGET-ACTIVE:", "1:[0,1];")); + final Set<List<String>> expectedRows2 = Set.of( + List.of(APP_ID_2, "2", "0", "dont-care", "streams", "2", "", ""), Review Comment: The second expected row has only 8 elements but the header lists 9 columns; the missing ASSIGNMENTS value should be added or the row adjusted to match the header. ```suggestion List.of(APP_ID_2, "2", "0", "dont-care", "streams", "2", "", "", ""), ``` ########## tools/src/test/java/org/apache/kafka/tools/streams/DescribeStreamsGroupTest.java: ########## @@ -90,6 +97,28 @@ public static void closeCluster() { cluster = null; } + @Test + public void testDescribeWithUnrecognizedOption() { + String[] args = new String[]{"--unrecognized-option", "--bootstrap-server", bootstrapServers, "--describe", "--group", APP_ID}; + assertThrows(OptionException.class, () -> getStreamsGroupService(args)); + } + + @Test + public void testDescribeWithoutGroupOption() { + final String[] args = new String[]{"--bootstrap-server", bootstrapServers, "--describe"}; + AtomicBoolean exited = new AtomicBoolean(false); Review Comment: [nitpick] The test overrides the global Exit procedure but does not restore the original. Consider resetting the default exit procedure at the end of the test to avoid side effects on other tests. ```suggestion AtomicBoolean exited = new AtomicBoolean(false); Exit.Procedure originalExitProcedure = Exit.getExitProcedure(); // Save the original Exit procedure ``` ########## tools/src/test/java/org/apache/kafka/tools/streams/DescribeStreamsGroupTest.java: ########## @@ -170,21 +199,87 @@ public void testDescribeStreamsGroupWithMembersAndVerboseOptions() throws Except final List<Integer> dontCares = List.of(3, 6, 7); validateDescribeOutput( - Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), "--describe", "--members", "--verbose", "--group", APP_ID), expectedHeader, expectedRows, dontCares); + Arrays.asList("--bootstrap-server", bootstrapServers, "--describe", "--members", "--verbose", "--group", APP_ID), expectedHeader, expectedRows, dontCares); validateDescribeOutput( - Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), "--describe", "--verbose", "--members", "--group", APP_ID), expectedHeader, expectedRows, dontCares); + Arrays.asList("--bootstrap-server", bootstrapServers, "--describe", "--verbose", "--members", "--group", APP_ID), expectedHeader, expectedRows, dontCares); + } + + @Test + public void testDescribeMultipleStreamsGroupWithMembersAndVerboseOptions() throws Exception { + KafkaStreams streams2 = new KafkaStreams(topology(INPUT_TOPIC_2, OUTPUT_TOPIC_2), streamsProp(APP_ID_2)); + startApplicationAndWaitUntilRunning(streams2); + + final List<String> expectedHeader = List.of("GROUP", "TARGET-ASSIGNMENT-EPOCH", "TOPOLOGY-EPOCH", "MEMBER", "MEMBER-PROTOCOL", "MEMBER-EPOCH", "PROCESS", "CLIENT-ID", "ASSIGNMENTS"); + final Set<List<String>> expectedRows1 = Set.of( + List.of(APP_ID, "3", "0", "", "streams", "3", "", "", "ACTIVE:", "0:[0,1];", "TARGET-ACTIVE:", "0:[0,1];"), + List.of(APP_ID, "3", "0", "", "streams", "3", "", "", "ACTIVE:", "1:[0,1];", "TARGET-ACTIVE:", "1:[0,1];")); + final Set<List<String>> expectedRows2 = Set.of( + List.of(APP_ID_2, "2", "0", "dont-care", "streams", "2", "", ""), + List.of(APP_ID_2, "2", "0", "", "streams", "2", "", "")); + final Map<String, Set<List<String>>> expectedRowsMap = new HashMap<>(); + expectedRowsMap.put(APP_ID, expectedRows1); + expectedRowsMap.put(APP_ID_2, expectedRows2); + + // The member and process names as well as client-id are not deterministic, so we don't care about them. + final List<Integer> dontCares = List.of(3, 6, 7); + + validateDescribeOutput( + Arrays.asList("--bootstrap-server", bootstrapServers, "--describe", "--members", "--verbose", "--group", APP_ID, "--group", APP_ID_2), + expectedHeader, expectedRowsMap, dontCares); + validateDescribeOutput( + Arrays.asList("--bootstrap-server", bootstrapServers, "--describe", "--verbose", "--members", "--group", APP_ID, "--group", APP_ID_2), + expectedHeader, expectedRowsMap, dontCares); + validateDescribeOutput( + Arrays.asList("--bootstrap-server", bootstrapServers, "--describe", "--verbose", "--members", "--all-groups"), + expectedHeader, expectedRowsMap, dontCares); + + streams2.close(); + streams2.cleanUp(); + } + + @Test + public void testDescribeNonExistingStreamsGroup() { + final String nonExistingGroup = "non-existing-group"; + final String errorMessage = String.format( + "Error: Executing streams group command failed due to org.apache.kafka.common.errors.GroupIdNotFoundException: Group %s not found.", + nonExistingGroup); + + validateDescribeOutput( + Arrays.asList("--bootstrap-server", bootstrapServers, "--describe", "--members", "--verbose", "--group", nonExistingGroup), errorMessage); + validateDescribeOutput( + Arrays.asList("--bootstrap-server", bootstrapServers, "--describe", "--verbose", "--members", "--group", nonExistingGroup), errorMessage); Review Comment: This call is duplicated immediately below with identical arguments. Remove the redundant invocation or replace it with a test of `--all-groups` to cover that code path. ```suggestion ``` ########## tools/src/test/java/org/apache/kafka/tools/streams/DescribeStreamsGroupTest.java: ########## @@ -227,4 +327,69 @@ private static void validateDescribeOutput( ); }, () -> String.format("Expected header=%s and groups=%s, but found:%n%s", expectedHeader, expectedRows, out.get())); } + + private static void validateDescribeOutput( + List<String> args, + List<String> expectedHeader, + Map<String, Set<List<String>>> expectedRows, + List<Integer> dontCareIndices + ) throws InterruptedException { + final AtomicReference<String> out = new AtomicReference<>(""); + TestUtils.waitForCondition(() -> { + String output = ToolsTestUtils.grabConsoleOutput(() -> StreamsGroupCommand.main(args.toArray(new String[0]))); + out.set(output); + + String[] lines = output.split("\n"); + if (lines.length == 1 && lines[0].isEmpty()) lines = new String[]{}; + + if (lines.length == 0) return false; + List<String> header = Arrays.asList(lines[0].split("\\s+")); + if (!expectedHeader.equals(header)) return false; + + Map<String, Set<List<String>>> groupdescMap = splitOutputByGroup(lines); + + if (groupdescMap.size() != expectedRows.size()) return false; + + // clear the dontCare fields and then compare two sets + boolean compareResult = true; + for (Map.Entry<String, Set<List<String>>> entry : groupdescMap.entrySet()) { + String group = entry.getKey(); + Set<List<String>> groupDesc = entry.getValue(); + if (!expectedRows.containsKey(group)) return false; + Set<List<String>> expectedGroupDesc = expectedRows.get(group); + if (expectedGroupDesc.size() != groupDesc.size()) + compareResult = false; + for (List<String> list : groupDesc) { + List<String> listCloned = new ArrayList<>(list); + dontCareIndices.forEach(index -> listCloned.set(index, "")); + if (!expectedGroupDesc.contains(listCloned)) { + compareResult = false; + } + } + } + + return compareResult; + }, () -> String.format("Expected header=%s and groups=%s, but found:%n%s", expectedHeader, expectedRows, out.get())); + } + + private static Map<String, Set<List<String>>> splitOutputByGroup(String[] lines) { + Map<String, Set<List<String>>> groupdescMap = new HashMap<>(); + String headerLine = lines[0].replaceAll(" ", ""); Review Comment: [nitpick] Replacing only space characters may miss other whitespace (e.g., tabs). Use `replaceAll("\s+", "")` to normalize all whitespace. ```suggestion String headerLine = lines[0].replaceAll("\\s+", ""); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org