[GitHub] [kafka] zhaohaidao commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups

2023-09-13 Thread via GitHub


zhaohaidao commented on code in PR #14271:
URL: https://github.com/apache/kafka/pull/14271#discussion_r1324884926


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -431,14 +434,17 @@ public void rollback() {
 snapshotRegistry.revertToSnapshot(lastCommittedOffset);
 }
 
+public void getOrCreateSnapshot() {
+snapshotRegistry.getOrCreateSnapshot(lastCommittedOffset);
+}

Review Comment:
   This will cause testListGroups to fail because 
groupMetadataManager.listGroups always uses lastCommittedOffset to query. 
Putting getOrCreateSnapshot(lastWrittenOffset) at the beginning of the replay 
function will lead to the following result: the largest epoch in the snapshot 
is always 1 less than lastCommittedOffset. In this way, listGroups cannot find 
the expected results.
   Putting getOrCreateSnapshot(lastWrittenOffset) at the end of the function, 
especially after lastWrittenOffset+1, all cases can pass, which is consistent 
with the semantics of CoordinatorRuntime, that is, always update the memory 
first, then update lastWrittenOffset, and use the latest lastWrittenOffset to 
update snapshot.
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] zhaohaidao commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups

2023-09-13 Thread via GitHub


zhaohaidao commented on code in PR #14271:
URL: https://github.com/apache/kafka/pull/14271#discussion_r1324884926


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -431,14 +434,17 @@ public void rollback() {
 snapshotRegistry.revertToSnapshot(lastCommittedOffset);
 }
 
+public void getOrCreateSnapshot() {
+snapshotRegistry.getOrCreateSnapshot(lastCommittedOffset);
+}

Review Comment:
   This will cause testListGroups to fail because 
groupMetadataManager.listGroups always uses lastCommittedOffset to query. 
Putting getOrCreateSnapshot(lastWrittenOffset) at the beginning of the replay 
function will lead to the following result: the largest epoch in the snapshot 
is always 1 less than lastCommittedOffset. In this way, listGroups cannot find 
the expected results.
   Put getOrCreateSnapshot(lastWrittenOffset) at the end of the function, 
especially after lastWrittenOffset+1, all cases can pass, which is consistent 
with the semantics of CoordinatorRuntime, that is, always update the memory 
first, then update lastWrittenOffset, and use the latest lastWrittenOffset 
Update snapshot.
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] zhaohaidao commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups

2023-09-13 Thread via GitHub


zhaohaidao commented on code in PR #14271:
URL: https://github.com/apache/kafka/pull/14271#discussion_r1324870944


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##
@@ -599,6 +608,164 @@ public void testHeartbeatCoordinatorException() throws 
Exception {
 );
 }
 
