[GitHub] [kafka] dajac commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups

2023-09-14 Thread via GitHub


dajac commented on code in PR #14271:
URL: https://github.com/apache/kafka/pull/14271#discussion_r1325484973


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -8603,6 +8614,86 @@ public void 
testHeartbeatDuringRebalanceCausesRebalanceInProgress() throws Excep
 assertEquals(Errors.REBALANCE_IN_PROGRESS.code(), 
heartbeatResponse.errorCode());
 }
 
+@Test
+public void testListGroups() {
+String consumerGroupId = "consumer-group-id";
+String genericGroupId = "generic-group-id";
+String memberId1 = Uuid.randomUuid().toString();
+String genericGroupType = "generic";
+Uuid fooTopicId = Uuid.randomUuid();
+String fooTopicName = "foo";
+
+MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withAssignors(Collections.singletonList(assignor))
+.withConsumerGroup(new ConsumerGroupBuilder(consumerGroupId, 10))
+.build();
+context.replay(newGroupMetadataRecord(
+genericGroupId,
+new GroupMetadataValue()
+.setMembers(Collections.emptyList())
+.setGeneration(2)
+.setLeader(null)
+.setProtocolType(genericGroupType)
+.setProtocol("range")
+.setCurrentStateTimestamp(context.time.milliseconds()),
+MetadataVersion.latest()));
+context.commit();
+GenericGroup genericGroup = 
context.groupMetadataManager.getOrMaybeCreateGenericGroup(genericGroupId, 
false);
+
context.replay(RecordHelpers.newMemberSubscriptionRecord(consumerGroupId, new 
ConsumerGroupMember.Builder(memberId1)
+.setSubscribedTopicNames(Collections.singletonList(fooTopicName))
+.build()));
+context.replay(RecordHelpers.newGroupEpochRecord(consumerGroupId, 11));
+
+Map actualAllGroupMap =
+context.sendListGroups(Collections.emptyList())
+
.stream().collect(Collectors.toMap(ListGroupsResponseData.ListedGroup::groupId, 
Function.identity()));
+Map expectAllGroupMap =
+Stream.of(
+new ListGroupsResponseData.ListedGroup()
+.setGroupId(genericGroup.groupId())
+.setProtocolType(genericGroupType)
+.setGroupState(EMPTY.toString()),
+new ListGroupsResponseData.ListedGroup()
+.setGroupId(consumerGroupId)
+.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+
.setGroupState(ConsumerGroup.ConsumerGroupState.EMPTY.toString())
+
).collect(Collectors.toMap(ListGroupsResponseData.ListedGroup::groupId, 
Function.identity()));
+
+assertEquals(expectAllGroupMap, actualAllGroupMap);
+
+
context.replay(RecordHelpers.newMemberSubscriptionRecord(consumerGroupId, new 
ConsumerGroupMember.Builder(memberId1)
+.setSubscribedTopicNames(Collections.singletonList(fooTopicName))
+.build()));
+context.replay(RecordHelpers.newGroupEpochRecord(consumerGroupId, 11));

Review Comment:
   I think that we can remove those two lines now that they are already 
executed earlier, no?



##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##
@@ -599,6 +608,164 @@ public void testHeartbeatCoordinatorException() throws 
Exception {
 );
 }
 
+@Test
+public void testListGroups() throws ExecutionException, 
InterruptedException, TimeoutException {
+CoordinatorRuntime runtime = 
mockRuntime();
+GroupCoordinatorService service = new GroupCoordinatorService(
+new LogContext(),
+createConfig(),
+runtime
+);
+int partitionCount = 3;
+service.startup(() -> partitionCount);
+
+ListGroupsRequestData request = new ListGroupsRequestData();
+
+List expectedResults = 
Arrays.asList(
+new ListGroupsResponseData.ListedGroup()
+.setGroupId("group0")
+.setGroupState("Stable")
+.setProtocolType("protocol1"),
+new ListGroupsResponseData.ListedGroup()
+.setGroupId("group1")
+.setGroupState("Empty")
+.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE),
+new ListGroupsResponseData.ListedGroup()
+.setGroupId("group2")
+.setGroupState("Dead")
+.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+);
+Map> expectResultMap 
= new HashMap<>();
+for (ListGroupsResponseData.ListedGroup result : expectedResults) {
+expectResultMap.put(result.groupId(), 
Collections.singletonList(result));
+}
+  

