dajac commented on code in PR #14271:
URL: https://github.com/apache/kafka/pull/14271#discussion_r1312918434


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -428,9 +430,44 @@ public CompletableFuture<ListGroupsResponseData> 
listGroups(
             return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
         }
 
-        return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
-            "This API is not implemented yet."
-        ));
+        List<CompletableFuture<ListGroupsResponseData>> futures = new 
java.util.ArrayList<>(Collections.emptyList());
+        for (TopicPartition tp : runtime.partitions()) {
+            futures.add(runtime.scheduleReadOperation(
+                "list_groups",

Review Comment:
   nit: Let's use `list-groups` to be consistent with the existing names.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -428,9 +430,44 @@ public CompletableFuture<ListGroupsResponseData> 
listGroups(
             return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
         }
 
-        return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
-            "This API is not implemented yet."
-        ));
+        List<CompletableFuture<ListGroupsResponseData>> futures = new 
java.util.ArrayList<>(Collections.emptyList());

Review Comment:
   nit: Let's remove `java.util` and `Collections.emptyList()` as they are not 
necessary.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -428,9 +430,44 @@ public CompletableFuture<ListGroupsResponseData> 
listGroups(
             return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
         }
 
-        return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
-            "This API is not implemented yet."
-        ));
+        List<CompletableFuture<ListGroupsResponseData>> futures = new 
java.util.ArrayList<>(Collections.emptyList());
+        for (TopicPartition tp : runtime.partitions()) {
+            futures.add(runtime.scheduleReadOperation(
+                "list_groups",
+                tp,
+                (coordinator, lastCommittedOffset) -> 
coordinator.listGroups(context, request, lastCommittedOffset)
+            ).exceptionally(exception -> {
+                if (!(exception instanceof KafkaException)) {
+                    log.error("ListGroups request {} hit an unexpected 
exception: {}",
+                        request, exception.getMessage());
+                }
+                return new ListGroupsResponseData()
+                    .setErrorCode(Errors.forException(exception).code());
+            }));

Review Comment:
   I think that we need to think a little more about the different errors that 
we could get here. My understanding is that we fail the entire requests in two 
cases: 1) at least one partition is loading; and 2) there is an unexpected 
error (non KafkaException).
   
   Here, we could get the following errors:
   - NotCoordinatorException if `tp` is no longer active, failed, etc. In this 
case, we actually want to return an empty lit of groups.
   - CoordinatorLoadingException if `tp` is being loaded. In this case, we want 
to fail the entire request.
   - Unexpected Exception. In this case, we also want to fail the entire 
request.
   
   Knowing this, we should explicitly handle the NotCoordinatorException case 
