dajac commented on code in PR #15150:
URL: https://github.com/apache/kafka/pull/15150#discussion_r1483062130


##########
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##########
@@ -2811,6 +2811,71 @@ public void testListConsumerGroupsWithStates() throws 
Exception {
         }
     }
 
+    @Test
+    public void testListConsumerGroupsWithTypes() throws Exception {
+        try (AdminClientUnitTestEnv env = new 
AdminClientUnitTestEnv(mockCluster(1, 0))) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+            // Test with no specific list consumer group options.
+            
env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), 
Errors.NONE));
+
+            env.kafkaClient().prepareResponseFrom(
+                new ListGroupsResponse(new ListGroupsResponseData()
+                    .setErrorCode(Errors.NONE.code())
+                    .setGroups(Arrays.asList(
+                        new ListGroupsResponseData.ListedGroup()
+                            .setGroupId("group-1")
+                            .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+                            .setGroupState("Stable")
+                            .setGroupType(GroupType.CLASSIC.toString()),
+                        new ListGroupsResponseData.ListedGroup()
+                            .setGroupId("group-2")
+                            .setGroupState("Empty")
+                            .setGroupType(GroupType.CLASSIC.toString())))),
+                env.cluster().nodeById(0));
+
+            final ListConsumerGroupsOptions options = new 
ListConsumerGroupsOptions();
+            final ListConsumerGroupsResult result = 
env.adminClient().listConsumerGroups(options);
+            Collection<ConsumerGroupListing> listings = result.valid().get();
+
+            assertEquals(2, listings.size());
+            List<ConsumerGroupListing> expected = new ArrayList<>();
+            expected.add(new ConsumerGroupListing("group-2", true, 
Optional.of(ConsumerGroupState.EMPTY), Optional.of(GroupType.CLASSIC)));
+            expected.add(new ConsumerGroupListing("group-1", false, 
Optional.of(ConsumerGroupState.STABLE), Optional.of(GroupType.CLASSIC)));
+            assertEquals(expected, listings);
+            assertEquals(0, result.errors().get().size());
+
+            // Test with list consumer group options.
+            
env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), 
Errors.NONE));
+
+            env.kafkaClient().prepareResponseFrom(

Review Comment:
   It may be worth validating that the request also contains the expected 
filters. For this, you can pass a request matcher as the first argument here. 
There are examples in this file. We could also add it to the previous case.



##########
core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala:
##########
@@ -189,16 +197,65 @@ object ConsumerGroupCommand extends Logging {
     }
 
     def listGroups(): Unit = {
-      if (opts.options.has(opts.stateOpt)) {
-        val stateValue = opts.options.valueOf(opts.stateOpt)
-        val states = if (stateValue == null || stateValue.isEmpty)
-          Set[ConsumerGroupState]()
-        else
-          consumerGroupStatesFromString(stateValue)
-        val listings = listConsumerGroupsWithState(states)
-        printGroupStates(listings.map(e => (e.groupId, e.state.get.toString)))
-      } else
+      val includeType = opts.options.has(opts.typeOpt)
+      val includeState = opts.options.has(opts.stateOpt)
+
+      if (includeType || includeState) {
+        val types = typeValues()
+        val states = stateValues()
+        val listings = listConsumerGroupsWithFilters(types, states)
+
+        printGroupInfo(listings, includeType, includeState)
+
+      } else {
         listConsumerGroups().foreach(println(_))
+      }
+    }
+
+    private def stateValues(): Set[ConsumerGroupState] = {
+      val stateValue = opts.options.valueOf(opts.stateOpt)
+      if (stateValue == null || stateValue.isEmpty)
+        Set[ConsumerGroupState]()
+      else
+        consumerGroupStatesFromString(stateValue)
+    }
+
+    private def typeValues(): Set[GroupType] = {
+      val typeValue = opts.options.valueOf(opts.typeOpt)
+      if (typeValue == null || typeValue.isEmpty)
+        Set[GroupType]()
+      else
+        consumerGroupTypesFromString(typeValue)
+    }
+
+    private def printGroupInfo(groups: List[ConsumerGroupListing], 
includeType: Boolean, includeState: Boolean): Unit = {
+      def groupId(groupListing: ConsumerGroupListing): String = 
groupListing.groupId
+      def groupType(groupListing: ConsumerGroupListing): String = 
groupListing.`type`().orElse(GroupType.UNKNOWN).toString
+      def groupState(groupListing: ConsumerGroupListing): String = 
groupListing.state.orElse(ConsumerGroupState.UNKNOWN).toString
+
+      val maxGroupLen = groups.foldLeft(15)((maxLen, groupListing) => 
Math.max(maxLen, groupId(groupListing).length)) + 10
+      var format = s"%-${maxGroupLen}s"
+      var header = List("Group")

Review Comment:
   nit: `GROUP` as it was before.



##########
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##########
@@ -2849,6 +2914,44 @@ public void 
testListConsumerGroupsWithStatesOlderBrokerVersion() throws Exceptio
         }
     }
 
