dajac commented on code in PR #14271: URL: https://github.com/apache/kafka/pull/14271#discussion_r1312918434
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ########## @@ -428,9 +430,44 @@ public CompletableFuture<ListGroupsResponseData> listGroups( return FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception()); } - return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception( - "This API is not implemented yet." - )); + List<CompletableFuture<ListGroupsResponseData>> futures = new java.util.ArrayList<>(Collections.emptyList()); + for (TopicPartition tp : runtime.partitions()) { + futures.add(runtime.scheduleReadOperation( + "list_groups", Review Comment: nit: Let's use `list-groups` to be consistent with the existing names. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ########## @@ -428,9 +430,44 @@ public CompletableFuture<ListGroupsResponseData> listGroups( return FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception()); } - return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception( - "This API is not implemented yet." - )); + List<CompletableFuture<ListGroupsResponseData>> futures = new java.util.ArrayList<>(Collections.emptyList()); Review Comment: nit: Let's remove `java.util` and `Collections.emptyList()` as they are not necessary. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ########## @@ -428,9 +430,44 @@ public CompletableFuture<ListGroupsResponseData> listGroups( return FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception()); } - return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception( - "This API is not implemented yet." - )); + List<CompletableFuture<ListGroupsResponseData>> futures = new java.util.ArrayList<>(Collections.emptyList()); + for (TopicPartition tp : runtime.partitions()) { + futures.add(runtime.scheduleReadOperation( + "list_groups", + tp, + (coordinator, lastCommittedOffset) -> coordinator.listGroups(context, request, lastCommittedOffset) + ).exceptionally(exception -> { + if (!(exception instanceof KafkaException)) { + log.error("ListGroups request {} hit an unexpected exception: {}", + request, exception.getMessage()); + } + return new ListGroupsResponseData() + .setErrorCode(Errors.forException(exception).code()); + })); Review Comment: I think that we need to think a little more about the different errors that we could get here. My understanding is that we fail the entire requests in two cases: 1) at least one partition is loading; and 2) there is an unexpected error (non KafkaException). Here, we could get the following errors: - NotCoordinatorException if `tp` is no longer active, failed, etc. In this case, we actually want to return an empty lit of groups. - CoordinatorLoadingException if `tp` is being loaded. In this case, we want to fail the entire request. - Unexpected Exception. In this case, we also want to fail the entire request. Knowing this, we should explicitly handle the NotCoordinatorException case here. For the other cases, would it be possible to re-throw the exception? It would be great if you could also add a few unit tests for this. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ########## @@ -747,4 +749,15 @@ private static Integer decValue(String key, Integer value) { private static Integer incValue(String key, Integer value) { return value == null ? 1 : value + 1; } + + /** + * @return the group formatted as a list group response. + */ + public ListGroupsResponseData.ListedGroup asListedGroup() { + return new ListGroupsResponseData.ListedGroup() + .setGroupId(groupId) + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setGroupState(state.toString()); + } Review Comment: Let's please add a unit test for this method. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java: ########## @@ -309,6 +311,21 @@ public CoordinatorResult<OffsetCommitResponseData, Record> commitOffset( return offsetMetadataManager.commitOffset(context, request); } + /** + * Handles a ListGroups request. + * + * @param context The request context. + * @param request The ListGroups request. + * @return A Result containing the ListGroupsResponseData response + */ + public ListGroupsResponseData listGroups( + RequestContext context, + ListGroupsRequestData request, + long committedOffset + ) throws ApiException { Review Comment: I wonder if we should simplify the signature of this method. How about taking a list of states and returning a list of ListedGroup? The full request and the context are not really necessary in my opinion in this case. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -420,6 +423,18 @@ public Group group(String groupId, long committedOffset) throws GroupIdNotFoundE return group; } + /** + * @return The GenericGroup List filtered by statesFilter or typesFilter. + */ + public ListGroupsResponseData listGroups(ListGroupsRequestData request, long committedOffset) { + Stream<Group> groupStream = groups.values(committedOffset).stream(); + List<String> statesFilter = request.statesFilter(); + if (!statesFilter.isEmpty()) { + groupStream = groupStream.filter(group -> statesFilter.contains(group.stateAsString())); + } + return new ListGroupsResponseData().setGroups(groupStream.map(Group::asListedGroup).collect(Collectors.toList())); Review Comment: In the ConsumerGroup case, the state is stored in a timeline data structure. Hence, we need to pass the `committedOffset` to `stateAsString` and `asListedGroup` in order to stay consistent. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -420,6 +423,18 @@ public Group group(String groupId, long committedOffset) throws GroupIdNotFoundE return group; } + /** + * @return The GenericGroup List filtered by statesFilter or typesFilter. Review Comment: nit: Remove `typesFilter`. `statesFilter` -> `states`. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ########## @@ -428,9 +430,44 @@ public CompletableFuture<ListGroupsResponseData> listGroups( return FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception()); } - return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception( - "This API is not implemented yet." - )); + List<CompletableFuture<ListGroupsResponseData>> futures = new java.util.ArrayList<>(Collections.emptyList()); + for (TopicPartition tp : runtime.partitions()) { + futures.add(runtime.scheduleReadOperation( + "list_groups", + tp, + (coordinator, lastCommittedOffset) -> coordinator.listGroups(context, request, lastCommittedOffset) + ).exceptionally(exception -> { + if (!(exception instanceof KafkaException)) { + log.error("ListGroups request {} hit an unexpected exception: {}", + request, exception.getMessage()); + } + return new ListGroupsResponseData() + .setErrorCode(Errors.forException(exception).code()); + })); + } + CompletableFuture<ListGroupsResponseData> responseFuture = new CompletableFuture<>(); + List<ListGroupsResponseData.ListedGroup> listedGroups = new ArrayList<>(); + futures.forEach(CompletableFuture::join); + for (CompletableFuture<ListGroupsResponseData> future : futures) { + try { + ListGroupsResponseData data = future.get(); + if (data.errorCode() != Errors.NONE.code()) { + responseFuture.complete(data); + return responseFuture; + } + listedGroups.addAll(future.get().groups()); + } catch (InterruptedException | ExecutionException e) { + log.error("ListGroups request {} hit an unexpected exception: {}", + request, e.getMessage()); + if (!responseFuture.isDone()) { + responseFuture.complete(new ListGroupsResponseData() + .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())); + return responseFuture; + } + } + } Review Comment: Have you thought about writing an helper method to turn a list of CompletableFutures to a CompletableFuture containing the list of the result? That would be a nice building block that we could put in `FutureUtils`. If any of the CompletableFutures would fail, the resulting CompletableFuture would be failed as well with the same error. This would simplify the code here? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -420,6 +423,18 @@ public Group group(String groupId, long committedOffset) throws GroupIdNotFoundE return group; } + /** + * @return The GenericGroup List filtered by statesFilter or typesFilter. + */ + public ListGroupsResponseData listGroups(ListGroupsRequestData request, long committedOffset) { Review Comment: Let's also add a unit test for this one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org