[GitHub] [kafka] zhaohaidao commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups
zhaohaidao commented on code in PR #14271: URL: https://github.com/apache/kafka/pull/14271#discussion_r1324884926 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -431,14 +434,17 @@ public void rollback() { snapshotRegistry.revertToSnapshot(lastCommittedOffset); } +public void getOrCreateSnapshot() { +snapshotRegistry.getOrCreateSnapshot(lastCommittedOffset); +} Review Comment: This will cause testListGroups to fail because groupMetadataManager.listGroups always uses lastCommittedOffset to query. Putting getOrCreateSnapshot(lastWrittenOffset) at the beginning of the replay function will lead to the following result: the largest epoch in the snapshot is always 1 less than lastCommittedOffset. In this way, listGroups cannot find the expected results. Putting getOrCreateSnapshot(lastWrittenOffset) at the end of the function, especially after lastWrittenOffset+1, all cases can pass, which is consistent with the semantics of CoordinatorRuntime, that is, always update the memory first, then update lastWrittenOffset, and use the latest lastWrittenOffset to update snapshot. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] zhaohaidao commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups
zhaohaidao commented on code in PR #14271: URL: https://github.com/apache/kafka/pull/14271#discussion_r1324884926 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -431,14 +434,17 @@ public void rollback() { snapshotRegistry.revertToSnapshot(lastCommittedOffset); } +public void getOrCreateSnapshot() { +snapshotRegistry.getOrCreateSnapshot(lastCommittedOffset); +} Review Comment: This will cause testListGroups to fail because groupMetadataManager.listGroups always uses lastCommittedOffset to query. Putting getOrCreateSnapshot(lastWrittenOffset) at the beginning of the replay function will lead to the following result: the largest epoch in the snapshot is always 1 less than lastCommittedOffset. In this way, listGroups cannot find the expected results. Put getOrCreateSnapshot(lastWrittenOffset) at the end of the function, especially after lastWrittenOffset+1, all cases can pass, which is consistent with the semantics of CoordinatorRuntime, that is, always update the memory first, then update lastWrittenOffset, and use the latest lastWrittenOffset Update snapshot. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] zhaohaidao commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups
zhaohaidao commented on code in PR #14271: URL: https://github.com/apache/kafka/pull/14271#discussion_r1324870944 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java: ## @@ -599,6 +608,164 @@ public void testHeartbeatCoordinatorException() throws Exception { ); } +@Test +public void testListGroups() throws ExecutionException, InterruptedException, TimeoutException { +CoordinatorRuntime runtime = mockRuntime(); +GroupCoordinatorService service = new GroupCoordinatorService( +new LogContext(), +createConfig(), +runtime +); +int partitionCount = 3; +service.startup(() -> partitionCount); + +ListGroupsRequestData request = new ListGroupsRequestData(); + +List expectedResults = Arrays.asList( +new ListGroupsResponseData.ListedGroup() +.setGroupId("group0") +.setGroupState("Stable") +.setProtocolType("protocol1"), +new ListGroupsResponseData.ListedGroup() +.setGroupId("group1") +.setGroupState("Empty") +.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE), +new ListGroupsResponseData.ListedGroup() +.setGroupId("group2") +.setGroupState("Dead") +.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) +); +Map> expectResultMap = new HashMap<>(); +for (ListGroupsResponseData.ListedGroup result : expectedResults) { +expectResultMap.put(result.groupId(), Collections.singletonList(result)); +} +when(runtime.partitions()).thenReturn(Sets.newSet( +new TopicPartition("__consumer_offsets", 0), +new TopicPartition("__consumer_offsets", 1), +new TopicPartition("__consumer_offsets", 2))); +for (int i = 0; i < partitionCount; i++) { +when(runtime.scheduleReadOperation( +ArgumentMatchers.eq("list-groups"), +ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", i)), +ArgumentMatchers.any() + )).thenReturn(CompletableFuture.completedFuture(Collections.singletonList(expectedResults.get(i; +} + +CompletableFuture responseFuture = service.listGroups( +requestContext(ApiKeys.LIST_GROUPS), +request +); + +List actualResults = responseFuture.get(5, TimeUnit.SECONDS).groups(); +assertEquals(expectedResults, actualResults); +assertEquals(expectResultMap.size(), actualResults.size()); +for (ListGroupsResponseData.ListedGroup result : actualResults) { +assertEquals(expectResultMap.get(result.groupId()), Collections.singletonList(result)); +} +} + +@Test +public void testListGroupsFailedWithNotCoordinatorException() +throws InterruptedException, ExecutionException, TimeoutException { +CoordinatorRuntime runtime = mockRuntime(); +GroupCoordinatorService service = new GroupCoordinatorService( +new LogContext(), +createConfig(), +runtime +); +int partitionCount = 3; +service.startup(() -> partitionCount); + +List expectedResults = Arrays.asList( +new ListGroupsResponseData.ListedGroup() +.setGroupId("group0") +.setGroupState("Stable") +.setProtocolType("protocol1"), +new ListGroupsResponseData.ListedGroup() +.setGroupId("group1") +.setGroupState("Empty") +.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) +); +Map> expectResultMap = new HashMap<>(); +for (ListGroupsResponseData.ListedGroup result : expectedResults) { +expectResultMap.put(result.groupId(), Collections.singletonList(result)); +} + +ListGroupsRequestData request = new ListGroupsRequestData(); +when(runtime.partitions()).thenReturn(Sets.newSet( +new TopicPartition("__consumer_offsets", 0), +new TopicPartition("__consumer_offsets", 1), +new TopicPartition("__consumer_offsets", 2))); +for (int i = 0; i < 2; i++) { +when(runtime.scheduleReadOperation( +ArgumentMatchers.eq("list-groups"), +ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", i)), +ArgumentMatchers.any() + )).thenReturn(CompletableFuture.completedFuture(Collections.singletonList(expectedResults.get(i; +} + +when(runtime.scheduleReadOperation( +ArgumentMatchers.eq("list-groups"), +ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 2)), +ArgumentMatchers.any() +)).thenReturn(FutureUtils.failedFuture(new
[GitHub] [kafka] zhaohaidao commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups
zhaohaidao commented on code in PR #14271: URL: https://github.com/apache/kafka/pull/14271#discussion_r1324869392 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -8574,6 +8584,93 @@ public void testHeartbeatDuringRebalanceCausesRebalanceInProgress() throws Excep assertEquals(Errors.REBALANCE_IN_PROGRESS.code(), heartbeatResponse.errorCode()); } +@Test +public void testListGroups() { +String consumerGroupId = "consumer-group-id"; +String genericGroupId = "generic-group-id"; +String memberId1 = Uuid.randomUuid().toString(); +String genericGroupType = "generic"; +Uuid fooTopicId = Uuid.randomUuid(); +String fooTopicName = "foo"; + +MockPartitionAssignor assignor = new MockPartitionAssignor("range"); +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withAssignors(Collections.singletonList(assignor)) +.withConsumerGroup(new ConsumerGroupBuilder(consumerGroupId, 10)) +.build(); +context.replay(newGroupMetadataRecord( +genericGroupId, +new GroupMetadataValue() +.setMembers(Collections.emptyList()) +.setGeneration(2) +.setLeader(null) +.setProtocolType(genericGroupType) +.setProtocol("range") +.setCurrentStateTimestamp(context.time.milliseconds()), +MetadataVersion.latest())); +context.commit(); +context.getOrCreateSnapshot(); +GenericGroup genericGroup = context.groupMetadataManager.getOrMaybeCreateGenericGroup(genericGroupId, false); + +Map actualAllGroupMap = +context.sendListGroups(Collections.emptyList()) + .stream().collect(Collectors.toMap(ListGroupsResponseData.ListedGroup::groupId, Function.identity())); +Map expectAllGroupMap = +Stream.of(new ListGroupsResponseData.ListedGroup() +.setGroupId(genericGroup.groupId()) +.setProtocolType(genericGroupType) +.setGroupState(EMPTY.toString()), +new ListGroupsResponseData.ListedGroup() +.setGroupId(consumerGroupId) +.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setGroupState(ConsumerGroup.ConsumerGroupState.EMPTY.toString())) + .collect(Collectors.toMap(ListGroupsResponseData.ListedGroup::groupId, Function.identity())); Review Comment: done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] zhaohaidao commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups
zhaohaidao commented on code in PR #14271: URL: https://github.com/apache/kafka/pull/14271#discussion_r1324869129 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -8574,6 +8584,93 @@ public void testHeartbeatDuringRebalanceCausesRebalanceInProgress() throws Excep assertEquals(Errors.REBALANCE_IN_PROGRESS.code(), heartbeatResponse.errorCode()); } +@Test +public void testListGroups() { +String consumerGroupId = "consumer-group-id"; +String genericGroupId = "generic-group-id"; +String memberId1 = Uuid.randomUuid().toString(); +String genericGroupType = "generic"; +Uuid fooTopicId = Uuid.randomUuid(); +String fooTopicName = "foo"; + +MockPartitionAssignor assignor = new MockPartitionAssignor("range"); +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withAssignors(Collections.singletonList(assignor)) +.withConsumerGroup(new ConsumerGroupBuilder(consumerGroupId, 10)) +.build(); +context.replay(newGroupMetadataRecord( +genericGroupId, +new GroupMetadataValue() +.setMembers(Collections.emptyList()) +.setGeneration(2) +.setLeader(null) +.setProtocolType(genericGroupType) +.setProtocol("range") +.setCurrentStateTimestamp(context.time.milliseconds()), +MetadataVersion.latest())); +context.commit(); +context.getOrCreateSnapshot(); +GenericGroup genericGroup = context.groupMetadataManager.getOrMaybeCreateGenericGroup(genericGroupId, false); + +Map actualAllGroupMap = +context.sendListGroups(Collections.emptyList()) + .stream().collect(Collectors.toMap(ListGroupsResponseData.ListedGroup::groupId, Function.identity())); +Map expectAllGroupMap = +Stream.of(new ListGroupsResponseData.ListedGroup() +.setGroupId(genericGroup.groupId()) +.setProtocolType(genericGroupType) +.setGroupState(EMPTY.toString()), +new ListGroupsResponseData.ListedGroup() +.setGroupId(consumerGroupId) +.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setGroupState(ConsumerGroup.ConsumerGroupState.EMPTY.toString())) + .collect(Collectors.toMap(ListGroupsResponseData.ListedGroup::groupId, Function.identity())); + +assertEquals(expectAllGroupMap.size(), actualAllGroupMap.size()); +for (Map.Entry entry : expectAllGroupMap.entrySet()) { +assertEquals(entry.getValue(), actualAllGroupMap.get(entry.getKey())); +} + + context.replay(RecordHelpers.newMemberSubscriptionRecord(consumerGroupId, new ConsumerGroupMember.Builder(memberId1) +.setSubscribedTopicNames(Collections.singletonList(fooTopicName)) +.build())); +context.replay(RecordHelpers.newGroupEpochRecord(consumerGroupId, 11)); +context.commit(); Review Comment: Can you be more detailed? I don’t quite understand this comment. ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -8574,6 +8584,93 @@ public void testHeartbeatDuringRebalanceCausesRebalanceInProgress() throws Excep assertEquals(Errors.REBALANCE_IN_PROGRESS.code(), heartbeatResponse.errorCode()); } +@Test +public void testListGroups() { +String consumerGroupId = "consumer-group-id"; +String genericGroupId = "generic-group-id"; +String memberId1 = Uuid.randomUuid().toString(); +String genericGroupType = "generic"; +Uuid fooTopicId = Uuid.randomUuid(); +String fooTopicName = "foo"; + +MockPartitionAssignor assignor = new MockPartitionAssignor("range"); +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withAssignors(Collections.singletonList(assignor)) +.withConsumerGroup(new ConsumerGroupBuilder(consumerGroupId, 10)) +.build(); +context.replay(newGroupMetadataRecord( +genericGroupId, +new GroupMetadataValue() +.setMembers(Collections.emptyList()) +.setGeneration(2) +.setLeader(null) +.setProtocolType(genericGroupType) +.setProtocol("range") +.setCurrentStateTimestamp(context.time.milliseconds()), +MetadataVersion.latest())); +context.commit(); +context.getOrCreateSnapshot(); +GenericGroup genericGroup = context.groupMetadataManager.getOrMaybeCreateGenericGroup(genericGroupId, false); + +Map
[GitHub] [kafka] zhaohaidao commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups
zhaohaidao commented on code in PR #14271: URL: https://github.com/apache/kafka/pull/14271#discussion_r1324791489 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java: ## @@ -599,6 +608,164 @@ public void testHeartbeatCoordinatorException() throws Exception { ); } +@Test +public void testListGroups() throws ExecutionException, InterruptedException, TimeoutException { +CoordinatorRuntime runtime = mockRuntime(); +GroupCoordinatorService service = new GroupCoordinatorService( +new LogContext(), +createConfig(), +runtime +); +int partitionCount = 3; +service.startup(() -> partitionCount); + +ListGroupsRequestData request = new ListGroupsRequestData(); + +List expectedResults = Arrays.asList( +new ListGroupsResponseData.ListedGroup() +.setGroupId("group0") +.setGroupState("Stable") +.setProtocolType("protocol1"), +new ListGroupsResponseData.ListedGroup() +.setGroupId("group1") +.setGroupState("Empty") +.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE), +new ListGroupsResponseData.ListedGroup() +.setGroupId("group2") +.setGroupState("Dead") +.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) +); +Map> expectResultMap = new HashMap<>(); +for (ListGroupsResponseData.ListedGroup result : expectedResults) { +expectResultMap.put(result.groupId(), Collections.singletonList(result)); +} +when(runtime.partitions()).thenReturn(Sets.newSet( +new TopicPartition("__consumer_offsets", 0), +new TopicPartition("__consumer_offsets", 1), +new TopicPartition("__consumer_offsets", 2))); +for (int i = 0; i < partitionCount; i++) { +when(runtime.scheduleReadOperation( +ArgumentMatchers.eq("list-groups"), +ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", i)), +ArgumentMatchers.any() + )).thenReturn(CompletableFuture.completedFuture(Collections.singletonList(expectedResults.get(i; +} + +CompletableFuture responseFuture = service.listGroups( +requestContext(ApiKeys.LIST_GROUPS), +request +); + +List actualResults = responseFuture.get(5, TimeUnit.SECONDS).groups(); +assertEquals(expectedResults, actualResults); +assertEquals(expectResultMap.size(), actualResults.size()); +for (ListGroupsResponseData.ListedGroup result : actualResults) { +assertEquals(expectResultMap.get(result.groupId()), Collections.singletonList(result)); +} Review Comment: Map is used here because the order of the list returned by the function may be different from the expected one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] zhaohaidao commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups
zhaohaidao commented on code in PR #14271: URL: https://github.com/apache/kafka/pull/14271#discussion_r1323254383 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java: ## @@ -599,6 +606,85 @@ public void testHeartbeatCoordinatorException() throws Exception { ); } +@Test +public void testListGroups() throws ExecutionException, InterruptedException, TimeoutException { +CoordinatorRuntime runtime = mockRuntime(); +GroupCoordinatorService service = new GroupCoordinatorService( +new LogContext(), +createConfig(), +runtime +); +service.startup(() -> 1); + +ListGroupsRequestData request = new ListGroupsRequestData(); + +List expectedResults = Arrays.asList( +new ListGroupsResponseData.ListedGroup() +.setGroupId("group1") +.setGroupState("Stable") +.setProtocolType("protocol1"), +new ListGroupsResponseData.ListedGroup() +.setGroupId("group2") +.setGroupState("Empty") +.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + +); +when(runtime.partitions()).thenReturn(Sets.newSet(new TopicPartition("__consumer_offsets", 0))); +when(runtime.scheduleReadOperation( +ArgumentMatchers.eq("list-groups"), +ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), +ArgumentMatchers.any() +)).thenReturn(CompletableFuture.completedFuture(expectedResults)); + +CompletableFuture responseFuture = service.listGroups( +requestContext(ApiKeys.LIST_GROUPS), +request +); + +assertEquals(new ListGroupsResponseData().setGroups(expectedResults), responseFuture.get(5, TimeUnit.SECONDS)); +} + +private void testListGroupsFailedWithException(Throwable t, ListGroupsResponseData expectResponseData) +throws InterruptedException, ExecutionException, TimeoutException { +CoordinatorRuntime runtime = mockRuntime(); +GroupCoordinatorService service = new GroupCoordinatorService( +new LogContext(), +createConfig(), +runtime +); +service.startup(() -> 1); + +ListGroupsRequestData request = new ListGroupsRequestData(); +when(runtime.partitions()).thenReturn(Sets.newSet(new TopicPartition("__consumer_offsets", 0))); +when(runtime.scheduleReadOperation( +ArgumentMatchers.eq("list-groups"), +ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), +ArgumentMatchers.any() +)).thenReturn(FutureUtils.failedFuture(t)); + +CompletableFuture responseFuture = service.listGroups( +requestContext(ApiKeys.LIST_GROUPS), +request +); +assertEquals(expectResponseData, responseFuture.get(5, TimeUnit.SECONDS)); + +} + +@Test +public void testListGroupsFutureFailed() throws InterruptedException, ExecutionException, TimeoutException { +for (Errors errors : Errors.values()) { Review Comment: I updated this test case based on my own understanding. Please help to see if it meets expectations. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] zhaohaidao commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups
zhaohaidao commented on code in PR #14271: URL: https://github.com/apache/kafka/pull/14271#discussion_r1323252420 ## server-common/src/main/java/org/apache/kafka/server/util/FutureUtils.java: ## @@ -103,4 +104,12 @@ public static CompletableFuture failedFuture(Throwable ex) { future.completeExceptionally(ex); return future; } + +public static void drainFutures( Review Comment: done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] zhaohaidao commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups
zhaohaidao commented on code in PR #14271: URL: https://github.com/apache/kafka/pull/14271#discussion_r1323252098 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -8574,6 +8597,36 @@ public void testHeartbeatDuringRebalanceCausesRebalanceInProgress() throws Excep assertEquals(Errors.REBALANCE_IN_PROGRESS.code(), heartbeatResponse.errorCode()); } +@Test +public void testListGroups() { +String genericGroupId = "generic-group-id"; +String consumerGroupId = "consumer-group-id"; +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder().build(); +context.updateLastWrittenOffset(context.lastWrittenOffset); +GenericGroup genericGroup = context.createGenericGroup(genericGroupId); +ConsumerGroup consumerGroup = context.createConsumerGroup(consumerGroupId); +context.updateLastWrittenOffset(context.lastWrittenOffset + 2); Review Comment: I updated this test case based on my own understanding. Please help to see if it meets expectations. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] zhaohaidao commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups
zhaohaidao commented on code in PR #14271: URL: https://github.com/apache/kafka/pull/14271#discussion_r1318751801 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -420,6 +422,23 @@ public Group group(String groupId, long committedOffset) throws GroupIdNotFoundE return group; } +/** + * @return The GenericGroup List filtered by statesFilter and committedOffset. + */ +public List listGroups(List statesFilter, long committedOffset) { +Stream groupStream = groups.values(committedOffset).stream(); +if (!statesFilter.isEmpty()) { +groupStream = groupStream.filter(group -> { +if (group instanceof ConsumerGroup) { +return statesFilter.contains(((ConsumerGroup) group).stateAsString(committedOffset)); +} else { +return statesFilter.contains(group.stateAsString()); +} Review Comment: make sense. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] zhaohaidao commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups
zhaohaidao commented on code in PR #14271: URL: https://github.com/apache/kafka/pull/14271#discussion_r1318745164 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -428,9 +432,51 @@ public CompletableFuture listGroups( return FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception()); } -return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception( -"This API is not implemented yet." -)); +List> futures = new ArrayList<>(); +for (TopicPartition tp : runtime.partitions()) { +futures.add(runtime.scheduleReadOperation( +"list-groups", +tp, +(coordinator, lastCommittedOffset) -> coordinator.listGroups(request.statesFilter(), lastCommittedOffset) +).exceptionally(exception -> { +if (!(exception instanceof KafkaException)) { +log.error("ListGroups request {} hit an unexpected exception: {}", +request, exception.getMessage()); +throw new RuntimeException(exception); +} +if (exception instanceof CoordinatorLoadInProgressException) { +throw new RuntimeException(exception); +} else if (exception instanceof NotCoordinatorException) { +log.warn("ListGroups request {} hit a NotCoordinatorException exception: {}", +request, exception.getMessage()); +return new ListGroupsResponseData().setGroups(Collections.emptyList()); +} else { +return new ListGroupsResponseData().setErrorCode(Errors.forException(exception).code()); +} +})); +} +CompletableFuture responseFuture = new CompletableFuture<>(); +List listedGroups = new ArrayList<>(); +AtomicInteger succeedFutureCount = new AtomicInteger(); +FutureUtils.drainFutures(futures, (data, t) -> { +synchronized (runtime) { +if (t != null) { +responseFuture.completeExceptionally(new UnknownServerException(t.getMessage())); +} else { +if (data.errorCode() != Errors.NONE.code()) { +if (!responseFuture.isDone()) { +responseFuture.complete(data); +} +} else { +listedGroups.addAll(data.groups()); +if (succeedFutureCount.addAndGet(1) == runtime.partitions().size()) { +responseFuture.complete(new ListGroupsResponseData().setGroups(listedGroups)); +} +} +} +} +}); +return responseFuture; Review Comment: Thank you for your suggestions. The code is much simpler and more elegant. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] zhaohaidao commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups
zhaohaidao commented on code in PR #14271: URL: https://github.com/apache/kafka/pull/14271#discussion_r1314218928 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java: ## @@ -309,6 +311,21 @@ public CoordinatorResult commitOffset( return offsetMetadataManager.commitOffset(context, request); } +/** + * Handles a ListGroups request. + * + * @param context The request context. + * @param request The ListGroups request. + * @return A Result containing the ListGroupsResponseData response + */ +public ListGroupsResponseData listGroups( +RequestContext context, +ListGroupsRequestData request, +long committedOffset +) throws ApiException { Review Comment: I tried to simplify the function signature based on my own understanding. The reason why the function return value here is consistent with the current one is that when an exception is thrown, the final result with an error code can be directly returned. Otherwise, the Runtime exception can only be thrown upward in completeExceptionaly. (a list of ListedGroup cannot pass in an error code). Then when handling the exception, you need to parse the cause of the RuntimeException, which I'm not sure is appropriate. Do you have any suggestions? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] zhaohaidao commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups
zhaohaidao commented on code in PR #14271: URL: https://github.com/apache/kafka/pull/14271#discussion_r1314219391 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -428,9 +430,44 @@ public CompletableFuture listGroups( return FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception()); } -return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception( -"This API is not implemented yet." -)); +List> futures = new java.util.ArrayList<>(Collections.emptyList()); +for (TopicPartition tp : runtime.partitions()) { +futures.add(runtime.scheduleReadOperation( +"list_groups", +tp, +(coordinator, lastCommittedOffset) -> coordinator.listGroups(context, request, lastCommittedOffset) +).exceptionally(exception -> { +if (!(exception instanceof KafkaException)) { +log.error("ListGroups request {} hit an unexpected exception: {}", +request, exception.getMessage()); +} +return new ListGroupsResponseData() +.setErrorCode(Errors.forException(exception).code()); +})); Review Comment: done. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -428,9 +430,44 @@ public CompletableFuture listGroups( return FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception()); } -return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception( -"This API is not implemented yet." -)); +List> futures = new java.util.ArrayList<>(Collections.emptyList()); +for (TopicPartition tp : runtime.partitions()) { +futures.add(runtime.scheduleReadOperation( +"list_groups", Review Comment: done. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -428,9 +430,44 @@ public CompletableFuture listGroups( return FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception()); } -return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception( -"This API is not implemented yet." -)); +List> futures = new java.util.ArrayList<>(Collections.emptyList()); Review Comment: done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] zhaohaidao commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups
zhaohaidao commented on code in PR #14271: URL: https://github.com/apache/kafka/pull/14271#discussion_r1314219340 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -428,9 +430,44 @@ public CompletableFuture listGroups( return FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception()); } -return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception( -"This API is not implemented yet." -)); +List> futures = new java.util.ArrayList<>(Collections.emptyList()); +for (TopicPartition tp : runtime.partitions()) { +futures.add(runtime.scheduleReadOperation( +"list_groups", +tp, +(coordinator, lastCommittedOffset) -> coordinator.listGroups(context, request, lastCommittedOffset) +).exceptionally(exception -> { +if (!(exception instanceof KafkaException)) { +log.error("ListGroups request {} hit an unexpected exception: {}", +request, exception.getMessage()); +} +return new ListGroupsResponseData() +.setErrorCode(Errors.forException(exception).code()); +})); +} +CompletableFuture responseFuture = new CompletableFuture<>(); +List listedGroups = new ArrayList<>(); +futures.forEach(CompletableFuture::join); +for (CompletableFuture future : futures) { +try { +ListGroupsResponseData data = future.get(); +if (data.errorCode() != Errors.NONE.code()) { +responseFuture.complete(data); +return responseFuture; +} +listedGroups.addAll(future.get().groups()); +} catch (InterruptedException | ExecutionException e) { +log.error("ListGroups request {} hit an unexpected exception: {}", +request, e.getMessage()); +if (!responseFuture.isDone()) { +responseFuture.complete(new ListGroupsResponseData() +.setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())); +return responseFuture; +} +} +} Review Comment: I tried to simplify and improve a version based on my own understanding, but I’m not sure if it’s consistent with what you expected. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -428,9 +430,44 @@ public CompletableFuture listGroups( return FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception()); } -return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception( -"This API is not implemented yet." -)); +List> futures = new java.util.ArrayList<>(Collections.emptyList()); +for (TopicPartition tp : runtime.partitions()) { +futures.add(runtime.scheduleReadOperation( +"list_groups", +tp, +(coordinator, lastCommittedOffset) -> coordinator.listGroups(context, request, lastCommittedOffset) +).exceptionally(exception -> { +if (!(exception instanceof KafkaException)) { +log.error("ListGroups request {} hit an unexpected exception: {}", +request, exception.getMessage()); +} +return new ListGroupsResponseData() +.setErrorCode(Errors.forException(exception).code()); +})); +} +CompletableFuture responseFuture = new CompletableFuture<>(); +List listedGroups = new ArrayList<>(); +futures.forEach(CompletableFuture::join); +for (CompletableFuture future : futures) { +try { +ListGroupsResponseData data = future.get(); +if (data.errorCode() != Errors.NONE.code()) { +responseFuture.complete(data); +return responseFuture; +} +listedGroups.addAll(future.get().groups()); +} catch (InterruptedException | ExecutionException e) { +log.error("ListGroups request {} hit an unexpected exception: {}", +request, e.getMessage()); +if (!responseFuture.isDone()) { +responseFuture.complete(new ListGroupsResponseData() +.setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())); +return responseFuture; +} +} +} Review Comment: I tried to simplify and improve a version based on my own understanding, but I’m not sure if it’s consistent with what you expected. -- This is an automated message from the Apache Git Service. To
[GitHub] [kafka] zhaohaidao commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups
zhaohaidao commented on code in PR #14271: URL: https://github.com/apache/kafka/pull/14271#discussion_r1314218928 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java: ## @@ -309,6 +311,21 @@ public CoordinatorResult commitOffset( return offsetMetadataManager.commitOffset(context, request); } +/** + * Handles a ListGroups request. + * + * @param context The request context. + * @param request The ListGroups request. + * @return A Result containing the ListGroupsResponseData response + */ +public ListGroupsResponseData listGroups( +RequestContext context, +ListGroupsRequestData request, +long committedOffset +) throws ApiException { Review Comment: I tried to simplify the function signature based on my own understanding. The reason why the function return value here is consistent with the current one is that when an exception is thrown, the final result with an error code can be directly returned. Otherwise, the Runtime exception can only be thrown upward in completeExceptionaly. (List cannot pass in an error code). Then when handling the exception, you need to parse the cause of the RuntimeException, which I'm not sure is appropriate. Do you have any suggestions? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] zhaohaidao commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups
zhaohaidao commented on code in PR #14271: URL: https://github.com/apache/kafka/pull/14271#discussion_r1314217880 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -420,6 +423,18 @@ public Group group(String groupId, long committedOffset) throws GroupIdNotFoundE return group; } +/** + * @return The GenericGroup List filtered by statesFilter or typesFilter. + */ +public ListGroupsResponseData listGroups(ListGroupsRequestData request, long committedOffset) { +Stream groupStream = groups.values(committedOffset).stream(); +List statesFilter = request.statesFilter(); +if (!statesFilter.isEmpty()) { +groupStream = groupStream.filter(group -> statesFilter.contains(group.stateAsString())); +} +return new ListGroupsResponseData().setGroups(groupStream.map(Group::asListedGroup).collect(Collectors.toList())); Review Comment: done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] zhaohaidao commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups
zhaohaidao commented on code in PR #14271: URL: https://github.com/apache/kafka/pull/14271#discussion_r1314217786 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -420,6 +423,18 @@ public Group group(String groupId, long committedOffset) throws GroupIdNotFoundE return group; } +/** + * @return The GenericGroup List filtered by statesFilter or typesFilter. Review Comment: done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] zhaohaidao commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups
zhaohaidao commented on code in PR #14271: URL: https://github.com/apache/kafka/pull/14271#discussion_r1314217697 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ## @@ -747,4 +749,15 @@ private static Integer decValue(String key, Integer value) { private static Integer incValue(String key, Integer value) { return value == null ? 1 : value + 1; } + +/** + * @return the group formatted as a list group response. + */ +public ListGroupsResponseData.ListedGroup asListedGroup() { +return new ListGroupsResponseData.ListedGroup() +.setGroupId(groupId) +.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) +.setGroupState(state.toString()); +} Review Comment: done. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -420,6 +423,18 @@ public Group group(String groupId, long committedOffset) throws GroupIdNotFoundE return group; } +/** + * @return The GenericGroup List filtered by statesFilter or typesFilter. + */ +public ListGroupsResponseData listGroups(ListGroupsRequestData request, long committedOffset) { Review Comment: done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] zhaohaidao commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups
zhaohaidao commented on code in PR #14271: URL: https://github.com/apache/kafka/pull/14271#discussion_r1304575969 ## clients/src/main/resources/common/message/ListGroupsRequest.json: ## @@ -23,11 +23,15 @@ // Version 3 is the first flexible version. // // Version 4 adds the StatesFilter field (KIP-518). - "validVersions": "0-4", + // + // Version 5 adds the TypesFilter field (KIP-848). + "validVersions": "0-5", "flexibleVersions": "3+", "fields": [ { "name": "StatesFilter", "type": "[]string", "versions": "4+", "about": "The states of the groups we want to list. If empty all groups are returned with their state." -} +}, +{ "name": "TypesFilter", "type": "[]string", "versions": "5+", + "about": "The types of the groups we want to list. If empty all groups are returned" } Review Comment: The test will use the code generated by the new protocol. If the protocol change is to be placed in the second pr, the corresponding test code also needs to be removed in this pr. I don't know if my understanding is correct -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] zhaohaidao commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups
zhaohaidao commented on code in PR #14271: URL: https://github.com/apache/kafka/pull/14271#discussion_r1304540159 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ## @@ -739,4 +741,15 @@ private static Integer decValue(String key, Integer value) { private static Integer incValue(String key, Integer value) { return value == null ? 1 : value + 1; } + +/** + * @return the group formatted as a list group response. + */ +public ListGroupsResponseData.ListedGroup asListedGroup() { +return new ListGroupsResponseData.ListedGroup() +.setGroupId(groupId) +.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) +.setGroupState(state.toString()); Review Comment: Thank you for the suggestion. I have fixed all the indented issues introduced by this pr. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] zhaohaidao commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups
zhaohaidao commented on code in PR #14271: URL: https://github.com/apache/kafka/pull/14271#discussion_r1304529821 ## clients/src/main/resources/common/message/ListGroupsRequest.json: ## @@ -23,11 +23,15 @@ // Version 3 is the first flexible version. // // Version 4 adds the StatesFilter field (KIP-518). - "validVersions": "0-4", + // + // Version 5 adds the TypesFilter field (KIP-848). + "validVersions": "0-5", "flexibleVersions": "3+", "fields": [ { "name": "StatesFilter", "type": "[]string", "versions": "4+", "about": "The states of the groups we want to list. If empty all groups are returned with their state." -} +}, +{ "name": "TypesFilter", "type": "[]string", "versions": "5+", + "about": "The types of the groups we want to list. If empty all groups are returned" } Review Comment: TODO: Add a second pr to update ListGroups protocol -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] zhaohaidao commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups
zhaohaidao commented on code in PR #14271: URL: https://github.com/apache/kafka/pull/14271#discussion_r1304506722 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ## @@ -739,4 +741,15 @@ private static Integer decValue(String key, Integer value) { private static Integer incValue(String key, Integer value) { return value == null ? 1 : value + 1; } + +/** + * @return the group formatted as a list group response. + */ +public ListGroupsResponseData.ListedGroup asListedGroup() { +return new ListGroupsResponseData.ListedGroup() +.setGroupId(groupId) +.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) +.setGroupState(state.toString()); Review Comment: done ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ## @@ -739,4 +741,15 @@ private static Integer decValue(String key, Integer value) { private static Integer incValue(String key, Integer value) { return value == null ? 1 : value + 1; } + +/** + * @return the group formatted as a list group response. + */ +public ListGroupsResponseData.ListedGroup asListedGroup() { +return new ListGroupsResponseData.ListedGroup() +.setGroupId(groupId) +.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) +.setGroupState(state.toString()); Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] zhaohaidao commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups
zhaohaidao commented on code in PR #14271: URL: https://github.com/apache/kafka/pull/14271#discussion_r1304504593 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -426,9 +429,43 @@ public CompletableFuture listGroups( return FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception()); } -return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception( -"This API is not implemented yet." -)); +List> futures = new java.util.ArrayList<>(Collections.emptyList()); +for (int i = 0; i < numPartitions; i++) { +futures.add(runtime.scheduleReadOperation("list_groups", +new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, i), +(coordinator, __) -> coordinator.listGroups(context, request) Review Comment: I don’t know if I understand it right, here we need to replace __ with lastCommittedOffset, right? ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -426,9 +429,43 @@ public CompletableFuture listGroups( return FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception()); } -return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception( -"This API is not implemented yet." -)); +List> futures = new java.util.ArrayList<>(Collections.emptyList()); +for (int i = 0; i < numPartitions; i++) { +futures.add(runtime.scheduleReadOperation("list_groups", +new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, i), +(coordinator, __) -> coordinator.listGroups(context, request) Review Comment: I don’t know if I understand it right, here we need to replace __ with lastCommittedOffset, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] zhaohaidao commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups
zhaohaidao commented on code in PR #14271: URL: https://github.com/apache/kafka/pull/14271#discussion_r1304488801 ## clients/src/main/resources/common/message/ListGroupsRequest.json: ## @@ -23,11 +23,15 @@ // Version 3 is the first flexible version. // // Version 4 adds the StatesFilter field (KIP-518). - "validVersions": "0-4", + // + // Version 5 adds the TypesFilter field (KIP-848). + "validVersions": "0-5", "flexibleVersions": "3+", "fields": [ { "name": "StatesFilter", "type": "[]string", "versions": "4+", "about": "The states of the groups we want to list. If empty all groups are returned with their state." -} +}, +{ "name": "TypesFilter", "type": "[]string", "versions": "5+", + "about": "The types of the groups we want to list. If empty all groups are returned" } Review Comment: makes sense -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] zhaohaidao commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups
zhaohaidao commented on code in PR #14271: URL: https://github.com/apache/kafka/pull/14271#discussion_r1304474195 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -426,9 +429,43 @@ public CompletableFuture listGroups( return FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception()); } -return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception( -"This API is not implemented yet." -)); +List> futures = new java.util.ArrayList<>(Collections.emptyList()); +for (int i = 0; i < numPartitions; i++) { Review Comment: Thanks for the reminder, I tried to find it but didn't know it, so I chose an inefficient implementation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] zhaohaidao commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups
zhaohaidao commented on code in PR #14271: URL: https://github.com/apache/kafka/pull/14271#discussion_r1304456965 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ## @@ -739,4 +741,15 @@ private static Integer decValue(String key, Integer value) { private static Integer incValue(String key, Integer value) { return value == null ? 1 : value + 1; } + +/** + * @return the group formatted as a list group response. + */ +public ListGroupsResponseData.ListedGroup asListedGroup() { Review Comment: > Don't we need to also implement this for `GenericGroup`? GenericGroup has implemented the asListedGroup method. In fact, GenericGroup has this method first. In order to implement ListGroups, I abstracted it into an interface and implemented it in ConsumerGroup. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] zhaohaidao commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups
zhaohaidao commented on code in PR #14271: URL: https://github.com/apache/kafka/pull/14271#discussion_r1304456965 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ## @@ -739,4 +741,15 @@ private static Integer decValue(String key, Integer value) { private static Integer incValue(String key, Integer value) { return value == null ? 1 : value + 1; } + +/** + * @return the group formatted as a list group response. + */ +public ListGroupsResponseData.ListedGroup asListedGroup() { Review Comment: > Don't we need to also implement this for `GenericGroup`? GenericGroup has implemented the asListedGroup method. In fact, ConsumerGroup has this method first. In order to implement ListGroups, I abstracted it into an interface and implemented it in GenericGroup. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org