dajac commented on code in PR #15150:
URL: https://github.com/apache/kafka/pull/15150#discussion_r1492178089


##########
tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java:
##########
@@ -20,78 +20,225 @@
 import kafka.admin.ConsumerGroupCommand;
 import org.apache.kafka.clients.admin.ConsumerGroupListing;
 import org.apache.kafka.common.ConsumerGroupState;
+import org.apache.kafka.common.GroupType;
 import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashSet;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static 
org.apache.kafka.tools.ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
 public class ListConsumerGroupTest extends ConsumerGroupCommandTest {
-    @ParameterizedTest
-    @ValueSource(strings = {"zk", "kraft"})
-    public void testListConsumerGroups(String quorum) throws Exception {
+    @ParameterizedTest(name = 
TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
+    @MethodSource("getTestQuorumAndGroupProtocolParametersAll")
+    public void testListConsumerGroupsWithoutFilters(String quorum, String 
groupProtocol) throws Exception {
         String simpleGroup = "simple-group";
+
+        createOffsetsTopic(listenerName(), new Properties());
+
         addSimpleGroupExecutor(simpleGroup);
         addConsumerGroupExecutor(1);
+        addConsumerGroupExecutor(1, PROTOCOL_GROUP, groupProtocol);
 
         String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list"};
         ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(cgcArgs);
-        scala.collection.Set<String> expectedGroups = set(Arrays.asList(GROUP, 
simpleGroup));
+
+        scala.collection.Set<String> expectedGroups = set(Arrays.asList(GROUP, 
simpleGroup, PROTOCOL_GROUP));
         final AtomicReference<scala.collection.Set> foundGroups = new 
AtomicReference<>();
+
         TestUtils.waitForCondition(() -> {
             foundGroups.set(service.listConsumerGroups().toSet());
             return Objects.equals(expectedGroups, foundGroups.get());
         }, "Expected --list to show groups " + expectedGroups + ", but found " 
+ foundGroups.get() + ".");
     }
 
-    @ParameterizedTest
-    @ValueSource(strings = {"zk", "kraft"})
-    public void testListWithUnrecognizedNewConsumerOption() {
+    @Test
+    public void testListWithUnrecognizedNewConsumerOption() throws Exception {
         String[] cgcArgs = new String[]{"--new-consumer", 
"--bootstrap-server", bootstrapServers(listenerName()), "--list"};
         assertThrows(OptionException.class, () -> 
getConsumerGroupService(cgcArgs));
     }
 
-    @ParameterizedTest
-    @ValueSource(strings = {"zk", "kraft"})
-    public void testListConsumerGroupsWithStates() throws Exception {
+    @ParameterizedTest(name = 
TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
+    @MethodSource("getTestQuorumAndGroupProtocolParametersAll")
+    public void testListConsumerGroupsWithStates(String quorum, String 
groupProtocol) throws Exception {

Review Comment:
   `groupProtocol` is never used in this test and we run it with all consumer 
types. I suppose that we either want to use `groupProtocol` or only run it with 
the classic type. If we do the latter, do we have a test listing new consumer 
groups too?



##########
tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java:
##########
@@ -143,22 +312,134 @@ public void testListGroupCommand(String quorum) throws 
Exception {
             return out.get().contains("STATE") && 
out.get().contains(simpleGroup) && out.get().contains(GROUP);
         }, "Expected to find " + simpleGroup + ", " + GROUP + " and the 
header, but found " + out.get());
 
-        String[] cgcArgs3 = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--state", "Stable"};
+        String[] cgcArgs3 = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--type"};
         TestUtils.waitForCondition(() -> {
             out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> {
                 ConsumerGroupCommand.main(cgcArgs3);
                 return null;
             }));
+            return out.get().contains("TYPE") && !out.get().contains("STATE") 
&& out.get().contains(simpleGroup) && out.get().contains(GROUP);
+        }, "Expected to find " + simpleGroup + ", " + GROUP + " and the 
header, but found " + out.get());
+
+        String[] cgcArgs4 = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--state", "--type"};
+        TestUtils.waitForCondition(() -> {
+            out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> {
+                ConsumerGroupCommand.main(cgcArgs4);
+                return null;
+            }));
+            return out.get().contains("TYPE") && out.get().contains("STATE") 
&& out.get().contains(simpleGroup) && out.get().contains(GROUP);
+        }, "Expected to find " + simpleGroup + ", " + GROUP + " and the 
header, but found " + out.get());
+
+        String[] cgcArgs5 = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--state", "Stable"};
+        TestUtils.waitForCondition(() -> {
+            out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> {
+                ConsumerGroupCommand.main(cgcArgs5);
+                return null;
+            }));
             return out.get().contains("STATE") && out.get().contains(GROUP) && 
out.get().contains("Stable");
         }, "Expected to find " + GROUP + " in state Stable and the header, but 
found " + out.get());
 
-        String[] cgcArgs4 = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--state", "stable"};
+        String[] cgcArgs6 = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--state", "stable"};
         TestUtils.waitForCondition(() -> {
             out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> {
-                ConsumerGroupCommand.main(cgcArgs4);
+                ConsumerGroupCommand.main(cgcArgs6);
                 return null;
             }));
             return out.get().contains("STATE") && out.get().contains(GROUP) && 
out.get().contains("Stable");
         }, "Expected to find " + GROUP + " in state Stable and the header, but 
found " + out.get());
+
+        String[] cgcArgs7 = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--type", "Classic"};
+        TestUtils.waitForCondition(() -> {
+            out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> {
+                ConsumerGroupCommand.main(cgcArgs7);
+                return null;
+            }));
+            return out.get().contains("TYPE") && out.get().contains("Classic") 
&& !out.get().contains("STATE") &&
+                out.get().contains(simpleGroup) && out.get().contains(GROUP);
+        }, "Expected to find " + GROUP + " and the header, but found " + 
out.get());
+
+        String[] cgcArgs8 = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--type", "classic"};
+        TestUtils.waitForCondition(() -> {
+            out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> {
+                ConsumerGroupCommand.main(cgcArgs8);
+                return null;
+            }));
+            return out.get().contains("TYPE") && out.get().contains("Classic") 
&& !out.get().contains("STATE") &&
+                out.get().contains(simpleGroup) && out.get().contains(GROUP);
+        }, "Expected to find " + GROUP + " and the header, but found " + 
out.get());