[GitHub] [kafka] dajac commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups

2023-09-13 Thread via GitHub


dajac commented on code in PR #14271:
URL: https://github.com/apache/kafka/pull/14271#discussion_r1324188593


##
checkstyle/suppressions.xml:
##
@@ -327,7 +327,7 @@
   
files="(GroupMetadataManager|ConsumerGroupTest|GroupMetadataManagerTest).java"/>
 
- 
commitOffset(
 return offsetMetadataManager.commitOffset(context, request);
 }
 
+/**
+ * Handles a ListGroups request.
+ *
+ * @param statesFilter The states of the groups we want to list. If empty 
all groups are returned with their state.
+ * @param committedOffset A specified committed offset corresponding to 
this shard

Review Comment:
   nit: Could you please align the description of the params like we did for 
the other methods?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -420,6 +422,22 @@ public Group group(String groupId, long committedOffset) 
throws GroupIdNotFoundE
 return group;
 }
 
+/**
+ * Get the Group List.
+ *
+ * @param statesFilter The states of the groups we want to list. If empty 
all groups are returned with their state.
+ * @param committedOffset A specified committed offset corresponding to 
this shard

Review Comment:
   nit: Same here.



##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##
@@ -599,6 +608,164 @@ public void testHeartbeatCoordinatorException() throws 
Exception {
 );
 }
 
+@Test
+public void testListGroups() throws ExecutionException, 
InterruptedException, TimeoutException {
+CoordinatorRuntime runtime = 
mockRuntime();
+GroupCoordinatorService service = new GroupCoordinatorService(
+new LogContext(),
+createConfig(),
+runtime
+);
+int partitionCount = 3;
+service.startup(() -> partitionCount);
+
+ListGroupsRequestData request = new ListGroupsRequestData();
+
+List expectedResults = 
Arrays.asList(
+new ListGroupsResponseData.ListedGroup()
+.setGroupId("group0")
+.setGroupState("Stable")
+.setProtocolType("protocol1"),
+new ListGroupsResponseData.ListedGroup()
+.setGroupId("group1")
+.setGroupState("Empty")
+.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE),
+new ListGroupsResponseData.ListedGroup()
+.setGroupId("group2")
+.setGroupState("Dead")
+.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+);
+Map> expectResultMap 
= new HashMap<>();
+for (ListGroupsResponseData.ListedGroup result : expectedResults) {
+expectResultMap.put(result.groupId(), 
Collections.singletonList(result));
+}
+when(runtime.partitions()).thenReturn(Sets.newSet(
+new TopicPartition("__consumer_offsets", 0),
+new TopicPartition("__consumer_offsets", 1),
+new TopicPartition("__consumer_offsets", 2)));
+for (int i = 0; i < partitionCount; i++) {
+when(runtime.scheduleReadOperation(
+ArgumentMatchers.eq("list-groups"),
+ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 
i)),
+ArgumentMatchers.any()
+
)).thenReturn(CompletableFuture.completedFuture(Collections.singletonList(expectedResults.get(i;
+}
+
+CompletableFuture responseFuture = 
service.listGroups(
+requestContext(ApiKeys.LIST_GROUPS),
+request
+);
+
+List actualResults = 
responseFuture.get(5, TimeUnit.SECONDS).groups();
+assertEquals(expectedResults, actualResults);
+assertEquals(expectResultMap.size(), actualResults.size());
+for (ListGroupsResponseData.ListedGroup result : actualResults) {
+assertEquals(expectResultMap.get(result.groupId()), 
Collections.singletonList(result));
+}

Review Comment:
   It seems to me that those assertions are not necessary as 
`assertEquals(expectedResults, actualResults);` already verifies everything, no?



##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##
@@ -599,6 +608,164 @@ public void testHeartbeatCoordinatorException() throws 
Exception {
 );
 }
 
+@Test
+public void testListGroups() throws ExecutionException, 
InterruptedException, TimeoutException {
+CoordinatorRuntime runtime = 
mockRuntime();
+GroupCoordinatorService service = new GroupCoordinatorService(
+new LogContext(),
+createConfig(),
+runtime
+);
+int partitionCount = 3;
+service.startup(() -> partitionCount);
+
+ListGroupsRequestData request = new ListGroupsRequestData();
+
+List 

[GitHub] [kafka] dajac commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups

2023-09-12 Thread via GitHub


dajac commented on code in PR #14271:
URL: https://github.com/apache/kafka/pull/14271#discussion_r1323263745


##
server-common/src/main/java/org/apache/kafka/server/util/FutureUtils.java:
##
@@ -103,4 +103,5 @@ public static  CompletableFuture 
failedFuture(Throwable ex) {
 future.completeExceptionally(ex);
 return future;
 }
+

Review Comment:
   nit: Remove this line.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups

2023-09-12 Thread via GitHub


dajac commented on code in PR #14271:
URL: https://github.com/apache/kafka/pull/14271#discussion_r1323262962


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##
@@ -599,6 +608,163 @@ public void testHeartbeatCoordinatorException() throws 
Exception {
 );
 }
 
+@Test
+public void testListGroups() throws ExecutionException, 
InterruptedException, TimeoutException {
+CoordinatorRuntime runtime = 
mockRuntime();
+GroupCoordinatorService service = new GroupCoordinatorService(
+new LogContext(),
+createConfig(),
+runtime
+);
+int partitionCount = 3;
+service.startup(() -> partitionCount);
+
+ListGroupsRequestData request = new ListGroupsRequestData();
+
+List expectedResults = 
Arrays.asList(
+new ListGroupsResponseData.ListedGroup()
+.setGroupId("group0")
+.setGroupState("Stable")
+.setProtocolType("protocol1"),
+new ListGroupsResponseData.ListedGroup()
+.setGroupId("group1")
+.setGroupState("Empty")
+.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE),
+new ListGroupsResponseData.ListedGroup()
+.setGroupId("group2")
+.setGroupState("Dead")
+.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+);
+Map> expectResultMap 
= new HashMap<>();
+for (ListGroupsResponseData.ListedGroup result : expectedResults) {
+expectResultMap.put(result.groupId(), 
Collections.singletonList(result));
+}
+when(runtime.partitions()).thenReturn(Sets.newSet(
+new TopicPartition("__consumer_offsets", 0),
+new TopicPartition("__consumer_offsets", 1),
+new TopicPartition("__consumer_offsets", 2)));
+for (int i = 0; i < partitionCount; i++) {
+when(runtime.scheduleReadOperation(
+ArgumentMatchers.eq("list-groups"),
+ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 
i)),
+ArgumentMatchers.any()
+
)).thenReturn(CompletableFuture.completedFuture(Collections.singletonList(expectedResults.get(i;
+}
+
+CompletableFuture responseFuture = 
service.listGroups(
+requestContext(ApiKeys.LIST_GROUPS),
+request
+);
+
+List actualResults = 
responseFuture.get(5, TimeUnit.SECONDS).groups();
+assertEquals(expectedResults, actualResults);
+assertEquals(expectResultMap.size(), actualResults.size());
+for (ListGroupsResponseData.ListedGroup result : actualResults) {
+assertEquals(expectResultMap.get(result.groupId()), 
Collections.singletonList(result));
+}
+}
+
+@Test
+public void testListGroupsFailedWithNotCoordinatorException()
+throws InterruptedException, ExecutionException, TimeoutException {
+CoordinatorRuntime runtime = 
mockRuntime();
+GroupCoordinatorService service = new GroupCoordinatorService(
+new LogContext(),
+createConfig(),
+runtime
+);
+int partitionCount = 3;
+service.startup(() -> partitionCount);
+
+List expectedResults = 
Arrays.asList(
+new ListGroupsResponseData.ListedGroup()
+.setGroupId("group0")
+.setGroupState("Stable")
+.setProtocolType("protocol1"),
+new ListGroupsResponseData.ListedGroup()
+.setGroupId("group1")
+.setGroupState("Empty")
+.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+);
+Map> expectResultMap 
= new HashMap<>();
+for (ListGroupsResponseData.ListedGroup result : expectedResults) {
+expectResultMap.put(result.groupId(), 
Collections.singletonList(result));
+}
+
+ListGroupsRequestData request = new ListGroupsRequestData();
+when(runtime.partitions()).thenReturn(Sets.newSet(
+new TopicPartition("__consumer_offsets", 0),
+new TopicPartition("__consumer_offsets", 1),
+new TopicPartition("__consumer_offsets", 2)));
+for (int i = 0; i < 2; i++) {
+when(runtime.scheduleReadOperation(
+ArgumentMatchers.eq("list-groups"),
+ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 
i)),
+ArgumentMatchers.any()
+
)).thenReturn(CompletableFuture.completedFuture(Collections.singletonList(expectedResults.get(i;
+}
+
+when(runtime.scheduleReadOperation(
+ArgumentMatchers.eq("list-groups"),
+ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 2)),
+ArgumentMatchers.any()
+)).thenReturn(FutureUtils.failedFuture(new 

[GitHub] [kafka] dajac commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups

2023-09-08 Thread via GitHub


dajac commented on code in PR #14271:
URL: https://github.com/apache/kafka/pull/14271#discussion_r1319725841


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java:
##
@@ -48,6 +49,16 @@ public String toString() {
  */
 String stateAsString();
 