+@Test
+public void testListGroups() throws ExecutionException, 
InterruptedException, TimeoutException {
+CoordinatorRuntime runtime = 
mockRuntime();
+GroupCoordinatorService service = new GroupCoordinatorService(
+new LogContext(),
+createConfig(),
+runtime
+);
+int partitionCount = 3;
+service.startup(() -> partitionCount);
+
+ListGroupsRequestData request = new ListGroupsRequestData();
+
+List expectedResults = 
Arrays.asList(
+new ListGroupsResponseData.ListedGroup()
+.setGroupId("group0")
+.setGroupState("Stable")
+.setProtocolType("protocol1"),
+new ListGroupsResponseData.ListedGroup()
+.setGroupId("group1")
+.setGroupState("Empty")
+.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE),
+new ListGroupsResponseData.ListedGroup()
+.setGroupId("group2")
+.setGroupState("Dead")
+.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+);
+Map> expectResultMap 
= new HashMap<>();
+for (ListGroupsResponseData.ListedGroup result : expectedResults) {
+expectResultMap.put(result.groupId(), 
Collections.singletonList(result));
+}
+when(runtime.partitions()).thenReturn(Sets.newSet(
+new TopicPartition("__consumer_offsets", 0),
+new TopicPartition("__consumer_offsets", 1),
+new TopicPartition("__consumer_offsets", 2)));
+for (int i = 0; i < partitionCount; i++) {
+when(runtime.scheduleReadOperation(
+ArgumentMatchers.eq("list-groups"),
+ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 
i)),
+ArgumentMatchers.any()
+
)).thenReturn(CompletableFuture.completedFuture(Collections.singletonList(expectedResults.get(i;
+}
+
+CompletableFuture responseFuture = 
service.listGroups(
+requestContext(ApiKeys.LIST_GROUPS),
+request
+);
+
+List actualResults = 
responseFuture.get(5, TimeUnit.SECONDS).groups();
+assertEquals(expectedResults, actualResults);
+assertEquals(expectResultMap.size(), actualResults.size());
+for (ListGroupsResponseData.ListedGroup result : actualResults) {
+assertEquals(expectResultMap.get(result.groupId()), 
Collections.singletonList(result));
+}
+}
+
+@Test
+public void testListGroupsFailedWithNotCoordinatorException()
+throws InterruptedException, ExecutionException, TimeoutException {
+CoordinatorRuntime runtime = 
mockRuntime();
+GroupCoordinatorService service = new GroupCoordinatorService(
+new LogContext(),
+createConfig(),
+runtime
+);
+int partitionCount = 3;
+service.startup(() -> partitionCount);
+
+List expectedResults = 
Arrays.asList(
+new ListGroupsResponseData.ListedGroup()
+.setGroupId("group0")
+.setGroupState("Stable")
+.setProtocolType("protocol1"),
+new ListGroupsResponseData.ListedGroup()
+.setGroupId("group1")
+.setGroupState("Empty")
+.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+);
+Map> expectResultMap 
= new HashMap<>();
+for (ListGroupsResponseData.ListedGroup result : expectedResults) {
+expectResultMap.put(result.groupId(), 
Collections.singletonList(result));
+}
+
+ListGroupsRequestData request = new ListGroupsRequestData();
+when(runtime.partitions()).thenReturn(Sets.newSet(
+new TopicPartition("__consumer_offsets", 0),
+new TopicPartition("__consumer_offsets", 1),
+new TopicPartition("__consumer_offsets", 2)));
+for (int i = 0; i < 2; i++) {
+when(runtime.scheduleReadOperation(
+ArgumentMatchers.eq("list-groups"),
+ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 
i)),
+ArgumentMatchers.any()
+
)).thenReturn(CompletableFuture.completedFuture(Collections.singletonList(expectedResults.get(i;
+}
+
+when(runtime.scheduleReadOperation(
+ArgumentMatchers.eq("list-groups"),
+ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 2)),
+ArgumentMatchers.any()
+)).thenReturn(FutureUtils.failedFuture(new 

[GitHub] [kafka] zhaohaidao commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups

2023-09-13 Thread via GitHub


zhaohaidao commented on code in PR #14271:
URL: https://github.com/apache/kafka/pull/14271#discussion_r1324869392


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -8574,6 +8584,93 @@ public void 
testHeartbeatDuringRebalanceCausesRebalanceInProgress() throws Excep
 assertEquals(Errors.REBALANCE_IN_PROGRESS.code(), 
heartbeatResponse.errorCode());
 }
 
+@Test
+public void testListGroups() {
+String consumerGroupId = "consumer-group-id";
+String genericGroupId = "generic-group-id";
+String memberId1 = Uuid.randomUuid().toString();
+String genericGroupType = "generic";
+Uuid fooTopicId = Uuid.randomUuid();
+String fooTopicName = "foo";
+
+MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withAssignors(Collections.singletonList(assignor))
+.withConsumerGroup(new ConsumerGroupBuilder(consumerGroupId, 10))
+.build();
+context.replay(newGroupMetadataRecord(
+genericGroupId,
+new GroupMetadataValue()
+.setMembers(Collections.emptyList())
+.setGeneration(2)
+.setLeader(null)
+.setProtocolType(genericGroupType)
+.setProtocol("range")
+.setCurrentStateTimestamp(context.time.milliseconds()),
+MetadataVersion.latest()));
+context.commit();
+context.getOrCreateSnapshot();
+GenericGroup genericGroup = 
context.groupMetadataManager.getOrMaybeCreateGenericGroup(genericGroupId, 
false);
+
+Map actualAllGroupMap =
+context.sendListGroups(Collections.emptyList())
+
.stream().collect(Collectors.toMap(ListGroupsResponseData.ListedGroup::groupId, 
Function.identity()));
+Map expectAllGroupMap =
+Stream.of(new ListGroupsResponseData.ListedGroup()
+.setGroupId(genericGroup.groupId())
+.setProtocolType(genericGroupType)
+.setGroupState(EMPTY.toString()),
+new ListGroupsResponseData.ListedGroup()
+.setGroupId(consumerGroupId)
+.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+
.setGroupState(ConsumerGroup.ConsumerGroupState.EMPTY.toString()))
+
.collect(Collectors.toMap(ListGroupsResponseData.ListedGroup::groupId, 
Function.identity()));

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] zhaohaidao commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups

2023-09-13 Thread via GitHub


zhaohaidao commented on code in PR #14271:
URL: https://github.com/apache/kafka/pull/14271#discussion_r1324869129


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -8574,6 +8584,93 @@ public void 
testHeartbeatDuringRebalanceCausesRebalanceInProgress() throws Excep
 assertEquals(Errors.REBALANCE_IN_PROGRESS.code(), 
heartbeatResponse.errorCode());
 }
 
+@Test
+public void testListGroups() {
+String consumerGroupId = "consumer-group-id";
+String genericGroupId = "generic-group-id";
+String memberId1 = Uuid.randomUuid().toString();
+String genericGroupType = "generic";
+Uuid fooTopicId = Uuid.randomUuid();
+String fooTopicName = "foo";
+
+MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withAssignors(Collections.singletonList(assignor))
+.withConsumerGroup(new ConsumerGroupBuilder(consumerGroupId, 10))
+.build();
+context.replay(newGroupMetadataRecord(
+genericGroupId,
+new GroupMetadataValue()
+.setMembers(Collections.emptyList())
+.setGeneration(2)
+.setLeader(null)
+.setProtocolType(genericGroupType)
+.setProtocol("range")
+.setCurrentStateTimestamp(context.time.milliseconds()),
+MetadataVersion.latest()));
+context.commit();
+context.getOrCreateSnapshot();
+GenericGroup genericGroup = 
context.groupMetadataManager.getOrMaybeCreateGenericGroup(genericGroupId, 
false);
+
+Map actualAllGroupMap =
+context.sendListGroups(Collections.emptyList())
+
.stream().collect(Collectors.toMap(ListGroupsResponseData.ListedGroup::groupId, 
Function.identity()));
+Map expectAllGroupMap =
+Stream.of(new ListGroupsResponseData.ListedGroup()
+.setGroupId(genericGroup.groupId())
+.setProtocolType(genericGroupType)
+.setGroupState(EMPTY.toString()),
+new ListGroupsResponseData.ListedGroup()
+.setGroupId(consumerGroupId)
+.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+
.setGroupState(ConsumerGroup.ConsumerGroupState.EMPTY.toString()))
+
.collect(Collectors.toMap(ListGroupsResponseData.ListedGroup::groupId, 
Function.identity()));
+
+assertEquals(expectAllGroupMap.size(), actualAllGroupMap.size());
+for (Map.Entry entry : 
expectAllGroupMap.entrySet()) {
+assertEquals(entry.getValue(), 
actualAllGroupMap.get(entry.getKey()));
+}
+
+
context.replay(RecordHelpers.newMemberSubscriptionRecord(consumerGroupId, new 
ConsumerGroupMember.Builder(memberId1)
+.setSubscribedTopicNames(Collections.singletonList(fooTopicName))
+.build()));
+context.replay(RecordHelpers.newGroupEpochRecord(consumerGroupId, 11));
+context.commit();

Review Comment:
   Can you be more detailed? I don’t quite understand this comment.



##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -8574,6 +8584,93 @@ public void 
testHeartbeatDuringRebalanceCausesRebalanceInProgress() throws Excep
 assertEquals(Errors.REBALANCE_IN_PROGRESS.code(), 
heartbeatResponse.errorCode());
 }
 
+@Test
+public void testListGroups() {
+String consumerGroupId = "consumer-group-id";
+String genericGroupId = "generic-group-id";
+String memberId1 = Uuid.randomUuid().toString();
+String genericGroupType = "generic";
+Uuid fooTopicId = Uuid.randomUuid();
+String fooTopicName = "foo";
+
+MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withAssignors(Collections.singletonList(assignor))
+.withConsumerGroup(new ConsumerGroupBuilder(consumerGroupId, 10))
+.build();
+context.replay(newGroupMetadataRecord(
+genericGroupId,
+new GroupMetadataValue()
+.setMembers(Collections.emptyList())
+.setGeneration(2)
+.setLeader(null)
+.setProtocolType(genericGroupType)
+.setProtocol("range")
+.setCurrentStateTimestamp(context.time.milliseconds()),
+MetadataVersion.latest()));
+context.commit();
+context.getOrCreateSnapshot();
+GenericGroup genericGroup = 
context.groupMetadataManager.getOrMaybeCreateGenericGroup(genericGroupId, 
false);
+
+Map 

[GitHub] [kafka] zhaohaidao commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups

2023-09-13 Thread via GitHub


zhaohaidao commented on code in PR #14271:
URL: https://github.com/apache/kafka/pull/14271#discussion_r1324791489


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##
@@ -599,6 +608,164 @@ public void testHeartbeatCoordinatorException() throws 
Exception {
 );
 }
 
+@Test
+public void testListGroups() throws ExecutionException, 
InterruptedException, TimeoutException {
+CoordinatorRuntime runtime = 
mockRuntime();
+GroupCoordinatorService service = new GroupCoordinatorService(
+new LogContext(),
+createConfig(),
+runtime
+);
+int partitionCount = 3;
+service.startup(() -> partitionCount);
+
+ListGroupsRequestData request = new ListGroupsRequestData();
+
+List expectedResults = 
Arrays.asList(
+new ListGroupsResponseData.ListedGroup()
+.setGroupId("group0")
+.setGroupState("Stable")
+.setProtocolType("protocol1"),
+new ListGroupsResponseData.ListedGroup()
+.setGroupId("group1")
+.setGroupState("Empty")
+.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE),
+new ListGroupsResponseData.ListedGroup()
+.setGroupId("group2")
+.setGroupState("Dead")
+.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+);
+Map> expectResultMap 
= new HashMap<>();
+for (ListGroupsResponseData.ListedGroup result : expectedResults) {
+expectResultMap.put(result.groupId(), 
Collections.singletonList(result));
+}
+when(runtime.partitions()).thenReturn(Sets.newSet(
+new TopicPartition("__consumer_offsets", 0),
+new TopicPartition("__consumer_offsets", 1),
+new TopicPartition("__consumer_offsets", 2)));
+for (int i = 0; i < partitionCount; i++) {
+when(runtime.scheduleReadOperation(
+ArgumentMatchers.eq("list-groups"),
+ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 
i)),
+ArgumentMatchers.any()
+
)).thenReturn(CompletableFuture.completedFuture(Collections.singletonList(expectedResults.get(i;
+}
+
+CompletableFuture responseFuture = 
service.listGroups(
+requestContext(ApiKeys.LIST_GROUPS),
+request
+);
+
+List actualResults = 
responseFuture.get(5, TimeUnit.SECONDS).groups();
+assertEquals(expectedResults, actualResults);
+assertEquals(expectResultMap.size(), actualResults.size());
+for (ListGroupsResponseData.ListedGroup result : actualResults) {
+assertEquals(expectResultMap.get(result.groupId()), 
Collections.singletonList(result));
+}

Review Comment:
   Map is used here because the order of the list returned by the function may 
be different from the expected one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] zhaohaidao commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups

2023-09-12 Thread via GitHub


zhaohaidao commented on code in PR #14271:
URL: https://github.com/apache/kafka/pull/14271#discussion_r1323254383


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##
@@ -599,6 +606,85 @@ public void testHeartbeatCoordinatorException() throws 
Exception {
 );
 }
 
+@Test
+public void testListGroups() throws ExecutionException, 
InterruptedException, TimeoutException {
+CoordinatorRuntime runtime = 
mockRuntime();
+GroupCoordinatorService service = new GroupCoordinatorService(
+new LogContext(),
+createConfig(),
+runtime
+);
+service.startup(() -> 1);
+
+ListGroupsRequestData request = new ListGroupsRequestData();
+
+List expectedResults = 
Arrays.asList(
+new ListGroupsResponseData.ListedGroup()
+.setGroupId("group1")
+.setGroupState("Stable")
+.setProtocolType("protocol1"),
+new ListGroupsResponseData.ListedGroup()
+.setGroupId("group2")
+.setGroupState("Empty")
+.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+
+);
+when(runtime.partitions()).thenReturn(Sets.newSet(new 
TopicPartition("__consumer_offsets", 0)));
+when(runtime.scheduleReadOperation(
+ArgumentMatchers.eq("list-groups"),
+ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+ArgumentMatchers.any()
+)).thenReturn(CompletableFuture.completedFuture(expectedResults));
+
+CompletableFuture responseFuture = 
service.listGroups(
+requestContext(ApiKeys.LIST_GROUPS),
+request
+);
+
+assertEquals(new ListGroupsResponseData().setGroups(expectedResults), 
responseFuture.get(5, TimeUnit.SECONDS));
+}
+
+private void testListGroupsFailedWithException(Throwable t, 
ListGroupsResponseData expectResponseData)
+throws InterruptedException, ExecutionException, TimeoutException {
+CoordinatorRuntime runtime = 
mockRuntime();
+GroupCoordinatorService service = new GroupCoordinatorService(
+new LogContext(),
+createConfig(),
+runtime
+);
+service.startup(() -> 1);
+
+ListGroupsRequestData request = new ListGroupsRequestData();
+when(runtime.partitions()).thenReturn(Sets.newSet(new 
TopicPartition("__consumer_offsets", 0)));
+when(runtime.scheduleReadOperation(
+ArgumentMatchers.eq("list-groups"),
+ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+ArgumentMatchers.any()
+)).thenReturn(FutureUtils.failedFuture(t));
+
+CompletableFuture responseFuture = 
service.listGroups(
+requestContext(ApiKeys.LIST_GROUPS),
+request
+);
+assertEquals(expectResponseData, responseFuture.get(5, 
TimeUnit.SECONDS));
+
+}
+
+@Test
+public void testListGroupsFutureFailed() throws InterruptedException, 
ExecutionException, TimeoutException {
+for (Errors errors : Errors.values()) {

Review Comment:
   I updated this test case based on my own understanding. Please help to see 
if it meets expectations.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] zhaohaidao commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups

2023-09-12 Thread via GitHub


zhaohaidao commented on code in PR #14271:
URL: https://github.com/apache/kafka/pull/14271#discussion_r1323252420


##
server-common/src/main/java/org/apache/kafka/server/util/FutureUtils.java:
##
@@ -103,4 +104,12 @@ public static  CompletableFuture 
failedFuture(Throwable ex) {
 future.completeExceptionally(ex);
 return future;
 }
+
+public static  void drainFutures(

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] zhaohaidao commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups

2023-09-12 Thread via GitHub


zhaohaidao commented on code in PR #14271:
URL: https://github.com/apache/kafka/pull/14271#discussion_r1323252098


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -8574,6 +8597,36 @@ public void 
testHeartbeatDuringRebalanceCausesRebalanceInProgress() throws Excep
 assertEquals(Errors.REBALANCE_IN_PROGRESS.code(), 
heartbeatResponse.errorCode());
 }
 
+@Test
+public void testListGroups() {
+String genericGroupId = "generic-group-id";
+String consumerGroupId = "consumer-group-id";
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder().build();
+context.updateLastWrittenOffset(context.lastWrittenOffset);
+GenericGroup genericGroup = context.createGenericGroup(genericGroupId);
+ConsumerGroup consumerGroup = 
context.createConsumerGroup(consumerGroupId);
+context.updateLastWrittenOffset(context.lastWrittenOffset + 2);

Review Comment:
   I updated this test case based on my own understanding. Please help to see 
if it meets expectations.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] zhaohaidao commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups

2023-09-07 Thread via GitHub


zhaohaidao commented on code in PR #14271:
URL: https://github.com/apache/kafka/pull/14271#discussion_r1318751801


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -420,6 +422,23 @@ public Group group(String groupId, long committedOffset) 
throws GroupIdNotFoundE
 return group;
 }
 
+/**
+ * @return The GenericGroup List filtered by statesFilter and 
committedOffset.
+ */
+public List listGroups(List 
statesFilter, long committedOffset) {
+Stream groupStream = groups.values(committedOffset).stream();
+if (!statesFilter.isEmpty()) {
+groupStream = groupStream.filter(group -> {
+if (group instanceof ConsumerGroup) {
+return statesFilter.contains(((ConsumerGroup) 
group).stateAsString(committedOffset));
+} else {
+return statesFilter.contains(group.stateAsString());
+}

Review Comment:
   make sense. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] zhaohaidao commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups

2023-09-07 Thread via GitHub


zhaohaidao commented on code in PR #14271:
URL: https://github.com/apache/kafka/pull/14271#discussion_r1318745164


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -428,9 +432,51 @@ public CompletableFuture 
listGroups(
 return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
 }
 
-return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
-"This API is not implemented yet."
-));
+List> futures = new 
ArrayList<>();
+for (TopicPartition tp : runtime.partitions()) {
+futures.add(runtime.scheduleReadOperation(
+"list-groups",
+tp,
+(coordinator, lastCommittedOffset) -> 
coordinator.listGroups(request.statesFilter(), lastCommittedOffset)
+).exceptionally(exception -> {
+if (!(exception instanceof KafkaException)) {
+log.error("ListGroups request {} hit an unexpected 
exception: {}",
+request, exception.getMessage());
+throw new RuntimeException(exception);
+}
+if (exception instanceof CoordinatorLoadInProgressException) {
+throw new RuntimeException(exception);
+} else if (exception instanceof NotCoordinatorException) {
+log.warn("ListGroups request {} hit a 
NotCoordinatorException exception: {}",
+request, exception.getMessage());
+return new 
ListGroupsResponseData().setGroups(Collections.emptyList());
+} else {
+return new 
ListGroupsResponseData().setErrorCode(Errors.forException(exception).code());
+}
+}));
+}
+CompletableFuture responseFuture = new 
CompletableFuture<>();
+List listedGroups = new 
ArrayList<>();
+AtomicInteger succeedFutureCount = new AtomicInteger();
+FutureUtils.drainFutures(futures, (data, t) -> {
+synchronized (runtime) {
+if (t != null) {
+responseFuture.completeExceptionally(new 
UnknownServerException(t.getMessage()));
+} else {
+if (data.errorCode() != Errors.NONE.code()) {
+if (!responseFuture.isDone()) {
+responseFuture.complete(data);
+}
+} else {
+listedGroups.addAll(data.groups());
+if (succeedFutureCount.addAndGet(1) == 
runtime.partitions().size()) {
+responseFuture.complete(new 
ListGroupsResponseData().setGroups(listedGroups));
+}
+}
+}
+}
+});
+return responseFuture;

Review Comment:
   Thank you for your suggestions. The code is much simpler and more elegant.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] zhaohaidao commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups

2023-09-03 Thread via GitHub


zhaohaidao commented on code in PR #14271:
URL: https://github.com/apache/kafka/pull/14271#discussion_r1314218928


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##
@@ -309,6 +311,21 @@ public CoordinatorResult 
commitOffset(
 return offsetMetadataManager.commitOffset(context, request);
 }
 
+/**
+ * Handles a ListGroups request.
+ *
+ * @param context The request context.
+ * @param request The ListGroups request.
+ * @return A Result containing the ListGroupsResponseData response
+ */
+public ListGroupsResponseData listGroups(
+RequestContext context,
+ListGroupsRequestData request,
+long committedOffset
+) throws ApiException {

Review Comment:
   I tried to simplify the function signature based on my own understanding. 
The reason why the function return value here is consistent with the current 
one is that when an exception is thrown, the final result with an error code 
can be directly returned. Otherwise, the Runtime exception can only be thrown 
upward in completeExceptionaly. (a list of ListedGroup cannot pass in an error 
code). Then when handling the exception, you need to parse the cause of the 
RuntimeException, which I'm not sure is appropriate. Do you have any 
suggestions?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] zhaohaidao commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups

2023-09-03 Thread via GitHub


zhaohaidao commented on code in PR #14271:
URL: https://github.com/apache/kafka/pull/14271#discussion_r1314219391


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -428,9 +430,44 @@ public CompletableFuture 
listGroups(
 return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
 }
 
-return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
-"This API is not implemented yet."
-));
+List> futures = new 
java.util.ArrayList<>(Collections.emptyList());
+for (TopicPartition tp : runtime.partitions()) {
+futures.add(runtime.scheduleReadOperation(
+"list_groups",
+tp,
+(coordinator, lastCommittedOffset) -> 
coordinator.listGroups(context, request, lastCommittedOffset)
+).exceptionally(exception -> {
+if (!(exception instanceof KafkaException)) {
+log.error("ListGroups request {} hit an unexpected 
exception: {}",
+request, exception.getMessage());
+}
+return new ListGroupsResponseData()
+.setErrorCode(Errors.forException(exception).code());
+}));

Review Comment:
   done.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -428,9 +430,44 @@ public CompletableFuture 
listGroups(
 return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
 }
 
-return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
-"This API is not implemented yet."
-));
+List> futures = new 
java.util.ArrayList<>(Collections.emptyList());
+for (TopicPartition tp : runtime.partitions()) {
+futures.add(runtime.scheduleReadOperation(
+"list_groups",

Review Comment:
   done.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -428,9 +430,44 @@ public CompletableFuture 
listGroups(
 return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
 }
 
-return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
-"This API is not implemented yet."
-));
+List> futures = new 
java.util.ArrayList<>(Collections.emptyList());

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] zhaohaidao commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups

2023-09-03 Thread via GitHub


zhaohaidao commented on code in PR #14271:
URL: https://github.com/apache/kafka/pull/14271#discussion_r1314219340


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -428,9 +430,44 @@ public CompletableFuture 
listGroups(
 return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
 }
 
-return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
-"This API is not implemented yet."
-));
+List> futures = new 
java.util.ArrayList<>(Collections.emptyList());
+for (TopicPartition tp : runtime.partitions()) {
+futures.add(runtime.scheduleReadOperation(
+"list_groups",
+tp,
+(coordinator, lastCommittedOffset) -> 
coordinator.listGroups(context, request, lastCommittedOffset)
+).exceptionally(exception -> {
+if (!(exception instanceof KafkaException)) {
+log.error("ListGroups request {} hit an unexpected 
exception: {}",
+request, exception.getMessage());
+}
+return new ListGroupsResponseData()
+.setErrorCode(Errors.forException(exception).code());
+}));
+}
+CompletableFuture responseFuture = new 
CompletableFuture<>();
+List listedGroups = new 
ArrayList<>();
+futures.forEach(CompletableFuture::join);
+for (CompletableFuture future : futures) {
+try {
+ListGroupsResponseData data = future.get();
+if (data.errorCode() != Errors.NONE.code()) {
+responseFuture.complete(data);
+return responseFuture;
+}
+listedGroups.addAll(future.get().groups());
+} catch (InterruptedException | ExecutionException e) {
+log.error("ListGroups request {} hit an unexpected exception: 
{}",
+request, e.getMessage());
+if (!responseFuture.isDone()) {
+responseFuture.complete(new ListGroupsResponseData()
+.setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code()));
+return responseFuture;
+}
+}
+}

