dajac commented on code in PR #14271: URL: https://github.com/apache/kafka/pull/14271#discussion_r1302279096
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ########## @@ -426,9 +429,43 @@ public CompletableFuture<ListGroupsResponseData> listGroups( return FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception()); } - return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception( - "This API is not implemented yet." - )); + List<CompletableFuture<ListGroupsResponseData>> futures = new java.util.ArrayList<>(Collections.emptyList()); + for (int i = 0; i < numPartitions; i++) { Review Comment: This seems to be inefficient because the coordinator may not be responsible for all the partitions. I thought that we could use `CoordinatorRuntime#partitions` to get the list of registered partitions. Have you considered this? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java: ########## @@ -272,6 +274,21 @@ public CoordinatorResult<OffsetCommitResponseData, Record> commitOffset( return offsetMetadataManager.commitOffset(context, request); } + /** + * Handles a ListGroups request. + * + * @param context The request context. + * @param request The ListGroups request. + * + * @return A Result containing the ListGroupsResponseData response + */ + public ListGroupsResponseData listGroups( + RequestContext context, + ListGroupsRequestData request Review Comment: nit: This should be indented with four spaces. ########## clients/src/main/resources/common/message/ListGroupsRequest.json: ########## @@ -23,11 +23,15 @@ // Version 3 is the first flexible version. // // Version 4 adds the StatesFilter field (KIP-518). - "validVersions": "0-4", + // + // Version 5 adds the TypesFilter field (KIP-848). + "validVersions": "0-5", "flexibleVersions": "3+", "fields": [ { "name": "StatesFilter", "type": "[]string", "versions": "4+", "about": "The states of the groups we want to list. If empty all groups are returned with their state." - } + }, + { "name": "TypesFilter", "type": "[]string", "versions": "5+", + "about": "The types of the groups we want to list. If empty all groups are returned" } Review Comment: I would rather prefer to do this in a second PR because this change impacts both the new and the old group coordinators. Would it be possible? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ########## @@ -739,4 +741,15 @@ private static Integer decValue(String key, Integer value) { private static Integer incValue(String key, Integer value) { return value == null ? 1 : value + 1; } + + /** + * @return the group formatted as a list group response. + */ + public ListGroupsResponseData.ListedGroup asListedGroup() { + return new ListGroupsResponseData.ListedGroup() + .setGroupId(groupId) + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setGroupState(state.toString()); Review Comment: nit: This should be indented with four spaces. I have seen this in other places in the code but I am not going to comment them all. I let you have a look. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ########## @@ -739,4 +741,15 @@ private static Integer decValue(String key, Integer value) { private static Integer incValue(String key, Integer value) { return value == null ? 1 : value + 1; } + + /** + * @return the group formatted as a list group response. + */ + public ListGroupsResponseData.ListedGroup asListedGroup() { Review Comment: Don't we need to also implement this for `GenericGroup`? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ########## @@ -426,9 +429,43 @@ public CompletableFuture<ListGroupsResponseData> listGroups( return FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception()); } - return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception( - "This API is not implemented yet." - )); + List<CompletableFuture<ListGroupsResponseData>> futures = new java.util.ArrayList<>(Collections.emptyList()); + for (int i = 0; i < numPartitions; i++) { + futures.add(runtime.scheduleReadOperation("list_groups", + new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, i), + (coordinator, __) -> coordinator.listGroups(context, request) Review Comment: We need to use the second parameters `__` as pass it down to `listGroups`. For the context, the second parameter is the last committed offsets. We should list the groups based on it. Otherwise, we would return uncommitted changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org