dajac commented on code in PR #15152:
URL: https://github.com/apache/kafka/pull/15152#discussion_r1457158606


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -452,19 +453,36 @@ public Group group(String groupId, long committedOffset) 
throws GroupIdNotFoundE
     /**
      * Get the Group List.
      *
-     * @param statesFilter The states of the groups we want to list.
-     *                     If empty all groups are returned with their state.
-     * @param committedOffset A specified committed offset corresponding to 
this shard
+     * @param statesFilter      The states of the groups we want to list.
+     *                          If empty, all groups are returned with their 
state.
+     * @param typesFilter       The types of the groups we want to list.
+     *                          If empty, all groups are returned with their 
type.
+     * @param committedOffset   A specified committed offset corresponding to 
this shard.
      *
      * @return A list containing the ListGroupsResponseData.ListedGroup
      */
+    public List<ListGroupsResponseData.ListedGroup> listGroups(
+        Set<String> statesFilter,
+        Set<String> typesFilter,
+        long committedOffset
+    ) {
+        Predicate<Group> combinedFilter = group -> {
+            boolean stateCheck = statesFilter.isEmpty() || 
statesFilter.contains(group.stateAsString(committedOffset));
+
+            // The type check is case-insensitive.
+            boolean typeCheck = typesFilter.isEmpty() ||
+                typesFilter.stream()
+                    .map(String::toLowerCase)

Review Comment:
   It would be better to do this outside of the predicate to avoid doing it for 
every group.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -9616,47 +9618,76 @@ public void testListGroups() {
             .build()));
         context.replay(RecordHelpers.newGroupEpochRecord(consumerGroupId, 11));
 
+        // Test list group response without a group state or group type filter.
         Map<String, ListGroupsResponseData.ListedGroup> actualAllGroupMap =
-            context.sendListGroups(Collections.emptyList())
-                
.stream().collect(Collectors.toMap(ListGroupsResponseData.ListedGroup::groupId, 
Function.identity()));
+            context.sendListGroups(Collections.emptyList(), 
Collections.emptyList()).stream()
+                
.collect(Collectors.toMap(ListGroupsResponseData.ListedGroup::groupId, 
Function.identity()));
+
         Map<String, ListGroupsResponseData.ListedGroup> expectAllGroupMap =
             Stream.of(
                 new ListGroupsResponseData.ListedGroup()
                     .setGroupId(classicGroup.groupId())
-                    .setProtocolType(classicGroupType)
-                    .setGroupState(EMPTY.toString()),
+                    .setProtocolType("classic")
+                    .setGroupState(EMPTY.toString())
+                    .setGroupType(Group.GroupType.CLASSIC.toString()),
                 new ListGroupsResponseData.ListedGroup()
                     .setGroupId(consumerGroupId)
                     .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
                     
.setGroupState(ConsumerGroup.ConsumerGroupState.EMPTY.toString())
+                    .setGroupType(Group.GroupType.CONSUMER.toString())
             
).collect(Collectors.toMap(ListGroupsResponseData.ListedGroup::groupId, 
Function.identity()));
 
         assertEquals(expectAllGroupMap, actualAllGroupMap);
 
         context.commit();
-        actualAllGroupMap = 
context.sendListGroups(Collections.emptyList()).stream()
+
+        // Test list group response to check assigning state in the consumer 
group.
+        actualAllGroupMap = 
context.sendListGroups(Collections.singletonList("assigning"), 
Collections.emptyList()).stream()
             
.collect(Collectors.toMap(ListGroupsResponseData.ListedGroup::groupId, 
Function.identity()));
         expectAllGroupMap =
             Stream.of(
-                new ListGroupsResponseData.ListedGroup()
-                    .setGroupId(classicGroup.groupId())
-                    .setProtocolType(classicGroupType)
-                    .setGroupState(EMPTY.toString()),
                 new ListGroupsResponseData.ListedGroup()
                     .setGroupId(consumerGroupId)
                     .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
                     
.setGroupState(ConsumerGroup.ConsumerGroupState.ASSIGNING.toString())
+                    .setGroupType(Group.GroupType.CONSUMER.toString())
             
).collect(Collectors.toMap(ListGroupsResponseData.ListedGroup::groupId, 
Function.identity()));
 
         assertEquals(expectAllGroupMap, actualAllGroupMap);
 
-        actualAllGroupMap = 
context.sendListGroups(Collections.singletonList("Empty")).stream()
+        // Test list group response with group state filter and no group type 
filter.
+        actualAllGroupMap = 
context.sendListGroups(Collections.singletonList("Empty"), 
Collections.emptyList()).stream()
+            
.collect(Collectors.toMap(ListGroupsResponseData.ListedGroup::groupId, 
Function.identity()));
+        expectAllGroupMap = Stream.of(
+            new ListGroupsResponseData.ListedGroup()
+                .setGroupId(classicGroup.groupId())
+                .setProtocolType("classic")
+                .setGroupState(EMPTY.toString())
+                .setGroupType(Group.GroupType.CLASSIC.toString())
+        
).collect(Collectors.toMap(ListGroupsResponseData.ListedGroup::groupId, 
Function.identity()));
+
+        assertEquals(expectAllGroupMap, actualAllGroupMap);
+
+        // Test list group response with no group state filter and with group 
type filter.
+        actualAllGroupMap = context.sendListGroups(Collections.emptyList(), 
Collections.singletonList(Group.GroupType.CLASSIC.toString())).stream()

Review Comment:
   Should we also test with the other type?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to