Review Comment:
   I tried to simplify and improve a version based on my own understanding, but 
I’m not sure if it’s consistent with what you expected.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -428,9 +430,44 @@ public CompletableFuture 
listGroups(
 return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
 }
 
-return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
-"This API is not implemented yet."
-));
+List> futures = new 
java.util.ArrayList<>(Collections.emptyList());
+for (TopicPartition tp : runtime.partitions()) {
+futures.add(runtime.scheduleReadOperation(
+"list_groups",
+tp,
+(coordinator, lastCommittedOffset) -> 
coordinator.listGroups(context, request, lastCommittedOffset)
+).exceptionally(exception -> {
+if (!(exception instanceof KafkaException)) {
+log.error("ListGroups request {} hit an unexpected 
exception: {}",
+request, exception.getMessage());
+}
+return new ListGroupsResponseData()
+.setErrorCode(Errors.forException(exception).code());
+}));
+}
+CompletableFuture responseFuture = new 
CompletableFuture<>();
+List listedGroups = new 
ArrayList<>();
+futures.forEach(CompletableFuture::join);
+for (CompletableFuture future : futures) {
+try {
+ListGroupsResponseData data = future.get();
+if (data.errorCode() != Errors.NONE.code()) {
+responseFuture.complete(data);
+return responseFuture;
+}
+listedGroups.addAll(future.get().groups());
+} catch (InterruptedException | ExecutionException e) {
+log.error("ListGroups request {} hit an unexpected exception: 
{}",
+request, e.getMessage());
+if (!responseFuture.isDone()) {
+responseFuture.complete(new ListGroupsResponseData()
+.setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code()));
+return responseFuture;
+}
+}
+}