+    @Test
+    public void testListConsumerGroupsWithTypesOlderBrokerVersion() throws 
Exception {
+        ApiVersion listGroupV4 = new ApiVersion()
+            .setApiKey(ApiKeys.LIST_GROUPS.id)
+            .setMinVersion((short) 0)
+            .setMaxVersion((short) 4);
+        try (AdminClientUnitTestEnv env = new 
AdminClientUnitTestEnv(mockCluster(1, 0))) {
+            
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(Collections.singletonList(listGroupV4)));
+
+            
env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), 
Errors.NONE));
+
+            // Check if we can list groups with older broker if we don't 
specify types.
+            env.kafkaClient().prepareResponseFrom(
+                new ListGroupsResponse(new ListGroupsResponseData()
+                    .setErrorCode(Errors.NONE.code())
+                    .setGroups(Collections.singletonList(
+                        new ListGroupsResponseData.ListedGroup()
+                            .setGroupId("group-1")
+                            
.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)))),
+                env.cluster().nodeById(0));
+            ListConsumerGroupsOptions options = new 
ListConsumerGroupsOptions();
+            ListConsumerGroupsResult result = 
env.adminClient().listConsumerGroups(options);
+            Collection<ConsumerGroupListing> listing = result.all().get();
+            assertEquals(1, listing.size());
+            List<ConsumerGroupListing> expected = 
Collections.singletonList(new ConsumerGroupListing("group-1", false));
+            assertEquals(expected, listing);

Review Comment:
   I wonder if this is really necessary. We already test this in the other test.



##########
core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala:
##########
@@ -111,37 +263,124 @@ class ListConsumerGroupTest extends 
ConsumerGroupCommandTest {
     assertThrows(classOf[IllegalArgumentException], () => 
ConsumerGroupCommand.consumerGroupStatesFromString("   ,   ,"))
   }
 
-  @ParameterizedTest
-  @ValueSource(strings = Array("zk", "kraft"))
-  def testListGroupCommand(quorum: String): Unit = {
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testConsumerGroupTypesFromString(quorum: String, groupProtocol: String): 
Unit = {
+    var result = ConsumerGroupCommand.consumerGroupTypesFromString("consumer")
+    assertEquals(Set(GroupType.CONSUMER), result)
+
+    result = ConsumerGroupCommand.consumerGroupTypesFromString("consumer, 
classic")
+    assertEquals(Set(GroupType.CONSUMER, GroupType.CLASSIC), result)
+
+    result = ConsumerGroupCommand.consumerGroupTypesFromString("Consumer, 
Classic")
+    assertEquals(Set(GroupType.CONSUMER, GroupType.CLASSIC), result)
+
+    assertThrows(classOf[IllegalArgumentException], () => 
ConsumerGroupCommand.consumerGroupTypesFromString("bad, wrong"))
+
+    assertThrows(classOf[IllegalArgumentException], () => 
ConsumerGroupCommand.consumerGroupTypesFromString("  bad, generic"))
+
+    assertThrows(classOf[IllegalArgumentException], () => 
ConsumerGroupCommand.consumerGroupTypesFromString("   ,   ,"))
+  }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
+  def testListGroupCommandClassicProtocol(quorum: String, groupProtocol: 
String): Unit = {
     val simpleGroup = "simple-group"
+    val protocolGroup = "protocol-group"
+
     addSimpleGroupExecutor(group = simpleGroup)
     addConsumerGroupExecutor(numConsumers = 1)
+    addConsumerGroupExecutor(numConsumers = 1, group = protocolGroup, 
groupProtocol = groupProtocol)
     var out = ""
 
     var cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list")
     TestUtils.waitUntilTrue(() => {
       out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs))
-      !out.contains("STATE") && out.contains(simpleGroup) && 
out.contains(group)
-    }, s"Expected to find $simpleGroup, $group and no header, but found $out")
+      !out.contains("STATE") && out.contains(simpleGroup) && 
out.contains(group) && out.contains(protocolGroup)
+    }, s"Expected to find $simpleGroup, $group, $protocolGroup and no header, 
but found $out")
 
     cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list", 
"--state")
     TestUtils.waitUntilTrue(() => {
       out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs))