+/**
+ * @return The {{@link GroupType}}'s String representation with 
committedOffset.

Review Comment:
   nit: `based on the committed offset.`.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -428,9 +431,32 @@ public CompletableFuture 
listGroups(
 return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
 }
 
-return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
-"This API is not implemented yet."
-));
+CompletableFuture future = new 
CompletableFuture<>();
+List results = new ArrayList<>();

Review Comment:
   nit: Those two could be final as well.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -428,9 +431,32 @@ public CompletableFuture 
listGroups(
 return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
 }
 
-return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
-"This API is not implemented yet."
-));
+CompletableFuture future = new 
CompletableFuture<>();
+List results = new ArrayList<>();
+final AtomicInteger cnt = new 
AtomicInteger(runtime.partitions().size());
+
+for (TopicPartition tp : runtime.partitions()) {

Review Comment:
   It seems to me that calling `partitions()` twice is not safe here because 
the number of partitions could change in between the two calls. I think that we 
should store it in order to avoid this race condition.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##
@@ -306,6 +308,19 @@ public CoordinatorResult 
commitOffset(
 return offsetMetadataManager.commitOffset(context, request);
 }
 
+/**
+ * Handles a ListGroups request.
+ *
+ * @param statesFilter The states of the groups we want to list. If empty 
all groups are returned with their state.

Review Comment:
   nit: Add committedOffset as well.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -179,6 +181,23 @@ public String stateAsString() {
 return state.get().toString();
 }
 
+/**
+ * @return The current state as a String with given committedOffset.
+ */
+public String stateAsString(long committedOffset) {
+return state.get(committedOffset).toString();
+}
+
+/**
+ * @return the group formatted as a list group response.

Review Comment:
   nit: ditto.



##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##
@@ -599,6 +606,85 @@ public void testHeartbeatCoordinatorException() throws 
Exception {
 );
 }
 
+@Test
+public void testListGroups() throws ExecutionException, 
InterruptedException, TimeoutException {
+CoordinatorRuntime runtime = 
mockRuntime();
+GroupCoordinatorService service = new GroupCoordinatorService(
+new LogContext(),
+createConfig(),
+runtime
+);
+service.startup(() -> 1);
+
+ListGroupsRequestData request = new ListGroupsRequestData();
+
+List expectedResults = 
Arrays.asList(
+new ListGroupsResponseData.ListedGroup()
+.setGroupId("group1")
+.setGroupState("Stable")
+.setProtocolType("protocol1"),
+new ListGroupsResponseData.ListedGroup()
+.setGroupId("group2")
+.setGroupState("Empty")
+.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+
+);
+when(runtime.partitions()).thenReturn(Sets.newSet(new 
TopicPartition("__consumer_offsets", 0)));

Review Comment:
   Could we add more partitions to ensure that the logic to handle them work as 
expected?



##
server-common/src/main/java/org/apache/kafka/server/util/FutureUtils.java:
##
@@ -103,4 +104,12 @@ public static  CompletableFuture 
failedFuture(Throwable ex) {
 future.completeExceptionally(ex);
 return future;
 }
+
+public static  void drainFutures(

Review Comment:
   I suppose that we could remove this now, isn't it?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -420,6 +422,17 @@ public Group group(String groupId, long committedOffset) 
throws GroupIdNotFoundE
 return group;
 }
 
+/**
+ * @return The GenericGroup List 

[GitHub] [kafka] dajac commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups

2023-09-04 Thread via GitHub


dajac commented on code in PR #14271:
URL: https://github.com/apache/kafka/pull/14271#discussion_r1314944364


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##
@@ -306,6 +308,19 @@ public CoordinatorResult 
commitOffset(
 return offsetMetadataManager.commitOffset(context, request);
 }
 
+/**
+ * Handles a ListGroups request.
+ *
+ * @param statesFilter The states of the groups we want to list. If empty 
all groups are returned with their state.
+ * @return A Result containing the ListGroupsResponseData response
+ */
+public ListGroupsResponseData listGroups(
+List statesFilter,
+long committedOffset
+) throws ApiException {
+return new 
ListGroupsResponseData().setGroups(groupMetadataManager.listGroups(statesFilter,
 committedOffset));
+}

