dajac commented on code in PR #14271:
URL: https://github.com/apache/kafka/pull/14271#discussion_r1319725841


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java:
##########
@@ -48,6 +49,16 @@ public String toString() {
      */
     String stateAsString();
 
+    /**
+     * @return The {{@link GroupType}}'s String representation with 
committedOffset.

Review Comment:
   nit: `based on the committed offset.`.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -428,9 +431,32 @@ public CompletableFuture<ListGroupsResponseData> 
listGroups(
             return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
         }
 
-        return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
-            "This API is not implemented yet."
-        ));
+        CompletableFuture<ListGroupsResponseData> future = new 
CompletableFuture<>();
+        List<ListGroupsResponseData.ListedGroup> results = new ArrayList<>();

Review Comment:
   nit: Those two could be final as well.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -428,9 +431,32 @@ public CompletableFuture<ListGroupsResponseData> 
listGroups(
             return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
         }
 
-        return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
-            "This API is not implemented yet."
-        ));
+        CompletableFuture<ListGroupsResponseData> future = new 
CompletableFuture<>();
+        List<ListGroupsResponseData.ListedGroup> results = new ArrayList<>();
+        final AtomicInteger cnt = new 
AtomicInteger(runtime.partitions().size());
+
+        for (TopicPartition tp : runtime.partitions()) {

Review Comment:
   It seems to me that calling `partitions()` twice is not safe here because 
the number of partitions could change in between the two calls. I think that we 
should store it in order to avoid this race condition.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##########
@@ -306,6 +308,19 @@ public CoordinatorResult<OffsetCommitResponseData, Record> 
commitOffset(
         return offsetMetadataManager.commitOffset(context, request);
     }
 
+    /**
+     * Handles a ListGroups request.
+     *
+     * @param statesFilter The states of the groups we want to list. If empty 
all groups are returned with their state.

Review Comment:
   nit: Add committedOffset as well.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -179,6 +181,23 @@ public String stateAsString() {
         return state.get().toString();
     }
 
+    /**
+     * @return The current state as a String with given committedOffset.
+     */
+    public String stateAsString(long committedOffset) {
+        return state.get(committedOffset).toString();
+    }
+
+    /**
+     * @return the group formatted as a list group response.

Review Comment:
   nit: ditto.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##########
@@ -599,6 +606,85 @@ public void testHeartbeatCoordinatorException() throws 
Exception {
         );
     }
 
+    @Test
+    public void testListGroups() throws ExecutionException, 
InterruptedException, TimeoutException {
+        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime
+        );
+        service.startup(() -> 1);
+
+        ListGroupsRequestData request = new ListGroupsRequestData();
+
+        List<ListGroupsResponseData.ListedGroup> expectedResults = 
Arrays.asList(
+            new ListGroupsResponseData.ListedGroup()
+                .setGroupId("group1")
+                .setGroupState("Stable")
+                .setProtocolType("protocol1"),
+            new ListGroupsResponseData.ListedGroup()
+                .setGroupId("group2")
+                .setGroupState("Empty")
+                .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+
+        );
+        when(runtime.partitions()).thenReturn(Sets.newSet(new 
TopicPartition("__consumer_offsets", 0)));

Review Comment:
   Could we add more partitions to ensure that the logic to handle them work as 
expected?



##########
server-common/src/main/java/org/apache/kafka/server/util/FutureUtils.java:
##########
@@ -103,4 +104,12 @@ public static <T> CompletableFuture<T> 
failedFuture(Throwable ex) {
         future.completeExceptionally(ex);
         return future;
     }
+
+    public static <T> void drainFutures(

Review Comment:
   I suppose that we could remove this now, isn't it?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -420,6 +422,17 @@ public Group group(String groupId, long committedOffset) 
throws GroupIdNotFoundE
         return group;
     }
 
+    /**
+     * @return The GenericGroup List filtered by statesFilter and 
committedOffset.
+     */

Review Comment:
   nit: Could we update the javadoc to full format? It would be great to 
document the arguments, etc. You have an example right below 
(`getOrMaybeCreateConsumerGroup`).



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -179,6 +181,23 @@ public String stateAsString() {
         return state.get().toString();
     }
 
+    /**
+     * @return The current state as a String with given committedOffset.

Review Comment:
   nit: `based on the committed offset.` here as well.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java:
##########
@@ -48,6 +49,16 @@ public String toString() {
      */
     String stateAsString();
 
+    /**
+     * @return The {{@link GroupType}}'s String representation with 
committedOffset.
+     */
+    String stateAsString(long committedOffset);
+
+    /**
+     * @return the group formatted as a list group response.

Review Comment:
   nit: Should we also add `based on the committed offset.` here?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##########
@@ -599,6 +606,85 @@ public void testHeartbeatCoordinatorException() throws 
Exception {
         );
     }
 
+    @Test
+    public void testListGroups() throws ExecutionException, 
InterruptedException, TimeoutException {
+        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime
+        );
+        service.startup(() -> 1);
+
+        ListGroupsRequestData request = new ListGroupsRequestData();
+
+        List<ListGroupsResponseData.ListedGroup> expectedResults = 
Arrays.asList(
+            new ListGroupsResponseData.ListedGroup()
+                .setGroupId("group1")
+                .setGroupState("Stable")
+                .setProtocolType("protocol1"),
+            new ListGroupsResponseData.ListedGroup()
+                .setGroupId("group2")
+                .setGroupState("Empty")
+                .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+
+        );
+        when(runtime.partitions()).thenReturn(Sets.newSet(new 
TopicPartition("__consumer_offsets", 0)));
+        when(runtime.scheduleReadOperation(
+            ArgumentMatchers.eq("list-groups"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+            ArgumentMatchers.any()
+        )).thenReturn(CompletableFuture.completedFuture(expectedResults));
+
+        CompletableFuture<ListGroupsResponseData> responseFuture = 
service.listGroups(
+            requestContext(ApiKeys.LIST_GROUPS),
+            request
+        );
+
+        assertEquals(new ListGroupsResponseData().setGroups(expectedResults), 
responseFuture.get(5, TimeUnit.SECONDS));
+    }
+
+    private void testListGroupsFailedWithException(Throwable t, 
ListGroupsResponseData expectResponseData)
+        throws InterruptedException, ExecutionException, TimeoutException {
+        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime
+        );
+        service.startup(() -> 1);
+
+        ListGroupsRequestData request = new ListGroupsRequestData();
+        when(runtime.partitions()).thenReturn(Sets.newSet(new 
TopicPartition("__consumer_offsets", 0)));
+        when(runtime.scheduleReadOperation(
+            ArgumentMatchers.eq("list-groups"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+            ArgumentMatchers.any()
+        )).thenReturn(FutureUtils.failedFuture(t));
+
+        CompletableFuture<ListGroupsResponseData> responseFuture = 
service.listGroups(
+            requestContext(ApiKeys.LIST_GROUPS),
+            request
+        );
+        assertEquals(expectResponseData, responseFuture.get(5, 
TimeUnit.SECONDS));
+

Review Comment:
   nit: Remove empty line.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##########
@@ -1169,7 +1179,7 @@ public List<JoinGroupResponseMember> 
currentGenericGroupMembers() {
     /**
      * @return the group formatted as a list group response.

Review Comment:
   nit: ditto.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##########
@@ -599,6 +606,85 @@ public void testHeartbeatCoordinatorException() throws 
Exception {
         );
     }
 
+    @Test
+    public void testListGroups() throws ExecutionException, 
InterruptedException, TimeoutException {
+        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime
+        );
+        service.startup(() -> 1);
+
+        ListGroupsRequestData request = new ListGroupsRequestData();
+
+        List<ListGroupsResponseData.ListedGroup> expectedResults = 
Arrays.asList(
+            new ListGroupsResponseData.ListedGroup()
+                .setGroupId("group1")
+                .setGroupState("Stable")
+                .setProtocolType("protocol1"),
+            new ListGroupsResponseData.ListedGroup()
+                .setGroupId("group2")
+                .setGroupState("Empty")
+                .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+

Review Comment:
   nit: We can remove this empty line.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -8999,4 +9052,4 @@ public SyncResult(
             this.appendFuture = coordinatorResult.appendFuture();
         }
     }
-}
+}

Review Comment:
   nit: Could we add an empty line at the end?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##########
@@ -242,6 +242,16 @@ public String stateAsString() {
         return this.state.toString();
     }
 
+    /**
+     * The state of this group with committedOffset.

Review Comment:
   nit: `based on the committed offset.`.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##########
@@ -599,6 +606,85 @@ public void testHeartbeatCoordinatorException() throws 
Exception {
         );
     }
 
+    @Test
+    public void testListGroups() throws ExecutionException, 
InterruptedException, TimeoutException {
+        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime
+        );
+        service.startup(() -> 1);
+
+        ListGroupsRequestData request = new ListGroupsRequestData();
+
+        List<ListGroupsResponseData.ListedGroup> expectedResults = 
Arrays.asList(
+            new ListGroupsResponseData.ListedGroup()
+                .setGroupId("group1")
+                .setGroupState("Stable")
+                .setProtocolType("protocol1"),
+            new ListGroupsResponseData.ListedGroup()
+                .setGroupId("group2")
+                .setGroupState("Empty")
+                .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+
+        );
+        when(runtime.partitions()).thenReturn(Sets.newSet(new 
TopicPartition("__consumer_offsets", 0)));
+        when(runtime.scheduleReadOperation(
+            ArgumentMatchers.eq("list-groups"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+            ArgumentMatchers.any()
+        )).thenReturn(CompletableFuture.completedFuture(expectedResults));
+
+        CompletableFuture<ListGroupsResponseData> responseFuture = 
service.listGroups(
+            requestContext(ApiKeys.LIST_GROUPS),
+            request
+        );
+
+        assertEquals(new ListGroupsResponseData().setGroups(expectedResults), 
responseFuture.get(5, TimeUnit.SECONDS));
+    }
+
+    private void testListGroupsFailedWithException(Throwable t, 
ListGroupsResponseData expectResponseData)
+        throws InterruptedException, ExecutionException, TimeoutException {
+        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime
+        );
+        service.startup(() -> 1);
+
+        ListGroupsRequestData request = new ListGroupsRequestData();
+        when(runtime.partitions()).thenReturn(Sets.newSet(new 
TopicPartition("__consumer_offsets", 0)));
+        when(runtime.scheduleReadOperation(
+            ArgumentMatchers.eq("list-groups"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+            ArgumentMatchers.any()
+        )).thenReturn(FutureUtils.failedFuture(t));
+
+        CompletableFuture<ListGroupsResponseData> responseFuture = 
service.listGroups(
+            requestContext(ApiKeys.LIST_GROUPS),
+            request
+        );
+        assertEquals(expectResponseData, responseFuture.get(5, 
TimeUnit.SECONDS));
+
+    }
+
+    @Test
+    public void testListGroupsFutureFailed() throws InterruptedException, 
ExecutionException, TimeoutException {
+        for (Errors errors : Errors.values()) {

Review Comment:
   Testing all errors does not seem necessary to me. I think that we should 
test the following cases:
   1. NOT_COORDINATOR is handled.
   2. Other errors fail the future immediately even if not all the future are 
resolved. It would be great if you can have add a unresolved future in this 
case.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -8574,6 +8597,36 @@ public void 
testHeartbeatDuringRebalanceCausesRebalanceInProgress() throws Excep
         assertEquals(Errors.REBALANCE_IN_PROGRESS.code(), 
heartbeatResponse.errorCode());
     }
 
+    @Test
+    public void testListGroups() {
+        String genericGroupId = "generic-group-id";
+        String consumerGroupId = "consumer-group-id";
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder().build();
+        context.updateLastWrittenOffset(context.lastWrittenOffset);
+        GenericGroup genericGroup = context.createGenericGroup(genericGroupId);
+        ConsumerGroup consumerGroup = 
context.createConsumerGroup(consumerGroupId);
+        context.updateLastWrittenOffset(context.lastWrittenOffset + 2);

Review Comment:
   In other tests, we use `replay()` to build the state. For instance, see 
`testConsumerGroupStates`. I wonder if we could reuse the same pattern here in 
order to keep the tests homogenous. What do you think?
   
   Should we also test the state filtering somehow? One way would be to add a 
new member to the consumer group at some point.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to