Review Comment:
   I tried to simplify and improve a version based on my own understanding, but 
I’m not sure if it’s consistent with what you expected.



-- 
This is an automated message from the Apache Git Service.
To 

[GitHub] [kafka] zhaohaidao commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups

2023-09-03 Thread via GitHub


zhaohaidao commented on code in PR #14271:
URL: https://github.com/apache/kafka/pull/14271#discussion_r1314218928


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##
@@ -309,6 +311,21 @@ public CoordinatorResult 
commitOffset(
 return offsetMetadataManager.commitOffset(context, request);
 }
 
+/**
+ * Handles a ListGroups request.
+ *
+ * @param context The request context.
+ * @param request The ListGroups request.
+ * @return A Result containing the ListGroupsResponseData response
+ */
+public ListGroupsResponseData listGroups(
+RequestContext context,
+ListGroupsRequestData request,
+long committedOffset
+) throws ApiException {

Review Comment:
   I tried to simplify the function signature based on my own understanding. 
The reason why the function return value here is consistent with the current 
one is that when an exception is thrown, the final result with an error code 
can be directly returned. Otherwise, the Runtime exception can only be thrown 
upward in completeExceptionaly. (List cannot pass in an error 
code). Then when handling the exception, you need to parse the cause of the 
RuntimeException, which I'm not sure is appropriate. Do you have any 
suggestions?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] zhaohaidao commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups

2023-09-03 Thread via GitHub


zhaohaidao commented on code in PR #14271:
URL: https://github.com/apache/kafka/pull/14271#discussion_r1314217880


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -420,6 +423,18 @@ public Group group(String groupId, long committedOffset) 
throws GroupIdNotFoundE
 return group;
 }
 
+/**
+ * @return The GenericGroup List filtered by statesFilter or typesFilter.
+ */
+public ListGroupsResponseData listGroups(ListGroupsRequestData request, 
long committedOffset) {
+Stream groupStream = groups.values(committedOffset).stream();
+List statesFilter = request.statesFilter();
+if (!statesFilter.isEmpty()) {
+groupStream = groupStream.filter(group -> 
statesFilter.contains(group.stateAsString()));
+}
+return new 
ListGroupsResponseData().setGroups(groupStream.map(Group::asListedGroup).collect(Collectors.toList()));

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] zhaohaidao commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups

2023-09-03 Thread via GitHub


zhaohaidao commented on code in PR #14271:
URL: https://github.com/apache/kafka/pull/14271#discussion_r1314217786


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -420,6 +423,18 @@ public Group group(String groupId, long committedOffset) 
throws GroupIdNotFoundE
 return group;
 }
 
+/**
+ * @return The GenericGroup List filtered by statesFilter or typesFilter.

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] zhaohaidao commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups

2023-09-03 Thread via GitHub


zhaohaidao commented on code in PR #14271:
URL: https://github.com/apache/kafka/pull/14271#discussion_r1314217697


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -747,4 +749,15 @@ private static Integer decValue(String key, Integer value) 
{
 private static Integer incValue(String key, Integer value) {
 return value == null ? 1 : value + 1;
 }
+
+/**
+ * @return the group formatted as a list group response.
+ */
+public ListGroupsResponseData.ListedGroup asListedGroup() {
+return new ListGroupsResponseData.ListedGroup()
+.setGroupId(groupId)
+.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+.setGroupState(state.toString());
+}