Review Comment:
   I sill think that we should rather return the list of groups here and create 
`ListGroupsResponseData` one level up.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -428,9 +432,51 @@ public CompletableFuture 
listGroups(
 return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
 }
 
-return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
-"This API is not implemented yet."
-));
+List> futures = new 
ArrayList<>();
+for (TopicPartition tp : runtime.partitions()) {
+futures.add(runtime.scheduleReadOperation(
+"list-groups",
+tp,
+(coordinator, lastCommittedOffset) -> 
coordinator.listGroups(request.statesFilter(), lastCommittedOffset)
+).exceptionally(exception -> {
+if (!(exception instanceof KafkaException)) {
+log.error("ListGroups request {} hit an unexpected 
exception: {}",
+request, exception.getMessage());
+throw new RuntimeException(exception);
+}
+if (exception instanceof CoordinatorLoadInProgressException) {
+throw new RuntimeException(exception);
+} else if (exception instanceof NotCoordinatorException) {
+log.warn("ListGroups request {} hit a 
NotCoordinatorException exception: {}",
+request, exception.getMessage());
+return new 
ListGroupsResponseData().setGroups(Collections.emptyList());
+} else {
+return new 
ListGroupsResponseData().setErrorCode(Errors.forException(exception).code());
+}
+}));
+}
+CompletableFuture responseFuture = new 
CompletableFuture<>();
+List listedGroups = new 
ArrayList<>();
+AtomicInteger succeedFutureCount = new AtomicInteger();
+FutureUtils.drainFutures(futures, (data, t) -> {
+synchronized (runtime) {
+if (t != null) {
+responseFuture.completeExceptionally(new 
UnknownServerException(t.getMessage()));
+} else {
+if (data.errorCode() != Errors.NONE.code()) {
+if (!responseFuture.isDone()) {
+responseFuture.complete(data);
+}
+} else {
+listedGroups.addAll(data.groups());
+if (succeedFutureCount.addAndGet(1) == 
runtime.partitions().size()) {
+responseFuture.complete(new 
ListGroupsResponseData().setGroups(listedGroups));
+}
+}
+}
+}
+});
+return responseFuture;

