dajac commented on code in PR #15152:
URL: https://github.com/apache/kafka/pull/15152#discussion_r1445918279


##########
clients/src/main/resources/common/message/ListGroupsResponse.json:
##########
@@ -19,26 +19,33 @@
   "name": "ListGroupsResponse",
   // Version 1 adds the throttle time.
   //
-  // Starting in version 2, on quota violation, brokers send out responses 
before throttling.
+  // Starting in version 2, on quota violation, brokers send out
+  // responses before throttling.
   //
   // Version 3 is the first flexible version.
   //
   // Version 4 adds the GroupState field (KIP-518).
-  "validVersions": "0-4",
+  //
+  // Version 5 adds the GroupType field (KIP-848).
+  "validVersions": "0-5",
   "flexibleVersions": "3+",
   "fields": [
-    { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", 
"ignorable": true,
+    { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+",

Review Comment:
   nit: Let's revert this change.



##########
clients/src/main/resources/common/message/ListGroupsResponse.json:
##########
@@ -19,26 +19,33 @@
   "name": "ListGroupsResponse",
   // Version 1 adds the throttle time.
   //
-  // Starting in version 2, on quota violation, brokers send out responses 
before throttling.
+  // Starting in version 2, on quota violation, brokers send out
+  // responses before throttling.
   //
   // Version 3 is the first flexible version.
   //
   // Version 4 adds the GroupState field (KIP-518).
-  "validVersions": "0-4",
+  //
+  // Version 5 adds the GroupType field (KIP-848).
+  "validVersions": "0-5",
   "flexibleVersions": "3+",
   "fields": [
-    { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", 
"ignorable": true,
+    { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+",
+      "ignorable": true,
       "about": "The duration in milliseconds for which the request was 
throttled due to a quota violation, or zero if the request did not violate any 
quota." },
     { "name": "ErrorCode", "type": "int16", "versions": "0+",
       "about": "The error code, or 0 if there was no error." },
     { "name": "Groups", "type": "[]ListedGroup", "versions": "0+",
       "about": "Each group in the response.", "fields": [
-      { "name": "GroupId", "type": "string", "versions": "0+", "entityType": 
"groupId",
+      { "name": "GroupId", "type": "string", "versions": "0+",

Review Comment:
   nit: Let's revert this change.



##########
core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala:
##########
@@ -19,7 +19,6 @@ package kafka.coordinator.group
 import java.nio.ByteBuffer
 import java.util.UUID
 import java.util.concurrent.locks.ReentrantLock
-

Review Comment:
   Let's revert this change.



##########
clients/src/main/java/org/apache/kafka/common/ConsumerGroupType.java:
##########
@@ -0,0 +1,50 @@
+/*

Review Comment:
   This seems to be client side related. Do we need it in this PR?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -453,19 +454,31 @@ public Group group(String groupId, long committedOffset) 
throws GroupIdNotFoundE
     /**
      * Get the Group List.
      *
-     * @param statesFilter The states of the groups we want to list.
-     *                     If empty all groups are returned with their state.
-     * @param committedOffset A specified committed offset corresponding to 
this shard
+     * @param statesFilter      The states of the groups we want to list.
+     *                          If empty, all groups are returned with their 
state.
+     * @param typesFilter       The types of the groups we want to list.
+     *                          If empty, all groups are returned with their 
type.
+     * @param committedOffset   A specified committed offset corresponding to 
this shard.
      *
      * @return A list containing the ListGroupsResponseData.ListedGroup
      */
+    public List<ListGroupsResponseData.ListedGroup> listGroups(
+        List<String> statesFilter,
+        List<String> typesFilter,
+        long committedOffset
+    ) {
+        Predicate<Group> combinedFilter = group -> {
+            boolean stateCheck = statesFilter.isEmpty() || 
statesFilter.contains(group.stateAsString(committedOffset));
+            boolean typeCheck = typesFilter.isEmpty() || 
typesFilter.contains(group.type().toString());
+            return stateCheck && typeCheck;
+        };
 
-    public List<ListGroupsResponseData.ListedGroup> listGroups(List<String> 
statesFilter, long committedOffset) {
-        Stream<Group> groupStream = groups.values(committedOffset).stream();
-        if (!statesFilter.isEmpty()) {
-            groupStream = groupStream.filter(group -> 
statesFilter.contains(group.stateAsString(committedOffset)));
-        }
-        return groupStream.map(group -> 
group.asListedGroup(committedOffset)).collect(Collectors.toList());
+        Stream<Group> groupStream = 
groups.values(committedOffset).parallelStream();

Review Comment:
   nit: `parallelStream()` is interesting here. I would start without it and 
measure the impact when we run benchmarks. 



##########
core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala:
##########
@@ -327,23 +327,26 @@ class GroupCoordinatorAdapterTest {
 
   @Test
   def testListGroups(): Unit = {
-    testListGroups(null, Set.empty)
-    testListGroups(List(), Set.empty)
-    testListGroups(List("Stable"), Set("Stable"))
+    testListGroups(null, null, Set.empty, Set.empty)
+    testListGroups(List(), List(), Set.empty, Set.empty)
+    testListGroups(List("Stable, Empty"), List(), Set("Stable, Empty"), 
Set.empty)

Review Comment:
   Should we also add variants with group types?



##########
core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala:
##########
@@ -1105,16 +1105,17 @@ private[group] class GroupCoordinator(
     }
   }
 
-  def handleListGroups(states: Set[String]): (Errors, List[GroupOverview]) = {
+  def handleListGroups(states: Set[String], groupTypes: Set[String]): (Errors, 
List[GroupOverview]) = {
     if (!isActive.get) {
       (Errors.COORDINATOR_NOT_AVAILABLE, List[GroupOverview]())
     } else {
       val errorCode = if (groupManager.isLoading) 
Errors.COORDINATOR_LOAD_IN_PROGRESS else Errors.NONE
-      // if states is empty, return all groups
-      val groups = if (states.isEmpty)
-        groupManager.currentGroups
-      else
-        groupManager.currentGroups.filter(g => 
states.contains(g.summary.state))
+      // Filter groups based on states and groupTypes. If either is empty, it 
won't filter on that criterion.
+      // If groupType is mentioned then no group is returned since the notion 
of groupTypes doesn't exist in the
+      // old group coordinator.
+      val groups = groupManager.currentGroups.filter { g =>
+        (states.isEmpty || states.contains(g.summary.state)) && 
groupTypes.isEmpty
+      }

Review Comment:
   In my opinion, we should rather consider all groups as `classic` groups. 
Therefore, we would only return groups if the type is empty or contains 
`classic`.



##########
clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java:
##########
@@ -50,6 +50,10 @@ public ListGroupsRequest build(short version) {
                 throw new UnsupportedVersionException("The broker only 
supports ListGroups " +
                         "v" + version + ", but we need v4 or newer to request 
groups by states.");
             }
+            if (!data.typesFilter().isEmpty() && version < 5) {
+                throw new UnsupportedVersionException("The broker only 
supports ListGroups " +
+                    "v" + version + ", but we need v5 or newer to request 
groups by type.");
+            }

Review Comment:
   Should we add a unit test to cover this change?



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -633,8 +633,8 @@ class KafkaApisTest extends Logging {
     val requestData = 
DescribeQuorumRequest.singletonRequest(KafkaRaftServer.MetadataPartition)

Review Comment:
   Most of the changes in this file are not related to this PR. Could we revert 
all the unnecessary changes please?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to