[GitHub] [kafka] dajac commented on a diff in pull request #14408: KAFKA-14506: Implement DeleteGroups API and OffsetDelete API

2023-09-29 Thread via GitHub


dajac commented on code in PR #14408:
URL: https://github.com/apache/kafka/pull/14408#discussion_r1341653044


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##
@@ -849,6 +853,42 @@ public void validateOffsetFetch(
 }
 }
 
+/**
+ * Validates the OffsetDelete request.
+ */
+@Override
+public void validateOffsetDelete() throws ApiException {
+if (isInState(DEAD)) {

Review Comment:
   I am not sure to understand why we need to do this. Couldn't we just delete 
the group when it is empty and offsets are gone instead of transitioning to 
Dead and then deleting it?
   
   My understanding is that we use Dead in the old code because we can't remove 
the group from the map before the change is committed to the log. During this 
time, the group is in the Dead state. In our world, the group is remove from 
the map immediately and the change is reverted if the write fails.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #14408: KAFKA-14506: Implement DeleteGroups API and OffsetDelete API

2023-09-28 Thread via GitHub


dajac commented on code in PR #14408:
URL: https://github.com/apache/kafka/pull/14408#discussion_r1340716345


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##
@@ -262,6 +267,44 @@ public HeartbeatResponseData genericGroupHeartbeat(
 );
 }
 
+/**
+ * Handles a DeleteGroups request.
+ *
+ * @param context   The request context.
+ * @param groupIds  The groupIds of the groups to be deleted
+ * @return A Result containing the 
DeleteGroupsResponseData.DeletableGroupResultCollection response and
+ * a list of records to update the state machine.
+ */
+public 
CoordinatorResult deleteGroups(
+RequestContext context,
+List groupIds
+) throws ApiException {
+final DeleteGroupsResponseData.DeletableGroupResultCollection 
resultCollection =
+new DeleteGroupsResponseData.DeletableGroupResultCollection();

Review Comment:
   nit: Should we set the expected size here?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##
@@ -849,6 +853,42 @@ public void validateOffsetFetch(
 }
 }
 
+/**
+ * Validates the OffsetDelete request.
+ */
+@Override
+public void validateOffsetDelete() throws ApiException {
+if (isInState(DEAD)) {

Review Comment:
   @jeffkbkim Do we ever transition to Dead? If not, I wonder if we should just 
remove this and remove the Dead state. What do you think?



##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java:
##
@@ -105,6 +115,107 @@ public void testCommitOffset() {
 assertEquals(result, coordinator.commitOffset(context, request));
 }
 
+@Test
+public void testDeleteGroups() {
+GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
+OffsetMetadataManager offsetMetadataManager = 
mock(OffsetMetadataManager.class);
+GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+groupMetadataManager,
+offsetMetadataManager
+);
+
+RequestContext context = requestContext(ApiKeys.DELETE_GROUPS);
+List groupIds = Arrays.asList("group-id-1", "group-id-2");
+DeleteGroupsResponseData.DeletableGroupResultCollection 
expectedResultCollection = new 
DeleteGroupsResponseData.DeletableGroupResultCollection();
+expectedResultCollection.add(new 
DeleteGroupsResponseData.DeletableGroupResult().setGroupId("group-id-1"));

Review Comment:
   ditto for those two.



##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##
@@ -1561,6 +1604,156 @@ public void 
testConsumerGroupOffsetFetchWithStaleMemberEpoch() {
 () -> context.fetchAllOffsets("group", "member", 10, 
Long.MAX_VALUE));
 }
 
+static private void testOffsetDeleteWith(
+OffsetMetadataManagerTestContext context,
+String groupId,
+String topic,
+int partition,
+Errors error
+) {
+final OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection 
requestTopicCollection =
+new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection();
+requestTopicCollection.add(

Review Comment:
   We could also apply my formatting suggestion here.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java:
##
@@ -90,4 +92,29 @@ void validateOffsetFetch(
 int memberEpoch,
 long lastCommittedOffset
 ) throws KafkaException;
+
+/**
+ * Validates the OffsetDelete request.
+ */
+void validateOffsetDelete() throws KafkaException;
+
+/**
+ * Validates the DeleteGroups request.
+ */
+void validateDeleteGroup() throws KafkaException;
+
+/**
+ * Returns true if the group is actively subscribed to the topic.
+ *
+ * @param topic The topic name.
+ * @return Whether the group is subscribed to the topic.
+ */
+boolean isSubscribedToTopic(String topic);
+
+/**
+ * Creates tombstone(s) for deleting the group.
+ *
+ * @return The list of tombstone record(s).
+ */
+List createGroupTombstoneRecords();

Review Comment:
   I wonder if we should rather pass the list of records as an argument in 
order to avoid having to copy the records afterwards. Have you considered this?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -3071,6 +3071,35 @@ private void removeCurrentMemberFromGenericGroup(
 group.remove(member.memberId());
 }
 
+/**
+ * Handles a DeleteGroups request.
+ * Populates the record list passed in with record to update the state 
machine.
+ * Validations are done in {@link 
GroupCoordinatorShard#deleteGroups(RequestContext, List)} 

[GitHub] [kafka] dajac commented on a diff in pull request #14408: KAFKA-14506: Implement DeleteGroups API and OffsetDelete API

2023-09-22 Thread via GitHub


dajac commented on code in PR #14408:
URL: https://github.com/apache/kafka/pull/14408#discussion_r1334519906


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java:
##
@@ -90,4 +90,29 @@ void validateOffsetFetch(
 int memberEpoch,
 long lastCommittedOffset
 ) throws KafkaException;
+
+/**
+ * Validates the OffsetDelete request.
+ */
+void validateOffsetDelete() throws KafkaException;
+
+/**
+ * Validates the GroupDelete request
+ */
+void validateGroupDelete() throws KafkaException;
+
+/**
+ * Returns true if the group is actively subscribed to the topic.
+ *
+ * @param topic the topic name.
+ * @return whether the group is subscribed to the topic.
+ */
+boolean isSubscribedToTopic(String topic);

Review Comment:
   Offsets APIs still use topic names...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #14408: KAFKA-14506: Implement DeleteGroups API and OffsetDelete API

2023-09-22 Thread via GitHub


dajac commented on code in PR #14408:
URL: https://github.com/apache/kafka/pull/14408#discussion_r1334471084


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -523,9 +526,63 @@ public 
CompletableFuture> groupsByTopicPartition = new 
HashMap<>();
+groupIds.forEach(groupId -> {
+final TopicPartition topicPartition = topicPartitionFor(groupId);
+groupsByTopicPartition
+.computeIfAbsent(topicPartition, __ -> new ArrayList<>())
+.add(groupId);
+});
+
+final 
List>
 futures =
+new ArrayList<>(groupIds.size());
+groupsByTopicPartition.forEach((topicPartition, groupList) -> {
+
CompletableFuture 
future =
+runtime.scheduleWriteOperation(
+"delete-group",
+topicPartition,
+coordinator -> coordinator.deleteGroups(context, groupList)
+).exceptionally(exception -> {
+if (exception instanceof UnknownTopicOrPartitionException 
||
+exception instanceof NotEnoughReplicasException) {
+return DeleteGroupsRequest.getErrorResultCollection(
+groupIds,
+Errors.COORDINATOR_NOT_AVAILABLE
+);
+}
+
+if (exception instanceof NotLeaderOrFollowerException ||
+exception instanceof KafkaStorageException) {
+return DeleteGroupsRequest.getErrorResultCollection(
+groupIds,
+Errors.NOT_COORDINATOR
+);
+}
+
+if (exception instanceof RecordTooLargeException ||
+exception instanceof RecordBatchTooLargeException ||
+exception instanceof InvalidFetchSizeException) {
+return DeleteGroupsRequest.getErrorResultCollection(
+groupIds,
+Errors.UNKNOWN_SERVER_ERROR
+);
+}
+
+return DeleteGroupsRequest.getErrorResultCollection(
+groupIds,
+Errors.forException(exception)
+);
+});
+
+futures.add(future);
+});
+
+final CompletableFuture allFutures = 
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));

Review Comment:
   @jeffkbkim I made the same comment earlier and @dongnuo123 updated the code 
to handle exceptions for each write operation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #14408: KAFKA-14506: Implement DeleteGroups API and OffsetDelete API

2023-09-21 Thread via GitHub


dajac commented on code in PR #14408:
URL: https://github.com/apache/kafka/pull/14408#discussion_r1333162337


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##
@@ -849,6 +853,46 @@ public void validateOffsetFetch(
 }
 }
 
+/**
+ * Validates the OffsetDelete request.
+ */
+@Override
+public void validateOffsetDelete() throws GroupIdNotFoundException {
+if (isInState(DEAD)) {
+throw new GroupIdNotFoundException(String.format("Group %s is in 
dead state.", groupId));
+}
+}
+
+/**
+ * Validates the GroupDelete request.
+ */
+@Override
+public void validateGroupDelete() throws ApiException {
+if (isInState(DEAD)) {
+throw new GroupIdNotFoundException(String.format("Group %s is in 
dead state.", groupId));
+} else if (isInState(STABLE)
+|| isInState(PREPARING_REBALANCE)
+|| isInState(COMPLETING_REBALANCE)) {
+throw Errors.NON_EMPTY_GROUP.exception();
+}
+
+// We avoid writing the tombstone when the generationId is 0, since 
this group is only using
+// Kafka for offset storage.
+if (generationId() <= 0) {
+throw Errors.UNKNOWN_SERVER_ERROR.exception();

Review Comment:
   Actually, what I said is wrong here. I think that we should generate the 
tombstone in any cases to ensure that the group is removed from the timeline 
hashmap.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #14408: KAFKA-14506: Implement DeleteGroups API and OffsetDelete API

2023-09-21 Thread via GitHub


dajac commented on code in PR #14408:
URL: https://github.com/apache/kafka/pull/14408#discussion_r1332632377


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -523,9 +525,46 @@ public 
CompletableFuture> groupsByPartition = new HashMap<>();
+groupIds.forEach(groupId -> {
+final int partition = partitionFor(groupId);

Review Comment:
   nit: I wonder if we should use `topicPartitionFor` here. With this, we could 
directly have the TopicPartition as the key in the Map and we would not need to 
create `new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, partition)` later 
on. What do you think?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -523,9 +525,46 @@ public 
CompletableFuture> groupsByPartition = new HashMap<>();
+groupIds.forEach(groupId -> {
+final int partition = partitionFor(groupId);
+final List groupList = 
groupsByPartition.getOrDefault(partition, new ArrayList<>());
+groupList.add(groupId);
+groupsByPartition.put(partition, groupList);

Review Comment:
   nit: You could do the following to avoid having to put the list again into 
the map.
   
   ```
   groupsByPartition
   .computeIdAbsent(partition, __ -> new ArrayList())
   .put(groupId);
   ```



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##
@@ -262,6 +267,45 @@ public HeartbeatResponseData genericGroupHeartbeat(
 );
 }
 
+/**
+ * Handles a GroupDelete request.
+ *
+ * @param context The request context.
+ * @param groupIds The groupIds of the groups to be deleted
+ * @return A Result containing the 
DeleteGroupsResponseData.DeletableGroupResultCollection response and
+ * a list of records to update the state machine.
+ */
+public 
CoordinatorResult deleteGroups(
+RequestContext context,
+List groupIds
+) throws ApiException {
+final DeleteGroupsResponseData.DeletableGroupResultCollection 
resultCollection =
+new DeleteGroupsResponseData.DeletableGroupResultCollection();
+final List records = new ArrayList<>();
+
+groupIds.forEach(groupId -> {
+try {
+groupMetadataManager.validateGroupDelete(groupId);
+

Review Comment:
   nit: We can remove this empty line.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -705,9 +744,39 @@ public CompletableFuture 
deleteOffsets(
 return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
 }
 
-return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
-"This API is not implemented yet."
-));
+if (!isGroupIdNotEmpty(request.groupId())) {
+return CompletableFuture.completedFuture(new 
OffsetDeleteResponseData()
+.setErrorCode(Errors.INVALID_GROUP_ID.code())
+);
+}
+
+return runtime.scheduleWriteOperation(
+"delete-offset",
+topicPartitionFor(request.groupId()),
+coordinator -> coordinator.deleteOffsets(context, request)
+).exceptionally(exception -> {

Review Comment:
   It is interesting to point out that, in the current implementation, all 
these errors are swallowed. This is definitely not ideal because it tells to 
the user that the deletion is successful even if was not. Should we apply the 
same error handling to the deleteGroups?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -3071,6 +3072,39 @@ private void removeCurrentMemberFromGenericGroup(
 group.remove(member.memberId());
 }
 
+/**
+ * Handles a GroupDelete request.
+ *
+ * @param context The request context.
+ * @param groupId The group id of the group to be deleted.
+ * @return A Result containing the 
DeleteGroupsResponseData.DeletableGroupResult response and
+ * a list of records to update the state machine.
+ */
+public CoordinatorResult groupDelete(
+RequestContext context,
+String groupId

Review Comment:
   nit: The indentation is incorrect.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -333,6 +348,80 @@ public CoordinatorResult 
commitOffset(
 return new CoordinatorResult<>(records, response);
 }
 
+/**
+ * Handles an OffsetDelete request.
+ *
+ * @param context The request context.
+ * @param request The OffsetDelete request.
+ *
+ * @return A Result containing the OffsetDeleteResponseData response and
+ * a list of records to update the state machine.
+  

[GitHub] [kafka] dajac commented on a diff in pull request #14408: Kafka 14506: Implement DeleteGroups API and OffsetDelete API

2023-09-20 Thread via GitHub


dajac commented on code in PR #14408:
URL: https://github.com/apache/kafka/pull/14408#discussion_r1332073736


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java:
##
@@ -454,6 +454,25 @@ public static Record newGroupMetadataTombstoneRecord(
 );
 }
 
+/**
+ * Creates a ConsumerGroupMetadata tombstone.
+ *
+ * @param groupId  The group id.
+ * @return The record.
+ */
+public static Record newConsumerGroupMetadataTombstoneRecord(

Review Comment:
   This is the same as newGroupEpochTombstoneRecord, no?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #14408: Kafka 14506: Implement DeleteGroups API and OffsetDelete API

2023-09-20 Thread via GitHub


dajac commented on code in PR #14408:
URL: https://github.com/apache/kafka/pull/14408#discussion_r1332072625


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##
@@ -262,6 +267,45 @@ public HeartbeatResponseData genericGroupHeartbeat(
 );
 }
 
+/**
+ * Handles a GroupDelete request.
+ *
+ * @param context The request context.
+ * @param groupIds The groupIds of the groups to be deleted
+ * @return A Result containing the 
DeleteGroupsResponseData.DeletableGroupResultCollection response and
+ * a list of records to update the state machine.
+ */
+public 
CoordinatorResult deleteGroups(
+RequestContext context,
+List groupIds
+) throws ApiException {
+final DeleteGroupsResponseData.DeletableGroupResultCollection 
resultCollection =
+new DeleteGroupsResponseData.DeletableGroupResultCollection();
+final List records = new ArrayList<>();
+
+groupIds.forEach(groupId -> {
+try {
+groupMetadataManager.validateGroupDelete(groupId);
+
+
offsetMetadataManager.populateRecordListToDeleteAllOffsets(context, groupId, 
records);
+final 
CoordinatorResult 
deleteGroupCoordinatorResult =

Review Comment:
   Do we need this CoordinatorResult?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #14408: Kafka 14506: Implement DeleteGroups API and OffsetDelete API

2023-09-20 Thread via GitHub


dajac commented on code in PR #14408:
URL: https://github.com/apache/kafka/pull/14408#discussion_r1332069169


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -592,6 +607,45 @@ public void validateOffsetFetch(
 validateMemberEpoch(memberEpoch, member.memberEpoch());
 }
 
+/**
+ * Validates the OffsetDelete request.
+ */
+@Override
+public void validateOffsetDelete() throws GroupIdNotFoundException {
+if (state() == ConsumerGroupState.DEAD) {
+throw new GroupIdNotFoundException(String.format("Group %s is in 
dead state.", groupId));
+}
+}
+
+/**
+ * Validates the GroupDelete request.
+ */
+@Override
+public void validateGroupDelete() throws ApiException {
+if (state() == ConsumerGroupState.DEAD) {
+throw new GroupIdNotFoundException(String.format("Group %s is in 
dead state.", groupId));
+} else if (state() == ConsumerGroupState.STABLE
+|| state() == ConsumerGroupState.ASSIGNING
+|| state() == ConsumerGroupState.RECONCILING) {
+throw Errors.NON_EMPTY_GROUP.exception();
+}
+
+// We avoid writing the tombstone when the generationId is 0, since 
this group is only using
+// Kafka for offset storage.
+if (groupEpoch() <= 0) {
+throw Errors.UNKNOWN_SERVER_ERROR.exception();
+}
+}
+
+/**
+ * Creates a GroupMetadata tombstone.
+ *
+ * @return The record.
+ */
+public Record createMetadataTombstoneRecord() {
+return 
RecordHelpers.newConsumerGroupMetadataTombstoneRecord(groupId());

Review Comment:
   I think that we need to generate the above records here.
   * newTargetAssignmentEpochTombstoneRecord
   * newGroupSubscriptionMetadataTombstoneRecord
   * newGroupEpochTombstoneRecord



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #14408: Kafka 14506: Implement DeleteGroups API and OffsetDelete API

2023-09-20 Thread via GitHub


dajac commented on code in PR #14408:
URL: https://github.com/apache/kafka/pull/14408#discussion_r1332057781


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -523,9 +526,38 @@ public 
CompletableFuture> groupsByPartition = new HashMap<>();
+for (String groupId : groupIds) {
+final int partition = partitionFor(groupId);
+final List groupList = 
groupsByPartition.getOrDefault(partition, new ArrayList<>());
+groupList.add(groupId);
+groupsByPartition.put(partition, groupList);
+}
+
+final 
List>
 futures = new ArrayList<>();
+for (Map.Entry> entry : 
groupsByPartition.entrySet()) {
+int partition = entry.getKey();
+List groupList = entry.getValue();
+
CompletableFuture 
future =
+runtime.scheduleWriteOperation("delete-group",
+new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, partition),
+coordinator -> coordinator.deleteGroups(context, 
groupList));
+futures.add(future);
+}
+
+final CompletableFuture allFutures = 
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));

Review Comment:
   `DeletableGroupResultCollection` contains `DeletableGroupResult` which has 
an error code. Therefore I think that we should create a `DeletableGroupResult` 
per group id in the `groupList` when there is an exception.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #14408: Kafka 14506: Implement DeleteGroups API and OffsetDelete API

2023-09-20 Thread via GitHub


dajac commented on code in PR #14408:
URL: https://github.com/apache/kafka/pull/14408#discussion_r1331125886


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -523,9 +526,38 @@ public 
CompletableFuture> groupsByPartition = new HashMap<>();
+for (String groupId : groupIds) {
+final int partition = partitionFor(groupId);
+final List groupList = 
groupsByPartition.getOrDefault(partition, new ArrayList<>());
+groupList.add(groupId);
+groupsByPartition.put(partition, groupList);
+}
+
+final 
List>
 futures = new ArrayList<>();
+for (Map.Entry> entry : 
groupsByPartition.entrySet()) {
+int partition = entry.getKey();
+List groupList = entry.getValue();

Review Comment:
   nit: You could use `foreach` which is a bit more concise. 



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -523,9 +526,38 @@ public 
CompletableFuture> groupsByPartition = new HashMap<>();
+for (String groupId : groupIds) {
+final int partition = partitionFor(groupId);
+final List groupList = 
groupsByPartition.getOrDefault(partition, new ArrayList<>());
+groupList.add(groupId);
+groupsByPartition.put(partition, groupList);
+}
+
+final 
List>
 futures = new ArrayList<>();
+for (Map.Entry> entry : 
groupsByPartition.entrySet()) {
+int partition = entry.getKey();
+List groupList = entry.getValue();
+
CompletableFuture 
future =
+runtime.scheduleWriteOperation("delete-group",
+new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, partition),
+coordinator -> coordinator.deleteGroups(context, 
groupList));
+futures.add(future);
+}
+
+final CompletableFuture allFutures = 
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
+return allFutures.thenApply(v -> {
+final DeleteGroupsResponseData.DeletableGroupResultCollection res 
= new DeleteGroupsResponseData.DeletableGroupResultCollection();
+for 
(CompletableFuture 
future : futures) {
+try {
+DeleteGroupsResponseData.DeletableGroupResultCollection 
result = future.get();

Review Comment:
   It may be better to use `join` instead of `get`. I think that you would be 
able to remove the try..catch if you use `join`.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -523,9 +526,38 @@ public 
CompletableFuture> groupsByPartition = new HashMap<>();
+for (String groupId : groupIds) {
+final int partition = partitionFor(groupId);
+final List groupList = 
groupsByPartition.getOrDefault(partition, new ArrayList<>());
+groupList.add(groupId);
+groupsByPartition.put(partition, groupList);
+}
+
+final 
List>
 futures = new ArrayList<>();
+for (Map.Entry> entry : 
groupsByPartition.entrySet()) {
+int partition = entry.getKey();
+List groupList = entry.getValue();
+
CompletableFuture 
future =
+runtime.scheduleWriteOperation("delete-group",
+new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, partition),
+coordinator -> coordinator.deleteGroups(context, 
groupList));
+futures.add(future);
+}
+
+final CompletableFuture allFutures = 
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));

Review Comment:
   Let's assume that one of the write operation fails with 
`COORDINATOR_LOAD_IN_PROGRESS`, this would result in failing `allFutures` even 
though some write operations may have been successful. It seems to me that we 
should handle exceptions for each write operation future before we combine 
them, no?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -523,9 +526,38 @@ public 
CompletableFuture> groupsByPartition = new HashMap<>();
+for (String groupId : groupIds) {
+final int partition = partitionFor(groupId);
+final List groupList = 
groupsByPartition.getOrDefault(partition, new ArrayList<>());
+groupList.add(groupId);
+groupsByPartition.put(partition, groupList);
+}
+
+final 
List>
 futures = new ArrayList<>();
+for (Map.Entry> entry : 
groupsByPartition.entrySet()) {
+int partition = entry.getKey();
+List groupList = entry.getValue();
+
CompletableFuture 
future =
+runtime.scheduleWriteOperation("delete-group",
+new