Review Comment:
   done.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -420,6 +423,18 @@ public Group group(String groupId, long committedOffset) 
throws GroupIdNotFoundE
 return group;
 }
 
+/**
+ * @return The GenericGroup List filtered by statesFilter or typesFilter.
+ */
+public ListGroupsResponseData listGroups(ListGroupsRequestData request, 
long committedOffset) {

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] zhaohaidao commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups

2023-08-24 Thread via GitHub


zhaohaidao commented on code in PR #14271:
URL: https://github.com/apache/kafka/pull/14271#discussion_r1304575969


##
clients/src/main/resources/common/message/ListGroupsRequest.json:
##
@@ -23,11 +23,15 @@
   // Version 3 is the first flexible version.
   //
   // Version 4 adds the StatesFilter field (KIP-518).
-  "validVersions": "0-4",
+  //
+  // Version 5 adds the TypesFilter field (KIP-848).
+  "validVersions": "0-5",
   "flexibleVersions": "3+",
   "fields": [
 { "name": "StatesFilter", "type": "[]string", "versions": "4+",
   "about": "The states of the groups we want to list. If empty all groups 
are returned with their state."
-}
+},
+{ "name": "TypesFilter", "type": "[]string", "versions": "5+",
+  "about": "The types of the groups we want to list. If empty all groups 
are returned" }

Review Comment:
   The test will use the code generated by the new protocol. If the protocol 
change is to be placed in the second pr, the corresponding test code also needs 
to be removed in this pr.
   I don't know if my understanding is correct



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] zhaohaidao commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups

2023-08-24 Thread via GitHub


