This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 96c68096a26 KAFKA-15462: Add Group Type Filter for List Group to the 
Admin Client  (#15150)
96c68096a26 is described below

commit 96c68096a26ea5e7c2333308dfbaef47cb1eac72
Author: Ritika Reddy <98577846+rreddy...@users.noreply.github.com>
AuthorDate: Thu Feb 29 00:38:42 2024 -0800

    KAFKA-15462: Add Group Type Filter for List Group to the Admin Client  
(#15150)
    
    In KIP-848, we introduce the notion of Group Types based on the protocol 
type that the members in the consumer group use. As of now we support two types 
of groups:
    * Classic : Members use the classic consumer group protocol ( existing one )
    * Consumer : Members use the consumer group protocol introduced in KIP-848.
    Currently List Groups allows users to list all the consumer groups 
available. KIP-518 introduced filtering the consumer groups by the state that 
they are in. We now want to allow users to filter consumer groups by type.
    
    This patch includes the changes to the admin client and related files. It 
also includes changes to parameterize the tests to include permutations of the 
old GC and the new GC with the different protocol types.
    
    Reviewers: David Jacot <dja...@confluent.io>
---
 checkstyle/suppressions.xml                        |   1 +
 .../kafka/clients/admin/ConsumerGroupListing.java  |  64 ++--
 .../kafka/clients/admin/KafkaAdminClient.java      |  20 +-
 .../clients/admin/ListConsumerGroupsOptions.java   |  25 +-
 .../kafka/clients/admin/KafkaAdminClientTest.java  | 124 ++++++-
 .../scala/kafka/admin/ConsumerGroupCommand.scala   | 107 ++++--
 .../integration/kafka/api/BaseConsumerTest.scala   |  19 +-
 .../kafka/admin/ConsumerGroupCommandTest.scala     |   7 +-
 .../org/apache/kafka/tools/ToolsTestUtils.java     |   2 +
 .../consumer/group/ConsumerGroupCommandTest.java   |  13 +-
 .../consumer/group/ListConsumerGroupTest.java      | 386 +++++++++++++++++++--
 11 files changed, 669 insertions(+), 99 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 7486ef9a80d..c65cd675a9e 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -46,6 +46,7 @@
 
     <!-- server tests -->
     <suppress checks="MethodLength|JavaNCSS|NPath" 
files="DescribeTopicPartitionsRequestHandlerTest.java"/>
+    <suppress checks="CyclomaticComplexity" 
files="ListConsumerGroupTest.java"/>
 
     <!-- Clients -->
     <suppress id="dontUseSystemExit"
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupListing.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupListing.java
index 0abc3e01ca9..01c23796d41 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupListing.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupListing.java
@@ -21,6 +21,7 @@ import java.util.Objects;
 import java.util.Optional;
 
 import org.apache.kafka.common.ConsumerGroupState;
+import org.apache.kafka.common.GroupType;
 
 /**
  * A listing of a consumer group in the cluster.
@@ -29,6 +30,7 @@ public class ConsumerGroupListing {
     private final String groupId;
     private final boolean isSimpleConsumerGroup;
     private final Optional<ConsumerGroupState> state;
+    private final Optional<GroupType> type;
 
     /**
      * Create an instance with the specified parameters.
@@ -37,7 +39,7 @@ public class ConsumerGroupListing {
      * @param isSimpleConsumerGroup If consumer group is simple or not.
      */
     public ConsumerGroupListing(String groupId, boolean isSimpleConsumerGroup) 
{
-        this(groupId, isSimpleConsumerGroup, Optional.empty());
+        this(groupId, isSimpleConsumerGroup, Optional.empty(), 
Optional.empty());
     }
 
     /**
@@ -48,9 +50,27 @@ public class ConsumerGroupListing {
      * @param state The state of the consumer group
      */
     public ConsumerGroupListing(String groupId, boolean isSimpleConsumerGroup, 
Optional<ConsumerGroupState> state) {
+        this(groupId, isSimpleConsumerGroup, state, Optional.empty());
+    }
+
+    /**
+     * Create an instance with the specified parameters.
+     *
+     * @param groupId                   Group Id.
+     * @param isSimpleConsumerGroup     If consumer group is simple or not.
+     * @param state                     The state of the consumer group.
+     * @param type                      The type of the consumer group.
+     */
+    public ConsumerGroupListing(
+        String groupId,
+        boolean isSimpleConsumerGroup,
+        Optional<ConsumerGroupState> state,
+        Optional<GroupType> type
+    ) {
         this.groupId = groupId;
         this.isSimpleConsumerGroup = isSimpleConsumerGroup;
         this.state = Objects.requireNonNull(state);
+        this.type = Objects.requireNonNull(type);
     }
 
     /**
@@ -74,42 +94,38 @@ public class ConsumerGroupListing {
         return state;
     }
 
+    /**
+     * The type of the consumer group.
+     *
+     * @return An Optional containing the type, if available.
+     */
+    public Optional<GroupType> type() {
+        return type;
+    }
+
     @Override
     public String toString() {
         return "(" +
             "groupId='" + groupId + '\'' +
             ", isSimpleConsumerGroup=" + isSimpleConsumerGroup +
             ", state=" + state +
+            ", type=" + type +
             ')';
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(groupId, isSimpleConsumerGroup, state);
+        return Objects.hash(groupId, isSimpleConsumerGroup(), state, type);
     }
 
     @Override
-    public boolean equals(Object obj) {
-        if (this == obj)
-            return true;
-        if (obj == null)
-            return false;
-        if (getClass() != obj.getClass())
-            return false;
-        ConsumerGroupListing other = (ConsumerGroupListing) obj;
-        if (groupId == null) {
-            if (other.groupId != null)
-                return false;
-        } else if (!groupId.equals(other.groupId))
-            return false;
-        if (isSimpleConsumerGroup != other.isSimpleConsumerGroup)
-            return false;
-        if (state == null) {
-            if (other.state != null)
-                return false;
-        } else if (!state.equals(other.state))
-            return false;
-        return true;
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (!(o instanceof ConsumerGroupListing)) return false;
+        ConsumerGroupListing that = (ConsumerGroupListing) o;
+        return isSimpleConsumerGroup() == that.isSimpleConsumerGroup() &&
+            Objects.equals(groupId, that.groupId) &&
+            Objects.equals(state, that.state) &&
+            Objects.equals(type, that.type);
     }
-
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 85c82e25144..d98ad8ac04e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -58,6 +58,7 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.ConsumerGroupState;
+import org.apache.kafka.common.GroupType;
 import org.apache.kafka.common.ElectionType;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.KafkaFuture;
@@ -3382,7 +3383,14 @@ public class KafkaAdminClient extends AdminClient {
                                     .stream()
                                     .map(ConsumerGroupState::toString)
                                     .collect(Collectors.toList());
-                            return new ListGroupsRequest.Builder(new 
ListGroupsRequestData().setStatesFilter(states));
+                            List<String> groupTypes = options.types()
+                                    .stream()
+                                    .map(GroupType::toString)
+                                    .collect(Collectors.toList());
+                            return new ListGroupsRequest.Builder(new 
ListGroupsRequestData()
+                                .setStatesFilter(states)
+                                .setTypesFilter(groupTypes)
+                            );
                         }
 
                         private void 
maybeAddConsumerGroup(ListGroupsResponseData.ListedGroup group) {
@@ -3392,7 +3400,15 @@ public class KafkaAdminClient extends AdminClient {
                                 final Optional<ConsumerGroupState> state = 
group.groupState().equals("")
                                         ? Optional.empty()
                                         : 
Optional.of(ConsumerGroupState.parse(group.groupState()));
-                                final ConsumerGroupListing groupListing = new 
ConsumerGroupListing(groupId, protocolType.isEmpty(), state);
+                                final Optional<GroupType> type = 
group.groupType().equals("")
+                                        ? Optional.empty()
+                                        : 
Optional.of(GroupType.parse(group.groupType()));
+                                final ConsumerGroupListing groupListing = new 
ConsumerGroupListing(
+                                        groupId,
+                                        protocolType.isEmpty(),
+                                        state,
+                                        type
+                                    );
                                 results.addListing(groupListing);
                             }
                         }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java
index 9f1f38dd4a8..c240da159ff 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java
@@ -22,6 +22,7 @@ import java.util.HashSet;
 import java.util.Set;
 
 import org.apache.kafka.common.ConsumerGroupState;
+import org.apache.kafka.common.GroupType;
 import org.apache.kafka.common.annotation.InterfaceStability;
 
 /**
@@ -34,20 +35,38 @@ public class ListConsumerGroupsOptions extends 
AbstractOptions<ListConsumerGroup
 
     private Set<ConsumerGroupState> states = Collections.emptySet();
 
+    private Set<GroupType> types = Collections.emptySet();
+
     /**
-     * If states is set, only groups in these states will be returned by 
listConsumerGroups()
+     * If states is set, only groups in these states will be returned by 
listConsumerGroups().
      * Otherwise, all groups are returned.
      * This operation is supported by brokers with version 2.6.0 or later.
      */
     public ListConsumerGroupsOptions inStates(Set<ConsumerGroupState> states) {
-        this.states = (states == null) ? Collections.emptySet() : new 
HashSet<>(states);
+        this.states = (states == null || states.isEmpty()) ? 
Collections.emptySet() : new HashSet<>(states);
         return this;
     }
 
     /**
-     * Returns the list of States that are requested or empty if no states 
have been specified
+     * If types is set, only groups of these types will be returned by 
listConsumerGroups().
+     * Otherwise, all groups are returned.
+     */
+    public ListConsumerGroupsOptions withTypes(Set<GroupType> types) {
+        this.types = (types == null || types.isEmpty()) ? 
Collections.emptySet() : new HashSet<>(types);
+        return this;
+    }
+
+    /**
+     * Returns the list of States that are requested or empty if no states 
have been specified.
      */
     public Set<ConsumerGroupState> states() {
         return states;
     }
+
+    /**
+     * Returns the list of group types that are requested or empty if no types 
have been specified.
+     */
+    public Set<GroupType> types() {
+        return types;
+    }
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index b8b3d54ef43..43d391a220e 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -243,6 +243,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.OptionalInt;
 import java.util.OptionalLong;
@@ -2811,6 +2812,68 @@ public class KafkaAdminClientTest {
         }
     }
 
+    @Test
+    public void testListConsumerGroupsWithTypes() throws Exception {
+        try (AdminClientUnitTestEnv env = new 
AdminClientUnitTestEnv(mockCluster(1, 0))) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+            // Test with a specific state filter but no type filter in list 
consumer group options.
+            
env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), 
Errors.NONE));
+
+            env.kafkaClient().prepareResponseFrom(
+                
expectListGroupsRequestWithFilters(singleton(ConsumerGroupState.STABLE.toString()),
 Collections.emptySet()),
+                new ListGroupsResponse(new ListGroupsResponseData()
+                    .setErrorCode(Errors.NONE.code())
+                    .setGroups(Arrays.asList(
+                        new ListGroupsResponseData.ListedGroup()
+                            .setGroupId("group-1")
+                            .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+                            .setGroupState("Stable")
+                            .setGroupType(GroupType.CLASSIC.toString())))),
+                env.cluster().nodeById(0));
+
+            final ListConsumerGroupsOptions options = new 
ListConsumerGroupsOptions().inStates(singleton(ConsumerGroupState.STABLE));
+            final ListConsumerGroupsResult result = 
env.adminClient().listConsumerGroups(options);
+            Collection<ConsumerGroupListing> listings = result.valid().get();
+
+            assertEquals(1, listings.size());
+            List<ConsumerGroupListing> expected = new ArrayList<>();
+            expected.add(new ConsumerGroupListing("group-1", false, 
Optional.of(ConsumerGroupState.STABLE), Optional.of(GroupType.CLASSIC)));
+            assertEquals(expected, listings);
+            assertEquals(0, result.errors().get().size());
+
+            // Test with list consumer group options.
+            
env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), 
Errors.NONE));
+
+            env.kafkaClient().prepareResponseFrom(
+                expectListGroupsRequestWithFilters(Collections.emptySet(), 
singleton(GroupType.CONSUMER.toString())),
+                new ListGroupsResponse(new ListGroupsResponseData()
+                    .setErrorCode(Errors.NONE.code())
+                    .setGroups(Arrays.asList(
+                        new ListGroupsResponseData.ListedGroup()
+                            .setGroupId("group-1")
+                            .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+                            .setGroupState("Stable")
+                            .setGroupType(GroupType.CONSUMER.toString()),
+                        new ListGroupsResponseData.ListedGroup()
+                            .setGroupId("group-2")
+                            .setGroupState("Empty")
+                            .setGroupType(GroupType.CONSUMER.toString())))),
+                env.cluster().nodeById(0));
+
+            final ListConsumerGroupsOptions options2 = new 
ListConsumerGroupsOptions().withTypes(singleton(GroupType.CONSUMER));
+            final ListConsumerGroupsResult result2 = 
env.adminClient().listConsumerGroups(options2);
+            Collection<ConsumerGroupListing> listings2 = result2.valid().get();
+
+            assertEquals(2, listings2.size());
+            List<ConsumerGroupListing> expected2 = new ArrayList<>();
+            expected2.add(new ConsumerGroupListing("group-2", true, 
Optional.of(ConsumerGroupState.EMPTY), Optional.of(GroupType.CONSUMER)));
+            expected2.add(new ConsumerGroupListing("group-1", false, 
Optional.of(ConsumerGroupState.STABLE), Optional.of(GroupType.CONSUMER)));
+            assertEquals(expected2, listings2);
+            assertEquals(0, result.errors().get().size());
+        }
+    }
+
     @Test
     public void testListConsumerGroupsWithStatesOlderBrokerVersion() throws 