-      out.contains("STATE") && out.contains(simpleGroup) && out.contains(group)
-    }, s"Expected to find $simpleGroup, $group and the header, but found $out")
+      out.contains("STATE") && !out.contains("TYPE") && 
out.contains(simpleGroup) && out.contains(group) && out.contains(protocolGroup)
+    }, s"Expected to find $simpleGroup, $group, $protocolGroup and the header, 
but found $out")
+
+    cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list", 
"--type")
+    TestUtils.waitUntilTrue(() => {
+      out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs))
+      out.contains("TYPE") && !out.contains("STATE") && 
out.contains(simpleGroup) && out.contains(group) && out.contains(protocolGroup)
+    }, s"Expected to find $simpleGroup, $group, $protocolGroup and the header, 
but found $out")
+
+    cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list", 
"--state", "--type")
+    TestUtils.waitUntilTrue(() => {
+      out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs))
+      out.contains("TYPE") && out.contains("STATE") && 
out.contains(simpleGroup) && out.contains(group) && out.contains(protocolGroup)
+    }, s"Expected to find $simpleGroup, $group, $protocolGroup and the header, 
but found $out")
 
     cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list", 
"--state", "Stable")
     TestUtils.waitUntilTrue(() => {
       out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs))
-      out.contains("STATE") && out.contains(group) && out.contains("Stable")
-    }, s"Expected to find $group in state Stable and the header, but found 
$out")
+      out.contains("STATE") && out.contains(group) && out.contains("Stable") 
&& out.contains(protocolGroup)
+    }, s"Expected to find $group, $protocolGroup in state Stable and the 
header, but found $out")
 
     cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list", 
"--state", "stable")
     TestUtils.waitUntilTrue(() => {
       out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs))
