This is an automated email from the ASF dual-hosted git repository. dajac pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 96c68096a26 KAFKA-15462: Add Group Type Filter for List Group to the Admin Client (#15150) 96c68096a26 is described below commit 96c68096a26ea5e7c2333308dfbaef47cb1eac72 Author: Ritika Reddy <98577846+rreddy...@users.noreply.github.com> AuthorDate: Thu Feb 29 00:38:42 2024 -0800 KAFKA-15462: Add Group Type Filter for List Group to the Admin Client (#15150) In KIP-848, we introduce the notion of Group Types based on the protocol type that the members in the consumer group use. As of now we support two types of groups: * Classic : Members use the classic consumer group protocol ( existing one ) * Consumer : Members use the consumer group protocol introduced in KIP-848. Currently List Groups allows users to list all the consumer groups available. KIP-518 introduced filtering the consumer groups by the state that they are in. We now want to allow users to filter consumer groups by type. This patch includes the changes to the admin client and related files. It also includes changes to parameterize the tests to include permutations of the old GC and the new GC with the different protocol types. Reviewers: David Jacot <dja...@confluent.io> --- checkstyle/suppressions.xml | 1 + .../kafka/clients/admin/ConsumerGroupListing.java | 64 ++-- .../kafka/clients/admin/KafkaAdminClient.java | 20 +- .../clients/admin/ListConsumerGroupsOptions.java | 25 +- .../kafka/clients/admin/KafkaAdminClientTest.java | 124 ++++++- .../scala/kafka/admin/ConsumerGroupCommand.scala | 107 ++++-- .../integration/kafka/api/BaseConsumerTest.scala | 19 +- .../kafka/admin/ConsumerGroupCommandTest.scala | 7 +- .../org/apache/kafka/tools/ToolsTestUtils.java | 2 + .../consumer/group/ConsumerGroupCommandTest.java | 13 +- .../consumer/group/ListConsumerGroupTest.java | 386 +++++++++++++++++++-- 11 files changed, 669 insertions(+), 99 deletions(-) diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index 7486ef9a80d..c65cd675a9e 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -46,6 +46,7 @@ <!-- server tests --> <suppress checks="MethodLength|JavaNCSS|NPath" files="DescribeTopicPartitionsRequestHandlerTest.java"/> + <suppress checks="CyclomaticComplexity" files="ListConsumerGroupTest.java"/> <!-- Clients --> <suppress id="dontUseSystemExit" diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupListing.java b/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupListing.java index 0abc3e01ca9..01c23796d41 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupListing.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupListing.java @@ -21,6 +21,7 @@ import java.util.Objects; import java.util.Optional; import org.apache.kafka.common.ConsumerGroupState; +import org.apache.kafka.common.GroupType; /** * A listing of a consumer group in the cluster. @@ -29,6 +30,7 @@ public class ConsumerGroupListing { private final String groupId; private final boolean isSimpleConsumerGroup; private final Optional<ConsumerGroupState> state; + private final Optional<GroupType> type; /** * Create an instance with the specified parameters. @@ -37,7 +39,7 @@ public class ConsumerGroupListing { * @param isSimpleConsumerGroup If consumer group is simple or not. */ public ConsumerGroupListing(String groupId, boolean isSimpleConsumerGroup) { - this(groupId, isSimpleConsumerGroup, Optional.empty()); + this(groupId, isSimpleConsumerGroup, Optional.empty(), Optional.empty()); } /** @@ -48,9 +50,27 @@ public class ConsumerGroupListing { * @param state The state of the consumer group */ public ConsumerGroupListing(String groupId, boolean isSimpleConsumerGroup, Optional<ConsumerGroupState> state) { + this(groupId, isSimpleConsumerGroup, state, Optional.empty()); + } + + /** + * Create an instance with the specified parameters. + * + * @param groupId Group Id. + * @param isSimpleConsumerGroup If consumer group is simple or not. + * @param state The state of the consumer group. + * @param type The type of the consumer group. + */ + public ConsumerGroupListing( + String groupId, + boolean isSimpleConsumerGroup, + Optional<ConsumerGroupState> state, + Optional<GroupType> type + ) { this.groupId = groupId; this.isSimpleConsumerGroup = isSimpleConsumerGroup; this.state = Objects.requireNonNull(state); + this.type = Objects.requireNonNull(type); } /** @@ -74,42 +94,38 @@ public class ConsumerGroupListing { return state; } + /** + * The type of the consumer group. + * + * @return An Optional containing the type, if available. + */ + public Optional<GroupType> type() { + return type; + } + @Override public String toString() { return "(" + "groupId='" + groupId + '\'' + ", isSimpleConsumerGroup=" + isSimpleConsumerGroup + ", state=" + state + + ", type=" + type + ')'; } @Override public int hashCode() { - return Objects.hash(groupId, isSimpleConsumerGroup, state); + return Objects.hash(groupId, isSimpleConsumerGroup(), state, type); } @Override - public boolean equals(Object obj) { - if (this == obj) - return true; - if (obj == null) - return false; - if (getClass() != obj.getClass()) - return false; - ConsumerGroupListing other = (ConsumerGroupListing) obj; - if (groupId == null) { - if (other.groupId != null) - return false; - } else if (!groupId.equals(other.groupId)) - return false; - if (isSimpleConsumerGroup != other.isSimpleConsumerGroup) - return false; - if (state == null) { - if (other.state != null) - return false; - } else if (!state.equals(other.state)) - return false; - return true; + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof ConsumerGroupListing)) return false; + ConsumerGroupListing that = (ConsumerGroupListing) o; + return isSimpleConsumerGroup() == that.isSimpleConsumerGroup() && + Objects.equals(groupId, that.groupId) && + Objects.equals(state, that.state) && + Objects.equals(type, that.type); } - } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 85c82e25144..d98ad8ac04e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -58,6 +58,7 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.internals.ConsumerProtocol; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.ConsumerGroupState; +import org.apache.kafka.common.GroupType; import org.apache.kafka.common.ElectionType; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.KafkaFuture; @@ -3382,7 +3383,14 @@ public class KafkaAdminClient extends AdminClient { .stream() .map(ConsumerGroupState::toString) .collect(Collectors.toList()); - return new ListGroupsRequest.Builder(new ListGroupsRequestData().setStatesFilter(states)); + List<String> groupTypes = options.types() + .stream() + .map(GroupType::toString) + .collect(Collectors.toList()); + return new ListGroupsRequest.Builder(new ListGroupsRequestData() + .setStatesFilter(states) + .setTypesFilter(groupTypes) + ); } private void maybeAddConsumerGroup(ListGroupsResponseData.ListedGroup group) { @@ -3392,7 +3400,15 @@ public class KafkaAdminClient extends AdminClient { final Optional<ConsumerGroupState> state = group.groupState().equals("") ? Optional.empty() : Optional.of(ConsumerGroupState.parse(group.groupState())); - final ConsumerGroupListing groupListing = new ConsumerGroupListing(groupId, protocolType.isEmpty(), state); + final Optional<GroupType> type = group.groupType().equals("") + ? Optional.empty() + : Optional.of(GroupType.parse(group.groupType())); + final ConsumerGroupListing groupListing = new ConsumerGroupListing( + groupId, + protocolType.isEmpty(), + state, + type + ); results.addListing(groupListing); } } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java index 9f1f38dd4a8..c240da159ff 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java @@ -22,6 +22,7 @@ import java.util.HashSet; import java.util.Set; import org.apache.kafka.common.ConsumerGroupState; +import org.apache.kafka.common.GroupType; import org.apache.kafka.common.annotation.InterfaceStability; /** @@ -34,20 +35,38 @@ public class ListConsumerGroupsOptions extends AbstractOptions<ListConsumerGroup private Set<ConsumerGroupState> states = Collections.emptySet(); + private Set<GroupType> types = Collections.emptySet(); + /** - * If states is set, only groups in these states will be returned by listConsumerGroups() + * If states is set, only groups in these states will be returned by listConsumerGroups(). * Otherwise, all groups are returned. * This operation is supported by brokers with version 2.6.0 or later. */ public ListConsumerGroupsOptions inStates(Set<ConsumerGroupState> states) { - this.states = (states == null) ? Collections.emptySet() : new HashSet<>(states); + this.states = (states == null || states.isEmpty()) ? Collections.emptySet() : new HashSet<>(states); return this; } /** - * Returns the list of States that are requested or empty if no states have been specified + * If types is set, only groups of these types will be returned by listConsumerGroups(). + * Otherwise, all groups are returned. + */ + public ListConsumerGroupsOptions withTypes(Set<GroupType> types) { + this.types = (types == null || types.isEmpty()) ? Collections.emptySet() : new HashSet<>(types); + return this; + } + + /** + * Returns the list of States that are requested or empty if no states have been specified. */ public Set<ConsumerGroupState> states() { return states; } + + /** + * Returns the list of group types that are requested or empty if no types have been specified. + */ + public Set<GroupType> types() { + return types; + } } diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index b8b3d54ef43..43d391a220e 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -243,6 +243,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.OptionalInt; import java.util.OptionalLong; @@ -2811,6 +2812,68 @@ public class KafkaAdminClientTest { } } + @Test + public void testListConsumerGroupsWithTypes() throws Exception { + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + + // Test with a specific state filter but no type filter in list consumer group options. + env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE)); + + env.kafkaClient().prepareResponseFrom( + expectListGroupsRequestWithFilters(singleton(ConsumerGroupState.STABLE.toString()), Collections.emptySet()), + new ListGroupsResponse(new ListGroupsResponseData() + .setErrorCode(Errors.NONE.code()) + .setGroups(Arrays.asList( + new ListGroupsResponseData.ListedGroup() + .setGroupId("group-1") + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setGroupState("Stable") + .setGroupType(GroupType.CLASSIC.toString())))), + env.cluster().nodeById(0)); + + final ListConsumerGroupsOptions options = new ListConsumerGroupsOptions().inStates(singleton(ConsumerGroupState.STABLE)); + final ListConsumerGroupsResult result = env.adminClient().listConsumerGroups(options); + Collection<ConsumerGroupListing> listings = result.valid().get(); + + assertEquals(1, listings.size()); + List<ConsumerGroupListing> expected = new ArrayList<>(); + expected.add(new ConsumerGroupListing("group-1", false, Optional.of(ConsumerGroupState.STABLE), Optional.of(GroupType.CLASSIC))); + assertEquals(expected, listings); + assertEquals(0, result.errors().get().size()); + + // Test with list consumer group options. + env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE)); + + env.kafkaClient().prepareResponseFrom( + expectListGroupsRequestWithFilters(Collections.emptySet(), singleton(GroupType.CONSUMER.toString())), + new ListGroupsResponse(new ListGroupsResponseData() + .setErrorCode(Errors.NONE.code()) + .setGroups(Arrays.asList( + new ListGroupsResponseData.ListedGroup() + .setGroupId("group-1") + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setGroupState("Stable") + .setGroupType(GroupType.CONSUMER.toString()), + new ListGroupsResponseData.ListedGroup() + .setGroupId("group-2") + .setGroupState("Empty") + .setGroupType(GroupType.CONSUMER.toString())))), + env.cluster().nodeById(0)); + + final ListConsumerGroupsOptions options2 = new ListConsumerGroupsOptions().withTypes(singleton(GroupType.CONSUMER)); + final ListConsumerGroupsResult result2 = env.adminClient().listConsumerGroups(options2); + Collection<ConsumerGroupListing> listings2 = result2.valid().get(); + + assertEquals(2, listings2.size()); + List<ConsumerGroupListing> expected2 = new ArrayList<>(); + expected2.add(new ConsumerGroupListing("group-2", true, Optional.of(ConsumerGroupState.EMPTY), Optional.of(GroupType.CONSUMER))); + expected2.add(new ConsumerGroupListing("group-1", false, Optional.of(ConsumerGroupState.STABLE), Optional.of(GroupType.CONSUMER))); + assertEquals(expected2, listings2); + assertEquals(0, result.errors().get().size()); + } + } + @Test public void testListConsumerGroupsWithStatesOlderBrokerVersion() throws Exception { ApiVersion listGroupV3 = new ApiVersion() @@ -2835,7 +2898,7 @@ public class KafkaAdminClientTest { ListConsumerGroupsResult result = env.adminClient().listConsumerGroups(options); Collection<ConsumerGroupListing> listing = result.all().get(); assertEquals(1, listing.size()); - List<ConsumerGroupListing> expected = Collections.singletonList(new ConsumerGroupListing("group-1", false, Optional.empty())); + List<ConsumerGroupListing> expected = Collections.singletonList(new ConsumerGroupListing("group-1", false)); assertEquals(expected, listing); // But we cannot set a state filter with older broker @@ -2849,6 +2912,65 @@ public class KafkaAdminClientTest { } } + @Test + public void testListConsumerGroupsWithTypesOlderBrokerVersion() throws Exception { + ApiVersion listGroupV4 = new ApiVersion() + .setApiKey(ApiKeys.LIST_GROUPS.id) + .setMinVersion((short) 0) + .setMaxVersion((short) 4); + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(Collections.singletonList(listGroupV4))); + + env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE)); + + // Check if we can list groups with older broker if we specify states and don't specify types. + env.kafkaClient().prepareResponseFrom( + expectListGroupsRequestWithFilters(singleton(ConsumerGroupState.STABLE.toString()), Collections.emptySet()), + new ListGroupsResponse(new ListGroupsResponseData() + .setErrorCode(Errors.NONE.code()) + .setGroups(Collections.singletonList( + new ListGroupsResponseData.ListedGroup() + .setGroupId("group-1") + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setGroupState(ConsumerGroupState.STABLE.toString())))), + env.cluster().nodeById(0)); + + ListConsumerGroupsOptions options = new ListConsumerGroupsOptions().inStates(singleton(ConsumerGroupState.STABLE)); + ListConsumerGroupsResult result = env.adminClient().listConsumerGroups(options); + + Collection<ConsumerGroupListing> listing = result.all().get(); + assertEquals(1, listing.size()); + List<ConsumerGroupListing> expected = Collections.singletonList( + new ConsumerGroupListing("group-1", false, Optional.of(ConsumerGroupState.STABLE)) + ); + assertEquals(expected, listing); + + // Check that we cannot set a type filter with an older broker. + env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE)); + env.kafkaClient().prepareUnsupportedVersionResponse(request -> + request instanceof ListGroupsRequest && !((ListGroupsRequest) request).data().typesFilter().isEmpty() + ); + + options = new ListConsumerGroupsOptions().withTypes(singleton(GroupType.CLASSIC)); + result = env.adminClient().listConsumerGroups(options); + TestUtils.assertFutureThrows(result.all(), UnsupportedVersionException.class); + } + } + + private MockClient.RequestMatcher expectListGroupsRequestWithFilters( + Set<String> expectedStates, + Set<String> expectedTypes + ) { + return body -> { + if (body instanceof ListGroupsRequest) { + ListGroupsRequest request = (ListGroupsRequest) body; + return Objects.equals(new HashSet<>(request.data().statesFilter()), expectedStates) + && Objects.equals(new HashSet<>(request.data().typesFilter()), expectedTypes); + } + return false; + }; + } + @Test public void testOffsetCommitNumRetries() throws Exception { final Cluster cluster = mockCluster(3, 0); diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala index 4187274a22d..160b9a70aae 100755 --- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala +++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala @@ -29,7 +29,7 @@ import org.apache.kafka.clients.admin._ import org.apache.kafka.clients.consumer.OffsetAndMetadata import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.common.utils.Utils -import org.apache.kafka.common.{KafkaException, Node, TopicPartition} +import org.apache.kafka.common.{ConsumerGroupState, GroupType, KafkaException, Node, TopicPartition} import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils} import scala.jdk.CollectionConverters._ @@ -41,7 +41,6 @@ import org.apache.kafka.common.protocol.Errors import scala.collection.immutable.TreeMap import scala.reflect.ClassTag -import org.apache.kafka.common.ConsumerGroupState import org.apache.kafka.common.requests.ListOffsetsResponse object ConsumerGroupCommand extends Logging { @@ -104,6 +103,15 @@ object ConsumerGroupCommand extends Logging { parsedStates } + def consumerGroupTypesFromString(input: String): Set[GroupType] = { + val parsedTypes = input.toLowerCase.split(',').map(s => GroupType.parse(s.trim)).toSet + if (parsedTypes.contains(GroupType.UNKNOWN)) { + val validTypes = GroupType.values().filter(_ != GroupType.UNKNOWN) + throw new IllegalArgumentException(s"Invalid types list '$input'. Valid types are: ${validTypes.mkString(", ")}") + } + parsedTypes + } + val MISSING_COLUMN_VALUE = "-" private def printError(msg: String, e: Option[Throwable] = None): Unit = { @@ -135,7 +143,7 @@ object ConsumerGroupCommand extends Logging { private[admin] case class MemberAssignmentState(group: String, consumerId: String, host: String, clientId: String, groupInstanceId: String, numPartitions: Int, assignment: List[TopicPartition]) - case class GroupState(group: String, coordinator: Node, assignmentStrategy: String, state: String, numMembers: Int) + private[admin] case class GroupState(group: String, coordinator: Node, assignmentStrategy: String, state: String, numMembers: Int) private[admin] sealed trait CsvRecord private[admin] case class CsvRecordWithGroup(group: String, topic: String, partition: Int, offset: Long) extends CsvRecord @@ -189,16 +197,65 @@ object ConsumerGroupCommand extends Logging { } def listGroups(): Unit = { - if (opts.options.has(opts.stateOpt)) { - val stateValue = opts.options.valueOf(opts.stateOpt) - val states = if (stateValue == null || stateValue.isEmpty) - Set[ConsumerGroupState]() - else - consumerGroupStatesFromString(stateValue) - val listings = listConsumerGroupsWithState(states) - printGroupStates(listings.map(e => (e.groupId, e.state.get.toString))) - } else + val includeType = opts.options.has(opts.typeOpt) + val includeState = opts.options.has(opts.stateOpt) + + if (includeType || includeState) { + val types = typeValues() + val states = stateValues() + val listings = listConsumerGroupsWithFilters(types, states) + + printGroupInfo(listings, includeType, includeState) + + } else { listConsumerGroups().foreach(println(_)) + } + } + + private def stateValues(): Set[ConsumerGroupState] = { + val stateValue = opts.options.valueOf(opts.stateOpt) + if (stateValue == null || stateValue.isEmpty) + Set[ConsumerGroupState]() + else + consumerGroupStatesFromString(stateValue) + } + + private def typeValues(): Set[GroupType] = { + val typeValue = opts.options.valueOf(opts.typeOpt) + if (typeValue == null || typeValue.isEmpty) + Set[GroupType]() + else + consumerGroupTypesFromString(typeValue) + } + + private def printGroupInfo(groups: List[ConsumerGroupListing], includeType: Boolean, includeState: Boolean): Unit = { + def groupId(groupListing: ConsumerGroupListing): String = groupListing.groupId + def groupType(groupListing: ConsumerGroupListing): String = groupListing.`type`().orElse(GroupType.UNKNOWN).toString + def groupState(groupListing: ConsumerGroupListing): String = groupListing.state.orElse(ConsumerGroupState.UNKNOWN).toString + + val maxGroupLen = groups.foldLeft(15)((maxLen, groupListing) => Math.max(maxLen, groupId(groupListing).length)) + 10 + var format = s"%-${maxGroupLen}s" + var header = List("GROUP") + var extractors: List[ConsumerGroupListing => String] = List(groupId) + + if (includeType) { + header = header :+ "TYPE" + extractors = extractors :+ groupType _ + format += " %-20s" + } + + if (includeState) { + header = header :+ "STATE" + extractors = extractors :+ groupState _ + format += " %-20s" + } + + println(format.format(header: _*)) + + groups.foreach { groupListing => + val info = extractors.map(extractor => extractor(groupListing)) + println(format.format(info: _*)) + } } def listConsumerGroups(): List[String] = { @@ -207,26 +264,15 @@ object ConsumerGroupCommand extends Logging { listings.map(_.groupId).toList } - def listConsumerGroupsWithState(states: Set[ConsumerGroupState]): List[ConsumerGroupListing] = { + def listConsumerGroupsWithFilters(types: Set[GroupType], states: Set[ConsumerGroupState]): List[ConsumerGroupListing] = { val listConsumerGroupsOptions = withTimeoutMs(new ListConsumerGroupsOptions()) - listConsumerGroupsOptions.inStates(states.asJava) + listConsumerGroupsOptions + .inStates(states.asJava) + .withTypes(types.asJava) val result = adminClient.listConsumerGroups(listConsumerGroupsOptions) result.all.get.asScala.toList } - private def printGroupStates(groupsAndStates: List[(String, String)]): Unit = { - // find proper columns width - var maxGroupLen = 15 - for ((groupId, _) <- groupsAndStates) { - maxGroupLen = Math.max(maxGroupLen, groupId.length) - } - val format = s"%${-maxGroupLen}s %s" - println(format.format("GROUP", "STATE")) - for ((groupId, state) <- groupsAndStates) { - println(format.format(groupId, state)) - } - } - private def shouldPrintMemberState(group: String, state: Option[String], numRows: Option[Int]): Boolean = { // numRows contains the number of data rows, if any, compiled from the API call in the caller method. // if it's undefined or 0, there is no relevant group information to display. @@ -1024,6 +1070,9 @@ object ConsumerGroupCommand extends Logging { "When specified with '--list', it displays the state of all groups. It can also be used to list groups with specific states." + nl + "Example: --bootstrap-server localhost:9092 --list --state stable,empty" + nl + "This option may be used with '--describe', '--list' and '--bootstrap-server' options only." + private val TypeDoc = "When specified with '--list', it displays the types of all the groups. It can also be used to list groups with specific types." + nl + + "Example: --bootstrap-server localhost:9092 --list --type classic,consumer" + nl + + "This option may be used with the '--list' option only." private val DeleteOffsetsDoc = "Delete offsets of consumer group. Supports one consumer group at the time, and multiple topics." val bootstrapServerOpt: OptionSpec[String] = parser.accepts("bootstrap-server", BootstrapServerDoc) @@ -1090,6 +1139,10 @@ object ConsumerGroupCommand extends Logging { .availableIf(describeOpt, listOpt) .withOptionalArg() .ofType(classOf[String]) + val typeOpt: OptionSpec[String] = parser.accepts("type", TypeDoc) + .availableIf(listOpt) + .withOptionalArg() + .ofType(classOf[String]) options = parser.parse(args : _*) diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala index bb3259baf98..20159830943 100644 --- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala @@ -26,6 +26,7 @@ import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.{Arguments, MethodSource} +import java.util import java.util.Properties import java.util.concurrent.atomic.AtomicInteger import scala.jdk.CollectionConverters._ @@ -117,11 +118,12 @@ object BaseConsumerTest { // * KRaft with the new group coordinator enabled and the classic group protocol // * KRaft with the new group coordinator enabled and the consumer group protocol def getTestQuorumAndGroupProtocolParametersAll() : java.util.stream.Stream[Arguments] = { - java.util.stream.Stream.of( + util.Arrays.stream(Array( Arguments.of("zk", "classic"), Arguments.of("kraft", "classic"), Arguments.of("kraft+kip848", "classic"), - Arguments.of("kraft+kip848", "consumer")) + Arguments.of("kraft+kip848", "consumer") + )) } // In Scala 2.12, it is necessary to disambiguate the java.util.stream.Stream.of() method call @@ -138,10 +140,19 @@ object BaseConsumerTest { // * KRaft and the classic group protocol // * KRaft with the new group coordinator enabled and the classic group protocol def getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly() : java.util.stream.Stream[Arguments] = { - java.util.stream.Stream.of( + util.Arrays.stream(Array( Arguments.of("zk", "classic"), Arguments.of("kraft", "classic"), - Arguments.of("kraft+kip848", "classic")) + Arguments.of("kraft+kip848", "classic") + )) + } + + // For tests that only work with the consumer group protocol, we want to test the following combination: + // * KRaft with the new group coordinator enabled and the consumer group protocol + def getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly(): java.util.stream.Stream[Arguments] = { + util.Arrays.stream(Array( + Arguments.of("kraft+kip848", "consumer") + )) } val updateProducerCount = new AtomicInteger() diff --git a/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala index 18c7a0a8f81..f682df1f1dc 100644 --- a/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala @@ -19,7 +19,7 @@ package kafka.admin import java.time.Duration import java.util.concurrent.{ExecutorService, Executors, TimeUnit} -import java.util.{Collections, Properties} +import java.util.{Collections, Properties, stream} import kafka.admin.ConsumerGroupCommand.{ConsumerGroupCommandOptions, ConsumerGroupService} import kafka.api.BaseConsumerTest import kafka.integration.KafkaServerTestHarness @@ -31,6 +31,7 @@ import org.apache.kafka.common.{PartitionInfo, TopicPartition} import org.apache.kafka.common.errors.WakeupException import org.apache.kafka.common.serialization.StringDeserializer import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo} +import org.junit.jupiter.params.provider.Arguments import scala.jdk.CollectionConverters._ import scala.collection.mutable.ArrayBuffer @@ -122,7 +123,9 @@ class ConsumerGroupCommandTest extends KafkaServerTestHarness { } object ConsumerGroupCommandTest { - def getTestQuorumAndGroupProtocolParametersAll() = BaseConsumerTest.getTestQuorumAndGroupProtocolParametersAll() + def getTestQuorumAndGroupProtocolParametersAll(): stream.Stream[Arguments] = BaseConsumerTest.getTestQuorumAndGroupProtocolParametersAll() + def getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly(): stream.Stream[Arguments] = BaseConsumerTest.getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly() + def getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly(): stream.Stream[Arguments] = BaseConsumerTest.getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly() abstract class AbstractConsumerRunnable(broker: String, groupId: String, customPropsOpt: Option[Properties] = None, syncCommit: Boolean = false) extends Runnable { diff --git a/tools/src/test/java/org/apache/kafka/tools/ToolsTestUtils.java b/tools/src/test/java/org/apache/kafka/tools/ToolsTestUtils.java index fdc732ea29a..83fa31bf5e7 100644 --- a/tools/src/test/java/org/apache/kafka/tools/ToolsTestUtils.java +++ b/tools/src/test/java/org/apache/kafka/tools/ToolsTestUtils.java @@ -45,6 +45,8 @@ import java.util.stream.Collectors; public class ToolsTestUtils { /** @see TestInfoUtils#TestWithParameterizedQuorumName() */ public static final String TEST_WITH_PARAMETERIZED_QUORUM_NAME = "{displayName}.{argumentsWithNames}"; + /** @see TestInfoUtils#TestWithParameterizedQuorumAndGroupProtocolNames() */ + public static final String TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES = "{displayName}.quorum={0}.groupProtocol={1}"; private static int randomPort = 0; diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTest.java index b78054cb4ad..bde3af37a1d 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTest.java @@ -58,6 +58,7 @@ import java.util.stream.Stream; public class ConsumerGroupCommandTest extends kafka.integration.KafkaServerTestHarness { public static final String TOPIC = "foo"; public static final String GROUP = "test.group"; + public static final String PROTOCOL_GROUP = "protocol-group"; List<ConsumerGroupCommand.ConsumerGroupService> consumerGroupService = new ArrayList<>(); List<AbstractConsumerGroupExecutor> consumerGroupExecutors = new ArrayList<>(); @@ -154,8 +155,8 @@ public class ConsumerGroupCommandTest extends kafka.integration.KafkaServerTestH return addConsumerGroupExecutor(numConsumers, TOPIC, GROUP, RangeAssignor.class.getName(), remoteAssignor, Optional.empty(), false, groupProtocol); } - ConsumerGroupExecutor addConsumerGroupExecutor(int numConsumers, String topic, String group) { - return addConsumerGroupExecutor(numConsumers, topic, group, RangeAssignor.class.getName(), Optional.empty(), Optional.empty(), false, GroupProtocol.CLASSIC.name); + ConsumerGroupExecutor addConsumerGroupExecutor(int numConsumers, String group, String groupProtocol) { + return addConsumerGroupExecutor(numConsumers, TOPIC, group, RangeAssignor.class.getName(), Optional.empty(), Optional.empty(), false, groupProtocol); } ConsumerGroupExecutor addConsumerGroupExecutor(int numConsumers, String topic, String group, String groupProtocol) { @@ -342,6 +343,14 @@ public class ConsumerGroupCommandTest extends kafka.integration.KafkaServerTestH return BaseConsumerTest.getTestQuorumAndGroupProtocolParametersAll(); } + public static Stream<Arguments> getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly() { + return BaseConsumerTest.getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly(); + } + + public static Stream<Arguments> getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly() { + return BaseConsumerTest.getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly(); + } + @SuppressWarnings({"deprecation"}) static <T> Seq<T> seq(Collection<T> seq) { return JavaConverters.asScalaIteratorConverter(seq.iterator()).asScala().toSeq(); diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java index 894f00df5e7..ba5ebd254fc 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java @@ -20,83 +20,258 @@ import joptsimple.OptionException; import kafka.admin.ConsumerGroupCommand; import org.apache.kafka.clients.admin.ConsumerGroupListing; import org.apache.kafka.common.ConsumerGroupState; +import org.apache.kafka.common.GroupType; import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; +import org.junit.jupiter.params.provider.MethodSource; import java.util.Arrays; import java.util.Collections; +import java.util.EnumSet; import java.util.HashSet; -import java.util.List; import java.util.Objects; import java.util.Optional; +import java.util.Properties; +import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; +import static org.apache.kafka.tools.ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES; import static org.apache.kafka.common.utils.Utils.mkSet; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; public class ListConsumerGroupTest extends ConsumerGroupCommandTest { - @ParameterizedTest - @ValueSource(strings = {"zk", "kraft"}) - public void testListConsumerGroups(String quorum) throws Exception { + @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) + @MethodSource("getTestQuorumAndGroupProtocolParametersAll") + public void testListConsumerGroupsWithoutFilters(String quorum, String groupProtocol) throws Exception { String simpleGroup = "simple-group"; + + createOffsetsTopic(listenerName(), new Properties()); + addSimpleGroupExecutor(simpleGroup); addConsumerGroupExecutor(1); + addConsumerGroupExecutor(1, PROTOCOL_GROUP, groupProtocol); String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--list"}; ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); - scala.collection.Set<String> expectedGroups = set(Arrays.asList(GROUP, simpleGroup)); + + scala.collection.Set<String> expectedGroups = set(Arrays.asList(GROUP, simpleGroup, PROTOCOL_GROUP)); final AtomicReference<scala.collection.Set> foundGroups = new AtomicReference<>(); + TestUtils.waitForCondition(() -> { foundGroups.set(service.listConsumerGroups().toSet()); return Objects.equals(expectedGroups, foundGroups.get()); }, "Expected --list to show groups " + expectedGroups + ", but found " + foundGroups.get() + "."); } - @ParameterizedTest - @ValueSource(strings = {"zk", "kraft"}) + @Test public void testListWithUnrecognizedNewConsumerOption() { String[] cgcArgs = new String[]{"--new-consumer", "--bootstrap-server", bootstrapServers(listenerName()), "--list"}; assertThrows(OptionException.class, () -> getConsumerGroupService(cgcArgs)); } - @ParameterizedTest - @ValueSource(strings = {"zk", "kraft"}) - public void testListConsumerGroupsWithStates() throws Exception { + @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) + @MethodSource("getTestQuorumAndGroupProtocolParametersAll") + public void testListConsumerGroupsWithStates(String quorum, String groupProtocol) throws Exception { String simpleGroup = "simple-group"; + + createOffsetsTopic(listenerName(), new Properties()); + addSimpleGroupExecutor(simpleGroup); - addConsumerGroupExecutor(1); + addConsumerGroupExecutor(1, groupProtocol); String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--list", "--state"}; ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); - scala.collection.Set<ConsumerGroupListing> expectedListing = set(Arrays.asList( - new ConsumerGroupListing(simpleGroup, true, Optional.of(ConsumerGroupState.EMPTY)), - new ConsumerGroupListing(GROUP, false, Optional.of(ConsumerGroupState.STABLE)))); + Set<ConsumerGroupListing> expectedListing = mkSet( + new ConsumerGroupListing( + simpleGroup, + true, + Optional.of(ConsumerGroupState.EMPTY), + Optional.of(GroupType.CLASSIC) + ), + new ConsumerGroupListing( + GROUP, + false, + Optional.of(ConsumerGroupState.STABLE), + Optional.of(GroupType.parse(groupProtocol)) + ) + ); - final AtomicReference<scala.collection.Set> foundListing = new AtomicReference<>(); - TestUtils.waitForCondition(() -> { - foundListing.set(service.listConsumerGroupsWithState(set(Arrays.asList(ConsumerGroupState.values()))).toSet()); - return Objects.equals(expectedListing, foundListing.get()); - }, "Expected to show groups " + expectedListing + ", but found " + foundListing.get()); + assertGroupListing( + service, + Collections.emptySet(), + EnumSet.allOf(ConsumerGroupState.class), + expectedListing + ); - scala.collection.Set<ConsumerGroupListing> expectedListingStable = set(Collections.singleton( - new ConsumerGroupListing(GROUP, false, Optional.of(ConsumerGroupState.STABLE)))); + expectedListing = mkSet( + new ConsumerGroupListing( + GROUP, + false, + Optional.of(ConsumerGroupState.STABLE), + Optional.of(GroupType.parse(groupProtocol)) + ) + ); - foundListing.set(null); + assertGroupListing( + service, + Collections.emptySet(), + mkSet(ConsumerGroupState.STABLE), + expectedListing + ); - TestUtils.waitForCondition(() -> { - foundListing.set(service.listConsumerGroupsWithState(set(Collections.singleton(ConsumerGroupState.STABLE))).toSet()); - return Objects.equals(expectedListingStable, foundListing.get()); - }, "Expected to show groups " + expectedListingStable + ", but found " + foundListing.get()); + assertGroupListing( + service, + Collections.emptySet(), + mkSet(ConsumerGroupState.PREPARING_REBALANCE), + Collections.emptySet() + ); } - @ParameterizedTest - @ValueSource(strings = {"zk", "kraft"}) - public void testConsumerGroupStatesFromString(String quorum) { + @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) + @MethodSource("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly") + public void testListConsumerGroupsWithTypesClassicProtocol(String quorum, String groupProtocol) throws Exception { + String simpleGroup = "simple-group"; + + createOffsetsTopic(listenerName(), new Properties()); + + addSimpleGroupExecutor(simpleGroup); + addConsumerGroupExecutor(1); + + String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--list"}; + ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); + + Set<ConsumerGroupListing> expectedListing = mkSet( + new ConsumerGroupListing( + simpleGroup, + true, + Optional.of(ConsumerGroupState.EMPTY), + Optional.of(GroupType.CLASSIC) + ), + new ConsumerGroupListing( + GROUP, + false, + Optional.of(ConsumerGroupState.STABLE), + Optional.of(GroupType.CLASSIC) + ) + ); + + // No filters explicitly mentioned. Expectation is that all groups are returned. + assertGroupListing( + service, + Collections.emptySet(), + Collections.emptySet(), + expectedListing + ); + + // When group type is mentioned: + // Old Group Coordinator returns empty listings if the type is not Classic. + // New Group Coordinator returns groups according to the filter. + assertGroupListing( + service, + mkSet(GroupType.CONSUMER), + Collections.emptySet(), + Collections.emptySet() + ); + + assertGroupListing( + service, + mkSet(GroupType.CLASSIC), + Collections.emptySet(), + expectedListing + ); + } + + @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) + @MethodSource("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly") + public void testListConsumerGroupsWithTypesConsumerProtocol(String quorum, String groupProtocol) throws Exception { + String simpleGroup = "simple-group"; + + createOffsetsTopic(listenerName(), new Properties()); + + addSimpleGroupExecutor(simpleGroup); + addConsumerGroupExecutor(1); + addConsumerGroupExecutor(1, PROTOCOL_GROUP, groupProtocol); + + String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--list"}; + ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); + + // No filters explicitly mentioned. Expectation is that all groups are returned. + Set<ConsumerGroupListing> expectedListing = mkSet( + new ConsumerGroupListing( + simpleGroup, + true, + Optional.of(ConsumerGroupState.EMPTY), + Optional.of(GroupType.CLASSIC) + ), + new ConsumerGroupListing( + GROUP, + false, + Optional.of(ConsumerGroupState.STABLE), + Optional.of(GroupType.CLASSIC) + ), + new ConsumerGroupListing( + PROTOCOL_GROUP, + false, + Optional.of(ConsumerGroupState.STABLE), + Optional.of(GroupType.CONSUMER) + ) + ); + + assertGroupListing( + service, + Collections.emptySet(), + Collections.emptySet(), + expectedListing + ); + + // When group type is mentioned: + // New Group Coordinator returns groups according to the filter. + expectedListing = mkSet( + new ConsumerGroupListing( + PROTOCOL_GROUP, + false, + Optional.of(ConsumerGroupState.STABLE), + Optional.of(GroupType.CONSUMER) + ) + ); + + assertGroupListing( + service, + mkSet(GroupType.CONSUMER), + Collections.emptySet(), + expectedListing + ); + + expectedListing = mkSet( + new ConsumerGroupListing( + simpleGroup, + true, + Optional.of(ConsumerGroupState.EMPTY), + Optional.of(GroupType.CLASSIC) + ), + new ConsumerGroupListing( + GROUP, + false, + Optional.of(ConsumerGroupState.STABLE), + Optional.of(GroupType.CLASSIC) + ) + ); + + assertGroupListing( + service, + mkSet(GroupType.CLASSIC), + Collections.emptySet(), + expectedListing + ); + } + + @Test + public void testConsumerGroupStatesFromString() { scala.collection.Set<ConsumerGroupState> result = ConsumerGroupCommand.consumerGroupStatesFromString("Stable"); assertEquals(set(Collections.singleton(ConsumerGroupState.STABLE)), result); @@ -107,7 +282,7 @@ public class ListConsumerGroupTest extends ConsumerGroupCommandTest { assertEquals(set(Arrays.asList(ConsumerGroupState.DEAD, ConsumerGroupState.COMPLETING_REBALANCE)), result); result = ConsumerGroupCommand.consumerGroupStatesFromString("stable"); - assertEquals(set(Arrays.asList(ConsumerGroupState.STABLE)), result); + assertEquals(set(Collections.singletonList(ConsumerGroupState.STABLE)), result); result = ConsumerGroupCommand.consumerGroupStatesFromString("stable, assigning"); assertEquals(set(Arrays.asList(ConsumerGroupState.STABLE, ConsumerGroupState.ASSIGNING)), result); @@ -122,10 +297,31 @@ public class ListConsumerGroupTest extends ConsumerGroupCommandTest { assertThrows(IllegalArgumentException.class, () -> ConsumerGroupCommand.consumerGroupStatesFromString(" , ,")); } - @ParameterizedTest - @ValueSource(strings = {"zk", "kraft"}) - public void testListGroupCommand(String quorum) throws Exception { + @Test + public void testConsumerGroupTypesFromString() { + scala.collection.Set<GroupType> result = ConsumerGroupCommand.consumerGroupTypesFromString("consumer"); + assertEquals(set(Collections.singleton(GroupType.CONSUMER)), result); + + result = ConsumerGroupCommand.consumerGroupTypesFromString("consumer, classic"); + assertEquals(set(Arrays.asList(GroupType.CONSUMER, GroupType.CLASSIC)), result); + + result = ConsumerGroupCommand.consumerGroupTypesFromString("Consumer, Classic"); + assertEquals(set(Arrays.asList(GroupType.CONSUMER, GroupType.CLASSIC)), result); + + assertThrows(IllegalArgumentException.class, () -> ConsumerGroupCommand.consumerGroupTypesFromString("bad, wrong")); + + assertThrows(IllegalArgumentException.class, () -> ConsumerGroupCommand.consumerGroupTypesFromString(" bad, generic")); + + assertThrows(IllegalArgumentException.class, () -> ConsumerGroupCommand.consumerGroupTypesFromString(" , ,")); + } + + @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) + @MethodSource("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly") + public void testListGroupCommandClassicProtocol(String quorum, String groupProtocol) throws Exception { String simpleGroup = "simple-group"; + + createOffsetsTopic(listenerName(), new Properties()); + addSimpleGroupExecutor(simpleGroup); addConsumerGroupExecutor(1); @@ -147,6 +343,24 @@ public class ListConsumerGroupTest extends ConsumerGroupCommandTest { ) ); + validateListOutput( + Arrays.asList("--bootstrap-server", bootstrapServers(listenerName()), "--list", "--type"), + Arrays.asList("GROUP", "TYPE"), + mkSet( + Arrays.asList(GROUP, "Classic"), + Arrays.asList(simpleGroup, "Classic") + ) + ); + + validateListOutput( + Arrays.asList("--bootstrap-server", bootstrapServers(listenerName()), "--list", "--type", "--state"), + Arrays.asList("GROUP", "TYPE", "STATE"), + mkSet( + Arrays.asList(GROUP, "Classic", "Stable"), + Arrays.asList(simpleGroup, "Classic", "Empty") + ) + ); + validateListOutput( Arrays.asList("--bootstrap-server", bootstrapServers(listenerName()), "--list", "--state", "Stable"), Arrays.asList("GROUP", "STATE"), @@ -155,6 +369,7 @@ public class ListConsumerGroupTest extends ConsumerGroupCommandTest { ) ); + // Check case-insensitivity in state filter. validateListOutput( Arrays.asList("--bootstrap-server", bootstrapServers(listenerName()), "--list", "--state", "stable"), Arrays.asList("GROUP", "STATE"), @@ -162,6 +377,109 @@ public class ListConsumerGroupTest extends ConsumerGroupCommandTest { Arrays.asList(GROUP, "Stable") ) ); + + validateListOutput( + Arrays.asList("--bootstrap-server", bootstrapServers(listenerName()), "--list", "--type", "Classic"), + Arrays.asList("GROUP", "TYPE"), + mkSet( + Arrays.asList(GROUP, "Classic"), + Arrays.asList(simpleGroup, "Classic") + ) + ); + + // Check case-insensitivity in type filter. + validateListOutput( + Arrays.asList("--bootstrap-server", bootstrapServers(listenerName()), "--list", "--type", "classic"), + Arrays.asList("GROUP", "TYPE"), + mkSet( + Arrays.asList(GROUP, "Classic"), + Arrays.asList(simpleGroup, "Classic") + ) + ); + } + + @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) + @MethodSource("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly") + public void testListGroupCommandConsumerProtocol(String quorum, String groupProtocol) throws Exception { + String simpleGroup = "simple-group"; + + createOffsetsTopic(listenerName(), new Properties()); + + addSimpleGroupExecutor(simpleGroup); + addConsumerGroupExecutor(1, PROTOCOL_GROUP, groupProtocol); + + validateListOutput( + Arrays.asList("--bootstrap-server", bootstrapServers(listenerName()), "--list"), + Collections.emptyList(), + mkSet( + Collections.singletonList(PROTOCOL_GROUP), + Collections.singletonList(simpleGroup) + ) + ); + + validateListOutput( + Arrays.asList("--bootstrap-server", bootstrapServers(listenerName()), "--list", "--state"), + Arrays.asList("GROUP", "STATE"), + mkSet( + Arrays.asList(PROTOCOL_GROUP, "Stable"), + Arrays.asList(simpleGroup, "Empty") + ) + ); + + validateListOutput( + Arrays.asList("--bootstrap-server", bootstrapServers(listenerName()), "--list", "--type"), + Arrays.asList("GROUP", "TYPE"), + mkSet( + Arrays.asList(PROTOCOL_GROUP, "Consumer"), + Arrays.asList(simpleGroup, "Classic") + ) + ); + + validateListOutput( + Arrays.asList("--bootstrap-server", bootstrapServers(listenerName()), "--list", "--type", "--state"), + Arrays.asList("GROUP", "TYPE", "STATE"), + mkSet( + Arrays.asList(PROTOCOL_GROUP, "Consumer", "Stable"), + Arrays.asList(simpleGroup, "Classic", "Empty") + ) + ); + + validateListOutput( + Arrays.asList("--bootstrap-server", bootstrapServers(listenerName()), "--list", "--type", "consumer"), + Arrays.asList("GROUP", "TYPE"), + mkSet( + Arrays.asList(PROTOCOL_GROUP, "Consumer") + ) + ); + + validateListOutput( + Arrays.asList("--bootstrap-server", bootstrapServers(listenerName()), "--list", "--type", "consumer", "--state", "Stable"), + Arrays.asList("GROUP", "TYPE", "STATE"), + mkSet( + Arrays.asList(PROTOCOL_GROUP, "Consumer", "Stable") + ) + ); + } + + /** + * Validates the consumer group listings returned against expected values using specified filters. + * + * @param service The service to list consumer groups. + * @param typeFilterSet Filters for group types, empty for no filter. + * @param stateFilterSet Filters for group states, empty for no filter. + * @param expectedListing Expected consumer group listings. + */ + private static void assertGroupListing( + ConsumerGroupCommand.ConsumerGroupService service, + Set<GroupType> typeFilterSet, + Set<ConsumerGroupState> stateFilterSet, + Set<ConsumerGroupListing> expectedListing + ) throws Exception { + final AtomicReference<scala.collection.Set> foundListing = new AtomicReference<>(); + TestUtils.waitForCondition(() -> { + foundListing.set(service.listConsumerGroupsWithFilters(set(typeFilterSet), set(stateFilterSet)).toSet()); + return Objects.equals(set(expectedListing), foundListing.get()); + }, () -> "Expected to show groups " + expectedListing + ", but found " + foundListing.get() + "."); } /**