hachikuji commented on a change in pull request #8238:
URL: https://github.com/apache/kafka/pull/8238#discussion_r426867775



##########
File path: clients/src/main/resources/common/message/ListGroupsRequest.json
##########
@@ -20,8 +20,14 @@
   // Version 1 and 2 are the same as version 0.
   //
   // Version 3 is the first flexible version.
-  "validVersions": "0-3",
+  //
+  // Version 4 adds the States flexible field (KIP-518).
+  "validVersions": "0-4",
   "flexibleVersions": "3+",
   "fields": [
+    { "name": "States", "type": "[]string", "versions": "4+", "tag": 0, 
"taggedVersions": "4+",

Review comment:
       Sorry I missed this from the discussion, but why are we bumping the 
version if we are only adding tagged fields? Is it so that we can detect 
whether the capability is supported? If so, then I wonder why we don't make 
this a regular field.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java
##########
@@ -26,4 +31,34 @@
  */
 @InterfaceStability.Evolving
 public class ListConsumerGroupsOptions extends 
AbstractOptions<ListConsumerGroupsOptions> {
+
+    private Optional<Set<ConsumerGroupState>> states = Optional.empty();
+
+    /**
+     * Only groups in these states will be returned by listConsumerGroups()

Review comment:
       Probably worth adding a comment about broker compatibility with this API.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java
##########
@@ -26,4 +31,34 @@
  */
 @InterfaceStability.Evolving
 public class ListConsumerGroupsOptions extends 
AbstractOptions<ListConsumerGroupsOptions> {
+
+    private Optional<Set<ConsumerGroupState>> states = Optional.empty();
+
+    /**
+     * Only groups in these states will be returned by listConsumerGroups()
+     * If not set, all groups are returned without their states
+     * throw IllegalArgumentException if states is empty
+     */
+    public ListConsumerGroupsOptions inStates(Set<ConsumerGroupState> states) {
+        if (states == null || states.isEmpty()) {
+            throw new IllegalArgumentException("states should not be null or 
empty");
+        }
+        this.states = Optional.of(states);
+        return this;
+    }
+
+    /**
+     * All groups with their states will be returned by listConsumerGroups()
+     */
+    public ListConsumerGroupsOptions inAnyState() {
+        this.states = Optional.of(EnumSet.allOf(ConsumerGroupState.class));

Review comment:
       Hmm.. We have an `UNKNOWN` state in `ConsumerGroupState` in case the 
group coordinator adds a new state that the client isn't aware of. Currently 
we're going to pass this through the request, which is a bit odd. Furthermore, 
if the coordinator _does_ add new states, we will be unable to see them using 
this API. I think it might be better to use a `null` list of states in the 
request to indicate that any state is needed.

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1397,29 +1398,32 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   def handleListGroupsRequest(request: RequestChannel.Request): Unit = {
-    val (error, groups) = groupCoordinator.handleListGroups()
+    val listGroupsRequest = request.body[ListGroupsRequest]
+    val states = listGroupsRequest.data.states.asScala.toList
+
+    def createResponse(throttleMs: Int, groups: List[GroupOverview], error: 
Errors): AbstractResponse = {
+       new ListGroupsResponse(new ListGroupsResponseData()
+            .setErrorCode(error.code)
+            .setGroups(groups.map { group =>
+                val listedGroup = new ListGroupsResponseData.ListedGroup()
+                  .setGroupId(group.groupId)
+                  .setProtocolType(group.protocolType)
+                if (!states.isEmpty)

Review comment:
       Why don't we always return the state? I don't think overhead is a huge 
concern for an api like this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to