here. For the other cases, would it be possible to re-throw the exception?
   
   It would be great if you could also add a few unit tests for this.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -747,4 +749,15 @@ private static Integer decValue(String key, Integer value) 
{
     private static Integer incValue(String key, Integer value) {
         return value == null ? 1 : value + 1;
     }
+
+    /**
+     * @return the group formatted as a list group response.
+     */
+    public ListGroupsResponseData.ListedGroup asListedGroup() {
+        return new ListGroupsResponseData.ListedGroup()
+            .setGroupId(groupId)
+            .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+            .setGroupState(state.toString());
+    }

Review Comment:
   Let's please add a unit test for this method.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##########
@@ -309,6 +311,21 @@ public CoordinatorResult<OffsetCommitResponseData, Record> 
commitOffset(
         return offsetMetadataManager.commitOffset(context, request);
     }
 
+    /**
+     * Handles a ListGroups request.
+     *
+     * @param context The request context.
+     * @param request The ListGroups request.
+     * @return A Result containing the ListGroupsResponseData response
+     */
+    public ListGroupsResponseData listGroups(
+        RequestContext context,
+        ListGroupsRequestData request,
+        long committedOffset
+    ) throws ApiException {

Review Comment:
   I wonder if we should simplify the signature of this method. How about 
taking a list of states and returning a list of ListedGroup? The full request 
and the context are not really necessary in my opinion in this case.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -420,6 +423,18 @@ public Group group(String groupId, long committedOffset) 
throws GroupIdNotFoundE
         return group;
     }
 
+    /**
+     * @return The GenericGroup List filtered by statesFilter or typesFilter.
+     */
+    public ListGroupsResponseData listGroups(ListGroupsRequestData request, 
long committedOffset) {
+        Stream<Group> groupStream = groups.values(committedOffset).stream();
+        List<String> statesFilter = request.statesFilter();
+        if (!statesFilter.isEmpty()) {
+            groupStream = groupStream.filter(group -> 
statesFilter.contains(group.stateAsString()));
+        }
+        return new 
ListGroupsResponseData().setGroups(groupStream.map(Group::asListedGroup).collect(Collectors.toList()));

Review Comment:
   In the ConsumerGroup case, the state is stored in a timeline data structure. 
Hence, we need to pass the `committedOffset` to `stateAsString` and 
`asListedGroup` in order to stay consistent.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -420,6 +423,18 @@ public Group group(String groupId, long committedOffset) 
throws GroupIdNotFoundE
         return group;
     }
 
+    /**
+     * @return The GenericGroup List filtered by statesFilter or typesFilter.

Review Comment:
   nit: Remove `typesFilter`. `statesFilter` -> `states`.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -428,9 +430,44 @@ public CompletableFuture<ListGroupsResponseData> 
listGroups(
             return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
         }
 
-        return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
-            "This API is not implemented yet."
-        ));
+        List<CompletableFuture<ListGroupsResponseData>> futures = new 
java.util.ArrayList<>(Collections.emptyList());
+        for (TopicPartition tp : runtime.partitions()) {
+            futures.add(runtime.scheduleReadOperation(
+                "list_groups",
+                tp,
+                (coordinator, lastCommittedOffset) -> 
coordinator.listGroups(context, request, lastCommittedOffset)
+            ).exceptionally(exception -> {
+                if (!(exception instanceof KafkaException)) {
+                    log.error("ListGroups request {} hit an unexpected 
exception: {}",
+                        request, exception.getMessage());
+                }
+                return new ListGroupsResponseData()
+                    .setErrorCode(Errors.forException(exception).code());
+            }));
+        }
+        CompletableFuture<ListGroupsResponseData> responseFuture = new 
CompletableFuture<>();
+        List<ListGroupsResponseData.ListedGroup> listedGroups = new 
ArrayList<>();
+        futures.forEach(CompletableFuture::join);
+        for (CompletableFuture<ListGroupsResponseData> future : futures) {
+            try {
+                ListGroupsResponseData data = future.get();
+                if (data.errorCode() != Errors.NONE.code()) {
+                    responseFuture.complete(data);
+                    return responseFuture;
+                }
+                listedGroups.addAll(future.get().groups());
+            } catch (InterruptedException | ExecutionException e) {
+                log.error("ListGroups request {} hit an unexpected exception: 
{}",
+                    request, e.getMessage());
+                if (!responseFuture.isDone()) {
+                    responseFuture.complete(new ListGroupsResponseData()
+                        .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code()));
+                    return responseFuture;
+                }
+            }
+        }

Review Comment:
   Have you thought about writing an helper method to turn a list of 
CompletableFutures to a CompletableFuture containing the list of the result? 
That would be a nice building block that we could put in `FutureUtils`. If any 
of the CompletableFutures would fail, the resulting CompletableFuture would be 
failed as well with the same error. This would simplify the code here?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -420,6 +423,18 @@ public Group group(String groupId, long committedOffset) 
throws GroupIdNotFoundE
         return group;
     }
 
+    /**
+     * @return The GenericGroup List filtered by statesFilter or typesFilter.
+     */
+    public ListGroupsResponseData listGroups(ListGroupsRequestData request, 
long committedOffset) {

Review Comment:
   Let's also add a unit test for this one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to