-      out.contains("STATE") && out.contains(group) && out.contains("Stable")
-    }, s"Expected to find $group in state Stable and the header, but found 
$out")
+      out.contains("STATE") && out.contains(group) && out.contains("Stable") 
&& out.contains(protocolGroup)
+    }, s"Expected to find $group, $protocolGroup in state Stable and the 
header, but found $out")
+
+    cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list", 
"--type", "Classic")
+    TestUtils.waitUntilTrue(() => {
+      out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs))
+      out.contains("TYPE") && out.contains("Classic") && 
!out.contains("STATE") && out.contains(simpleGroup) && out.contains(group) && 
out.contains(protocolGroup)
+    }, s"Expected to find $simpleGroup, $group, $protocolGroup and the header, 
but found $out")
+
+    cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list", 
"--type", "classic")
+    TestUtils.waitUntilTrue(() => {
+      out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs))
+      out.contains("TYPE") && out.contains("Classic") && 
!out.contains("STATE") && out.contains(simpleGroup) && out.contains(group) && 
out.contains(protocolGroup)
+    }, s"Expected to find $simpleGroup, $group, $protocolGroup  and the 
header, but found $out")
   }
 
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @CsvSource(Array("kraft+kip848,consumer"))
+  def testListGroupCommandConsumerProtocol(quorum: String, groupProtocol: 
String): Unit = {
+    val simpleGroup = "simple-group"

Review Comment:
   btw, I was wondering why tests fail in this suite but not in 
DescribeConsumerGroupTest. I have noticed that there we call 
`createOffsetsTopic()` in each tests. We could also try this here.



##########
core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala:
##########
@@ -64,28 +67,177 @@ class ListConsumerGroupTest extends 
ConsumerGroupCommandTest {
     val service = getConsumerGroupService(cgcArgs)
 
     val expectedListing = Set(
-      new ConsumerGroupListing(simpleGroup, true, 
Optional.of(ConsumerGroupState.EMPTY)),
-      new ConsumerGroupListing(group, false, 
Optional.of(ConsumerGroupState.STABLE)))
+      new ConsumerGroupListing(
+        simpleGroup,
+        true,
+        Optional.of(ConsumerGroupState.EMPTY),
+        Optional.of(GroupType.CLASSIC)
+      ),
+      new ConsumerGroupListing(
+        group,
+        false,
+        Optional.of(ConsumerGroupState.STABLE),
+        Optional.of(GroupType.CLASSIC)
+      )
+    )
 
     var foundListing = Set.empty[ConsumerGroupListing]
     TestUtils.waitUntilTrue(() => {
-      foundListing = 
service.listConsumerGroupsWithState(ConsumerGroupState.values.toSet).toSet
+      foundListing = service.listConsumerGroupsWithFilters(Set.empty, 
ConsumerGroupState.values.toSet).toSet
       expectedListing == foundListing
     }, s"Expected to show groups $expectedListing, but found $foundListing")
 
-    val expectedListingStable = Set(
-      new ConsumerGroupListing(group, false, 
Optional.of(ConsumerGroupState.STABLE)))
+    val expectedListingStable = Set.empty[ConsumerGroupListing]
 
     foundListing = Set.empty[ConsumerGroupListing]
     TestUtils.waitUntilTrue(() => {
-      foundListing = 
service.listConsumerGroupsWithState(Set(ConsumerGroupState.STABLE)).toSet
+      foundListing = service.listConsumerGroupsWithFilters(Set.empty, 
Set(ConsumerGroupState.PREPARING_REBALANCE)).toSet
       expectedListingStable == foundListing
     }, s"Expected to show groups $expectedListingStable, but found 
$foundListing")
   }
 
-  @ParameterizedTest
-  @ValueSource(strings = Array("zk", "kraft"))
-  def testConsumerGroupStatesFromString(quorum: String): Unit = {
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
+  def testListConsumerGroupsWithTypesClassicProtocol(quorum: String, 
groupProtocol: String): Unit = {
+    val simpleGroup = "simple-group"
+    val protocolGroup = "protocol-group"
+
+    addSimpleGroupExecutor(group = simpleGroup)
+    addConsumerGroupExecutor(numConsumers = 1)
+    addConsumerGroupExecutor(numConsumers = 1, group = protocolGroup, 
groupProtocol = groupProtocol)
+
+    val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list", 
"--type")
+    val service = getConsumerGroupService(cgcArgs)
+
+    val expectedListingStable = Set.empty[ConsumerGroupListing]
+
+    val expectedListing = Set(
+      new ConsumerGroupListing(
+        simpleGroup,
+        true,
+        Optional.of(ConsumerGroupState.EMPTY),
+        Optional.of(GroupType.CLASSIC)
+      ),
+      new ConsumerGroupListing(
+        group,
+        false,
+        Optional.of(ConsumerGroupState.STABLE),
+        Optional.of(GroupType.CLASSIC)
+      ),
+      new ConsumerGroupListing(
+        protocolGroup,
+        false,
+        Optional.of(ConsumerGroupState.STABLE),
+        Optional.of(GroupType.CLASSIC)
+      )
+    )
+
+    // No filters explicitly mentioned. Expectation is that all groups are 
returned.
+    var foundListing = Set.empty[ConsumerGroupListing]
+    TestUtils.waitUntilTrue(() => {
+      foundListing = service.listConsumerGroupsWithFilters(Set.empty, 
Set.empty).toSet
+      expectedListing == foundListing
+    }, s"Expected to show groups $expectedListing, but found $foundListing")

Review Comment:
   nit: This code is repeated many times. I wonder if it may be worth 
extracting it into an helper method. It would make the test cases smaller.



##########
core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala:
##########
@@ -189,16 +197,65 @@ object ConsumerGroupCommand extends Logging {
     }
 
     def listGroups(): Unit = {
-      if (opts.options.has(opts.stateOpt)) {
-        val stateValue = opts.options.valueOf(opts.stateOpt)
-        val states = if (stateValue == null || stateValue.isEmpty)
-          Set[ConsumerGroupState]()
-        else
-          consumerGroupStatesFromString(stateValue)
-        val listings = listConsumerGroupsWithState(states)
-        printGroupStates(listings.map(e => (e.groupId, e.state.get.toString)))
-      } else
+      val includeType = opts.options.has(opts.typeOpt)
+      val includeState = opts.options.has(opts.stateOpt)
+
+      if (includeType || includeState) {
+        val types = typeValues()
+        val states = stateValues()
+        val listings = listConsumerGroupsWithFilters(types, states)
+
+        printGroupInfo(listings, includeType, includeState)
+
+      } else {
         listConsumerGroups().foreach(println(_))
+      }
+    }
+
+    private def stateValues(): Set[ConsumerGroupState] = {
+      val stateValue = opts.options.valueOf(opts.stateOpt)
+      if (stateValue == null || stateValue.isEmpty)
+        Set[ConsumerGroupState]()
+      else
+        consumerGroupStatesFromString(stateValue)
+    }
+
+    private def typeValues(): Set[GroupType] = {
+      val typeValue = opts.options.valueOf(opts.typeOpt)
+      if (typeValue == null || typeValue.isEmpty)
+        Set[GroupType]()
+      else
+        consumerGroupTypesFromString(typeValue)
+    }
+
+    private def printGroupInfo(groups: List[ConsumerGroupListing], 
includeType: Boolean, includeState: Boolean): Unit = {
+      def groupId(groupListing: ConsumerGroupListing): String = 
groupListing.groupId
+      def groupType(groupListing: ConsumerGroupListing): String = 
groupListing.`type`().orElse(GroupType.UNKNOWN).toString
+      def groupState(groupListing: ConsumerGroupListing): String = 
groupListing.state.orElse(ConsumerGroupState.UNKNOWN).toString
+
+      val maxGroupLen = groups.foldLeft(15)((maxLen, groupListing) => 
Math.max(maxLen, groupId(groupListing).length)) + 10

Review Comment:
   nit: It looks like `+ 10` was not there before. Why do we need it?



##########
core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala:
##########
@@ -64,28 +67,177 @@ class ListConsumerGroupTest extends 
ConsumerGroupCommandTest {
     val service = getConsumerGroupService(cgcArgs)
 
     val expectedListing = Set(
-      new ConsumerGroupListing(simpleGroup, true, 
Optional.of(ConsumerGroupState.EMPTY)),
-      new ConsumerGroupListing(group, false, 
Optional.of(ConsumerGroupState.STABLE)))
+      new ConsumerGroupListing(
+        simpleGroup,
+        true,
+        Optional.of(ConsumerGroupState.EMPTY),
+        Optional.of(GroupType.CLASSIC)
+      ),
+      new ConsumerGroupListing(
+        group,
+        false,
+        Optional.of(ConsumerGroupState.STABLE),
+        Optional.of(GroupType.CLASSIC)
+      )
+    )
 
     var foundListing = Set.empty[ConsumerGroupListing]
     TestUtils.waitUntilTrue(() => {
-      foundListing = 
service.listConsumerGroupsWithState(ConsumerGroupState.values.toSet).toSet
+      foundListing = service.listConsumerGroupsWithFilters(Set.empty, 
ConsumerGroupState.values.toSet).toSet
       expectedListing == foundListing
     }, s"Expected to show groups $expectedListing, but found $foundListing")
 
-    val expectedListingStable = Set(
-      new ConsumerGroupListing(group, false, 
Optional.of(ConsumerGroupState.STABLE)))
+    val expectedListingStable = Set.empty[ConsumerGroupListing]
 
     foundListing = Set.empty[ConsumerGroupListing]
     TestUtils.waitUntilTrue(() => {
-      foundListing = 
service.listConsumerGroupsWithState(Set(ConsumerGroupState.STABLE)).toSet
+      foundListing = service.listConsumerGroupsWithFilters(Set.empty, 
Set(ConsumerGroupState.PREPARING_REBALANCE)).toSet
       expectedListingStable == foundListing
     }, s"Expected to show groups $expectedListingStable, but found 
$foundListing")
   }
 
-  @ParameterizedTest
-  @ValueSource(strings = Array("zk", "kraft"))
-  def testConsumerGroupStatesFromString(quorum: String): Unit = {
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
+  def testListConsumerGroupsWithTypesClassicProtocol(quorum: String, 
groupProtocol: String): Unit = {
+    val simpleGroup = "simple-group"
+    val protocolGroup = "protocol-group"
+
+    addSimpleGroupExecutor(group = simpleGroup)
+    addConsumerGroupExecutor(numConsumers = 1)
+    addConsumerGroupExecutor(numConsumers = 1, group = protocolGroup, 
groupProtocol = groupProtocol)

Review Comment:
   nit: We probably don't need this one as we only test with the classic 
protocol in this test.



##########
clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupListing.java:
##########
@@ -51,6 +53,27 @@ public ConsumerGroupListing(String groupId, boolean 
isSimpleConsumerGroup, Optio
         this.groupId = groupId;
         this.isSimpleConsumerGroup = isSimpleConsumerGroup;
         this.state = Objects.requireNonNull(state);
+        this.type = Optional.empty();

Review Comment:
   nit: Let's call the other constructor here.



##########
core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala:
##########
@@ -111,37 +263,124 @@ class ListConsumerGroupTest extends 
ConsumerGroupCommandTest {
     assertThrows(classOf[IllegalArgumentException], () => 
ConsumerGroupCommand.consumerGroupStatesFromString("   ,   ,"))
   }
 
-  @ParameterizedTest
-  @ValueSource(strings = Array("zk", "kraft"))
-  def testListGroupCommand(quorum: String): Unit = {
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))

Review Comment:
   nit: `@Test`?



##########
core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala:
##########
@@ -111,37 +263,124 @@ class ListConsumerGroupTest extends 
ConsumerGroupCommandTest {
     assertThrows(classOf[IllegalArgumentException], () => 
ConsumerGroupCommand.consumerGroupStatesFromString("   ,   ,"))
   }
 
-  @ParameterizedTest
-  @ValueSource(strings = Array("zk", "kraft"))
-  def testListGroupCommand(quorum: String): Unit = {
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testConsumerGroupTypesFromString(quorum: String, groupProtocol: String): 
Unit = {
+    var result = ConsumerGroupCommand.consumerGroupTypesFromString("consumer")
+    assertEquals(Set(GroupType.CONSUMER), result)
+
+    result = ConsumerGroupCommand.consumerGroupTypesFromString("consumer, 
classic")
+    assertEquals(Set(GroupType.CONSUMER, GroupType.CLASSIC), result)
+
+    result = ConsumerGroupCommand.consumerGroupTypesFromString("Consumer, 
Classic")
+    assertEquals(Set(GroupType.CONSUMER, GroupType.CLASSIC), result)
+
+    assertThrows(classOf[IllegalArgumentException], () => 
ConsumerGroupCommand.consumerGroupTypesFromString("bad, wrong"))
+
+    assertThrows(classOf[IllegalArgumentException], () => 
ConsumerGroupCommand.consumerGroupTypesFromString("  bad, generic"))
+
+    assertThrows(classOf[IllegalArgumentException], () => 
ConsumerGroupCommand.consumerGroupTypesFromString("   ,   ,"))
+  }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
+  def testListGroupCommandClassicProtocol(quorum: String, groupProtocol: 
String): Unit = {
     val simpleGroup = "simple-group"
+    val protocolGroup = "protocol-group"
+
     addSimpleGroupExecutor(group = simpleGroup)
     addConsumerGroupExecutor(numConsumers = 1)
+    addConsumerGroupExecutor(numConsumers = 1, group = protocolGroup, 
groupProtocol = groupProtocol)
     var out = ""
 
     var cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list")
     TestUtils.waitUntilTrue(() => {
       out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs))
-      !out.contains("STATE") && out.contains(simpleGroup) && 
out.contains(group)
-    }, s"Expected to find $simpleGroup, $group and no header, but found $out")
+      !out.contains("STATE") && out.contains(simpleGroup) && 
out.contains(group) && out.contains(protocolGroup)
+    }, s"Expected to find $simpleGroup, $group, $protocolGroup and no header, 
but found $out")
 
     cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list", 
"--state")
     TestUtils.waitUntilTrue(() => {
       out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs))
-      out.contains("STATE") && out.contains(simpleGroup) && out.contains(group)
-    }, s"Expected to find $simpleGroup, $group and the header, but found $out")
+      out.contains("STATE") && !out.contains("TYPE") && 
out.contains(simpleGroup) && out.contains(group) && out.contains(protocolGroup)
+    }, s"Expected to find $simpleGroup, $group, $protocolGroup and the header, 
but found $out")
+
+    cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list", 
"--type")
+    TestUtils.waitUntilTrue(() => {
+      out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs))
+      out.contains("TYPE") && !out.contains("STATE") && 
out.contains(simpleGroup) && out.contains(group) && out.contains(protocolGroup)
+    }, s"Expected to find $simpleGroup, $group, $protocolGroup and the header, 
but found $out")
+
+    cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list", 
"--state", "--type")
+    TestUtils.waitUntilTrue(() => {
+      out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs))
+      out.contains("TYPE") && out.contains("STATE") && 
out.contains(simpleGroup) && out.contains(group) && out.contains(protocolGroup)
+    }, s"Expected to find $simpleGroup, $group, $protocolGroup and the header, 
but found $out")
 
     cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list", 