Review Comment:
   I understand that you follow the pattern in place but I find all those tests 
really hard to read/follow. I took a stab at improving them: 
https://github.com/apache/kafka/pull/15382. Please let me know what you think.



##########
tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java:
##########
@@ -143,22 +312,134 @@ public void testListGroupCommand(String quorum) throws 
Exception {
             return out.get().contains("STATE") && 
out.get().contains(simpleGroup) && out.get().contains(GROUP);
         }, "Expected to find " + simpleGroup + ", " + GROUP + " and the 
header, but found " + out.get());
 
-        String[] cgcArgs3 = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--state", "Stable"};
+        String[] cgcArgs3 = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--type"};
         TestUtils.waitForCondition(() -> {
             out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> {
                 ConsumerGroupCommand.main(cgcArgs3);
                 return null;
             }));
+            return out.get().contains("TYPE") && !out.get().contains("STATE") 
&& out.get().contains(simpleGroup) && out.get().contains(GROUP);
+        }, "Expected to find " + simpleGroup + ", " + GROUP + " and the 
header, but found " + out.get());
+
+        String[] cgcArgs4 = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--state", "--type"};
+        TestUtils.waitForCondition(() -> {
+            out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> {
+                ConsumerGroupCommand.main(cgcArgs4);
+                return null;
+            }));
+            return out.get().contains("TYPE") && out.get().contains("STATE") 
&& out.get().contains(simpleGroup) && out.get().contains(GROUP);
+        }, "Expected to find " + simpleGroup + ", " + GROUP + " and the 
header, but found " + out.get());
+
+        String[] cgcArgs5 = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--state", "Stable"};
+        TestUtils.waitForCondition(() -> {
+            out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> {
+                ConsumerGroupCommand.main(cgcArgs5);
+                return null;
+            }));
             return out.get().contains("STATE") && out.get().contains(GROUP) && 
out.get().contains("Stable");
         }, "Expected to find " + GROUP + " in state Stable and the header, but 
found " + out.get());
 
-        String[] cgcArgs4 = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--state", "stable"};
+        String[] cgcArgs6 = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--state", "stable"};
         TestUtils.waitForCondition(() -> {
             out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> {
-                ConsumerGroupCommand.main(cgcArgs4);
+                ConsumerGroupCommand.main(cgcArgs6);
                 return null;
             }));
             return out.get().contains("STATE") && out.get().contains(GROUP) && 
out.get().contains("Stable");
         }, "Expected to find " + GROUP + " in state Stable and the header, but 