Review Comment:
   There are a few issues with this code.
   1. Synchronising on `runtime` will create lock contention across all the 
callers of `listGroups`. We should rather use a local variable.
   2. The error handling seems error prone to me. For instance, 
`NotCoordinatorException` exceptions are turned into `RuntimeException` 
exceptions and then turned into `UnknownServerException` if I understood it 
correctly. We lose the semantic along the way.
   
   I think that we could take your idea further and combine the two main steps 
into one. I am thinking about something like this:
   
   ```
   final List partitions = new 
ArrayList<>(runtime.partitions());
   final CompletableFuture future = new 
CompletableFuture<>();
   final List results = new 
ArrayList<>();
   final AtomicInteger cnt = new AtomicInteger(partitions.size());
   
   for (TopicPartition partition : partitions) {
   runtime.scheduleReadOperation(
   "list-group",
   partition,
   (coordinator, lastCommittedOffset) -> 

[GitHub] [kafka] dajac commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups

2023-09-01 Thread via GitHub


dajac commented on code in PR #14271:
URL: https://github.com/apache/kafka/pull/14271#discussion_r1312918434


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -428,9 +430,44 @@ public CompletableFuture 
listGroups(
 return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
 }
 
-return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
-"This API is not implemented yet."
-));
+List> futures = new 
java.util.ArrayList<>(Collections.emptyList());
+for (TopicPartition tp : runtime.partitions()) {
+futures.add(runtime.scheduleReadOperation(
+"list_groups",

Review Comment:
   nit: Let's use `list-groups` to be consistent with the existing names.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -428,9 +430,44 @@ public CompletableFuture 
listGroups(
 return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
 }
 
-return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
-"This API is not implemented yet."
-));
+List> futures = new 
java.util.ArrayList<>(Collections.emptyList());

Review Comment:
   nit: Let's remove `java.util` and `Collections.emptyList()` as they are not 
necessary.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -428,9 +430,44 @@ public CompletableFuture 
listGroups(
 return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
 }
 
-return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
-"This API is not implemented yet."
-));
+List> futures = new 
java.util.ArrayList<>(Collections.emptyList());
+for (TopicPartition tp : runtime.partitions()) {
+futures.add(runtime.scheduleReadOperation(
+"list_groups",
+tp,
+(coordinator, lastCommittedOffset) -> 
coordinator.listGroups(context, request, lastCommittedOffset)
+).exceptionally(exception -> {
+if (!(exception instanceof KafkaException)) {
+log.error("ListGroups request {} hit an unexpected 
exception: {}",
+request, exception.getMessage());
+}
+return new ListGroupsResponseData()
+.setErrorCode(Errors.forException(exception).code());
+}));

Review Comment:
   I think that we need to think a little more about the different errors that 
we could get here. My understanding is that we fail the entire requests in two 
cases: 1) at least one partition is loading; and 2) there is an unexpected 
error (non KafkaException).
   
   Here, we could get the following errors:
   - NotCoordinatorException if `tp` is no longer active, failed, etc. In this 
case, we actually want to return an empty lit of groups.
   - CoordinatorLoadingException if `tp` is being loaded. In this case, we want 
to fail the entire request.
   - Unexpected Exception. In this case, we also want to fail the entire 
request.
   
   Knowing this, we should explicitly handle the NotCoordinatorException case 
here. For the other cases, would it be possible to re-throw the exception?
   
   It would be great if you could also add a few unit tests for this.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -747,4 +749,15 @@ private static Integer decValue(String key, Integer value) 
{
 private static Integer incValue(String key, Integer value) {
 return value == null ? 1 : value + 1;
 }
+
+/**
+ * @return the group formatted as a list group response.
+ */
+public ListGroupsResponseData.ListedGroup asListedGroup() {
+return new ListGroupsResponseData.ListedGroup()
+.setGroupId(groupId)
+.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+.setGroupState(state.toString());
+}

Review Comment:
   Let's please add a unit test for this method.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##
@@ -309,6 +311,21 @@ public CoordinatorResult 
commitOffset(
 return offsetMetadataManager.commitOffset(context, request);
 }
 
+/**
+ * Handles a ListGroups request.
+ *
+ * @param context The request context.
+ * @param request The ListGroups request.
+ * @return A Result containing the ListGroupsResponseData response
+ */
+public ListGroupsResponseData listGroups(
+RequestContext context,
+ListGroupsRequestData request,
+long committedOffset
+) throws ApiException {

Review Comment:
   I 

[GitHub] [kafka] dajac commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups

2023-08-28 Thread via GitHub


dajac commented on code in PR #14271:
URL: https://github.com/apache/kafka/pull/14271#discussion_r1307484913


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -426,9 +429,43 @@ public CompletableFuture 
listGroups(
 return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
 }
 
-return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
-"This API is not implemented yet."
-));
+List> futures = new 
java.util.ArrayList<>(Collections.emptyList());
+for (int i = 0; i < numPartitions; i++) {
+futures.add(runtime.scheduleReadOperation("list_groups",
+new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, i),
+(coordinator, __) -> coordinator.listGroups(context, 
request)

Review Comment:
   Basically 
[here](https://github.com/apache/kafka/pull/14271/files#diff-00f0f81cf13e66781777d94f7d2e68a581663385c37e98792507f2294c91bb09R418),
 you need to use `lastCommittedOffset` to ensure that you only read committed 
state. I don't know if we support the Stream API but the other APIs should 
accept an argument called `epoch`. You can use `lastCommittedOffset` as the 
`epoch.`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups

2023-08-28 Thread via GitHub


dajac commented on code in PR #14271:
URL: https://github.com/apache/kafka/pull/14271#discussion_r1307484913


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -426,9 +429,43 @@ public CompletableFuture 
listGroups(
 return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
 }
 
-return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
-"This API is not implemented yet."
-));
+List> futures = new 
java.util.ArrayList<>(Collections.emptyList());
+for (int i = 0; i < numPartitions; i++) {
+futures.add(runtime.scheduleReadOperation("list_groups",
+new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, i),
+(coordinator, __) -> coordinator.listGroups(context, 
request)

Review Comment:
   Basically 
[here](https://github.com/apache/kafka/pull/14271/files#diff-00f0f81cf13e66781777d94f7d2e68a581663385c37e98792507f2294c91bb09R418),
 you need to use `lastCommittedOffset` to ensure that you only read committed 
state.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups

2023-08-28 Thread via GitHub


dajac commented on code in PR #14271:
URL: https://github.com/apache/kafka/pull/14271#discussion_r1307483188


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -426,9 +429,43 @@ public CompletableFuture 
listGroups(
 return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
 }
 
-return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
-"This API is not implemented yet."
-));
+List> futures = new 
java.util.ArrayList<>(Collections.emptyList());
+for (int i = 0; i < numPartitions; i++) {
+futures.add(runtime.scheduleReadOperation("list_groups",
+new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, i),
+(coordinator, __) -> coordinator.listGroups(context, 
request)

Review Comment:
   Right. Then you must pass `lastCommittedOffset` to `listGroups` as well and 
use it to query the timeline data structures.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups

2023-08-28 Thread via GitHub


dajac commented on code in PR #14271:
URL: https://github.com/apache/kafka/pull/14271#discussion_r1307482348


##
clients/src/main/resources/common/message/ListGroupsRequest.json:
##
@@ -23,11 +23,15 @@
   // Version 3 is the first flexible version.
   //
   // Version 4 adds the StatesFilter field (KIP-518).
-  "validVersions": "0-4",
+  //
+  // Version 5 adds the TypesFilter field (KIP-848).
+  "validVersions": "0-5",
   "flexibleVersions": "3+",
   "fields": [
 { "name": "StatesFilter", "type": "[]string", "versions": "4+",
   "about": "The states of the groups we want to list. If empty all groups 
are returned with their state."
-}
+},
+{ "name": "TypesFilter", "type": "[]string", "versions": "5+",
+  "about": "The types of the groups we want to list. If empty all groups 
are returned" }

Review Comment:
   If the tests are related to the new filter, they should go to the next PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups

2023-08-22 Thread via GitHub


dajac commented on code in PR #14271:
URL: https://github.com/apache/kafka/pull/14271#discussion_r1302279096


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -426,9 +429,43 @@ public CompletableFuture 
listGroups(
 return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
 }
 
-return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
-"This API is not implemented yet."
-));
+List> futures = new 
java.util.ArrayList<>(Collections.emptyList());
+for (int i = 0; i < numPartitions; i++) {

Review Comment:
   This seems to be inefficient because the coordinator may not be responsible 
for all the partitions. I thought that we could use 
`CoordinatorRuntime#partitions` to get the list of registered partitions. Have 
you considered this?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##
@@ -272,6 +274,21 @@ public CoordinatorResult 
commitOffset(
 return offsetMetadataManager.commitOffset(context, request);
 }
 