"--state", "Stable")
     TestUtils.waitUntilTrue(() => {
       out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs))
-      out.contains("STATE") && out.contains(group) && out.contains("Stable")
-    }, s"Expected to find $group in state Stable and the header, but found 
$out")
+      out.contains("STATE") && out.contains(group) && out.contains("Stable") 
&& out.contains(protocolGroup)
+    }, s"Expected to find $group, $protocolGroup in state Stable and the 
header, but found $out")
 
     cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list", 
"--state", "stable")
     TestUtils.waitUntilTrue(() => {
       out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs))
-      out.contains("STATE") && out.contains(group) && out.contains("Stable")
-    }, s"Expected to find $group in state Stable and the header, but found 
$out")
+      out.contains("STATE") && out.contains(group) && out.contains("Stable") 
&& out.contains(protocolGroup)
+    }, s"Expected to find $group, $protocolGroup in state Stable and the 
header, but found $out")
+
+    cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list", 
"--type", "Classic")
+    TestUtils.waitUntilTrue(() => {
+      out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs))
+      out.contains("TYPE") && out.contains("Classic") && 
!out.contains("STATE") && out.contains(simpleGroup) && out.contains(group) && 
out.contains(protocolGroup)
+    }, s"Expected to find $simpleGroup, $group, $protocolGroup and the header, 
but found $out")
+
+    cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list", 
"--type", "classic")
+    TestUtils.waitUntilTrue(() => {
+      out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs))
+      out.contains("TYPE") && out.contains("Classic") && 
!out.contains("STATE") && out.contains(simpleGroup) && out.contains(group) && 
out.contains(protocolGroup)
+    }, s"Expected to find $simpleGroup, $group, $protocolGroup  and the 
header, but found $out")
   }
 
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @CsvSource(Array("kraft+kip848,consumer"))