zhaohaidao commented on code in PR #14271:
URL: https://github.com/apache/kafka/pull/14271#discussion_r1304540159


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -739,4 +741,15 @@ private static Integer decValue(String key, Integer value) 
{
 private static Integer incValue(String key, Integer value) {
 return value == null ? 1 : value + 1;
 }
+
+/**
+ * @return the group formatted as a list group response.
+ */
+public ListGroupsResponseData.ListedGroup asListedGroup() {
+return new ListGroupsResponseData.ListedGroup()
+.setGroupId(groupId)
+.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+.setGroupState(state.toString());

Review Comment:
   Thank you for the suggestion. I have fixed all the indented issues 
introduced by this pr.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] zhaohaidao commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups

2023-08-24 Thread via GitHub


zhaohaidao commented on code in PR #14271:
URL: https://github.com/apache/kafka/pull/14271#discussion_r1304529821


##
clients/src/main/resources/common/message/ListGroupsRequest.json:
##
@@ -23,11 +23,15 @@
   // Version 3 is the first flexible version.
   //
   // Version 4 adds the StatesFilter field (KIP-518).
-  "validVersions": "0-4",
+  //
+  // Version 5 adds the TypesFilter field (KIP-848).
+  "validVersions": "0-5",
   "flexibleVersions": "3+",
   "fields": [
 { "name": "StatesFilter", "type": "[]string", "versions": "4+",
   "about": "The states of the groups we want to list. If empty all groups 
are returned with their state."
-}
+},
+{ "name": "TypesFilter", "type": "[]string", "versions": "5+",
+  "about": "The types of the groups we want to list. If empty all groups 
are returned" }

Review Comment:
   TODO: Add a second pr to update ListGroups protocol



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] zhaohaidao commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups

2023-08-24 Thread via GitHub


zhaohaidao commented on code in PR #14271:
URL: https://github.com/apache/kafka/pull/14271#discussion_r1304506722


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -739,4 +741,15 @@ private static Integer decValue(String key, Integer value) 
{
 private static Integer incValue(String key, Integer value) {
 return value == null ? 1 : value + 1;
 }
+
+/**
+ * @return the group formatted as a list group response.
+ */
+public ListGroupsResponseData.ListedGroup asListedGroup() {
+return new ListGroupsResponseData.ListedGroup()
+.setGroupId(groupId)
+.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+.setGroupState(state.toString());

Review Comment:
   done



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -739,4 +741,15 @@ private static Integer decValue(String key, Integer value) 
{
 private static Integer incValue(String key, Integer value) {
 return value == null ? 1 : value + 1;
 }
+
+/**
+ * @return the group formatted as a list group response.
+ */
+public ListGroupsResponseData.ListedGroup asListedGroup() {
+return new ListGroupsResponseData.ListedGroup()
+.setGroupId(groupId)
+.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+.setGroupState(state.toString());

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] zhaohaidao commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups

2023-08-24 Thread via GitHub


zhaohaidao commented on code in PR #14271:
URL: https://github.com/apache/kafka/pull/14271#discussion_r1304504593


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -426,9 +429,43 @@ public CompletableFuture 
listGroups(
 return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
 }
 
-return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
-"This API is not implemented yet."
-));
+List> futures = new 
java.util.ArrayList<>(Collections.emptyList());
+for (int i = 0; i < numPartitions; i++) {
+futures.add(runtime.scheduleReadOperation("list_groups",
+new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, i),
+(coordinator, __) -> coordinator.listGroups(context, 
request)

Review Comment:
   I don’t know if I understand it right, here we need to replace __ with 
lastCommittedOffset, right?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -426,9 +429,43 @@ public CompletableFuture 
listGroups(
 return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
 }
 
-return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
-"This API is not implemented yet."
-));
+List> futures = new 
java.util.ArrayList<>(Collections.emptyList());
+for (int i = 0; i < numPartitions; i++) {
+futures.add(runtime.scheduleReadOperation("list_groups",
+new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, i),
+(coordinator, __) -> coordinator.listGroups(context, 
request)

Review Comment:
   I don’t know if I understand it right, here we need to replace __ with 
lastCommittedOffset, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] zhaohaidao commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups

2023-08-24 Thread via GitHub


zhaohaidao commented on code in PR #14271:
URL: https://github.com/apache/kafka/pull/14271#discussion_r1304488801


##
clients/src/main/resources/common/message/ListGroupsRequest.json:
##
@@ -23,11 +23,15 @@
   // Version 3 is the first flexible version.
   //
   // Version 4 adds the StatesFilter field (KIP-518).
-  "validVersions": "0-4",
+  //
+  // Version 5 adds the TypesFilter field (KIP-848).
+  "validVersions": "0-5",
   "flexibleVersions": "3+",
   "fields": [
 { "name": "StatesFilter", "type": "[]string", "versions": "4+",
   "about": "The states of the groups we want to list. If empty all groups 
are returned with their state."
-}
+},
+{ "name": "TypesFilter", "type": "[]string", "versions": "5+",
+  "about": "The types of the groups we want to list. If empty all groups 
are returned" }

Review Comment:
   makes sense



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] zhaohaidao commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups

2023-08-24 Thread via GitHub


zhaohaidao commented on code in PR #14271:
URL: https://github.com/apache/kafka/pull/14271#discussion_r1304474195


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -426,9 +429,43 @@ public CompletableFuture 
listGroups(
 return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
 }
 
-return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
-"This API is not implemented yet."
-));
+List> futures = new 
java.util.ArrayList<>(Collections.emptyList());
+for (int i = 0; i < numPartitions; i++) {

Review Comment:
   Thanks for the reminder, I tried to find it but didn't know it, so I chose 
an inefficient implementation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] zhaohaidao commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups

2023-08-24 Thread via GitHub


zhaohaidao commented on code in PR #14271:
URL: https://github.com/apache/kafka/pull/14271#discussion_r1304456965


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -739,4 +741,15 @@ private static Integer decValue(String key, Integer value) 
{
 private static Integer incValue(String key, Integer value) {
 return value == null ? 1 : value + 1;
 }
+
+/**
+ * @return the group formatted as a list group response.
+ */
+public ListGroupsResponseData.ListedGroup asListedGroup() {

Review Comment:
   > Don't we need to also implement this for `GenericGroup`?
   
   GenericGroup has implemented the asListedGroup method. In fact, GenericGroup 
has this method first. In order to implement ListGroups, I abstracted it into 
an interface and implemented it in ConsumerGroup.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] zhaohaidao commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups

2023-08-24 Thread via GitHub


zhaohaidao commented on code in PR #14271:
URL: https://github.com/apache/kafka/pull/14271#discussion_r1304456965


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -739,4 +741,15 @@ private static Integer decValue(String key, Integer value) 
{
 private static Integer incValue(String key, Integer value) {
 return value == null ? 1 : value + 1;
 }
+
+/**
+ * @return the group formatted as a list group response.
+ */
+public ListGroupsResponseData.ListedGroup asListedGroup() {

Review Comment:
   > Don't we need to also implement this for `GenericGroup`?
   
   GenericGroup has implemented the asListedGroup method. In fact, 
ConsumerGroup has this method first. In order to implement ListGroups, I 
abstracted it into an interface and implemented it in GenericGroup.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org