+/**
+ * Handles a ListGroups request.
+ *
+ * @param context The request context.
+ * @param request The ListGroups request.
+ *
+ * @return A Result containing the ListGroupsResponseData response
+ */
+public ListGroupsResponseData listGroups(
+RequestContext context,
+ListGroupsRequestData request

Review Comment:
   nit: This should be indented with four spaces.



##
clients/src/main/resources/common/message/ListGroupsRequest.json:
##
@@ -23,11 +23,15 @@
   // Version 3 is the first flexible version.
   //
   // Version 4 adds the StatesFilter field (KIP-518).
-  "validVersions": "0-4",
+  //
+  // Version 5 adds the TypesFilter field (KIP-848).
+  "validVersions": "0-5",
   "flexibleVersions": "3+",
   "fields": [
 { "name": "StatesFilter", "type": "[]string", "versions": "4+",
   "about": "The states of the groups we want to list. If empty all groups 
are returned with their state."
-}
+},
+{ "name": "TypesFilter", "type": "[]string", "versions": "5+",
+  "about": "The types of the groups we want to list. If empty all groups 
are returned" }

Review Comment:
   I would rather prefer to do this in a second PR because this change impacts 
both the new and the old group coordinators. Would it be possible?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -739,4 +741,15 @@ private static Integer decValue(String key, Integer value) 
{
 private static Integer incValue(String key, Integer value) {
 return value == null ? 1 : value + 1;
 }
+
+/**
+ * @return the group formatted as a list group response.
+ */
+public ListGroupsResponseData.ListedGroup asListedGroup() {
+return new ListGroupsResponseData.ListedGroup()
+.setGroupId(groupId)
+.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+.setGroupState(state.toString());

Review Comment:
   nit: This should be indented with four spaces. I have seen this in other 
places in the code but I am not going to comment them all. I let you have a 
look.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -739,4 +741,15 @@ private static Integer decValue(String key, Integer value) 
{
 private static Integer incValue(String key, Integer value) {
 return value == null ? 1 : value + 1;
 }
+
+/**
+ * @return the group formatted as a list group response.
+ */
+public ListGroupsResponseData.ListedGroup asListedGroup() {

Review Comment:
   Don't we need to also implement this for `GenericGroup`?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -426,9 +429,43 @@ public CompletableFuture 
listGroups(
 return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
 }
 
-return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
-"This API is not implemented yet."
-));
+List> futures = new 
java.util.ArrayList<>(Collections.emptyList());
+for (int i = 0; i < numPartitions; i++) {
+futures.add(runtime.scheduleReadOperation("list_groups",
+new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, i),
+(coordinator, __) -> coordinator.listGroups(context, 
request)

Review Comment:
   We need to use the second parameters `__` as pass it down to `listGroups`. 
For the context, the second parameter is the last committed offsets. We should 
list the groups based on it. Otherwise, we would return uncommitted changes.



-- 
This is an automated message from the