found " + out.get());
+
+        String[] cgcArgs7 = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--type", "Classic"};
+        TestUtils.waitForCondition(() -> {
+            out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> {
+                ConsumerGroupCommand.main(cgcArgs7);
+                return null;
+            }));
+            return out.get().contains("TYPE") && out.get().contains("Classic") 
&& !out.get().contains("STATE") &&
+                out.get().contains(simpleGroup) && out.get().contains(GROUP);
+        }, "Expected to find " + GROUP + " and the header, but found " + 
out.get());
+
+        String[] cgcArgs8 = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--type", "classic"};
+        TestUtils.waitForCondition(() -> {
+            out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> {
+                ConsumerGroupCommand.main(cgcArgs8);
+                return null;
+            }));
+            return out.get().contains("TYPE") && out.get().contains("Classic") 
&& !out.get().contains("STATE") &&
+                out.get().contains(simpleGroup) && out.get().contains(GROUP);
+        }, "Expected to find " + GROUP + " and the header, but found " + 
out.get());
+    }
+
+    @ParameterizedTest(name = 
TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
+    
@MethodSource("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly")
+    public void testListGroupCommandConsumerProtocol(String quorum, String 
groupProtocol) throws Exception {
+        String simpleGroup = "simple-group";
+
+        createOffsetsTopic(listenerName(), new Properties());
+
+        addSimpleGroupExecutor(simpleGroup);
+        addConsumerGroupExecutor(1, PROTOCOL_GROUP, groupProtocol);
+
+        final AtomicReference<String> out = new AtomicReference<>("");
+
+        String[] cgcArgs1 = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list"};
+        TestUtils.waitForCondition(() -> {
+            out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> {
+                ConsumerGroupCommand.main(cgcArgs1);
+                return null;
+            }));
+            return !out.get().contains("TYPE") && !out.get().contains("STATE") 
&&
+                out.get().contains(simpleGroup) && 
out.get().contains(PROTOCOL_GROUP);
+        }, "Expected to find " + simpleGroup + ", " + PROTOCOL_GROUP + " and 
no header, but found " + out.get());
+
+        String[] cgcArgs2 = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--state"};
+        TestUtils.waitForCondition(() -> {
+            out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> {
+                ConsumerGroupCommand.main(cgcArgs2);
+                return null;
+            }));
+            return out.get().contains("STATE") && !out.get().contains("TYPE") 
&& out.get().contains(simpleGroup) && out.get().contains(PROTOCOL_GROUP);
+        }, "Expected to find " + simpleGroup + ", " + PROTOCOL_GROUP + " and 
state header, but found " + out.get());
+
+        String[] cgcArgs3 = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--type"};
+        TestUtils.waitForCondition(() -> {
+            out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> {
+                ConsumerGroupCommand.main(cgcArgs3);
+                return null;
+            }));
+            return out.get().contains("TYPE") && 
out.get().contains("Consumer") && !out.get().contains("STATE") &&
+                out.get().contains(PROTOCOL_GROUP) && 
out.get().contains(simpleGroup);
+        }, "Expected to find " + simpleGroup + ", " + PROTOCOL_GROUP + " and 
type header, but found " + out.get());
+
+        String[] cgcArgs4 = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--type", "consumer"};
+        TestUtils.waitForCondition(() -> {
+            out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> {
+                ConsumerGroupCommand.main(cgcArgs4);
+                return null;
+            }));
+            return out.get().contains("TYPE") && 
out.get().contains("Consumer") && !out.get().contains("STATE") && 
out.get().contains(PROTOCOL_GROUP);
+        }, "Expected to find " + PROTOCOL_GROUP + " with type header and 
Consumer type, but found " + out.get());
+
+        String[] cgcArgs5 = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--type", "consumer", "--state", 
"Stable"};
+        TestUtils.waitForCondition(() -> {
+            out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> {
+                ConsumerGroupCommand.main(cgcArgs5);
+                return null;
+            }));
+            return out.get().contains("TYPE") && 
out.get().contains("Consumer") && out.get().contains("STATE") &&
+                out.get().contains("Stable") && 
out.get().contains(PROTOCOL_GROUP);
+        }, "Expected to find " + PROTOCOL_GROUP + " with type and state 
header, but found " + out.get());
+    }
+
+    private void assertGroupListing(

Review Comment:
   nit: This one could be static.



##########
tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java:
##########
@@ -20,78 +20,225 @@
 import kafka.admin.ConsumerGroupCommand;
 import org.apache.kafka.clients.admin.ConsumerGroupListing;
 import org.apache.kafka.common.ConsumerGroupState;
+import org.apache.kafka.common.GroupType;
 import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashSet;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static 
org.apache.kafka.tools.ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
 public class ListConsumerGroupTest extends ConsumerGroupCommandTest {
-    @ParameterizedTest
-    @ValueSource(strings = {"zk", "kraft"})
-    public void testListConsumerGroups(String quorum) throws Exception {
+    @ParameterizedTest(name = 
TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
+    @MethodSource("getTestQuorumAndGroupProtocolParametersAll")
+    public void testListConsumerGroupsWithoutFilters(String quorum, String 
groupProtocol) throws Exception {
         String simpleGroup = "simple-group";
+
+        createOffsetsTopic(listenerName(), new Properties());
+
         addSimpleGroupExecutor(simpleGroup);
         addConsumerGroupExecutor(1);
+        addConsumerGroupExecutor(1, PROTOCOL_GROUP, groupProtocol);
 
         String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list"};
         ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(cgcArgs);
-        scala.collection.Set<String> expectedGroups = set(Arrays.asList(GROUP, 
simpleGroup));
+
+        scala.collection.Set<String> expectedGroups = set(Arrays.asList(GROUP, 
simpleGroup, PROTOCOL_GROUP));
         final AtomicReference<scala.collection.Set> foundGroups = new 
AtomicReference<>();
+
         TestUtils.waitForCondition(() -> {
             foundGroups.set(service.listConsumerGroups().toSet());
             return Objects.equals(expectedGroups, foundGroups.get());
         }, "Expected --list to show groups " + expectedGroups + ", but found " 
+ foundGroups.get() + ".");
     }
 
-    @ParameterizedTest
-    @ValueSource(strings = {"zk", "kraft"})
-    public void testListWithUnrecognizedNewConsumerOption() {
+    @Test
+    public void testListWithUnrecognizedNewConsumerOption() throws Exception {
         String[] cgcArgs = new String[]{"--new-consumer", 
"--bootstrap-server", bootstrapServers(listenerName()), "--list"};
         assertThrows(OptionException.class, () -> 
getConsumerGroupService(cgcArgs));
     }
 
-    @ParameterizedTest
-    @ValueSource(strings = {"zk", "kraft"})
-    public void testListConsumerGroupsWithStates() throws Exception {
+    @ParameterizedTest(name = 
TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
+    @MethodSource("getTestQuorumAndGroupProtocolParametersAll")
+    public void testListConsumerGroupsWithStates(String quorum, String 
groupProtocol) throws Exception {
         String simpleGroup = "simple-group";
+
+        createOffsetsTopic(listenerName(), new Properties());

Review Comment:
   Yes, I would keep them.



##########
tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java:
##########
@@ -143,22 +312,134 @@ public void testListGroupCommand(String quorum) throws 
Exception {
             return out.get().contains("STATE") && 
out.get().contains(simpleGroup) && out.get().contains(GROUP);
         }, "Expected to find " + simpleGroup + ", " + GROUP + " and the 
header, but found " + out.get());
 
-        String[] cgcArgs3 = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--state", "Stable"};
+        String[] cgcArgs3 = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--type"};
         TestUtils.waitForCondition(() -> {
             out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> {
                 ConsumerGroupCommand.main(cgcArgs3);
                 return null;
             }));
+            return out.get().contains("TYPE") && !out.get().contains("STATE") 
&& out.get().contains(simpleGroup) && out.get().contains(GROUP);
+        }, "Expected to find " + simpleGroup + ", " + GROUP + " and the 
header, but found " + out.get());
+
+        String[] cgcArgs4 = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--state", "--type"};
+        TestUtils.waitForCondition(() -> {
+            out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> {
+                ConsumerGroupCommand.main(cgcArgs4);
+                return null;
+            }));
+            return out.get().contains("TYPE") && out.get().contains("STATE") 
&& out.get().contains(simpleGroup) && out.get().contains(GROUP);
+        }, "Expected to find " + simpleGroup + ", " + GROUP + " and the 
header, but found " + out.get());
+
+        String[] cgcArgs5 = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--state", "Stable"};
+        TestUtils.waitForCondition(() -> {
+            out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> {
+                ConsumerGroupCommand.main(cgcArgs5);
+                return null;
+            }));
             return out.get().contains("STATE") && out.get().contains(GROUP) && 
out.get().contains("Stable");
         }, "Expected to find " + GROUP + " in state Stable and the header, but 
found " + out.get());
 
-        String[] cgcArgs4 = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--state", "stable"};
+        String[] cgcArgs6 = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--state", "stable"};
         TestUtils.waitForCondition(() -> {
             out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> {
-                ConsumerGroupCommand.main(cgcArgs4);
+                ConsumerGroupCommand.main(cgcArgs6);
                 return null;
             }));
             return out.get().contains("STATE") && out.get().contains(GROUP) && 
out.get().contains("Stable");
         }, "Expected to find " + GROUP + " in state Stable and the header, but 
found " + out.get());
+
+        String[] cgcArgs7 = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--type", "Classic"};
+        TestUtils.waitForCondition(() -> {
+            out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> {
+                ConsumerGroupCommand.main(cgcArgs7);
+                return null;
+            }));
+            return out.get().contains("TYPE") && out.get().contains("Classic") 
&& !out.get().contains("STATE") &&
+                out.get().contains(simpleGroup) && out.get().contains(GROUP);
+        }, "Expected to find " + GROUP + " and the header, but found " + 
out.get());
+
+        String[] cgcArgs8 = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--type", "classic"};
+        TestUtils.waitForCondition(() -> {
+            out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> {
+                ConsumerGroupCommand.main(cgcArgs8);
+                return null;
+            }));
+            return out.get().contains("TYPE") && out.get().contains("Classic") 
&& !out.get().contains("STATE") &&
+                out.get().contains(simpleGroup) && out.get().contains(GROUP);
+        }, "Expected to find " + GROUP + " and the header, but found " + 
out.get());
+    }
+
+    @ParameterizedTest(name = 
TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
+    
@MethodSource("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly")
+    public void testListGroupCommandConsumerProtocol(String quorum, String 
groupProtocol) throws Exception {
+        String simpleGroup = "simple-group";
+
+        createOffsetsTopic(listenerName(), new Properties());
+
+        addSimpleGroupExecutor(simpleGroup);
+        addConsumerGroupExecutor(1, PROTOCOL_GROUP, groupProtocol);
+
+        final AtomicReference<String> out = new AtomicReference<>("");
+
+        String[] cgcArgs1 = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list"};
+        TestUtils.waitForCondition(() -> {
+            out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> {
+                ConsumerGroupCommand.main(cgcArgs1);
+                return null;
+            }));
+            return !out.get().contains("TYPE") && !out.get().contains("STATE") 
&&
+                out.get().contains(simpleGroup) && 
out.get().contains(PROTOCOL_GROUP);
+        }, "Expected to find " + simpleGroup + ", " + PROTOCOL_GROUP + " and 
no header, but found " + out.get());
+
+        String[] cgcArgs2 = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--state"};
+        TestUtils.waitForCondition(() -> {
+            out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> {
+                ConsumerGroupCommand.main(cgcArgs2);
+                return null;
+            }));
+            return out.get().contains("STATE") && !out.get().contains("TYPE") 
&& out.get().contains(simpleGroup) && out.get().contains(PROTOCOL_GROUP);
+        }, "Expected to find " + simpleGroup + ", " + PROTOCOL_GROUP + " and 
state header, but found " + out.get());
+
+        String[] cgcArgs3 = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--type"};
+        TestUtils.waitForCondition(() -> {
+            out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> {
+                ConsumerGroupCommand.main(cgcArgs3);
+                return null;
+            }));
+            return out.get().contains("TYPE") && 
out.get().contains("Consumer") && !out.get().contains("STATE") &&
+                out.get().contains(PROTOCOL_GROUP) && 
out.get().contains(simpleGroup);
+        }, "Expected to find " + simpleGroup + ", " + PROTOCOL_GROUP + " and 
type header, but found " + out.get());
+
+        String[] cgcArgs4 = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--type", "consumer"};
+        TestUtils.waitForCondition(() -> {
+            out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> {
+                ConsumerGroupCommand.main(cgcArgs4);
+                return null;
+            }));
+            return out.get().contains("TYPE") && 
out.get().contains("Consumer") && !out.get().contains("STATE") && 
out.get().contains(PROTOCOL_GROUP);
+        }, "Expected to find " + PROTOCOL_GROUP + " with type header and 
Consumer type, but found " + out.get());
+
+        String[] cgcArgs5 = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--type", "consumer", "--state", 
"Stable"};
+        TestUtils.waitForCondition(() -> {
+            out.set(kafka.utils.TestUtils.grabConsoleOutput(() -> {
+                ConsumerGroupCommand.main(cgcArgs5);
+                return null;
+            }));
+            return out.get().contains("TYPE") && 
out.get().contains("Consumer") && out.get().contains("STATE") &&
+                out.get().contains("Stable") && 
out.get().contains(PROTOCOL_GROUP);
+        }, "Expected to find " + PROTOCOL_GROUP + " with type and state 
header, but found " + out.get());
+    }
+
+    private void assertGroupListing(
+        Set<GroupType> typeFilterSet,
+        Set<ConsumerGroupState> stateFilterSet,
+        Set<ConsumerGroupListing> expectedListing,
+        ConsumerGroupCommand.ConsumerGroupService service

Review Comment:
   nit: Let's put the service first.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to