dajac commented on code in PR #14271: URL: https://github.com/apache/kafka/pull/14271#discussion_r1319725841
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java: ########## @@ -48,6 +49,16 @@ public String toString() { */ String stateAsString(); + /** + * @return The {{@link GroupType}}'s String representation with committedOffset. Review Comment: nit: `based on the committed offset.`. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ########## @@ -428,9 +431,32 @@ public CompletableFuture<ListGroupsResponseData> listGroups( return FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception()); } - return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception( - "This API is not implemented yet." - )); + CompletableFuture<ListGroupsResponseData> future = new CompletableFuture<>(); + List<ListGroupsResponseData.ListedGroup> results = new ArrayList<>(); Review Comment: nit: Those two could be final as well. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ########## @@ -428,9 +431,32 @@ public CompletableFuture<ListGroupsResponseData> listGroups( return FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception()); } - return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception( - "This API is not implemented yet." - )); + CompletableFuture<ListGroupsResponseData> future = new CompletableFuture<>(); + List<ListGroupsResponseData.ListedGroup> results = new ArrayList<>(); + final AtomicInteger cnt = new AtomicInteger(runtime.partitions().size()); + + for (TopicPartition tp : runtime.partitions()) { Review Comment: It seems to me that calling `partitions()` twice is not safe here because the number of partitions could change in between the two calls. I think that we should store it in order to avoid this race condition. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java: ########## @@ -306,6 +308,19 @@ public CoordinatorResult<OffsetCommitResponseData, Record> commitOffset( return offsetMetadataManager.commitOffset(context, request); } + /** + * Handles a ListGroups request. + * + * @param statesFilter The states of the groups we want to list. If empty all groups are returned with their state. Review Comment: nit: Add committedOffset as well. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ########## @@ -179,6 +181,23 @@ public String stateAsString() { return state.get().toString(); } + /** + * @return The current state as a String with given committedOffset. + */ + public String stateAsString(long committedOffset) { + return state.get(committedOffset).toString(); + } + + /** + * @return the group formatted as a list group response. Review Comment: nit: ditto. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java: ########## @@ -599,6 +606,85 @@ public void testHeartbeatCoordinatorException() throws Exception { ); } + @Test + public void testListGroups() throws ExecutionException, InterruptedException, TimeoutException { + CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime(); + GroupCoordinatorService service = new GroupCoordinatorService( + new LogContext(), + createConfig(), + runtime + ); + service.startup(() -> 1); + + ListGroupsRequestData request = new ListGroupsRequestData(); + + List<ListGroupsResponseData.ListedGroup> expectedResults = Arrays.asList( + new ListGroupsResponseData.ListedGroup() + .setGroupId("group1") + .setGroupState("Stable") + .setProtocolType("protocol1"), + new ListGroupsResponseData.ListedGroup() + .setGroupId("group2") + .setGroupState("Empty") + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + + ); + when(runtime.partitions()).thenReturn(Sets.newSet(new TopicPartition("__consumer_offsets", 0))); Review Comment: Could we add more partitions to ensure that the logic to handle them work as expected? ########## server-common/src/main/java/org/apache/kafka/server/util/FutureUtils.java: ########## @@ -103,4 +104,12 @@ public static <T> CompletableFuture<T> failedFuture(Throwable ex) { future.completeExceptionally(ex); return future; } + + public static <T> void drainFutures( Review Comment: I suppose that we could remove this now, isn't it? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -420,6 +422,17 @@ public Group group(String groupId, long committedOffset) throws GroupIdNotFoundE return group; } + /** + * @return The GenericGroup List filtered by statesFilter and committedOffset. + */ Review Comment: nit: Could we update the javadoc to full format? It would be great to document the arguments, etc. You have an example right below (`getOrMaybeCreateConsumerGroup`). ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ########## @@ -179,6 +181,23 @@ public String stateAsString() { return state.get().toString(); } + /** + * @return The current state as a String with given committedOffset. Review Comment: nit: `based on the committed offset.` here as well. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java: ########## @@ -48,6 +49,16 @@ public String toString() { */ String stateAsString(); + /** + * @return The {{@link GroupType}}'s String representation with committedOffset. + */ + String stateAsString(long committedOffset); + + /** + * @return the group formatted as a list group response. Review Comment: nit: Should we also add `based on the committed offset.` here? ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java: ########## @@ -599,6 +606,85 @@ public void testHeartbeatCoordinatorException() throws Exception { ); } + @Test + public void testListGroups() throws ExecutionException, InterruptedException, TimeoutException { + CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime(); + GroupCoordinatorService service = new GroupCoordinatorService( + new LogContext(), + createConfig(), + runtime + ); + service.startup(() -> 1); + + ListGroupsRequestData request = new ListGroupsRequestData(); + + List<ListGroupsResponseData.ListedGroup> expectedResults = Arrays.asList( + new ListGroupsResponseData.ListedGroup() + .setGroupId("group1") + .setGroupState("Stable") + .setProtocolType("protocol1"), + new ListGroupsResponseData.ListedGroup() + .setGroupId("group2") + .setGroupState("Empty") + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + + ); + when(runtime.partitions()).thenReturn(Sets.newSet(new TopicPartition("__consumer_offsets", 0))); + when(runtime.scheduleReadOperation( + ArgumentMatchers.eq("list-groups"), + ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), + ArgumentMatchers.any() + )).thenReturn(CompletableFuture.completedFuture(expectedResults)); + + CompletableFuture<ListGroupsResponseData> responseFuture = service.listGroups( + requestContext(ApiKeys.LIST_GROUPS), + request + ); + + assertEquals(new ListGroupsResponseData().setGroups(expectedResults), responseFuture.get(5, TimeUnit.SECONDS)); + } + + private void testListGroupsFailedWithException(Throwable t, ListGroupsResponseData expectResponseData) + throws InterruptedException, ExecutionException, TimeoutException { + CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime(); + GroupCoordinatorService service = new GroupCoordinatorService( + new LogContext(), + createConfig(), + runtime + ); + service.startup(() -> 1); + + ListGroupsRequestData request = new ListGroupsRequestData(); + when(runtime.partitions()).thenReturn(Sets.newSet(new TopicPartition("__consumer_offsets", 0))); + when(runtime.scheduleReadOperation( + ArgumentMatchers.eq("list-groups"), + ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), + ArgumentMatchers.any() + )).thenReturn(FutureUtils.failedFuture(t)); + + CompletableFuture<ListGroupsResponseData> responseFuture = service.listGroups( + requestContext(ApiKeys.LIST_GROUPS), + request + ); + assertEquals(expectResponseData, responseFuture.get(5, TimeUnit.SECONDS)); + Review Comment: nit: Remove empty line. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java: ########## @@ -1169,7 +1179,7 @@ public List<JoinGroupResponseMember> currentGenericGroupMembers() { /** * @return the group formatted as a list group response. Review Comment: nit: ditto. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java: ########## @@ -599,6 +606,85 @@ public void testHeartbeatCoordinatorException() throws Exception { ); } + @Test + public void testListGroups() throws ExecutionException, InterruptedException, TimeoutException { + CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime(); + GroupCoordinatorService service = new GroupCoordinatorService( + new LogContext(), + createConfig(), + runtime + ); + service.startup(() -> 1); + + ListGroupsRequestData request = new ListGroupsRequestData(); + + List<ListGroupsResponseData.ListedGroup> expectedResults = Arrays.asList( + new ListGroupsResponseData.ListedGroup() + .setGroupId("group1") + .setGroupState("Stable") + .setProtocolType("protocol1"), + new ListGroupsResponseData.ListedGroup() + .setGroupId("group2") + .setGroupState("Empty") + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + Review Comment: nit: We can remove this empty line. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -8999,4 +9052,4 @@ public SyncResult( this.appendFuture = coordinatorResult.appendFuture(); } } -} +} Review Comment: nit: Could we add an empty line at the end? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java: ########## @@ -242,6 +242,16 @@ public String stateAsString() { return this.state.toString(); } + /** + * The state of this group with committedOffset. Review Comment: nit: `based on the committed offset.`. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java: ########## @@ -599,6 +606,85 @@ public void testHeartbeatCoordinatorException() throws Exception { ); } + @Test + public void testListGroups() throws ExecutionException, InterruptedException, TimeoutException { + CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime(); + GroupCoordinatorService service = new GroupCoordinatorService( + new LogContext(), + createConfig(), + runtime + ); + service.startup(() -> 1); + + ListGroupsRequestData request = new ListGroupsRequestData(); + + List<ListGroupsResponseData.ListedGroup> expectedResults = Arrays.asList( + new ListGroupsResponseData.ListedGroup() + .setGroupId("group1") + .setGroupState("Stable") + .setProtocolType("protocol1"), + new ListGroupsResponseData.ListedGroup() + .setGroupId("group2") + .setGroupState("Empty") + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + + ); + when(runtime.partitions()).thenReturn(Sets.newSet(new TopicPartition("__consumer_offsets", 0))); + when(runtime.scheduleReadOperation( + ArgumentMatchers.eq("list-groups"), + ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), + ArgumentMatchers.any() + )).thenReturn(CompletableFuture.completedFuture(expectedResults)); + + CompletableFuture<ListGroupsResponseData> responseFuture = service.listGroups( + requestContext(ApiKeys.LIST_GROUPS), + request + ); + + assertEquals(new ListGroupsResponseData().setGroups(expectedResults), responseFuture.get(5, TimeUnit.SECONDS)); + } + + private void testListGroupsFailedWithException(Throwable t, ListGroupsResponseData expectResponseData) + throws InterruptedException, ExecutionException, TimeoutException { + CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime(); + GroupCoordinatorService service = new GroupCoordinatorService( + new LogContext(), + createConfig(), + runtime + ); + service.startup(() -> 1); + + ListGroupsRequestData request = new ListGroupsRequestData(); + when(runtime.partitions()).thenReturn(Sets.newSet(new TopicPartition("__consumer_offsets", 0))); + when(runtime.scheduleReadOperation( + ArgumentMatchers.eq("list-groups"), + ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), + ArgumentMatchers.any() + )).thenReturn(FutureUtils.failedFuture(t)); + + CompletableFuture<ListGroupsResponseData> responseFuture = service.listGroups( + requestContext(ApiKeys.LIST_GROUPS), + request + ); + assertEquals(expectResponseData, responseFuture.get(5, TimeUnit.SECONDS)); + + } + + @Test + public void testListGroupsFutureFailed() throws InterruptedException, ExecutionException, TimeoutException { + for (Errors errors : Errors.values()) { Review Comment: Testing all errors does not seem necessary to me. I think that we should test the following cases: 1. NOT_COORDINATOR is handled. 2. Other errors fail the future immediately even if not all the future are resolved. It would be great if you can have add a unresolved future in this case. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -8574,6 +8597,36 @@ public void testHeartbeatDuringRebalanceCausesRebalanceInProgress() throws Excep assertEquals(Errors.REBALANCE_IN_PROGRESS.code(), heartbeatResponse.errorCode()); } + @Test + public void testListGroups() { + String genericGroupId = "generic-group-id"; + String consumerGroupId = "consumer-group-id"; + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder().build(); + context.updateLastWrittenOffset(context.lastWrittenOffset); + GenericGroup genericGroup = context.createGenericGroup(genericGroupId); + ConsumerGroup consumerGroup = context.createConsumerGroup(consumerGroupId); + context.updateLastWrittenOffset(context.lastWrittenOffset + 2); Review Comment: In other tests, we use `replay()` to build the state. For instance, see `testConsumerGroupStates`. I wonder if we could reuse the same pattern here in order to keep the tests homogenous. What do you think? Should we also test the state filtering somehow? One way would be to add a new member to the consumer group at some point. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org