[GitHub] [kafka] dajac commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups
dajac commented on code in PR #14271: URL: https://github.com/apache/kafka/pull/14271#discussion_r1325484973 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -8603,6 +8614,86 @@ public void testHeartbeatDuringRebalanceCausesRebalanceInProgress() throws Excep assertEquals(Errors.REBALANCE_IN_PROGRESS.code(), heartbeatResponse.errorCode()); } +@Test +public void testListGroups() { +String consumerGroupId = "consumer-group-id"; +String genericGroupId = "generic-group-id"; +String memberId1 = Uuid.randomUuid().toString(); +String genericGroupType = "generic"; +Uuid fooTopicId = Uuid.randomUuid(); +String fooTopicName = "foo"; + +MockPartitionAssignor assignor = new MockPartitionAssignor("range"); +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withAssignors(Collections.singletonList(assignor)) +.withConsumerGroup(new ConsumerGroupBuilder(consumerGroupId, 10)) +.build(); +context.replay(newGroupMetadataRecord( +genericGroupId, +new GroupMetadataValue() +.setMembers(Collections.emptyList()) +.setGeneration(2) +.setLeader(null) +.setProtocolType(genericGroupType) +.setProtocol("range") +.setCurrentStateTimestamp(context.time.milliseconds()), +MetadataVersion.latest())); +context.commit(); +GenericGroup genericGroup = context.groupMetadataManager.getOrMaybeCreateGenericGroup(genericGroupId, false); + context.replay(RecordHelpers.newMemberSubscriptionRecord(consumerGroupId, new ConsumerGroupMember.Builder(memberId1) +.setSubscribedTopicNames(Collections.singletonList(fooTopicName)) +.build())); +context.replay(RecordHelpers.newGroupEpochRecord(consumerGroupId, 11)); + +Map actualAllGroupMap = +context.sendListGroups(Collections.emptyList()) + .stream().collect(Collectors.toMap(ListGroupsResponseData.ListedGroup::groupId, Function.identity())); +Map expectAllGroupMap = +Stream.of( +new ListGroupsResponseData.ListedGroup() +.setGroupId(genericGroup.groupId()) +.setProtocolType(genericGroupType) +.setGroupState(EMPTY.toString()), +new ListGroupsResponseData.ListedGroup() +.setGroupId(consumerGroupId) +.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setGroupState(ConsumerGroup.ConsumerGroupState.EMPTY.toString()) + ).collect(Collectors.toMap(ListGroupsResponseData.ListedGroup::groupId, Function.identity())); + +assertEquals(expectAllGroupMap, actualAllGroupMap); + + context.replay(RecordHelpers.newMemberSubscriptionRecord(consumerGroupId, new ConsumerGroupMember.Builder(memberId1) +.setSubscribedTopicNames(Collections.singletonList(fooTopicName)) +.build())); +context.replay(RecordHelpers.newGroupEpochRecord(consumerGroupId, 11)); Review Comment: I think that we can remove those two lines now that they are already executed earlier, no? ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java: ## @@ -599,6 +608,164 @@ public void testHeartbeatCoordinatorException() throws Exception { ); } +@Test +public void testListGroups() throws ExecutionException, InterruptedException, TimeoutException { +CoordinatorRuntime runtime = mockRuntime(); +GroupCoordinatorService service = new GroupCoordinatorService( +new LogContext(), +createConfig(), +runtime +); +int partitionCount = 3; +service.startup(() -> partitionCount); + +ListGroupsRequestData request = new ListGroupsRequestData(); + +List expectedResults = Arrays.asList( +new ListGroupsResponseData.ListedGroup() +.setGroupId("group0") +.setGroupState("Stable") +.setProtocolType("protocol1"), +new ListGroupsResponseData.ListedGroup() +.setGroupId("group1") +.setGroupState("Empty") +.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE), +new ListGroupsResponseData.ListedGroup() +.setGroupId("group2") +.setGroupState("Dead") +.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) +); +Map> expectResultMap = new HashMap<>(); +for (ListGroupsResponseData.ListedGroup result : expectedResults) { +expectResultMap.put(result.groupId(), Collections.singletonList(result)); +} +
[GitHub] [kafka] dajac commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups
dajac commented on code in PR #14271: URL: https://github.com/apache/kafka/pull/14271#discussion_r1324188593 ## checkstyle/suppressions.xml: ## @@ -327,7 +327,7 @@ files="(GroupMetadataManager|ConsumerGroupTest|GroupMetadataManagerTest).java"/> - commitOffset( return offsetMetadataManager.commitOffset(context, request); } +/** + * Handles a ListGroups request. + * + * @param statesFilter The states of the groups we want to list. If empty all groups are returned with their state. + * @param committedOffset A specified committed offset corresponding to this shard Review Comment: nit: Could you please align the description of the params like we did for the other methods? ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -420,6 +422,22 @@ public Group group(String groupId, long committedOffset) throws GroupIdNotFoundE return group; } +/** + * Get the Group List. + * + * @param statesFilter The states of the groups we want to list. If empty all groups are returned with their state. + * @param committedOffset A specified committed offset corresponding to this shard Review Comment: nit: Same here. ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java: ## @@ -599,6 +608,164 @@ public void testHeartbeatCoordinatorException() throws Exception { ); } +@Test +public void testListGroups() throws ExecutionException, InterruptedException, TimeoutException { +CoordinatorRuntime runtime = mockRuntime(); +GroupCoordinatorService service = new GroupCoordinatorService( +new LogContext(), +createConfig(), +runtime +); +int partitionCount = 3; +service.startup(() -> partitionCount); + +ListGroupsRequestData request = new ListGroupsRequestData(); + +List expectedResults = Arrays.asList( +new ListGroupsResponseData.ListedGroup() +.setGroupId("group0") +.setGroupState("Stable") +.setProtocolType("protocol1"), +new ListGroupsResponseData.ListedGroup() +.setGroupId("group1") +.setGroupState("Empty") +.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE), +new ListGroupsResponseData.ListedGroup() +.setGroupId("group2") +.setGroupState("Dead") +.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) +); +Map> expectResultMap = new HashMap<>(); +for (ListGroupsResponseData.ListedGroup result : expectedResults) { +expectResultMap.put(result.groupId(), Collections.singletonList(result)); +} +when(runtime.partitions()).thenReturn(Sets.newSet( +new TopicPartition("__consumer_offsets", 0), +new TopicPartition("__consumer_offsets", 1), +new TopicPartition("__consumer_offsets", 2))); +for (int i = 0; i < partitionCount; i++) { +when(runtime.scheduleReadOperation( +ArgumentMatchers.eq("list-groups"), +ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", i)), +ArgumentMatchers.any() + )).thenReturn(CompletableFuture.completedFuture(Collections.singletonList(expectedResults.get(i; +} + +CompletableFuture responseFuture = service.listGroups( +requestContext(ApiKeys.LIST_GROUPS), +request +); + +List actualResults = responseFuture.get(5, TimeUnit.SECONDS).groups(); +assertEquals(expectedResults, actualResults); +assertEquals(expectResultMap.size(), actualResults.size()); +for (ListGroupsResponseData.ListedGroup result : actualResults) { +assertEquals(expectResultMap.get(result.groupId()), Collections.singletonList(result)); +} Review Comment: It seems to me that those assertions are not necessary as `assertEquals(expectedResults, actualResults);` already verifies everything, no? ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java: ## @@ -599,6 +608,164 @@ public void testHeartbeatCoordinatorException() throws Exception { ); } +@Test +public void testListGroups() throws ExecutionException, InterruptedException, TimeoutException { +CoordinatorRuntime runtime = mockRuntime(); +GroupCoordinatorService service = new GroupCoordinatorService( +new LogContext(), +createConfig(), +runtime +); +int partitionCount = 3; +service.startup(() -> partitionCount); + +ListGroupsRequestData request = new ListGroupsRequestData(); + +List
[GitHub] [kafka] dajac commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups
dajac commented on code in PR #14271: URL: https://github.com/apache/kafka/pull/14271#discussion_r1323263745 ## server-common/src/main/java/org/apache/kafka/server/util/FutureUtils.java: ## @@ -103,4 +103,5 @@ public static CompletableFuture failedFuture(Throwable ex) { future.completeExceptionally(ex); return future; } + Review Comment: nit: Remove this line. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups
dajac commented on code in PR #14271: URL: https://github.com/apache/kafka/pull/14271#discussion_r1323262962 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java: ## @@ -599,6 +608,163 @@ public void testHeartbeatCoordinatorException() throws Exception { ); } +@Test +public void testListGroups() throws ExecutionException, InterruptedException, TimeoutException { +CoordinatorRuntime runtime = mockRuntime(); +GroupCoordinatorService service = new GroupCoordinatorService( +new LogContext(), +createConfig(), +runtime +); +int partitionCount = 3; +service.startup(() -> partitionCount); + +ListGroupsRequestData request = new ListGroupsRequestData(); + +List expectedResults = Arrays.asList( +new ListGroupsResponseData.ListedGroup() +.setGroupId("group0") +.setGroupState("Stable") +.setProtocolType("protocol1"), +new ListGroupsResponseData.ListedGroup() +.setGroupId("group1") +.setGroupState("Empty") +.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE), +new ListGroupsResponseData.ListedGroup() +.setGroupId("group2") +.setGroupState("Dead") +.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) +); +Map> expectResultMap = new HashMap<>(); +for (ListGroupsResponseData.ListedGroup result : expectedResults) { +expectResultMap.put(result.groupId(), Collections.singletonList(result)); +} +when(runtime.partitions()).thenReturn(Sets.newSet( +new TopicPartition("__consumer_offsets", 0), +new TopicPartition("__consumer_offsets", 1), +new TopicPartition("__consumer_offsets", 2))); +for (int i = 0; i < partitionCount; i++) { +when(runtime.scheduleReadOperation( +ArgumentMatchers.eq("list-groups"), +ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", i)), +ArgumentMatchers.any() + )).thenReturn(CompletableFuture.completedFuture(Collections.singletonList(expectedResults.get(i; +} + +CompletableFuture responseFuture = service.listGroups( +requestContext(ApiKeys.LIST_GROUPS), +request +); + +List actualResults = responseFuture.get(5, TimeUnit.SECONDS).groups(); +assertEquals(expectedResults, actualResults); +assertEquals(expectResultMap.size(), actualResults.size()); +for (ListGroupsResponseData.ListedGroup result : actualResults) { +assertEquals(expectResultMap.get(result.groupId()), Collections.singletonList(result)); +} +} + +@Test +public void testListGroupsFailedWithNotCoordinatorException() +throws InterruptedException, ExecutionException, TimeoutException { +CoordinatorRuntime runtime = mockRuntime(); +GroupCoordinatorService service = new GroupCoordinatorService( +new LogContext(), +createConfig(), +runtime +); +int partitionCount = 3; +service.startup(() -> partitionCount); + +List expectedResults = Arrays.asList( +new ListGroupsResponseData.ListedGroup() +.setGroupId("group0") +.setGroupState("Stable") +.setProtocolType("protocol1"), +new ListGroupsResponseData.ListedGroup() +.setGroupId("group1") +.setGroupState("Empty") +.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) +); +Map> expectResultMap = new HashMap<>(); +for (ListGroupsResponseData.ListedGroup result : expectedResults) { +expectResultMap.put(result.groupId(), Collections.singletonList(result)); +} + +ListGroupsRequestData request = new ListGroupsRequestData(); +when(runtime.partitions()).thenReturn(Sets.newSet( +new TopicPartition("__consumer_offsets", 0), +new TopicPartition("__consumer_offsets", 1), +new TopicPartition("__consumer_offsets", 2))); +for (int i = 0; i < 2; i++) { +when(runtime.scheduleReadOperation( +ArgumentMatchers.eq("list-groups"), +ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", i)), +ArgumentMatchers.any() + )).thenReturn(CompletableFuture.completedFuture(Collections.singletonList(expectedResults.get(i; +} + +when(runtime.scheduleReadOperation( +ArgumentMatchers.eq("list-groups"), +ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 2)), +ArgumentMatchers.any() +)).thenReturn(FutureUtils.failedFuture(new
[GitHub] [kafka] dajac commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups
dajac commented on code in PR #14271: URL: https://github.com/apache/kafka/pull/14271#discussion_r1319725841 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java: ## @@ -48,6 +49,16 @@ public String toString() { */ String stateAsString(); +/** + * @return The {{@link GroupType}}'s String representation with committedOffset. Review Comment: nit: `based on the committed offset.`. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -428,9 +431,32 @@ public CompletableFuture listGroups( return FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception()); } -return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception( -"This API is not implemented yet." -)); +CompletableFuture future = new CompletableFuture<>(); +List results = new ArrayList<>(); Review Comment: nit: Those two could be final as well. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -428,9 +431,32 @@ public CompletableFuture listGroups( return FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception()); } -return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception( -"This API is not implemented yet." -)); +CompletableFuture future = new CompletableFuture<>(); +List results = new ArrayList<>(); +final AtomicInteger cnt = new AtomicInteger(runtime.partitions().size()); + +for (TopicPartition tp : runtime.partitions()) { Review Comment: It seems to me that calling `partitions()` twice is not safe here because the number of partitions could change in between the two calls. I think that we should store it in order to avoid this race condition. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java: ## @@ -306,6 +308,19 @@ public CoordinatorResult commitOffset( return offsetMetadataManager.commitOffset(context, request); } +/** + * Handles a ListGroups request. + * + * @param statesFilter The states of the groups we want to list. If empty all groups are returned with their state. Review Comment: nit: Add committedOffset as well. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ## @@ -179,6 +181,23 @@ public String stateAsString() { return state.get().toString(); } +/** + * @return The current state as a String with given committedOffset. + */ +public String stateAsString(long committedOffset) { +return state.get(committedOffset).toString(); +} + +/** + * @return the group formatted as a list group response. Review Comment: nit: ditto. ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java: ## @@ -599,6 +606,85 @@ public void testHeartbeatCoordinatorException() throws Exception { ); } +@Test +public void testListGroups() throws ExecutionException, InterruptedException, TimeoutException { +CoordinatorRuntime runtime = mockRuntime(); +GroupCoordinatorService service = new GroupCoordinatorService( +new LogContext(), +createConfig(), +runtime +); +service.startup(() -> 1); + +ListGroupsRequestData request = new ListGroupsRequestData(); + +List expectedResults = Arrays.asList( +new ListGroupsResponseData.ListedGroup() +.setGroupId("group1") +.setGroupState("Stable") +.setProtocolType("protocol1"), +new ListGroupsResponseData.ListedGroup() +.setGroupId("group2") +.setGroupState("Empty") +.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + +); +when(runtime.partitions()).thenReturn(Sets.newSet(new TopicPartition("__consumer_offsets", 0))); Review Comment: Could we add more partitions to ensure that the logic to handle them work as expected? ## server-common/src/main/java/org/apache/kafka/server/util/FutureUtils.java: ## @@ -103,4 +104,12 @@ public static CompletableFuture failedFuture(Throwable ex) { future.completeExceptionally(ex); return future; } + +public static void drainFutures( Review Comment: I suppose that we could remove this now, isn't it? ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -420,6 +422,17 @@ public Group group(String groupId, long committedOffset) throws GroupIdNotFoundE return group; } +/** + * @return The GenericGroup List
[GitHub] [kafka] dajac commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups
dajac commented on code in PR #14271: URL: https://github.com/apache/kafka/pull/14271#discussion_r1314944364 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java: ## @@ -306,6 +308,19 @@ public CoordinatorResult commitOffset( return offsetMetadataManager.commitOffset(context, request); } +/** + * Handles a ListGroups request. + * + * @param statesFilter The states of the groups we want to list. If empty all groups are returned with their state. + * @return A Result containing the ListGroupsResponseData response + */ +public ListGroupsResponseData listGroups( +List statesFilter, +long committedOffset +) throws ApiException { +return new ListGroupsResponseData().setGroups(groupMetadataManager.listGroups(statesFilter, committedOffset)); +} Review Comment: I sill think that we should rather return the list of groups here and create `ListGroupsResponseData` one level up. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -428,9 +432,51 @@ public CompletableFuture listGroups( return FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception()); } -return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception( -"This API is not implemented yet." -)); +List> futures = new ArrayList<>(); +for (TopicPartition tp : runtime.partitions()) { +futures.add(runtime.scheduleReadOperation( +"list-groups", +tp, +(coordinator, lastCommittedOffset) -> coordinator.listGroups(request.statesFilter(), lastCommittedOffset) +).exceptionally(exception -> { +if (!(exception instanceof KafkaException)) { +log.error("ListGroups request {} hit an unexpected exception: {}", +request, exception.getMessage()); +throw new RuntimeException(exception); +} +if (exception instanceof CoordinatorLoadInProgressException) { +throw new RuntimeException(exception); +} else if (exception instanceof NotCoordinatorException) { +log.warn("ListGroups request {} hit a NotCoordinatorException exception: {}", +request, exception.getMessage()); +return new ListGroupsResponseData().setGroups(Collections.emptyList()); +} else { +return new ListGroupsResponseData().setErrorCode(Errors.forException(exception).code()); +} +})); +} +CompletableFuture responseFuture = new CompletableFuture<>(); +List listedGroups = new ArrayList<>(); +AtomicInteger succeedFutureCount = new AtomicInteger(); +FutureUtils.drainFutures(futures, (data, t) -> { +synchronized (runtime) { +if (t != null) { +responseFuture.completeExceptionally(new UnknownServerException(t.getMessage())); +} else { +if (data.errorCode() != Errors.NONE.code()) { +if (!responseFuture.isDone()) { +responseFuture.complete(data); +} +} else { +listedGroups.addAll(data.groups()); +if (succeedFutureCount.addAndGet(1) == runtime.partitions().size()) { +responseFuture.complete(new ListGroupsResponseData().setGroups(listedGroups)); +} +} +} +} +}); +return responseFuture; Review Comment: There are a few issues with this code. 1. Synchronising on `runtime` will create lock contention across all the callers of `listGroups`. We should rather use a local variable. 2. The error handling seems error prone to me. For instance, `NotCoordinatorException` exceptions are turned into `RuntimeException` exceptions and then turned into `UnknownServerException` if I understood it correctly. We lose the semantic along the way. I think that we could take your idea further and combine the two main steps into one. I am thinking about something like this: ``` final List partitions = new ArrayList<>(runtime.partitions()); final CompletableFuture future = new CompletableFuture<>(); final List results = new ArrayList<>(); final AtomicInteger cnt = new AtomicInteger(partitions.size()); for (TopicPartition partition : partitions) { runtime.scheduleReadOperation( "list-group", partition, (coordinator, lastCommittedOffset) ->
[GitHub] [kafka] dajac commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups
dajac commented on code in PR #14271: URL: https://github.com/apache/kafka/pull/14271#discussion_r1312918434 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -428,9 +430,44 @@ public CompletableFuture listGroups( return FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception()); } -return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception( -"This API is not implemented yet." -)); +List> futures = new java.util.ArrayList<>(Collections.emptyList()); +for (TopicPartition tp : runtime.partitions()) { +futures.add(runtime.scheduleReadOperation( +"list_groups", Review Comment: nit: Let's use `list-groups` to be consistent with the existing names. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -428,9 +430,44 @@ public CompletableFuture listGroups( return FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception()); } -return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception( -"This API is not implemented yet." -)); +List> futures = new java.util.ArrayList<>(Collections.emptyList()); Review Comment: nit: Let's remove `java.util` and `Collections.emptyList()` as they are not necessary. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -428,9 +430,44 @@ public CompletableFuture listGroups( return FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception()); } -return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception( -"This API is not implemented yet." -)); +List> futures = new java.util.ArrayList<>(Collections.emptyList()); +for (TopicPartition tp : runtime.partitions()) { +futures.add(runtime.scheduleReadOperation( +"list_groups", +tp, +(coordinator, lastCommittedOffset) -> coordinator.listGroups(context, request, lastCommittedOffset) +).exceptionally(exception -> { +if (!(exception instanceof KafkaException)) { +log.error("ListGroups request {} hit an unexpected exception: {}", +request, exception.getMessage()); +} +return new ListGroupsResponseData() +.setErrorCode(Errors.forException(exception).code()); +})); Review Comment: I think that we need to think a little more about the different errors that we could get here. My understanding is that we fail the entire requests in two cases: 1) at least one partition is loading; and 2) there is an unexpected error (non KafkaException). Here, we could get the following errors: - NotCoordinatorException if `tp` is no longer active, failed, etc. In this case, we actually want to return an empty lit of groups. - CoordinatorLoadingException if `tp` is being loaded. In this case, we want to fail the entire request. - Unexpected Exception. In this case, we also want to fail the entire request. Knowing this, we should explicitly handle the NotCoordinatorException case here. For the other cases, would it be possible to re-throw the exception? It would be great if you could also add a few unit tests for this. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ## @@ -747,4 +749,15 @@ private static Integer decValue(String key, Integer value) { private static Integer incValue(String key, Integer value) { return value == null ? 1 : value + 1; } + +/** + * @return the group formatted as a list group response. + */ +public ListGroupsResponseData.ListedGroup asListedGroup() { +return new ListGroupsResponseData.ListedGroup() +.setGroupId(groupId) +.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) +.setGroupState(state.toString()); +} Review Comment: Let's please add a unit test for this method. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java: ## @@ -309,6 +311,21 @@ public CoordinatorResult commitOffset( return offsetMetadataManager.commitOffset(context, request); } +/** + * Handles a ListGroups request. + * + * @param context The request context. + * @param request The ListGroups request. + * @return A Result containing the ListGroupsResponseData response + */ +public ListGroupsResponseData listGroups( +RequestContext context, +ListGroupsRequestData request, +long committedOffset +) throws ApiException { Review Comment: I
[GitHub] [kafka] dajac commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups
dajac commented on code in PR #14271: URL: https://github.com/apache/kafka/pull/14271#discussion_r1307484913 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -426,9 +429,43 @@ public CompletableFuture listGroups( return FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception()); } -return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception( -"This API is not implemented yet." -)); +List> futures = new java.util.ArrayList<>(Collections.emptyList()); +for (int i = 0; i < numPartitions; i++) { +futures.add(runtime.scheduleReadOperation("list_groups", +new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, i), +(coordinator, __) -> coordinator.listGroups(context, request) Review Comment: Basically [here](https://github.com/apache/kafka/pull/14271/files#diff-00f0f81cf13e66781777d94f7d2e68a581663385c37e98792507f2294c91bb09R418), you need to use `lastCommittedOffset` to ensure that you only read committed state. I don't know if we support the Stream API but the other APIs should accept an argument called `epoch`. You can use `lastCommittedOffset` as the `epoch.` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups
dajac commented on code in PR #14271: URL: https://github.com/apache/kafka/pull/14271#discussion_r1307484913 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -426,9 +429,43 @@ public CompletableFuture listGroups( return FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception()); } -return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception( -"This API is not implemented yet." -)); +List> futures = new java.util.ArrayList<>(Collections.emptyList()); +for (int i = 0; i < numPartitions; i++) { +futures.add(runtime.scheduleReadOperation("list_groups", +new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, i), +(coordinator, __) -> coordinator.listGroups(context, request) Review Comment: Basically [here](https://github.com/apache/kafka/pull/14271/files#diff-00f0f81cf13e66781777d94f7d2e68a581663385c37e98792507f2294c91bb09R418), you need to use `lastCommittedOffset` to ensure that you only read committed state. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups
dajac commented on code in PR #14271: URL: https://github.com/apache/kafka/pull/14271#discussion_r1307483188 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -426,9 +429,43 @@ public CompletableFuture listGroups( return FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception()); } -return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception( -"This API is not implemented yet." -)); +List> futures = new java.util.ArrayList<>(Collections.emptyList()); +for (int i = 0; i < numPartitions; i++) { +futures.add(runtime.scheduleReadOperation("list_groups", +new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, i), +(coordinator, __) -> coordinator.listGroups(context, request) Review Comment: Right. Then you must pass `lastCommittedOffset` to `listGroups` as well and use it to query the timeline data structures. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups
dajac commented on code in PR #14271: URL: https://github.com/apache/kafka/pull/14271#discussion_r1307482348 ## clients/src/main/resources/common/message/ListGroupsRequest.json: ## @@ -23,11 +23,15 @@ // Version 3 is the first flexible version. // // Version 4 adds the StatesFilter field (KIP-518). - "validVersions": "0-4", + // + // Version 5 adds the TypesFilter field (KIP-848). + "validVersions": "0-5", "flexibleVersions": "3+", "fields": [ { "name": "StatesFilter", "type": "[]string", "versions": "4+", "about": "The states of the groups we want to list. If empty all groups are returned with their state." -} +}, +{ "name": "TypesFilter", "type": "[]string", "versions": "5+", + "about": "The types of the groups we want to list. If empty all groups are returned" } Review Comment: If the tests are related to the new filter, they should go to the next PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups
dajac commented on code in PR #14271: URL: https://github.com/apache/kafka/pull/14271#discussion_r1302279096 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -426,9 +429,43 @@ public CompletableFuture listGroups( return FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception()); } -return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception( -"This API is not implemented yet." -)); +List> futures = new java.util.ArrayList<>(Collections.emptyList()); +for (int i = 0; i < numPartitions; i++) { Review Comment: This seems to be inefficient because the coordinator may not be responsible for all the partitions. I thought that we could use `CoordinatorRuntime#partitions` to get the list of registered partitions. Have you considered this? ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java: ## @@ -272,6 +274,21 @@ public CoordinatorResult commitOffset( return offsetMetadataManager.commitOffset(context, request); } +/** + * Handles a ListGroups request. + * + * @param context The request context. + * @param request The ListGroups request. + * + * @return A Result containing the ListGroupsResponseData response + */ +public ListGroupsResponseData listGroups( +RequestContext context, +ListGroupsRequestData request Review Comment: nit: This should be indented with four spaces. ## clients/src/main/resources/common/message/ListGroupsRequest.json: ## @@ -23,11 +23,15 @@ // Version 3 is the first flexible version. // // Version 4 adds the StatesFilter field (KIP-518). - "validVersions": "0-4", + // + // Version 5 adds the TypesFilter field (KIP-848). + "validVersions": "0-5", "flexibleVersions": "3+", "fields": [ { "name": "StatesFilter", "type": "[]string", "versions": "4+", "about": "The states of the groups we want to list. If empty all groups are returned with their state." -} +}, +{ "name": "TypesFilter", "type": "[]string", "versions": "5+", + "about": "The types of the groups we want to list. If empty all groups are returned" } Review Comment: I would rather prefer to do this in a second PR because this change impacts both the new and the old group coordinators. Would it be possible? ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ## @@ -739,4 +741,15 @@ private static Integer decValue(String key, Integer value) { private static Integer incValue(String key, Integer value) { return value == null ? 1 : value + 1; } + +/** + * @return the group formatted as a list group response. + */ +public ListGroupsResponseData.ListedGroup asListedGroup() { +return new ListGroupsResponseData.ListedGroup() +.setGroupId(groupId) +.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) +.setGroupState(state.toString()); Review Comment: nit: This should be indented with four spaces. I have seen this in other places in the code but I am not going to comment them all. I let you have a look. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ## @@ -739,4 +741,15 @@ private static Integer decValue(String key, Integer value) { private static Integer incValue(String key, Integer value) { return value == null ? 1 : value + 1; } + +/** + * @return the group formatted as a list group response. + */ +public ListGroupsResponseData.ListedGroup asListedGroup() { Review Comment: Don't we need to also implement this for `GenericGroup`? ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -426,9 +429,43 @@ public CompletableFuture listGroups( return FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception()); } -return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception( -"This API is not implemented yet." -)); +List> futures = new java.util.ArrayList<>(Collections.emptyList()); +for (int i = 0; i < numPartitions; i++) { +futures.add(runtime.scheduleReadOperation("list_groups", +new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, i), +(coordinator, __) -> coordinator.listGroups(context, request) Review Comment: We need to use the second parameters `__` as pass it down to `listGroups`. For the context, the second parameter is the last committed offsets. We should list the groups based on it. Otherwise, we would return uncommitted changes. -- This is an automated message from the