dajac commented on code in PR #14544: URL: https://github.com/apache/kafka/pull/14544#discussion_r1411799674
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ########## @@ -547,6 +549,69 @@ public CompletableFuture<ListGroupsResponseData> listGroups( return future; } + /** + * See {@link GroupCoordinator#consumerGroupDescribe(RequestContext, List)}. + */ + @Override + public CompletableFuture<List<ConsumerGroupDescribeResponseData.DescribedGroup>> consumerGroupDescribe( + RequestContext context, + List<String> groupIds + ) { + if (!isActive.get()) { + return CompletableFuture.completedFuture(ConsumerGroupDescribeRequest.getErrorDescribedGroupList( + groupIds, + Errors.COORDINATOR_NOT_AVAILABLE + )); + } + + final List<CompletableFuture<List<ConsumerGroupDescribeResponseData.DescribedGroup>>> futures = + new ArrayList<>(groupIds.size()); + final Map<TopicPartition, List<String>> groupsByTopicPartition = new HashMap<>(); + groupIds.forEach(groupId -> { + // For backwards compatibility, we support DescribeGroups for the empty group id. + if (groupId == null) { + futures.add(CompletableFuture.completedFuture(Collections.singletonList( + new ConsumerGroupDescribeResponseData.DescribedGroup() + .setGroupId(null) + .setErrorCode(Errors.INVALID_GROUP_ID.code()) + .setErrorMessage(Errors.INVALID_GROUP_ID.message()) + ))); + } else { + groupsByTopicPartition + .computeIfAbsent(topicPartitionFor(groupId), __ -> new ArrayList<>()) + .add(groupId); + } + }); + + groupsByTopicPartition.forEach((topicPartition, groupList) -> { + CompletableFuture<List<ConsumerGroupDescribeResponseData.DescribedGroup>> future = + runtime.scheduleReadOperation( + "consumer-group-describe", + topicPartition, + (coordinator, lastCommittedOffset) -> coordinator.consumerGroupDescribe(groupIds, lastCommittedOffset) + ).exceptionally(exception -> { + if (!(exception instanceof KafkaException)) { + log.error("ConsumerGroupDescribe request {} hit an unexpected exception: {}.", + groupList, exception.getMessage()); + } + + return ConsumerGroupDescribeRequest.getErrorDescribedGroupList( + groupList, + Errors.forException(exception) + ); + }); + + futures.add(future); + }); + + final CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); + return allFutures.thenApply(v -> { + final List<ConsumerGroupDescribeResponseData.DescribedGroup> res = new ArrayList<>(); + futures.forEach(future -> res.addAll(future.join())); + return res; + }); Review Comment: I think that we have more or less the same code elsewhere in this class. If you are interested, we could try to refactor this into an helper method as a follow-up. We can keep it as-is in this pull request though. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -468,6 +468,33 @@ public List<ListGroupsResponseData.ListedGroup> listGroups(List<String> statesFi return groupStream.map(group -> group.asListedGroup(committedOffset)).collect(Collectors.toList()); } + + /** + * Handles a ConsumerGroupDescribe request. + * @param groupIds The IDs of the groups to describe. Review Comment: nit: We usually put an empty line between the description and the params. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ########## @@ -547,6 +549,69 @@ public CompletableFuture<ListGroupsResponseData> listGroups( return future; } + /** + * See {@link GroupCoordinator#consumerGroupDescribe(RequestContext, List)}. + */ + @Override + public CompletableFuture<List<ConsumerGroupDescribeResponseData.DescribedGroup>> consumerGroupDescribe( + RequestContext context, + List<String> groupIds + ) { + if (!isActive.get()) { + return CompletableFuture.completedFuture(ConsumerGroupDescribeRequest.getErrorDescribedGroupList( + groupIds, + Errors.COORDINATOR_NOT_AVAILABLE + )); + } + + final List<CompletableFuture<List<ConsumerGroupDescribeResponseData.DescribedGroup>>> futures = + new ArrayList<>(groupIds.size()); + final Map<TopicPartition, List<String>> groupsByTopicPartition = new HashMap<>(); + groupIds.forEach(groupId -> { + // For backwards compatibility, we support DescribeGroups for the empty group id. + if (groupId == null) { + futures.add(CompletableFuture.completedFuture(Collections.singletonList( + new ConsumerGroupDescribeResponseData.DescribedGroup() + .setGroupId(null) + .setErrorCode(Errors.INVALID_GROUP_ID.code()) + .setErrorMessage(Errors.INVALID_GROUP_ID.message()) Review Comment: Let's remove the default error message here. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java: ########## @@ -545,6 +547,34 @@ public String currentAssignmentSummary() { ')'; } + public ConsumerGroupDescribeResponseData.Member asConsumerGroupDescribeMember(Assignment targetAssignment) { Review Comment: nit: Javadoc. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMemberTest.java: ########## @@ -304,4 +312,75 @@ public void testUpdateWithConsumerGroupCurrentMemberAssignmentValue() { assertEquals(mkAssignment(mkTopicAssignment(topicId2, 3, 4, 5)), member.partitionsPendingRevocation()); assertEquals(mkAssignment(mkTopicAssignment(topicId3, 6, 7, 8)), member.partitionsPendingAssignment()); } + + @Test + public void testAsConsumerGroupDescribeMember() { + Uuid topicId1 = Uuid.randomUuid(); + Uuid topicId2 = Uuid.randomUuid(); + Uuid topicId3 = Uuid.randomUuid(); + List<Integer> assignedPartitions = Arrays.asList(0, 1, 2); + ConsumerGroupCurrentMemberAssignmentValue record = new ConsumerGroupCurrentMemberAssignmentValue() + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setTargetMemberEpoch(11) + .setAssignedPartitions(Collections.singletonList(new ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions() + .setTopicId(topicId1) + .setPartitions(assignedPartitions))) + .setPartitionsPendingRevocation(Collections.singletonList(new ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions() + .setTopicId(topicId2) + .setPartitions(Arrays.asList(3, 4, 5)))) + .setPartitionsPendingAssignment(Collections.singletonList(new ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions() + .setTopicId(topicId3) + .setPartitions(Arrays.asList(6, 7, 8)))); + String memberId = Uuid.randomUuid().toString(); + String clientId = "clientId"; + String instanceId = "instanceId"; + String rackId = "rackId"; + String clientHost = "clientHost"; + List<String> subscribedTopicNames = Arrays.asList("topic1", "topic2"); + String subscribedTopicRegex = "topic.*"; + Map<Uuid, Set<Integer>> assignmentMap = new HashMap<>(); + assignmentMap.put(Uuid.randomUuid(), new HashSet<>()); + Assignment targetAssignment = new Assignment(assignmentMap); + ConsumerGroupMember member = new ConsumerGroupMember.Builder(memberId) + .updateWith(record) + .setClientId(clientId) + .setInstanceId(instanceId) + .setRackId(rackId) + .setClientHost(clientHost) + .setSubscribedTopicNames(subscribedTopicNames) + .setSubscribedTopicRegex(subscribedTopicRegex) + .build(); + + ConsumerGroupDescribeResponseData.Member consumerGroupDescribeMember = member.asConsumerGroupDescribeMember(targetAssignment); + + assertEquals(memberId, consumerGroupDescribeMember.memberId().toString()); + assertEquals(clientId, consumerGroupDescribeMember.clientId()); + assertEquals(instanceId, consumerGroupDescribeMember.instanceId()); + assertEquals(rackId, consumerGroupDescribeMember.rackId()); + assertEquals(clientHost, consumerGroupDescribeMember.clientHost()); + assertEquals(subscribedTopicNames, consumerGroupDescribeMember.subscribedTopicNames()); + assertEquals(subscribedTopicRegex, consumerGroupDescribeMember.subscribedTopicRegex()); + assertEquals( + new ConsumerGroupDescribeResponseData.Assignment() + .setTopicPartitions(targetAssignment.partitions().entrySet().stream().map( + item -> new ConsumerGroupDescribeResponseData.TopicPartitions() + .setTopicId(item.getKey()) + .setPartitions(new ArrayList<>(item.getValue())) + ).collect(Collectors.toList())), + consumerGroupDescribeMember.targetAssignment() + ); + + assertEquals(assignedPartitions, consumerGroupDescribeMember.assignment().topicPartitions().get(0).partitions()); Review Comment: nit: We usually prefer to construct the full expected data structure, here `ConsumerGroupDescribeResponseData.Member`, and to use `assertEquals` because it ensures that the all the fields are tested (also future fields). ########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -3743,8 +3743,48 @@ class KafkaApis(val requestChannel: RequestChannel, } def handleConsumerGroupDescribe(request: RequestChannel.Request): CompletableFuture[Unit] = { - requestHelper.sendMaybeThrottle(request, request.body[ConsumerGroupDescribeRequest].getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) - CompletableFuture.completedFuture[Unit](()) + val consumerGroupDescribeRequest = request.body[ConsumerGroupDescribeRequest] Review Comment: I just realized that we don't handle the `IncludeAuthorizedOperations` case in the request. When set, we must populate the `AuthorizedOperations` field in the response. You can check how we did this in `handleDescribeGroupsRequest`. I suggest to do this in a separate pull request though. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -623,6 +650,22 @@ public GenericGroup genericGroup( } } + public ConsumerGroup consumerGroup( Review Comment: nit: Could we add the javadoc? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org