dajac commented on code in PR #14271:
URL: https://github.com/apache/kafka/pull/14271#discussion_r1302279096


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -426,9 +429,43 @@ public CompletableFuture<ListGroupsResponseData> 
listGroups(
             return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
         }
 
-        return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
-            "This API is not implemented yet."
-        ));
+        List<CompletableFuture<ListGroupsResponseData>> futures = new 
java.util.ArrayList<>(Collections.emptyList());
+        for (int i = 0; i < numPartitions; i++) {

Review Comment:
   This seems to be inefficient because the coordinator may not be responsible 
for all the partitions. I thought that we could use 
`CoordinatorRuntime#partitions` to get the list of registered partitions. Have 
you considered this?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##########
@@ -272,6 +274,21 @@ public CoordinatorResult<OffsetCommitResponseData, Record> 
commitOffset(
         return offsetMetadataManager.commitOffset(context, request);
     }
 
+    /**
+     * Handles a ListGroups request.
+     *
+     * @param context The request context.
+     * @param request The ListGroups request.
+     *
+     * @return A Result containing the ListGroupsResponseData response
+     */
+    public ListGroupsResponseData listGroups(
+            RequestContext context,
+            ListGroupsRequestData request

Review Comment:
   nit: This should be indented with four spaces.



##########
clients/src/main/resources/common/message/ListGroupsRequest.json:
##########
@@ -23,11 +23,15 @@
   // Version 3 is the first flexible version.
   //
   // Version 4 adds the StatesFilter field (KIP-518).
-  "validVersions": "0-4",
+  //
+  // Version 5 adds the TypesFilter field (KIP-848).
+  "validVersions": "0-5",
   "flexibleVersions": "3+",
   "fields": [
     { "name": "StatesFilter", "type": "[]string", "versions": "4+",
       "about": "The states of the groups we want to list. If empty all groups 
are returned with their state."
-    }
+    },
+    { "name": "TypesFilter", "type": "[]string", "versions": "5+",
+      "about": "The types of the groups we want to list. If empty all groups 
are returned" }

Review Comment:
   I would rather prefer to do this in a second PR because this change impacts 
both the new and the old group coordinators. Would it be possible?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -739,4 +741,15 @@ private static Integer decValue(String key, Integer value) 
{
     private static Integer incValue(String key, Integer value) {
         return value == null ? 1 : value + 1;
     }
+
+    /**
+     * @return the group formatted as a list group response.
+     */
+    public ListGroupsResponseData.ListedGroup asListedGroup() {
+        return new ListGroupsResponseData.ListedGroup()
+                .setGroupId(groupId)
+                .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+                .setGroupState(state.toString());

Review Comment:
   nit: This should be indented with four spaces. I have seen this in other 
places in the code but I am not going to comment them all. I let you have a 
look.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -739,4 +741,15 @@ private static Integer decValue(String key, Integer value) 
{
     private static Integer incValue(String key, Integer value) {
         return value == null ? 1 : value + 1;
     }
+
+    /**
+     * @return the group formatted as a list group response.
+     */
+    public ListGroupsResponseData.ListedGroup asListedGroup() {

Review Comment:
   Don't we need to also implement this for `GenericGroup`?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -426,9 +429,43 @@ public CompletableFuture<ListGroupsResponseData> 
listGroups(
             return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
         }
 
-        return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
-            "This API is not implemented yet."
-        ));
+        List<CompletableFuture<ListGroupsResponseData>> futures = new 
java.util.ArrayList<>(Collections.emptyList());
+        for (int i = 0; i < numPartitions; i++) {
+            futures.add(runtime.scheduleReadOperation("list_groups",
+                    new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, i),
+                    (coordinator, __) -> coordinator.listGroups(context, 
request)

Review Comment:
   We need to use the second parameters `__` as pass it down to `listGroups`. 
For the context, the second parameter is the last committed offsets. We should 
list the groups based on it. Otherwise, we would return uncommitted changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to