hachikuji commented on a change in pull request #8238: URL: https://github.com/apache/kafka/pull/8238#discussion_r426867775
########## File path: clients/src/main/resources/common/message/ListGroupsRequest.json ########## @@ -20,8 +20,14 @@ // Version 1 and 2 are the same as version 0. // // Version 3 is the first flexible version. - "validVersions": "0-3", + // + // Version 4 adds the States flexible field (KIP-518). + "validVersions": "0-4", "flexibleVersions": "3+", "fields": [ + { "name": "States", "type": "[]string", "versions": "4+", "tag": 0, "taggedVersions": "4+", Review comment: Sorry I missed this from the discussion, but why are we bumping the version if we are only adding tagged fields? Is it so that we can detect whether the capability is supported? If so, then I wonder why we don't make this a regular field. ########## File path: clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java ########## @@ -26,4 +31,34 @@ */ @InterfaceStability.Evolving public class ListConsumerGroupsOptions extends AbstractOptions<ListConsumerGroupsOptions> { + + private Optional<Set<ConsumerGroupState>> states = Optional.empty(); + + /** + * Only groups in these states will be returned by listConsumerGroups() Review comment: Probably worth adding a comment about broker compatibility with this API. ########## File path: clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java ########## @@ -26,4 +31,34 @@ */ @InterfaceStability.Evolving public class ListConsumerGroupsOptions extends AbstractOptions<ListConsumerGroupsOptions> { + + private Optional<Set<ConsumerGroupState>> states = Optional.empty(); + + /** + * Only groups in these states will be returned by listConsumerGroups() + * If not set, all groups are returned without their states + * throw IllegalArgumentException if states is empty + */ + public ListConsumerGroupsOptions inStates(Set<ConsumerGroupState> states) { + if (states == null || states.isEmpty()) { + throw new IllegalArgumentException("states should not be null or empty"); + } + this.states = Optional.of(states); + return this; + } + + /** + * All groups with their states will be returned by listConsumerGroups() + */ + public ListConsumerGroupsOptions inAnyState() { + this.states = Optional.of(EnumSet.allOf(ConsumerGroupState.class)); Review comment: Hmm.. We have an `UNKNOWN` state in `ConsumerGroupState` in case the group coordinator adds a new state that the client isn't aware of. Currently we're going to pass this through the request, which is a bit odd. Furthermore, if the coordinator _does_ add new states, we will be unable to see them using this API. I think it might be better to use a `null` list of states in the request to indicate that any state is needed. ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -1397,29 +1398,32 @@ class KafkaApis(val requestChannel: RequestChannel, } def handleListGroupsRequest(request: RequestChannel.Request): Unit = { - val (error, groups) = groupCoordinator.handleListGroups() + val listGroupsRequest = request.body[ListGroupsRequest] + val states = listGroupsRequest.data.states.asScala.toList + + def createResponse(throttleMs: Int, groups: List[GroupOverview], error: Errors): AbstractResponse = { + new ListGroupsResponse(new ListGroupsResponseData() + .setErrorCode(error.code) + .setGroups(groups.map { group => + val listedGroup = new ListGroupsResponseData.ListedGroup() + .setGroupId(group.groupId) + .setProtocolType(group.protocolType) + if (!states.isEmpty) Review comment: Why don't we always return the state? I don't think overhead is a huge concern for an api like this. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org