dajac commented on code in PR #14462: URL: https://github.com/apache/kafka/pull/14462#discussion_r1342814124
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -444,6 +445,66 @@ public List<ListGroupsResponseData.ListedGroup> listGroups(List<String> statesFi return groupStream.map(group -> group.asListedGroup(committedOffset)).collect(Collectors.toList()); } + /** + * Handles a DescribeGroup request. + * + * @param groupIds The IDs of the groups to describe. + * @param committedOffset A specified committed offset corresponding to this shard. + * + * @return A list containing the DescribeGroupsResponseData.DescribedGroup. + */ + public List<DescribeGroupsResponseData.DescribedGroup> describeGroups( + List<String> groupIds, + long committedOffset + ) { + final List<DescribeGroupsResponseData.DescribedGroup> describedGroups = new ArrayList<>(); + groupIds.forEach(groupId -> { + try { + Group group = group(groupId, committedOffset); + if (group.type() != GENERIC) { + // We don't support upgrading/downgrading between protocols at the moment, so + // we throw an exception if a group exists with the wrong type. + throw new GroupIdNotFoundException(String.format("Group %s is not a generic group.", + groupId)); + } + GenericGroup genericGroup = (GenericGroup) group; Review Comment: When the group does not exist, in the current code, we return `GroupCoordinator.DeadGroup` instead of returning a `GroupIdNotFoundException` exception. Do you confirm? ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java: ########## @@ -751,6 +752,112 @@ public void testListGroupsFailedImmediately() assertEquals(Collections.emptyList(), listGroupsResponseData.groups()); } + @Test + public void testDescribeGroups() throws Exception { + CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime(); + GroupCoordinatorService service = new GroupCoordinatorService( + new LogContext(), + createConfig(), + runtime + ); + int partitionCount = 2; + service.startup(() -> partitionCount); + + DescribeGroupsResponseData.DescribedGroup describedGroup1 = new DescribeGroupsResponseData.DescribedGroup() + .setGroupId("group-id-1"); + DescribeGroupsResponseData.DescribedGroup describedGroup2 = new DescribeGroupsResponseData.DescribedGroup() + .setGroupId("group-id-2"); + List<DescribeGroupsResponseData.DescribedGroup> expectedDescribedGroups = Arrays.asList( + describedGroup1, + describedGroup2 + ); + + when(runtime.scheduleReadOperation( + ArgumentMatchers.eq("describe-groups"), + ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), + ArgumentMatchers.any() + )).thenReturn(CompletableFuture.completedFuture(Collections.singletonList(describedGroup1))); + + CompletableFuture<Object> describedGroupFuture = new CompletableFuture<>(); + when(runtime.scheduleReadOperation( + ArgumentMatchers.eq("describe-groups"), + ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 1)), + ArgumentMatchers.any() + )).thenReturn(describedGroupFuture); + + CompletableFuture<List<DescribeGroupsResponseData.DescribedGroup>> future = + service.describeGroups(requestContext(ApiKeys.DESCRIBE_GROUPS), Arrays.asList("group-id-1", "group-id-2")); + + assertFalse(future.isDone()); + describedGroupFuture.complete(Collections.singletonList(describedGroup2)); + + assertTrue(future.get().containsAll(expectedDescribedGroups)); + assertTrue(expectedDescribedGroups.containsAll(future.get())); Review Comment: nit: Could we use assertEquals here? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -444,6 +445,66 @@ public List<ListGroupsResponseData.ListedGroup> listGroups(List<String> statesFi return groupStream.map(group -> group.asListedGroup(committedOffset)).collect(Collectors.toList()); } + /** + * Handles a DescribeGroup request. + * + * @param groupIds The IDs of the groups to describe. + * @param committedOffset A specified committed offset corresponding to this shard. + * + * @return A list containing the DescribeGroupsResponseData.DescribedGroup. + */ + public List<DescribeGroupsResponseData.DescribedGroup> describeGroups( + List<String> groupIds, + long committedOffset + ) { + final List<DescribeGroupsResponseData.DescribedGroup> describedGroups = new ArrayList<>(); + groupIds.forEach(groupId -> { + try { + Group group = group(groupId, committedOffset); + if (group.type() != GENERIC) { + // We don't support upgrading/downgrading between protocols at the moment, so + // we throw an exception if a group exists with the wrong type. + throw new GroupIdNotFoundException(String.format("Group %s is not a generic group.", + groupId)); + } + GenericGroup genericGroup = (GenericGroup) group; Review Comment: Could we define a method similar to `getOrMaybeCreateGenericGroup(groupId, false)`? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroupMember.java: ########## @@ -349,6 +350,25 @@ public boolean isNew() { return isNew; } + /** + * @return The described group member without metadata. + */ + public DescribeGroupsResponseData.DescribedGroupMember describeNoMetadata() { Review Comment: Should we add simple unit tests for those two as well? ########## clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java: ########## @@ -75,4 +77,16 @@ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { public static DescribeGroupsRequest parse(ByteBuffer buffer, short version) { return new DescribeGroupsRequest(new DescribeGroupsRequestData(new ByteBufferAccessor(buffer), version), version); } + + public static List<DescribeGroupsResponseData.DescribedGroup> getErrorDescribedGroupList( Review Comment: Could we add a unit test for this one please? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org