Review Comment:
   It would be better to also have a method as the source (like the others) for 
this one. We can add one if does not exist yet.



##########
core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala:
##########
@@ -64,28 +67,177 @@ class ListConsumerGroupTest extends 
ConsumerGroupCommandTest {
     val service = getConsumerGroupService(cgcArgs)
 
     val expectedListing = Set(
-      new ConsumerGroupListing(simpleGroup, true, 
Optional.of(ConsumerGroupState.EMPTY)),
-      new ConsumerGroupListing(group, false, 
Optional.of(ConsumerGroupState.STABLE)))
+      new ConsumerGroupListing(
+        simpleGroup,
+        true,
+        Optional.of(ConsumerGroupState.EMPTY),
+        Optional.of(GroupType.CLASSIC)
+      ),
+      new ConsumerGroupListing(
+        group,
+        false,
+        Optional.of(ConsumerGroupState.STABLE),
+        Optional.of(GroupType.CLASSIC)
+      )
+    )
 
     var foundListing = Set.empty[ConsumerGroupListing]
     TestUtils.waitUntilTrue(() => {
-      foundListing = 
service.listConsumerGroupsWithState(ConsumerGroupState.values.toSet).toSet
+      foundListing = service.listConsumerGroupsWithFilters(Set.empty, 
ConsumerGroupState.values.toSet).toSet
       expectedListing == foundListing
     }, s"Expected to show groups $expectedListing, but found $foundListing")
 