Exception {
         ApiVersion listGroupV3 = new ApiVersion()
@@ -2835,7 +2898,7 @@ public class KafkaAdminClientTest {
             ListConsumerGroupsResult result = 
env.adminClient().listConsumerGroups(options);
             Collection<ConsumerGroupListing> listing = result.all().get();
             assertEquals(1, listing.size());
-            List<ConsumerGroupListing> expected = 
Collections.singletonList(new ConsumerGroupListing("group-1", false, 
Optional.empty()));
+            List<ConsumerGroupListing> expected = 
Collections.singletonList(new ConsumerGroupListing("group-1", false));
             assertEquals(expected, listing);
 
             // But we cannot set a state filter with older broker
@@ -2849,6 +2912,65 @@ public class KafkaAdminClientTest {
         }
     }
 
+    @Test
+    public void testListConsumerGroupsWithTypesOlderBrokerVersion() throws 
Exception {
+        ApiVersion listGroupV4 = new ApiVersion()
+            .setApiKey(ApiKeys.LIST_GROUPS.id)
+            .setMinVersion((short) 0)
+            .setMaxVersion((short) 4);
+        try (AdminClientUnitTestEnv env = new 
AdminClientUnitTestEnv(mockCluster(1, 0))) {
+            
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(Collections.singletonList(listGroupV4)));
+
+            
env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), 
Errors.NONE));
+
+            // Check if we can list groups with older broker if we specify 
states and don't specify types.
+            env.kafkaClient().prepareResponseFrom(
+                
expectListGroupsRequestWithFilters(singleton(ConsumerGroupState.STABLE.toString()),
 Collections.emptySet()),
+                new ListGroupsResponse(new ListGroupsResponseData()
+                    .setErrorCode(Errors.NONE.code())
+                    .setGroups(Collections.singletonList(
+                        new ListGroupsResponseData.ListedGroup()
+                            .setGroupId("group-1")
+                            .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+                            
.setGroupState(ConsumerGroupState.STABLE.toString())))),
+                env.cluster().nodeById(0));
+
+            ListConsumerGroupsOptions options = new 
ListConsumerGroupsOptions().inStates(singleton(ConsumerGroupState.STABLE));
+            ListConsumerGroupsResult result = 
env.adminClient().listConsumerGroups(options);
+
+            Collection<ConsumerGroupListing> listing = result.all().get();
+            assertEquals(1, listing.size());
+            List<ConsumerGroupListing> expected = Collections.singletonList(
+                new ConsumerGroupListing("group-1", false, 
Optional.of(ConsumerGroupState.STABLE))
+            );
+            assertEquals(expected, listing);
+
+            // Check that we cannot set a type filter with an older broker.
+            
env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), 
Errors.NONE));
+            env.kafkaClient().prepareUnsupportedVersionResponse(request ->
+                request instanceof ListGroupsRequest && !((ListGroupsRequest) 
request).data().typesFilter().isEmpty()
+            );
+
+            options = new 
ListConsumerGroupsOptions().withTypes(singleton(GroupType.CLASSIC));
+            result = env.adminClient().listConsumerGroups(options);
+            TestUtils.assertFutureThrows(result.all(), 
UnsupportedVersionException.class);
+        }
+    }
+
+    private MockClient.RequestMatcher expectListGroupsRequestWithFilters(
+        Set<String> expectedStates,
+        Set<String> expectedTypes
+    ) {
+        return body -> {
+            if (body instanceof ListGroupsRequest) {
+                ListGroupsRequest request = (ListGroupsRequest) body;
+                return Objects.equals(new 
HashSet<>(request.data().statesFilter()), expectedStates)
+                    && Objects.equals(new 
HashSet<>(request.data().typesFilter()), expectedTypes);
+            }
+            return false;
+        };
+    }
+
     @Test
     public void testOffsetCommitNumRetries() throws Exception {
         final Cluster cluster = mockCluster(3, 0);
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala 
b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index 4187274a22d..160b9a70aae 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -29,7 +29,7 @@ import org.apache.kafka.clients.admin._
 import org.apache.kafka.clients.consumer.OffsetAndMetadata
 import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.common.{KafkaException, Node, TopicPartition}
+import org.apache.kafka.common.{ConsumerGroupState, GroupType, KafkaException, 
Node, TopicPartition}
 import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
 
 import scala.jdk.CollectionConverters._
@@ -41,7 +41,6 @@ import org.apache.kafka.common.protocol.Errors
 
 import scala.collection.immutable.TreeMap
 import scala.reflect.ClassTag
-import org.apache.kafka.common.ConsumerGroupState
 import org.apache.kafka.common.requests.ListOffsetsResponse
 
 object ConsumerGroupCommand extends Logging {
@@ -104,6 +103,15 @@ object ConsumerGroupCommand extends Logging {
     parsedStates
   }
 
+  def consumerGroupTypesFromString(input: String): Set[GroupType] = {
+    val parsedTypes = input.toLowerCase.split(',').map(s => 
GroupType.parse(s.trim)).toSet
+    if (parsedTypes.contains(GroupType.UNKNOWN)) {
+      val validTypes = GroupType.values().filter(_ != GroupType.UNKNOWN)
+      throw new IllegalArgumentException(s"Invalid types list '$input'. Valid 
types are: ${validTypes.mkString(", ")}")
+    }
+    parsedTypes
+  }
+
   val MISSING_COLUMN_VALUE = "-"
 
   private def printError(msg: String, e: Option[Throwable] = None): Unit = {
@@ -135,7 +143,7 @@ object ConsumerGroupCommand extends Logging {
   private[admin] case class MemberAssignmentState(group: String, consumerId: 
String, host: String, clientId: String, groupInstanceId: String,
                                              numPartitions: Int, assignment: 
List[TopicPartition])
 
-  case class GroupState(group: String, coordinator: Node, assignmentStrategy: 
String, state: String, numMembers: Int)
+  private[admin] case class GroupState(group: String, coordinator: Node, 
assignmentStrategy: String, state: String, numMembers: Int)
 
   private[admin] sealed trait CsvRecord
   private[admin] case class CsvRecordWithGroup(group: String, topic: String, 
partition: Int, offset: Long) extends CsvRecord
@@ -189,16 +197,65 @@ object ConsumerGroupCommand extends Logging {
     }
 
     def listGroups(): Unit = {
-      if (opts.options.has(opts.stateOpt)) {
-        val stateValue = opts.options.valueOf(opts.stateOpt)
-        val states = if (stateValue == null || stateValue.isEmpty)
-          Set[ConsumerGroupState]()
-        else
-          consumerGroupStatesFromString(stateValue)
-        val listings = listConsumerGroupsWithState(states)
-        printGroupStates(listings.map(e => (e.groupId, e.state.get.toString)))
-      } else
+      val includeType = opts.options.has(opts.typeOpt)
+      val includeState = opts.options.has(opts.stateOpt)
+
+      if (includeType || includeState) {
+        val types = typeValues()
+        val states = stateValues()
+        val listings = listConsumerGroupsWithFilters(types, states)
+
+        printGroupInfo(listings, includeType, includeState)
+
+      } else {
         listConsumerGroups().foreach(println(_))
+      }
+    }
+
+    private def stateValues(): Set[ConsumerGroupState] = {
+      val stateValue = opts.options.valueOf(opts.stateOpt)
+      if (stateValue == null || stateValue.isEmpty)
+        Set[ConsumerGroupState]()
+      else
+        consumerGroupStatesFromString(stateValue)
+    }
+
+    private def typeValues(): Set[GroupType] = {
+      val typeValue = opts.options.valueOf(opts.typeOpt)
+      if (typeValue == null || typeValue.isEmpty)
+        Set[GroupType]()
+      else
+        consumerGroupTypesFromString(typeValue)
+    }
+
+    private def printGroupInfo(groups: List[ConsumerGroupListing], 
includeType: Boolean, includeState: Boolean): Unit = {
+      def groupId(groupListing: ConsumerGroupListing): String = 
groupListing.groupId
+      def groupType(groupListing: ConsumerGroupListing): String = 
groupListing.`type`().orElse(GroupType.UNKNOWN).toString
+      def groupState(groupListing: ConsumerGroupListing): String = 
groupListing.state.orElse(ConsumerGroupState.UNKNOWN).toString
+
+      val maxGroupLen = groups.foldLeft(15)((maxLen, groupListing) => 
Math.max(maxLen, groupId(groupListing).length)) + 10
+      var format = s"%-${maxGroupLen}s"
+      var header = List("GROUP")
+      var extractors: List[ConsumerGroupListing => String] = List(groupId)
+
+      if (includeType) {
+        header = header :+ "TYPE"
+        extractors = extractors :+ groupType _
+        format += " %-20s"
+      }
+
+      if (includeState) {
+        header = header :+ "STATE"
+        extractors = extractors :+ groupState _
+        format += " %-20s"
+      }
+
+      println(format.format(header: _*))
+
+      groups.foreach { groupListing =>
+        val info = extractors.map(extractor => extractor(groupListing))
+        println(format.format(info: _*))
+      }
     }
 
     def listConsumerGroups(): List[String] = {
@@ -207,26 +264,15 @@ object ConsumerGroupCommand extends Logging {
       listings.map(_.groupId).toList
     }
 
-    def listConsumerGroupsWithState(states: Set[ConsumerGroupState]): 
List[ConsumerGroupListing] = {
+    def listConsumerGroupsWithFilters(types: Set[GroupType], states: 
Set[ConsumerGroupState]): List[ConsumerGroupListing] = {
       val listConsumerGroupsOptions = withTimeoutMs(new 
ListConsumerGroupsOptions())
-      listConsumerGroupsOptions.inStates(states.asJava)
+      listConsumerGroupsOptions
+        .inStates(states.asJava)
+        .withTypes(types.asJava)
       val result = adminClient.listConsumerGroups(listConsumerGroupsOptions)
       result.all.get.asScala.toList
     }
 
-    private def printGroupStates(groupsAndStates: List[(String, String)]): 
Unit = {
-      // find proper columns width
-      var maxGroupLen = 15
-      for ((groupId, _) <- groupsAndStates) {
-        maxGroupLen = Math.max(maxGroupLen, groupId.length)
-      }
-      val format = s"%${-maxGroupLen}s %s"
-      println(format.format("GROUP", "STATE"))
-      for ((groupId, state) <- groupsAndStates) {
-        println(format.format(groupId, state))
-      }
-    }
-
     private def shouldPrintMemberState(group: String, state: Option[String], 
numRows: Option[Int]): Boolean = {
       // numRows contains the number of data rows, if any, compiled from the 
API call in the caller method.
       // if it's undefined or 0, there is no relevant group information to 
display.
@@ -1024,6 +1070,9 @@ object ConsumerGroupCommand extends Logging {
       "When specified with '--list', it displays the state of all groups. It 
can also be used to list groups with specific states." + nl +
       "Example: --bootstrap-server localhost:9092 --list --state stable,empty" 
+ nl +
       "This option may be used with '--describe', '--list' and 
'--bootstrap-server' options only."
+    private val TypeDoc = "When specified with '--list', it displays the types 
of all the groups. It can also be used to list groups with specific types." + 
nl +
+      "Example: --bootstrap-server localhost:9092 --list --type 
classic,consumer" + nl +
+      "This option may be used with the '--list' option only."
     private val DeleteOffsetsDoc = "Delete offsets of consumer group. Supports 
one consumer group at the time, and multiple topics."
 
     val bootstrapServerOpt: OptionSpec[String] = 
parser.accepts("bootstrap-server", BootstrapServerDoc)
@@ -1090,6 +1139,10 @@ object ConsumerGroupCommand extends Logging {
                          .availableIf(describeOpt, listOpt)
                          .withOptionalArg()
                          .ofType(classOf[String])
+    val typeOpt: OptionSpec[String] = parser.accepts("type", TypeDoc)
+                        .availableIf(listOpt)
+                        .withOptionalArg()
+                        .ofType(classOf[String])
 
     options = parser.parse(args : _*)
 
diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala 
b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
index bb3259baf98..20159830943 100644
--- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
@@ -26,6 +26,7 @@ import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.{Arguments, MethodSource}
 
+import java.util
 import java.util.Properties
 import java.util.concurrent.atomic.AtomicInteger
 import scala.jdk.CollectionConverters._
@@ -117,11 +118,12 @@ object BaseConsumerTest {
   // * KRaft with the new group coordinator enabled and the classic group 
protocol
   // * KRaft with the new group coordinator enabled and the consumer group 
protocol
   def getTestQuorumAndGroupProtocolParametersAll() : 
java.util.stream.Stream[Arguments] = {
-    java.util.stream.Stream.of(
+    util.Arrays.stream(Array(
         Arguments.of("zk", "classic"),
         Arguments.of("kraft", "classic"),
         Arguments.of("kraft+kip848", "classic"),
-        Arguments.of("kraft+kip848", "consumer"))
+        Arguments.of("kraft+kip848", "consumer")
+    ))
   }
 
   // In Scala 2.12, it is necessary to disambiguate the 
java.util.stream.Stream.of() method call
@@ -138,10 +140,19 @@ object BaseConsumerTest {
   // * KRaft and the classic group protocol
   // * KRaft with the new group coordinator enabled and the classic group 
protocol
   def getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly() : 
java.util.stream.Stream[Arguments] = {
-    java.util.stream.Stream.of(
+    util.Arrays.stream(Array(
         Arguments.of("zk", "classic"),
         Arguments.of("kraft", "classic"),
-        Arguments.of("kraft+kip848", "classic"))
+        Arguments.of("kraft+kip848", "classic")
+    ))
+  }
+
+  // For tests that only work with the consumer group protocol, we want to 
test the following combination:
+  // * KRaft with the new group coordinator enabled and the consumer group 
protocol
+  def getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly(): 
java.util.stream.Stream[Arguments] = {
+    util.Arrays.stream(Array(
+        Arguments.of("kraft+kip848", "consumer")
+    ))
   }
 
   val updateProducerCount = new AtomicInteger()
diff --git 
a/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala 
b/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala
index 18c7a0a8f81..f682df1f1dc 100644
--- a/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala
@@ -19,7 +19,7 @@ package kafka.admin
 
 import java.time.Duration
 import java.util.concurrent.{ExecutorService, Executors, TimeUnit}
-import java.util.{Collections, Properties}
+import java.util.{Collections, Properties, stream}
 import kafka.admin.ConsumerGroupCommand.{ConsumerGroupCommandOptions, 
ConsumerGroupService}
 import kafka.api.BaseConsumerTest
 import kafka.integration.KafkaServerTestHarness
@@ -31,6 +31,7 @@ import org.apache.kafka.common.{PartitionInfo, TopicPartition}
 import org.apache.kafka.common.errors.WakeupException
 import org.apache.kafka.common.serialization.StringDeserializer
 import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
+import org.junit.jupiter.params.provider.Arguments
 
 import scala.jdk.CollectionConverters._
 import scala.collection.mutable.ArrayBuffer
@@ -122,7 +123,9 @@ class ConsumerGroupCommandTest extends 
KafkaServerTestHarness {
 }
 
 object ConsumerGroupCommandTest {
-   def getTestQuorumAndGroupProtocolParametersAll() = 
BaseConsumerTest.getTestQuorumAndGroupProtocolParametersAll()
+   def getTestQuorumAndGroupProtocolParametersAll(): stream.Stream[Arguments] 
= BaseConsumerTest.getTestQuorumAndGroupProtocolParametersAll()
+   def getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly(): 
stream.Stream[Arguments] = 
BaseConsumerTest.getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly()
+   def getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly(): 
stream.Stream[Arguments] = 
BaseConsumerTest.getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly()
 
   abstract class AbstractConsumerRunnable(broker: String, groupId: String, 
customPropsOpt: Option[Properties] = None,
                                           syncCommit: Boolean = false) extends 
Runnable {
diff --git a/tools/src/test/java/org/apache/kafka/tools/ToolsTestUtils.java 
b/tools/src/test/java/org/apache/kafka/tools/ToolsTestUtils.java
index fdc732ea29a..83fa31bf5e7 100644
--- a/tools/src/test/java/org/apache/kafka/tools/ToolsTestUtils.java
+++ b/tools/src/test/java/org/apache/kafka/tools/ToolsTestUtils.java
@@ -45,6 +45,8 @@ import java.util.stream.Collectors;
 public class ToolsTestUtils {
     /** @see TestInfoUtils#TestWithParameterizedQuorumName()  */
     public static final String TEST_WITH_PARAMETERIZED_QUORUM_NAME = 
"{displayName}.{argumentsWithNames}";
+    /** @see TestInfoUtils#TestWithParameterizedQuorumAndGroupProtocolNames()  
*/
+    public static final String 
TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES = 
"{displayName}.quorum={0}.groupProtocol={1}";
 
     private static int randomPort = 0;
 
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTest.java
 
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTest.java
index b78054cb4ad..bde3af37a1d 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTest.java
@@ -58,6 +58,7 @@ import java.util.stream.Stream;
 public class ConsumerGroupCommandTest extends 
kafka.integration.KafkaServerTestHarness {
     public static final String TOPIC = "foo";
     public static final String GROUP = "test.group";
+    public static final String PROTOCOL_GROUP = "protocol-group";
 
     List<ConsumerGroupCommand.ConsumerGroupService> consumerGroupService = new 
ArrayList<>();
     List<AbstractConsumerGroupExecutor> consumerGroupExecutors = new 
ArrayList<>();
@@ -154,8 +155,8 @@ public class ConsumerGroupCommandTest extends 
kafka.integration.KafkaServerTestH
         return addConsumerGroupExecutor(numConsumers, TOPIC, GROUP, 
RangeAssignor.class.getName(), remoteAssignor, Optional.empty(), false, 
groupProtocol);
     }
 
-    ConsumerGroupExecutor addConsumerGroupExecutor(int numConsumers, String 
topic, String group) {
-        return addConsumerGroupExecutor(numConsumers, topic, group, 
RangeAssignor.class.getName(), Optional.empty(), Optional.empty(), false, 
GroupProtocol.CLASSIC.name);
+    ConsumerGroupExecutor addConsumerGroupExecutor(int numConsumers, String 
group, String groupProtocol) {
+        return addConsumerGroupExecutor(numConsumers, TOPIC, group, 
RangeAssignor.class.getName(), Optional.empty(), Optional.empty(), false, 
groupProtocol);
     }
 
     ConsumerGroupExecutor addConsumerGroupExecutor(int numConsumers, String 
topic, String group, String groupProtocol) {
@@ -342,6 +343,14 @@ public class ConsumerGroupCommandTest extends 
kafka.integration.KafkaServerTestH
         return BaseConsumerTest.getTestQuorumAndGroupProtocolParametersAll();
     }
 
+    public static Stream<Arguments> 
getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly() {
+        return 
BaseConsumerTest.getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly();
+    }
+
+    public static Stream<Arguments> 
getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly() {
+        return 
BaseConsumerTest.getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly();
+    }
+
     @SuppressWarnings({"deprecation"})
     static <T> Seq<T> seq(Collection<T> seq) {
         return 
JavaConverters.asScalaIteratorConverter(seq.iterator()).asScala().toSeq();
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java
 
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java
index 894f00df5e7..ba5ebd254fc 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java
@@ -20,83 +20,258 @@ import joptsimple.OptionException;
 import kafka.admin.ConsumerGroupCommand;
 import org.apache.kafka.clients.admin.ConsumerGroupListing;
 import org.apache.kafka.common.ConsumerGroupState;
+import org.apache.kafka.common.GroupType;
 import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.EnumSet;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.Properties;
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
+import static 
org.apache.kafka.tools.ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES;
 import static org.apache.kafka.common.utils.Utils.mkSet;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
 public class ListConsumerGroupTest extends ConsumerGroupCommandTest {
-    @ParameterizedTest
-    @ValueSource(strings = {"zk", "kraft"})
-    public void testListConsumerGroups(String quorum) throws Exception {
+    @ParameterizedTest(name = 
TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
+    @MethodSource("getTestQuorumAndGroupProtocolParametersAll")
+    public void testListConsumerGroupsWithoutFilters(String quorum, String 
groupProtocol) throws Exception {
         String simpleGroup = "simple-group";
+
+        createOffsetsTopic(listenerName(), new Properties());
+
         addSimpleGroupExecutor(simpleGroup);
         addConsumerGroupExecutor(1);
+        addConsumerGroupExecutor(1, PROTOCOL_GROUP, groupProtocol);
 
         String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list"};
         ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(cgcArgs);
-        scala.collection.Set<String> expectedGroups = set(Arrays.asList(GROUP, 
simpleGroup));
+
+        scala.collection.Set<String> expectedGroups = set(Arrays.asList(GROUP, 
simpleGroup, PROTOCOL_GROUP));
         final AtomicReference<scala.collection.Set> foundGroups = new 
AtomicReference<>();
+
         TestUtils.waitForCondition(() -> {
             foundGroups.set(service.listConsumerGroups().toSet());
             return Objects.equals(expectedGroups, foundGroups.get());
         }, "Expected --list to show groups " + expectedGroups + ", but found " 
+ foundGroups.get() + ".");
     }
 
-    @ParameterizedTest
-    @ValueSource(strings = {"zk", "kraft"})
+    @Test
     public void testListWithUnrecognizedNewConsumerOption() {
         String[] cgcArgs = new String[]{"--new-consumer", 
"--bootstrap-server", bootstrapServers(listenerName()), "--list"};
         assertThrows(OptionException.class, () -> 
getConsumerGroupService(cgcArgs));
     }
 
-    @ParameterizedTest
-    @ValueSource(strings = {"zk", "kraft"})
-    public void testListConsumerGroupsWithStates() throws Exception {
+    @ParameterizedTest(name = 
TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
+    @MethodSource("getTestQuorumAndGroupProtocolParametersAll")
+    public void testListConsumerGroupsWithStates(String quorum, String 
groupProtocol) throws Exception {
         String simpleGroup = "simple-group";
+
+        createOffsetsTopic(listenerName(), new Properties());
+
         addSimpleGroupExecutor(simpleGroup);
-        addConsumerGroupExecutor(1);
+        addConsumerGroupExecutor(1, groupProtocol);
 
         String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--state"};
         ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(cgcArgs);
 
-        scala.collection.Set<ConsumerGroupListing> expectedListing = 
set(Arrays.asList(
-            new ConsumerGroupListing(simpleGroup, true, 
Optional.of(ConsumerGroupState.EMPTY)),
-            new ConsumerGroupListing(GROUP, false, 
Optional.of(ConsumerGroupState.STABLE))));
+        Set<ConsumerGroupListing> expectedListing = mkSet(
+            new ConsumerGroupListing(
+                simpleGroup,
+                true,
+                Optional.of(ConsumerGroupState.EMPTY),
+                Optional.of(GroupType.CLASSIC)
+            ),
+            new ConsumerGroupListing(
+                GROUP,
+                false,
+                Optional.of(ConsumerGroupState.STABLE),
+                Optional.of(GroupType.parse(groupProtocol))
+            )
+        );
 
-        final AtomicReference<scala.collection.Set> foundListing = new 
AtomicReference<>();
-        TestUtils.waitForCondition(() -> {
-            
foundListing.set(service.listConsumerGroupsWithState(set(Arrays.asList(ConsumerGroupState.values()))).toSet());
-            return Objects.equals(expectedListing, foundListing.get());
-        }, "Expected to show groups " + expectedListing + ", but found " + 
foundListing.get());
+        assertGroupListing(
+            service,
+            Collections.emptySet(),
+            EnumSet.allOf(ConsumerGroupState.class),
+            expectedListing
+        );
 
-        scala.collection.Set<ConsumerGroupListing> expectedListingStable = 
set(Collections.singleton(
-            new ConsumerGroupListing(GROUP, false, 
Optional.of(ConsumerGroupState.STABLE))));
+        expectedListing = mkSet(
+            new ConsumerGroupListing(
+                GROUP,
+                false,
+                Optional.of(ConsumerGroupState.STABLE),
+                Optional.of(GroupType.parse(groupProtocol))
+            )
+        );
 
-        foundListing.set(null);
+        assertGroupListing(
+            service,
+            Collections.emptySet(),
+            mkSet(ConsumerGroupState.STABLE),
+            expectedListing
+        );
 
-        TestUtils.waitForCondition(() -> {
-            
foundListing.set(service.listConsumerGroupsWithState(set(Collections.singleton(ConsumerGroupState.STABLE))).toSet());
-            return Objects.equals(expectedListingStable, foundListing.get());
-        }, "Expected to show groups " + expectedListingStable + ", but found " 
+ foundListing.get());
+        assertGroupListing(
+            service,
+            Collections.emptySet(),
+            mkSet(ConsumerGroupState.PREPARING_REBALANCE),
+            Collections.emptySet()
+        );
     }
 
-    @ParameterizedTest
-    @ValueSource(strings = {"zk", "kraft"})
-    public void testConsumerGroupStatesFromString(String quorum) {
+    @ParameterizedTest(name = 
TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
+    
@MethodSource("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly")
+    public void testListConsumerGroupsWithTypesClassicProtocol(String quorum, 
String groupProtocol) throws Exception {
+        String simpleGroup = "simple-group";
+
+        createOffsetsTopic(listenerName(), new Properties());
+
+        addSimpleGroupExecutor(simpleGroup);
+        addConsumerGroupExecutor(1);
+
+        String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list"};
+        ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(cgcArgs);
+
+        Set<ConsumerGroupListing> expectedListing = mkSet(
+            new ConsumerGroupListing(
+                simpleGroup,
+                true,
+                Optional.of(ConsumerGroupState.EMPTY),
+                Optional.of(GroupType.CLASSIC)
+            ),
+            new ConsumerGroupListing(
+                GROUP,
+                false,
+                Optional.of(ConsumerGroupState.STABLE),
+                Optional.of(GroupType.CLASSIC)
+            )
+        );
+
+        // No filters explicitly mentioned. Expectation is that all groups are 
returned.
+        assertGroupListing(
+            service,
+            Collections.emptySet(),
+            Collections.emptySet(),
+            expectedListing
+        );
+
+        // When group type is mentioned:
+        // Old Group Coordinator returns empty listings if the type is not 
Classic.
+        // New Group Coordinator returns groups according to the filter.
+        assertGroupListing(
+            service,
+            mkSet(GroupType.CONSUMER),
+            Collections.emptySet(),
+            Collections.emptySet()
+        );
+
+        assertGroupListing(
+            service,
+            mkSet(GroupType.CLASSIC),
+            Collections.emptySet(),
+            expectedListing
+        );
+    }
+
+    @ParameterizedTest(name = 
TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
+    
@MethodSource("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly")
+    public void testListConsumerGroupsWithTypesConsumerProtocol(String quorum, 
String groupProtocol) throws Exception {
+        String simpleGroup = "simple-group";
+
+        createOffsetsTopic(listenerName(), new Properties());
+
+        addSimpleGroupExecutor(simpleGroup);
+        addConsumerGroupExecutor(1);
+        addConsumerGroupExecutor(1, PROTOCOL_GROUP, groupProtocol);
+
+        String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list"};
+        ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(cgcArgs);
+
+        // No filters explicitly mentioned. Expectation is that all groups are 
returned.
+        Set<ConsumerGroupListing> expectedListing = mkSet(
+            new ConsumerGroupListing(
+                simpleGroup,
+                true,
+                Optional.of(ConsumerGroupState.EMPTY),
+                Optional.of(GroupType.CLASSIC)
+            ),
+            new ConsumerGroupListing(
+                GROUP,
+                false,
+                Optional.of(ConsumerGroupState.STABLE),
+                Optional.of(GroupType.CLASSIC)
+            ),
+            new ConsumerGroupListing(
+                PROTOCOL_GROUP,
+                false,
+                Optional.of(ConsumerGroupState.STABLE),
+                Optional.of(GroupType.CONSUMER)
+            )
+        );
+
+        assertGroupListing(
+            service,
+            Collections.emptySet(),
+            Collections.emptySet(),
+            expectedListing
+        );
+
+        // When group type is mentioned:
+        // New Group Coordinator returns groups according to the filter.
+        expectedListing = mkSet(
+            new ConsumerGroupListing(
+                PROTOCOL_GROUP,
+                false,
+                Optional.of(ConsumerGroupState.STABLE),
+                Optional.of(GroupType.CONSUMER)
+            )
+        );
+
+        assertGroupListing(
+            service,
+            mkSet(GroupType.CONSUMER),
+            Collections.emptySet(),
+            expectedListing
+        );
+
+        expectedListing = mkSet(
+            new ConsumerGroupListing(
+                simpleGroup,
+                true,
+                Optional.of(ConsumerGroupState.EMPTY),
+                Optional.of(GroupType.CLASSIC)
+            ),
+            new ConsumerGroupListing(
+                GROUP,
+                false,
+                Optional.of(ConsumerGroupState.STABLE),
+                Optional.of(GroupType.CLASSIC)
+            )
+        );
+
+        assertGroupListing(
+            service,
+            mkSet(GroupType.CLASSIC),
+            Collections.emptySet(),
+            expectedListing
+        );
+    }
+
+    @Test
+    public void testConsumerGroupStatesFromString() {
         scala.collection.Set<ConsumerGroupState> result = 
ConsumerGroupCommand.consumerGroupStatesFromString("Stable");
         assertEquals(set(Collections.singleton(ConsumerGroupState.STABLE)), 
result);
 
@@ -107,7 +282,7 @@ public class ListConsumerGroupTest extends 
ConsumerGroupCommandTest {
         assertEquals(set(Arrays.asList(ConsumerGroupState.DEAD, 
ConsumerGroupState.COMPLETING_REBALANCE)), result);
 
         result = ConsumerGroupCommand.consumerGroupStatesFromString("stable");
-        assertEquals(set(Arrays.asList(ConsumerGroupState.STABLE)), result);
+        
assertEquals(set(Collections.singletonList(ConsumerGroupState.STABLE)), result);
 
         result = ConsumerGroupCommand.consumerGroupStatesFromString("stable, 
assigning");
         assertEquals(set(Arrays.asList(ConsumerGroupState.STABLE, 
ConsumerGroupState.ASSIGNING)), result);
@@ -122,10 +297,31 @@ public class ListConsumerGroupTest extends 
ConsumerGroupCommandTest {
         assertThrows(IllegalArgumentException.class, () -> 
ConsumerGroupCommand.consumerGroupStatesFromString("   ,   ,"));
     }
 
-    @ParameterizedTest
-    @ValueSource(strings = {"zk", "kraft"})
-    public void testListGroupCommand(String quorum) throws Exception {
+    @Test
+    public void testConsumerGroupTypesFromString() {
+        scala.collection.Set<GroupType> result = 
ConsumerGroupCommand.consumerGroupTypesFromString("consumer");
+        assertEquals(set(Collections.singleton(GroupType.CONSUMER)), result);
+
+        result = ConsumerGroupCommand.consumerGroupTypesFromString("consumer, 
classic");
+        assertEquals(set(Arrays.asList(GroupType.CONSUMER, 
GroupType.CLASSIC)), result);
+
+        result = ConsumerGroupCommand.consumerGroupTypesFromString("Consumer, 
Classic");
+        assertEquals(set(Arrays.asList(GroupType.CONSUMER, 
GroupType.CLASSIC)), result);
+
+        assertThrows(IllegalArgumentException.class, () -> 
ConsumerGroupCommand.consumerGroupTypesFromString("bad, wrong"));
+
+        assertThrows(IllegalArgumentException.class, () -> 
ConsumerGroupCommand.consumerGroupTypesFromString("  bad, generic"));
+
+        assertThrows(IllegalArgumentException.class, () -> 
ConsumerGroupCommand.consumerGroupTypesFromString("   ,   ,"));
+    }
+
+    @ParameterizedTest(name = 
TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
+    
@MethodSource("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly")
+    public void testListGroupCommandClassicProtocol(String quorum, String 
groupProtocol) throws Exception {
         String simpleGroup = "simple-group";
+
+        createOffsetsTopic(listenerName(), new Properties());
+
         addSimpleGroupExecutor(simpleGroup);
         addConsumerGroupExecutor(1);
 
@@ -147,6 +343,24 @@ public class ListConsumerGroupTest extends 
ConsumerGroupCommandTest {
             )
         );
 
+        validateListOutput(
+            Arrays.asList("--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--type"),
+            Arrays.asList("GROUP", "TYPE"),
+            mkSet(
+                Arrays.asList(GROUP, "Classic"),
+                Arrays.asList(simpleGroup, "Classic")
+            )
+        );
+
+        validateListOutput(
+            Arrays.asList("--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--type", "--state"),
+            Arrays.asList("GROUP", "TYPE", "STATE"),
+            mkSet(
+                Arrays.asList(GROUP, "Classic", "Stable"),
+                Arrays.asList(simpleGroup, "Classic", "Empty")
+            )
+        );
+
         validateListOutput(
             Arrays.asList("--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--state", "Stable"),
             Arrays.asList("GROUP", "STATE"),
@@ -155,6 +369,7 @@ public class ListConsumerGroupTest extends 
ConsumerGroupCommandTest {
             )
         );
 
+        // Check case-insensitivity in state filter.
         validateListOutput(
             Arrays.asList("--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--state", "stable"),
             Arrays.asList("GROUP", "STATE"),
@@ -162,6 +377,109 @@ public class ListConsumerGroupTest extends 
ConsumerGroupCommandTest {
                 Arrays.asList(GROUP, "Stable")
             )
         );
+
+        validateListOutput(
+            Arrays.asList("--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--type", "Classic"),
+            Arrays.asList("GROUP", "TYPE"),
+            mkSet(
+                Arrays.asList(GROUP, "Classic"),
+                Arrays.asList(simpleGroup, "Classic")
+            )
+        );
+
+        // Check case-insensitivity in type filter.
+        validateListOutput(
+            Arrays.asList("--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--type", "classic"),
+            Arrays.asList("GROUP", "TYPE"),
+            mkSet(
+                Arrays.asList(GROUP, "Classic"),
+                Arrays.asList(simpleGroup, "Classic")
+            )
+        );
+    }
+
+    @ParameterizedTest(name = 
TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
+    
@MethodSource("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly")
+    public void testListGroupCommandConsumerProtocol(String quorum, String 
groupProtocol) throws Exception {
+        String simpleGroup = "simple-group";
+
+        createOffsetsTopic(listenerName(), new Properties());
+
+        addSimpleGroupExecutor(simpleGroup);
+        addConsumerGroupExecutor(1, PROTOCOL_GROUP, groupProtocol);
+
+        validateListOutput(
+            Arrays.asList("--bootstrap-server", 
bootstrapServers(listenerName()), "--list"),
+            Collections.emptyList(),
+            mkSet(
+                Collections.singletonList(PROTOCOL_GROUP),
+                Collections.singletonList(simpleGroup)
+            )
+        );
+
+        validateListOutput(
+            Arrays.asList("--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--state"),
+            Arrays.asList("GROUP", "STATE"),
+            mkSet(
+                Arrays.asList(PROTOCOL_GROUP, "Stable"),
+                Arrays.asList(simpleGroup, "Empty")
+            )
+        );
+
+        validateListOutput(
+            Arrays.asList("--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--type"),
+            Arrays.asList("GROUP", "TYPE"),
+            mkSet(
+                Arrays.asList(PROTOCOL_GROUP, "Consumer"),
+                Arrays.asList(simpleGroup, "Classic")
+            )
+        );
+
+        validateListOutput(
+            Arrays.asList("--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--type", "--state"),
+            Arrays.asList("GROUP", "TYPE", "STATE"),
+            mkSet(
+                Arrays.asList(PROTOCOL_GROUP, "Consumer", "Stable"),
+                Arrays.asList(simpleGroup, "Classic", "Empty")
+            )
+        );
+
+        validateListOutput(
+            Arrays.asList("--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--type", "consumer"),
+            Arrays.asList("GROUP", "TYPE"),
+            mkSet(
+                Arrays.asList(PROTOCOL_GROUP, "Consumer")
+            )
+        );
+
+        validateListOutput(
+            Arrays.asList("--bootstrap-server", 
bootstrapServers(listenerName()), "--list", "--type", "consumer", "--state", 
"Stable"),
+            Arrays.asList("GROUP", "TYPE", "STATE"),
+            mkSet(
+                Arrays.asList(PROTOCOL_GROUP, "Consumer", "Stable")
+            )
+        );
+    }
+
+    /**
+     * Validates the consumer group listings returned against expected values 
using specified filters.
+     *
+     * @param service           The service to list consumer groups.
+     * @param typeFilterSet     Filters for group types, empty for no filter.
+     * @param stateFilterSet    Filters for group states, empty for no filter.
+     * @param expectedListing   Expected consumer group listings.
+     */
+    private static void assertGroupListing(
+        ConsumerGroupCommand.ConsumerGroupService service,
+        Set<GroupType> typeFilterSet,
+        Set<ConsumerGroupState> stateFilterSet,
+        Set<ConsumerGroupListing> expectedListing
+    ) throws Exception {
+        final AtomicReference<scala.collection.Set> foundListing = new 
AtomicReference<>();
+        TestUtils.waitForCondition(() -> {
+            
foundListing.set(service.listConsumerGroupsWithFilters(set(typeFilterSet), 
set(stateFilterSet)).toSet());
+            return Objects.equals(set(expectedListing), foundListing.get());
+        }, () -> "Expected to show groups " + expectedListing + ", but found " 
+ foundListing.get() + ".");
     }
 
     /**

Reply via email to