lucasbru commented on code in PR #18979:
URL: https://github.com/apache/kafka/pull/18979#discussion_r1966098527


##########
core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala:
##########
@@ -1675,6 +1675,12 @@ class KafkaConfigTest {
     assertEquals(Set(GroupType.CLASSIC, GroupType.CONSUMER, GroupType.SHARE), 
config.groupCoordinatorRebalanceProtocols)
     assertTrue(config.isNewGroupCoordinatorEnabled)
     assertTrue(config.shareGroupConfig.isShareGroupEnabled)
+
+    // This is OK.

Review Comment:
   You are right, can be removed. I was just following the style in rest of the 
test.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -609,6 +612,34 @@ public List<ShareGroupDescribeResponseData.DescribedGroup> 
shareGroupDescribe(
         return describedGroups;
     }
 
+    /**
+     * Handles a StreamsGroupDescribe request.
+     *
+     * @param groupIds          The IDs of the groups to describe.
+     * @param committedOffset   A specified committed offset corresponding to 
this shard.
+     *
+     * @return A list containing the 
StreamsGroupDescribeResponseData.DescribedGroup.

Review Comment:
   Good idea. Done



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -682,6 +725,58 @@ public 
CompletableFuture<List<ConsumerGroupDescribeResponseData.DescribedGroup>>
         return FutureUtils.combineFutures(futures, ArrayList::new, 
List::addAll);
     }
 
+    /**
+     * See {@link GroupCoordinator#streamsGroupDescribe(RequestContext, List)}.
+     */
+    @Override
+    public 
CompletableFuture<List<StreamsGroupDescribeResponseData.DescribedGroup>> 
streamsGroupDescribe(
+        RequestContext context,
+        List<String> groupIds
+    ) {
+        if (!isActive.get()) {
+            return 
CompletableFuture.completedFuture(StreamsGroupDescribeRequest.getErrorDescribedGroupList(
+                groupIds,
+                Errors.COORDINATOR_NOT_AVAILABLE
+            ));
+        }
+
+        final 
List<CompletableFuture<List<StreamsGroupDescribeResponseData.DescribedGroup>>> 
futures =
+            new ArrayList<>(groupIds.size());
+        final Map<TopicPartition, List<String>> groupsByTopicPartition = new 
HashMap<>();

Review Comment:
   The group coordinator is sharded by topic partition of the consumer offset 
topic. So we group the group IDs by the topic partitions of the consumer 
offset, which acts as an "address" of the right group coordinator. We fetch the 
described groups by instance and merge the results.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -8944,6 +8947,113 @@ public void 
testConsumerGroupDescribeBeforeAndAfterCommittingOffset() {
         assertEquals(expected, actual);
     }
 
+    @Test
+    public void testStreamsGroupDescribeNoErrors() {
+        List<String> streamsGroupIds = Arrays.asList("group-id-1", 
"group-id-2");
+        int epoch = 10;
+        String memberId = "member-id";
+        StreamsGroupMember.Builder memberBuilder = 
streamsGroupMemberBuilderWithDefaults(memberId)
+            .setClientTags(Collections.singletonMap("clientTag", 
"clientValue"))
+            .setProcessId("processId")
+            .setMemberEpoch(epoch)
+            .setPreviousMemberEpoch(epoch - 1);
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withStreamsGroup(new StreamsGroupBuilder(streamsGroupIds.get(0), 
epoch))
+            .withStreamsGroup(new StreamsGroupBuilder(streamsGroupIds.get(1), 
epoch)
+                .withMember(memberBuilder.build()))
+            .build();
+
+        List<StreamsGroupDescribeResponseData.DescribedGroup> expected = 
Arrays.asList(
+            new StreamsGroupDescribeResponseData.DescribedGroup()
+                .setGroupEpoch(epoch)
+                .setGroupId(streamsGroupIds.get(0))
+                .setGroupState(StreamsGroupState.EMPTY.toString())
+                .setAssignmentEpoch(0),
+            new StreamsGroupDescribeResponseData.DescribedGroup()
+                .setGroupEpoch(epoch)
+                .setGroupId(streamsGroupIds.get(1))
+                .setMembers(Collections.singletonList(
+                    memberBuilder.build().asStreamsGroupDescribeMember(
+                        TasksTuple.EMPTY
+                    )
+                ))
+                .setGroupState(StreamsGroupState.NOT_READY.toString())
+        );
+        List<StreamsGroupDescribeResponseData.DescribedGroup> actual = 
context.sendStreamsGroupDescribe(streamsGroupIds);
+
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void testStreamsGroupDescribeWithErrors() {
+        String groupId = "groupId";
+

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to