-    val expectedListingStable = Set(
-      new ConsumerGroupListing(group, false, 
Optional.of(ConsumerGroupState.STABLE)))
+    val expectedListingStable = Set.empty[ConsumerGroupListing]
 
     foundListing = Set.empty[ConsumerGroupListing]
     TestUtils.waitUntilTrue(() => {
-      foundListing = 
service.listConsumerGroupsWithState(Set(ConsumerGroupState.STABLE)).toSet
+      foundListing = service.listConsumerGroupsWithFilters(Set.empty, 
Set(ConsumerGroupState.PREPARING_REBALANCE)).toSet
       expectedListingStable == foundListing
     }, s"Expected to show groups $expectedListingStable, but found 
$foundListing")
   }
 
-  @ParameterizedTest
-  @ValueSource(strings = Array("zk", "kraft"))
-  def testConsumerGroupStatesFromString(quorum: String): Unit = {
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
+  def testListConsumerGroupsWithTypesClassicProtocol(quorum: String, 
groupProtocol: String): Unit = {
+    val simpleGroup = "simple-group"
+    val protocolGroup = "protocol-group"
+
+    addSimpleGroupExecutor(group = simpleGroup)
+    addConsumerGroupExecutor(numConsumers = 1)
+    addConsumerGroupExecutor(numConsumers = 1, group = protocolGroup, 
groupProtocol = groupProtocol)
+
+    val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list", 
"--type")
+    val service = getConsumerGroupService(cgcArgs)
+
+    val expectedListingStable = Set.empty[ConsumerGroupListing]
+
+    val expectedListing = Set(
+      new ConsumerGroupListing(
+        simpleGroup,
+        true,
+        Optional.of(ConsumerGroupState.EMPTY),
+        Optional.of(GroupType.CLASSIC)
+      ),
+      new ConsumerGroupListing(
+        group,
+        false,
+        Optional.of(ConsumerGroupState.STABLE),
+        Optional.of(GroupType.CLASSIC)
+      ),
+      new ConsumerGroupListing(
+        protocolGroup,
+        false,
+        Optional.of(ConsumerGroupState.STABLE),
+        Optional.of(GroupType.CLASSIC)
+      )
+    )
+
+    // No filters explicitly mentioned. Expectation is that all groups are 
returned.
+    var foundListing = Set.empty[ConsumerGroupListing]
+    TestUtils.waitUntilTrue(() => {
+      foundListing = service.listConsumerGroupsWithFilters(Set.empty, 
Set.empty).toSet
+      expectedListing == foundListing
+    }, s"Expected to show groups $expectedListing, but found $foundListing")
+
+    // When group type is mentioned:
+    // Old Group Coordinator returns empty listings if the type is not Classic.
+    // New Group Coordinator returns groups according to the filter.
+    foundListing = Set.empty[ConsumerGroupListing]
+    TestUtils.waitUntilTrue(() => {
+      foundListing = 
service.listConsumerGroupsWithFilters(Set(GroupType.CONSUMER), Set.empty).toSet
+      expectedListingStable == foundListing

Review Comment:
   nit: Should we remove `expectedListingStable` and directly use 
`Set.empty[ConsumerGroupListing]` here?



##########
core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala:
##########
@@ -64,28 +67,177 @@ class ListConsumerGroupTest extends 
ConsumerGroupCommandTest {
     val service = getConsumerGroupService(cgcArgs)
 
     val expectedListing = Set(
-      new ConsumerGroupListing(simpleGroup, true, 
Optional.of(ConsumerGroupState.EMPTY)),
-      new ConsumerGroupListing(group, false, 
Optional.of(ConsumerGroupState.STABLE)))
+      new ConsumerGroupListing(
+        simpleGroup,
+        true,
+        Optional.of(ConsumerGroupState.EMPTY),
+        Optional.of(GroupType.CLASSIC)
+      ),
+      new ConsumerGroupListing(
+        group,
+        false,
+        Optional.of(ConsumerGroupState.STABLE),
+        Optional.of(GroupType.CLASSIC)
+      )
+    )
 
     var foundListing = Set.empty[ConsumerGroupListing]
     TestUtils.waitUntilTrue(() => {
-      foundListing = 
service.listConsumerGroupsWithState(ConsumerGroupState.values.toSet).toSet
+      foundListing = service.listConsumerGroupsWithFilters(Set.empty, 
ConsumerGroupState.values.toSet).toSet
       expectedListing == foundListing
     }, s"Expected to show groups $expectedListing, but found $foundListing")
 
-    val expectedListingStable = Set(
-      new ConsumerGroupListing(group, false, 
Optional.of(ConsumerGroupState.STABLE)))
+    val expectedListingStable = Set.empty[ConsumerGroupListing]
 
     foundListing = Set.empty[ConsumerGroupListing]
     TestUtils.waitUntilTrue(() => {
-      foundListing = 
service.listConsumerGroupsWithState(Set(ConsumerGroupState.STABLE)).toSet
+      foundListing = service.listConsumerGroupsWithFilters(Set.empty, 
Set(ConsumerGroupState.PREPARING_REBALANCE)).toSet

Review Comment:
   Why are we changing this? It you really want to test with a state that does 
not exist, it may be better to keep this one as it was and to add it after it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to