AndrewJSchofield commented on code in PR #17712:
URL: https://github.com/apache/kafka/pull/17712#discussion_r1837630024


##########
tools/src/test/java/org/apache/kafka/tools/GroupsCommandTest.java:
##########
@@ -357,6 +382,122 @@ public void testListGroupsFailsWithException() {
         )));
     }
 
+    @SuppressWarnings("NPathComplexity")
+    @ClusterTest(
+        serverProperties = {
+            @ClusterConfigProperty(key = 
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = 
"classic,consumer,share"),
+            @ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+            @ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
+        }
+    )
+    public void testGroupCommand(ClusterInstance clusterInstance) throws 
Exception {
+        String topic = "topic";
+        String classicGroupId = "classic_group";
+        String consumerGroupId = "consumer_group";
+        String shareGroupId = "share_group";
+        String simpleGroupId = "simple_group";
+        clusterInstance.createTopic("topic", 1, (short) 1);
+        TopicPartition topicPartition = new TopicPartition(topic, 0);
+
+        Properties props = new Properties();
+        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
clusterInstance.bootstrapServers());
+
+        try (KafkaConsumer<String, String> classicGroup = 
createKafkaConsumer(clusterInstance, classicGroupId, GroupProtocol.CLASSIC);
+             KafkaConsumer<String, String> consumerGroup = 
createKafkaConsumer(clusterInstance, consumerGroupId, GroupProtocol.CONSUMER);
+             KafkaShareConsumer<String, String> shareGroup = 
createKafkaShareConsumer(clusterInstance, shareGroupId);
+             Admin admin = clusterInstance.createAdminClient();
+             GroupsCommand.GroupsService groupsCommand = new 
GroupsCommand.GroupsService(props)
+        ) {
+            classicGroup.subscribe(List.of(topic));
+            classicGroup.poll(Duration.ofMillis(1000));
+            consumerGroup.subscribe(List.of(topic));
+            consumerGroup.poll(Duration.ofMillis(1000));
+            shareGroup.subscribe(List.of(topic));
+            shareGroup.poll(Duration.ofMillis(1000));
+
+            AlterConsumerGroupOffsetsResult result = 
admin.alterConsumerGroupOffsets(simpleGroupId, Map.of(topicPartition, new 
OffsetAndMetadata(0L)));
+            assertNull(result.all().get());
+
+            TestUtils.waitForCondition(() -> {
+                Map.Entry<String, String> res = 
ToolsTestUtils.grabConsoleOutputAndError(() ->
+                    assertDoesNotThrow(() -> groupsCommand.listGroups(new 
GroupsCommand.GroupsCommandOptions(
+                        List.of("--bootstrap-server", 
clusterInstance.bootstrapServers(), "--list").toArray(new String[0])))));
+                if (res.getKey().split("\n").length == 5 && 
res.getValue().isEmpty()) {
+                    assertCapturedListOutput(res.getKey(),
+                        new String[]{classicGroupId, "Classic", "consumer"},
+                        new String[]{consumerGroupId, "Consumer", "consumer"},
+                        new String[]{simpleGroupId, "Classic"},
+                        new String[]{shareGroupId, "Share", "share"});
+                    return true;
+                }
+                return false;
+            }, "Waiting for listing groups to return all groups");
+
+            TestUtils.waitForCondition(() -> {
+                Map.Entry<String, String> res = 
ToolsTestUtils.grabConsoleOutputAndError(() ->
+                    assertDoesNotThrow(() -> groupsCommand.listGroups(new 
GroupsCommand.GroupsCommandOptions(
+                        List.of("--bootstrap-server", 
clusterInstance.bootstrapServers(), "--list", "--consumer").toArray(new 
String[0])))));
+                if (res.getKey().split("\n").length == 4 && 
res.getValue().isEmpty()) {
+                    assertCapturedListOutput(res.getKey(),
+                        new String[]{classicGroupId, "Classic", "consumer"},
+                        new String[]{consumerGroupId, "Consumer", "consumer"},
+                        new String[]{simpleGroupId, "Classic"});
+                    return true;
+                }
+                return false;
+            }, "Waiting for listing groups to return consumer protocol 
groups");
+
+            TestUtils.waitForCondition(() -> {
+                Map.Entry<String, String> res = 
ToolsTestUtils.grabConsoleOutputAndError(() ->
+                    assertDoesNotThrow(() -> groupsCommand.listGroups(new 
GroupsCommand.GroupsCommandOptions(
+                        List.of("--bootstrap-server", 
clusterInstance.bootstrapServers(), "--list", "--group-type", 
"classic").toArray(new String[0])))));
+                if (res.getKey().split("\n").length == 3 && 
res.getValue().isEmpty()) {
+                    assertCapturedListOutput(res.getKey(),
+                        new String[]{classicGroupId, "Classic", "consumer"},
+                        new String[]{simpleGroupId, "Classic"});
+                    return true;
+                }
+                return false;
+            }, "Waiting for listing groups to return classic type groups");
+
+            TestUtils.waitForCondition(() -> {
+                Map.Entry<String, String> res = 
ToolsTestUtils.grabConsoleOutputAndError(() ->
+                    assertDoesNotThrow(() -> groupsCommand.listGroups(new 
GroupsCommand.GroupsCommandOptions(
+                        List.of("--bootstrap-server", 
clusterInstance.bootstrapServers(), "--list", "--group-type", 
"consumer").toArray(new String[0])))));
+                if (res.getKey().split("\n").length == 2 && 
res.getValue().isEmpty()) {
+                    assertCapturedListOutput(res.getKey(),
+                        new String[]{consumerGroupId, "Consumer", "consumer"});
+                    return true;
+                }
+                return false;
+            }, "Waiting for listing groups to return consumer type groups");
+
+            TestUtils.waitForCondition(() -> {
+                Map.Entry<String, String> res = 
ToolsTestUtils.grabConsoleOutputAndError(() ->
+                    assertDoesNotThrow(() -> groupsCommand.listGroups(new 
GroupsCommand.GroupsCommandOptions(
+                        List.of("--bootstrap-server", 
clusterInstance.bootstrapServers(), "--list", "--group-type", 
"share").toArray(new String[0])))));
+                if (res.getKey().split("\n").length == 2 && 
res.getValue().isEmpty()) {
+                    assertCapturedListOutput(res.getKey(),
+                        new String[]{shareGroupId, "Share", "share"});
+                    return true;
+                }
+                return false;
+            }, "Waiting for listing groups to return share type groups");
+
+            TestUtils.waitForCondition(() -> {
+                Map.Entry<String, String> res = 
ToolsTestUtils.grabConsoleOutputAndError(() ->
+                    assertDoesNotThrow(() -> groupsCommand.listGroups(new 
GroupsCommand.GroupsCommandOptions(
+                        List.of("--bootstrap-server", 
clusterInstance.bootstrapServers(), "--list", "--share").toArray(new 
String[0])))));
+                if (res.getKey().split("\n").length == 2 && 
res.getValue().isEmpty()) {
+                    assertCapturedListOutput(res.getKey(),
+                        new String[]{shareGroupId, "Share", "share"});
+                    return true;
+                }
+                return false;
+            }, "Waiting for listing groups to return share groups");

Review Comment:
   nit: To match the other cases, `"Waiting for listing groups to return share 
type groups"`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to