dajac commented on code in PR #14544:
URL: https://github.com/apache/kafka/pull/14544#discussion_r1411799674


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -547,6 +549,69 @@ public CompletableFuture<ListGroupsResponseData> 
listGroups(
         return future;
     }
 
+    /**
+     * See {@link GroupCoordinator#consumerGroupDescribe(RequestContext, 
List)}.
+     */
+    @Override
+    public 
CompletableFuture<List<ConsumerGroupDescribeResponseData.DescribedGroup>> 
consumerGroupDescribe(
+        RequestContext context,
+        List<String> groupIds
+    ) {
+        if (!isActive.get()) {
+            return 
CompletableFuture.completedFuture(ConsumerGroupDescribeRequest.getErrorDescribedGroupList(
+                groupIds,
+                Errors.COORDINATOR_NOT_AVAILABLE
+            ));
+        }
+
+        final 
List<CompletableFuture<List<ConsumerGroupDescribeResponseData.DescribedGroup>>> 
futures =
+            new ArrayList<>(groupIds.size());
+        final Map<TopicPartition, List<String>> groupsByTopicPartition = new 
HashMap<>();
+        groupIds.forEach(groupId -> {
+            // For backwards compatibility, we support DescribeGroups for the 
empty group id.
+            if (groupId == null) {
+                
futures.add(CompletableFuture.completedFuture(Collections.singletonList(
+                    new ConsumerGroupDescribeResponseData.DescribedGroup()
+                        .setGroupId(null)
+                        .setErrorCode(Errors.INVALID_GROUP_ID.code())
+                        .setErrorMessage(Errors.INVALID_GROUP_ID.message())
+                )));
+            } else {
+                groupsByTopicPartition
+                    .computeIfAbsent(topicPartitionFor(groupId), __ -> new 
ArrayList<>())
+                    .add(groupId);
+            }
+        });
+
+        groupsByTopicPartition.forEach((topicPartition, groupList) -> {
+            
CompletableFuture<List<ConsumerGroupDescribeResponseData.DescribedGroup>> 
future =
+                runtime.scheduleReadOperation(
+                    "consumer-group-describe",
+                    topicPartition,
+                    (coordinator, lastCommittedOffset) -> 
coordinator.consumerGroupDescribe(groupIds, lastCommittedOffset)
+                ).exceptionally(exception -> {
+                    if (!(exception instanceof KafkaException)) {
+                        log.error("ConsumerGroupDescribe request {} hit an 
unexpected exception: {}.",
+                            groupList, exception.getMessage());
+                    }
+
+                    return 
ConsumerGroupDescribeRequest.getErrorDescribedGroupList(
+                        groupList,
+                        Errors.forException(exception)
+                    );
+                });
+
+            futures.add(future);
+        });
+
+        final CompletableFuture<Void> allFutures = 
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
+        return allFutures.thenApply(v -> {
+            final List<ConsumerGroupDescribeResponseData.DescribedGroup> res = 
new ArrayList<>();
+            futures.forEach(future -> res.addAll(future.join()));
+            return res;
+        });

Review Comment:
   I think that we have more or less the same code elsewhere in this class. If 
you are interested, we could try to refactor this into an helper method as a 
follow-up. We can keep it as-is in this pull request though.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -468,6 +468,33 @@ public List<ListGroupsResponseData.ListedGroup> 
listGroups(List<String> statesFi
         return groupStream.map(group -> 
group.asListedGroup(committedOffset)).collect(Collectors.toList());
     }
 
+
+    /**
+     * Handles a ConsumerGroupDescribe request.
+     * @param groupIds          The IDs of the groups to describe.

Review Comment:
   nit: We usually put an empty line between the description and the params.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -547,6 +549,69 @@ public CompletableFuture<ListGroupsResponseData> 
listGroups(
         return future;
     }
 
+    /**
+     * See {@link GroupCoordinator#consumerGroupDescribe(RequestContext, 
List)}.
+     */
+    @Override
+    public 
CompletableFuture<List<ConsumerGroupDescribeResponseData.DescribedGroup>> 
consumerGroupDescribe(
+        RequestContext context,
+        List<String> groupIds
+    ) {
+        if (!isActive.get()) {
+            return 
CompletableFuture.completedFuture(ConsumerGroupDescribeRequest.getErrorDescribedGroupList(
+                groupIds,
+                Errors.COORDINATOR_NOT_AVAILABLE
+            ));
+        }
+
+        final 
List<CompletableFuture<List<ConsumerGroupDescribeResponseData.DescribedGroup>>> 
futures =
+            new ArrayList<>(groupIds.size());
+        final Map<TopicPartition, List<String>> groupsByTopicPartition = new 
HashMap<>();
+        groupIds.forEach(groupId -> {
+            // For backwards compatibility, we support DescribeGroups for the 
empty group id.
+            if (groupId == null) {
+                
futures.add(CompletableFuture.completedFuture(Collections.singletonList(
+                    new ConsumerGroupDescribeResponseData.DescribedGroup()
+                        .setGroupId(null)
+                        .setErrorCode(Errors.INVALID_GROUP_ID.code())
+                        .setErrorMessage(Errors.INVALID_GROUP_ID.message())

Review Comment:
   Let's remove the default error message here.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java:
##########
@@ -545,6 +547,34 @@ public String currentAssignmentSummary() {
             ')';
     }
 
+    public ConsumerGroupDescribeResponseData.Member 
asConsumerGroupDescribeMember(Assignment targetAssignment) {

Review Comment:
   nit: Javadoc.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMemberTest.java:
##########
@@ -304,4 +312,75 @@ public void 
testUpdateWithConsumerGroupCurrentMemberAssignmentValue() {
         assertEquals(mkAssignment(mkTopicAssignment(topicId2, 3, 4, 5)), 
member.partitionsPendingRevocation());
         assertEquals(mkAssignment(mkTopicAssignment(topicId3, 6, 7, 8)), 
member.partitionsPendingAssignment());
     }
+
+    @Test
+    public void testAsConsumerGroupDescribeMember() {
+        Uuid topicId1 = Uuid.randomUuid();
+        Uuid topicId2 = Uuid.randomUuid();
+        Uuid topicId3 = Uuid.randomUuid();
+        List<Integer> assignedPartitions = Arrays.asList(0, 1, 2);
+        ConsumerGroupCurrentMemberAssignmentValue record = new 
ConsumerGroupCurrentMemberAssignmentValue()
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(9)
+            .setTargetMemberEpoch(11)
+            .setAssignedPartitions(Collections.singletonList(new 
ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions()
+                .setTopicId(topicId1)
+                .setPartitions(assignedPartitions)))
+            .setPartitionsPendingRevocation(Collections.singletonList(new 
ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions()
+                .setTopicId(topicId2)
+                .setPartitions(Arrays.asList(3, 4, 5))))
+            .setPartitionsPendingAssignment(Collections.singletonList(new 
ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions()
+                .setTopicId(topicId3)
+                .setPartitions(Arrays.asList(6, 7, 8))));
+        String memberId = Uuid.randomUuid().toString();
+        String clientId = "clientId";
+        String instanceId = "instanceId";
+        String rackId = "rackId";
+        String clientHost = "clientHost";
+        List<String> subscribedTopicNames = Arrays.asList("topic1", "topic2");
+        String subscribedTopicRegex = "topic.*";
+        Map<Uuid, Set<Integer>> assignmentMap = new HashMap<>();
+        assignmentMap.put(Uuid.randomUuid(), new HashSet<>());
+        Assignment targetAssignment = new Assignment(assignmentMap);
+        ConsumerGroupMember member = new ConsumerGroupMember.Builder(memberId)
+            .updateWith(record)
+            .setClientId(clientId)
+            .setInstanceId(instanceId)
+            .setRackId(rackId)
+            .setClientHost(clientHost)
+            .setSubscribedTopicNames(subscribedTopicNames)
+            .setSubscribedTopicRegex(subscribedTopicRegex)
+            .build();
+
+        ConsumerGroupDescribeResponseData.Member consumerGroupDescribeMember = 
member.asConsumerGroupDescribeMember(targetAssignment);
+
+        assertEquals(memberId, 
consumerGroupDescribeMember.memberId().toString());
+        assertEquals(clientId, consumerGroupDescribeMember.clientId());
+        assertEquals(instanceId, consumerGroupDescribeMember.instanceId());
+        assertEquals(rackId, consumerGroupDescribeMember.rackId());
+        assertEquals(clientHost, consumerGroupDescribeMember.clientHost());
+        assertEquals(subscribedTopicNames, 
consumerGroupDescribeMember.subscribedTopicNames());
+        assertEquals(subscribedTopicRegex, 
consumerGroupDescribeMember.subscribedTopicRegex());
+        assertEquals(
+            new ConsumerGroupDescribeResponseData.Assignment()
+                
.setTopicPartitions(targetAssignment.partitions().entrySet().stream().map(
+                    item -> new 
ConsumerGroupDescribeResponseData.TopicPartitions()
+                        .setTopicId(item.getKey())
+                        .setPartitions(new ArrayList<>(item.getValue()))
+                ).collect(Collectors.toList())),
+            consumerGroupDescribeMember.targetAssignment()
+        );
+
+        assertEquals(assignedPartitions, 
consumerGroupDescribeMember.assignment().topicPartitions().get(0).partitions());

Review Comment:
   nit: We usually prefer to construct the full expected data structure, here 
`ConsumerGroupDescribeResponseData.Member`, and to use `assertEquals` because 
it ensures that the all the fields are tested (also future fields).



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -3743,8 +3743,48 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   def handleConsumerGroupDescribe(request: RequestChannel.Request): 
CompletableFuture[Unit] = {
-    requestHelper.sendMaybeThrottle(request, 
request.body[ConsumerGroupDescribeRequest].getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
-    CompletableFuture.completedFuture[Unit](())
+    val consumerGroupDescribeRequest = 
request.body[ConsumerGroupDescribeRequest]

Review Comment:
   I just realized that we don't handle the `IncludeAuthorizedOperations` case 
in the request. When set, we must populate the `AuthorizedOperations` field in 
the response. You can check how we did this in `handleDescribeGroupsRequest`.
   
   I suggest to do this in a separate pull request though.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -623,6 +650,22 @@ public GenericGroup genericGroup(
         }
     }
 
+    public ConsumerGroup consumerGroup(

Review Comment:
   nit: Could we add the javadoc?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to