[GitHub] [kafka] dajac commented on a diff in pull request #14408: KAFKA-14506: Implement DeleteGroups API and OffsetDelete API
dajac commented on code in PR #14408: URL: https://github.com/apache/kafka/pull/14408#discussion_r1341653044 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java: ## @@ -849,6 +853,42 @@ public void validateOffsetFetch( } } +/** + * Validates the OffsetDelete request. + */ +@Override +public void validateOffsetDelete() throws ApiException { +if (isInState(DEAD)) { Review Comment: I am not sure to understand why we need to do this. Couldn't we just delete the group when it is empty and offsets are gone instead of transitioning to Dead and then deleting it? My understanding is that we use Dead in the old code because we can't remove the group from the map before the change is committed to the log. During this time, the group is in the Dead state. In our world, the group is remove from the map immediately and the change is reverted if the write fails. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #14408: KAFKA-14506: Implement DeleteGroups API and OffsetDelete API
dajac commented on code in PR #14408: URL: https://github.com/apache/kafka/pull/14408#discussion_r1340716345 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java: ## @@ -262,6 +267,44 @@ public HeartbeatResponseData genericGroupHeartbeat( ); } +/** + * Handles a DeleteGroups request. + * + * @param context The request context. + * @param groupIds The groupIds of the groups to be deleted + * @return A Result containing the DeleteGroupsResponseData.DeletableGroupResultCollection response and + * a list of records to update the state machine. + */ +public CoordinatorResult deleteGroups( +RequestContext context, +List groupIds +) throws ApiException { +final DeleteGroupsResponseData.DeletableGroupResultCollection resultCollection = +new DeleteGroupsResponseData.DeletableGroupResultCollection(); Review Comment: nit: Should we set the expected size here? ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java: ## @@ -849,6 +853,42 @@ public void validateOffsetFetch( } } +/** + * Validates the OffsetDelete request. + */ +@Override +public void validateOffsetDelete() throws ApiException { +if (isInState(DEAD)) { Review Comment: @jeffkbkim Do we ever transition to Dead? If not, I wonder if we should just remove this and remove the Dead state. What do you think? ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java: ## @@ -105,6 +115,107 @@ public void testCommitOffset() { assertEquals(result, coordinator.commitOffset(context, request)); } +@Test +public void testDeleteGroups() { +GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); +OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); +GroupCoordinatorShard coordinator = new GroupCoordinatorShard( +groupMetadataManager, +offsetMetadataManager +); + +RequestContext context = requestContext(ApiKeys.DELETE_GROUPS); +List groupIds = Arrays.asList("group-id-1", "group-id-2"); +DeleteGroupsResponseData.DeletableGroupResultCollection expectedResultCollection = new DeleteGroupsResponseData.DeletableGroupResultCollection(); +expectedResultCollection.add(new DeleteGroupsResponseData.DeletableGroupResult().setGroupId("group-id-1")); Review Comment: ditto for those two. ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java: ## @@ -1561,6 +1604,156 @@ public void testConsumerGroupOffsetFetchWithStaleMemberEpoch() { () -> context.fetchAllOffsets("group", "member", 10, Long.MAX_VALUE)); } +static private void testOffsetDeleteWith( +OffsetMetadataManagerTestContext context, +String groupId, +String topic, +int partition, +Errors error +) { +final OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection requestTopicCollection = +new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(); +requestTopicCollection.add( Review Comment: We could also apply my formatting suggestion here. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java: ## @@ -90,4 +92,29 @@ void validateOffsetFetch( int memberEpoch, long lastCommittedOffset ) throws KafkaException; + +/** + * Validates the OffsetDelete request. + */ +void validateOffsetDelete() throws KafkaException; + +/** + * Validates the DeleteGroups request. + */ +void validateDeleteGroup() throws KafkaException; + +/** + * Returns true if the group is actively subscribed to the topic. + * + * @param topic The topic name. + * @return Whether the group is subscribed to the topic. + */ +boolean isSubscribedToTopic(String topic); + +/** + * Creates tombstone(s) for deleting the group. + * + * @return The list of tombstone record(s). + */ +List createGroupTombstoneRecords(); Review Comment: I wonder if we should rather pass the list of records as an argument in order to avoid having to copy the records afterwards. Have you considered this? ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -3071,6 +3071,35 @@ private void removeCurrentMemberFromGenericGroup( group.remove(member.memberId()); } +/** + * Handles a DeleteGroups request. + * Populates the record list passed in with record to update the state machine. + * Validations are done in {@link GroupCoordinatorShard#deleteGroups(RequestContext, List)}
[GitHub] [kafka] dajac commented on a diff in pull request #14408: KAFKA-14506: Implement DeleteGroups API and OffsetDelete API
dajac commented on code in PR #14408: URL: https://github.com/apache/kafka/pull/14408#discussion_r1334519906 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java: ## @@ -90,4 +90,29 @@ void validateOffsetFetch( int memberEpoch, long lastCommittedOffset ) throws KafkaException; + +/** + * Validates the OffsetDelete request. + */ +void validateOffsetDelete() throws KafkaException; + +/** + * Validates the GroupDelete request + */ +void validateGroupDelete() throws KafkaException; + +/** + * Returns true if the group is actively subscribed to the topic. + * + * @param topic the topic name. + * @return whether the group is subscribed to the topic. + */ +boolean isSubscribedToTopic(String topic); Review Comment: Offsets APIs still use topic names... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #14408: KAFKA-14506: Implement DeleteGroups API and OffsetDelete API
dajac commented on code in PR #14408: URL: https://github.com/apache/kafka/pull/14408#discussion_r1334471084 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -523,9 +526,63 @@ public CompletableFuture> groupsByTopicPartition = new HashMap<>(); +groupIds.forEach(groupId -> { +final TopicPartition topicPartition = topicPartitionFor(groupId); +groupsByTopicPartition +.computeIfAbsent(topicPartition, __ -> new ArrayList<>()) +.add(groupId); +}); + +final List> futures = +new ArrayList<>(groupIds.size()); +groupsByTopicPartition.forEach((topicPartition, groupList) -> { + CompletableFuture future = +runtime.scheduleWriteOperation( +"delete-group", +topicPartition, +coordinator -> coordinator.deleteGroups(context, groupList) +).exceptionally(exception -> { +if (exception instanceof UnknownTopicOrPartitionException || +exception instanceof NotEnoughReplicasException) { +return DeleteGroupsRequest.getErrorResultCollection( +groupIds, +Errors.COORDINATOR_NOT_AVAILABLE +); +} + +if (exception instanceof NotLeaderOrFollowerException || +exception instanceof KafkaStorageException) { +return DeleteGroupsRequest.getErrorResultCollection( +groupIds, +Errors.NOT_COORDINATOR +); +} + +if (exception instanceof RecordTooLargeException || +exception instanceof RecordBatchTooLargeException || +exception instanceof InvalidFetchSizeException) { +return DeleteGroupsRequest.getErrorResultCollection( +groupIds, +Errors.UNKNOWN_SERVER_ERROR +); +} + +return DeleteGroupsRequest.getErrorResultCollection( +groupIds, +Errors.forException(exception) +); +}); + +futures.add(future); +}); + +final CompletableFuture allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); Review Comment: @jeffkbkim I made the same comment earlier and @dongnuo123 updated the code to handle exceptions for each write operation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #14408: KAFKA-14506: Implement DeleteGroups API and OffsetDelete API
dajac commented on code in PR #14408: URL: https://github.com/apache/kafka/pull/14408#discussion_r1333162337 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java: ## @@ -849,6 +853,46 @@ public void validateOffsetFetch( } } +/** + * Validates the OffsetDelete request. + */ +@Override +public void validateOffsetDelete() throws GroupIdNotFoundException { +if (isInState(DEAD)) { +throw new GroupIdNotFoundException(String.format("Group %s is in dead state.", groupId)); +} +} + +/** + * Validates the GroupDelete request. + */ +@Override +public void validateGroupDelete() throws ApiException { +if (isInState(DEAD)) { +throw new GroupIdNotFoundException(String.format("Group %s is in dead state.", groupId)); +} else if (isInState(STABLE) +|| isInState(PREPARING_REBALANCE) +|| isInState(COMPLETING_REBALANCE)) { +throw Errors.NON_EMPTY_GROUP.exception(); +} + +// We avoid writing the tombstone when the generationId is 0, since this group is only using +// Kafka for offset storage. +if (generationId() <= 0) { +throw Errors.UNKNOWN_SERVER_ERROR.exception(); Review Comment: Actually, what I said is wrong here. I think that we should generate the tombstone in any cases to ensure that the group is removed from the timeline hashmap. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #14408: KAFKA-14506: Implement DeleteGroups API and OffsetDelete API
dajac commented on code in PR #14408: URL: https://github.com/apache/kafka/pull/14408#discussion_r1332632377 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -523,9 +525,46 @@ public CompletableFuture> groupsByPartition = new HashMap<>(); +groupIds.forEach(groupId -> { +final int partition = partitionFor(groupId); Review Comment: nit: I wonder if we should use `topicPartitionFor` here. With this, we could directly have the TopicPartition as the key in the Map and we would not need to create `new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, partition)` later on. What do you think? ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -523,9 +525,46 @@ public CompletableFuture> groupsByPartition = new HashMap<>(); +groupIds.forEach(groupId -> { +final int partition = partitionFor(groupId); +final List groupList = groupsByPartition.getOrDefault(partition, new ArrayList<>()); +groupList.add(groupId); +groupsByPartition.put(partition, groupList); Review Comment: nit: You could do the following to avoid having to put the list again into the map. ``` groupsByPartition .computeIdAbsent(partition, __ -> new ArrayList()) .put(groupId); ``` ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java: ## @@ -262,6 +267,45 @@ public HeartbeatResponseData genericGroupHeartbeat( ); } +/** + * Handles a GroupDelete request. + * + * @param context The request context. + * @param groupIds The groupIds of the groups to be deleted + * @return A Result containing the DeleteGroupsResponseData.DeletableGroupResultCollection response and + * a list of records to update the state machine. + */ +public CoordinatorResult deleteGroups( +RequestContext context, +List groupIds +) throws ApiException { +final DeleteGroupsResponseData.DeletableGroupResultCollection resultCollection = +new DeleteGroupsResponseData.DeletableGroupResultCollection(); +final List records = new ArrayList<>(); + +groupIds.forEach(groupId -> { +try { +groupMetadataManager.validateGroupDelete(groupId); + Review Comment: nit: We can remove this empty line. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -705,9 +744,39 @@ public CompletableFuture deleteOffsets( return FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception()); } -return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception( -"This API is not implemented yet." -)); +if (!isGroupIdNotEmpty(request.groupId())) { +return CompletableFuture.completedFuture(new OffsetDeleteResponseData() +.setErrorCode(Errors.INVALID_GROUP_ID.code()) +); +} + +return runtime.scheduleWriteOperation( +"delete-offset", +topicPartitionFor(request.groupId()), +coordinator -> coordinator.deleteOffsets(context, request) +).exceptionally(exception -> { Review Comment: It is interesting to point out that, in the current implementation, all these errors are swallowed. This is definitely not ideal because it tells to the user that the deletion is successful even if was not. Should we apply the same error handling to the deleteGroups? ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -3071,6 +3072,39 @@ private void removeCurrentMemberFromGenericGroup( group.remove(member.memberId()); } +/** + * Handles a GroupDelete request. + * + * @param context The request context. + * @param groupId The group id of the group to be deleted. + * @return A Result containing the DeleteGroupsResponseData.DeletableGroupResult response and + * a list of records to update the state machine. + */ +public CoordinatorResult groupDelete( +RequestContext context, +String groupId Review Comment: nit: The indentation is incorrect. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ## @@ -333,6 +348,80 @@ public CoordinatorResult commitOffset( return new CoordinatorResult<>(records, response); } +/** + * Handles an OffsetDelete request. + * + * @param context The request context. + * @param request The OffsetDelete request. + * + * @return A Result containing the OffsetDeleteResponseData response and + * a list of records to update the state machine. +
[GitHub] [kafka] dajac commented on a diff in pull request #14408: Kafka 14506: Implement DeleteGroups API and OffsetDelete API
dajac commented on code in PR #14408: URL: https://github.com/apache/kafka/pull/14408#discussion_r1332073736 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java: ## @@ -454,6 +454,25 @@ public static Record newGroupMetadataTombstoneRecord( ); } +/** + * Creates a ConsumerGroupMetadata tombstone. + * + * @param groupId The group id. + * @return The record. + */ +public static Record newConsumerGroupMetadataTombstoneRecord( Review Comment: This is the same as newGroupEpochTombstoneRecord, no? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #14408: Kafka 14506: Implement DeleteGroups API and OffsetDelete API
dajac commented on code in PR #14408: URL: https://github.com/apache/kafka/pull/14408#discussion_r1332072625 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java: ## @@ -262,6 +267,45 @@ public HeartbeatResponseData genericGroupHeartbeat( ); } +/** + * Handles a GroupDelete request. + * + * @param context The request context. + * @param groupIds The groupIds of the groups to be deleted + * @return A Result containing the DeleteGroupsResponseData.DeletableGroupResultCollection response and + * a list of records to update the state machine. + */ +public CoordinatorResult deleteGroups( +RequestContext context, +List groupIds +) throws ApiException { +final DeleteGroupsResponseData.DeletableGroupResultCollection resultCollection = +new DeleteGroupsResponseData.DeletableGroupResultCollection(); +final List records = new ArrayList<>(); + +groupIds.forEach(groupId -> { +try { +groupMetadataManager.validateGroupDelete(groupId); + + offsetMetadataManager.populateRecordListToDeleteAllOffsets(context, groupId, records); +final CoordinatorResult deleteGroupCoordinatorResult = Review Comment: Do we need this CoordinatorResult? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #14408: Kafka 14506: Implement DeleteGroups API and OffsetDelete API
dajac commented on code in PR #14408: URL: https://github.com/apache/kafka/pull/14408#discussion_r1332069169 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ## @@ -592,6 +607,45 @@ public void validateOffsetFetch( validateMemberEpoch(memberEpoch, member.memberEpoch()); } +/** + * Validates the OffsetDelete request. + */ +@Override +public void validateOffsetDelete() throws GroupIdNotFoundException { +if (state() == ConsumerGroupState.DEAD) { +throw new GroupIdNotFoundException(String.format("Group %s is in dead state.", groupId)); +} +} + +/** + * Validates the GroupDelete request. + */ +@Override +public void validateGroupDelete() throws ApiException { +if (state() == ConsumerGroupState.DEAD) { +throw new GroupIdNotFoundException(String.format("Group %s is in dead state.", groupId)); +} else if (state() == ConsumerGroupState.STABLE +|| state() == ConsumerGroupState.ASSIGNING +|| state() == ConsumerGroupState.RECONCILING) { +throw Errors.NON_EMPTY_GROUP.exception(); +} + +// We avoid writing the tombstone when the generationId is 0, since this group is only using +// Kafka for offset storage. +if (groupEpoch() <= 0) { +throw Errors.UNKNOWN_SERVER_ERROR.exception(); +} +} + +/** + * Creates a GroupMetadata tombstone. + * + * @return The record. + */ +public Record createMetadataTombstoneRecord() { +return RecordHelpers.newConsumerGroupMetadataTombstoneRecord(groupId()); Review Comment: I think that we need to generate the above records here. * newTargetAssignmentEpochTombstoneRecord * newGroupSubscriptionMetadataTombstoneRecord * newGroupEpochTombstoneRecord -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #14408: Kafka 14506: Implement DeleteGroups API and OffsetDelete API
dajac commented on code in PR #14408: URL: https://github.com/apache/kafka/pull/14408#discussion_r1332057781 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -523,9 +526,38 @@ public CompletableFuture> groupsByPartition = new HashMap<>(); +for (String groupId : groupIds) { +final int partition = partitionFor(groupId); +final List groupList = groupsByPartition.getOrDefault(partition, new ArrayList<>()); +groupList.add(groupId); +groupsByPartition.put(partition, groupList); +} + +final List> futures = new ArrayList<>(); +for (Map.Entry> entry : groupsByPartition.entrySet()) { +int partition = entry.getKey(); +List groupList = entry.getValue(); + CompletableFuture future = +runtime.scheduleWriteOperation("delete-group", +new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, partition), +coordinator -> coordinator.deleteGroups(context, groupList)); +futures.add(future); +} + +final CompletableFuture allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); Review Comment: `DeletableGroupResultCollection` contains `DeletableGroupResult` which has an error code. Therefore I think that we should create a `DeletableGroupResult` per group id in the `groupList` when there is an exception. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #14408: Kafka 14506: Implement DeleteGroups API and OffsetDelete API
dajac commented on code in PR #14408: URL: https://github.com/apache/kafka/pull/14408#discussion_r1331125886 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -523,9 +526,38 @@ public CompletableFuture> groupsByPartition = new HashMap<>(); +for (String groupId : groupIds) { +final int partition = partitionFor(groupId); +final List groupList = groupsByPartition.getOrDefault(partition, new ArrayList<>()); +groupList.add(groupId); +groupsByPartition.put(partition, groupList); +} + +final List> futures = new ArrayList<>(); +for (Map.Entry> entry : groupsByPartition.entrySet()) { +int partition = entry.getKey(); +List groupList = entry.getValue(); Review Comment: nit: You could use `foreach` which is a bit more concise. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -523,9 +526,38 @@ public CompletableFuture> groupsByPartition = new HashMap<>(); +for (String groupId : groupIds) { +final int partition = partitionFor(groupId); +final List groupList = groupsByPartition.getOrDefault(partition, new ArrayList<>()); +groupList.add(groupId); +groupsByPartition.put(partition, groupList); +} + +final List> futures = new ArrayList<>(); +for (Map.Entry> entry : groupsByPartition.entrySet()) { +int partition = entry.getKey(); +List groupList = entry.getValue(); + CompletableFuture future = +runtime.scheduleWriteOperation("delete-group", +new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, partition), +coordinator -> coordinator.deleteGroups(context, groupList)); +futures.add(future); +} + +final CompletableFuture allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); +return allFutures.thenApply(v -> { +final DeleteGroupsResponseData.DeletableGroupResultCollection res = new DeleteGroupsResponseData.DeletableGroupResultCollection(); +for (CompletableFuture future : futures) { +try { +DeleteGroupsResponseData.DeletableGroupResultCollection result = future.get(); Review Comment: It may be better to use `join` instead of `get`. I think that you would be able to remove the try..catch if you use `join`. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -523,9 +526,38 @@ public CompletableFuture> groupsByPartition = new HashMap<>(); +for (String groupId : groupIds) { +final int partition = partitionFor(groupId); +final List groupList = groupsByPartition.getOrDefault(partition, new ArrayList<>()); +groupList.add(groupId); +groupsByPartition.put(partition, groupList); +} + +final List> futures = new ArrayList<>(); +for (Map.Entry> entry : groupsByPartition.entrySet()) { +int partition = entry.getKey(); +List groupList = entry.getValue(); + CompletableFuture future = +runtime.scheduleWriteOperation("delete-group", +new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, partition), +coordinator -> coordinator.deleteGroups(context, groupList)); +futures.add(future); +} + +final CompletableFuture allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); Review Comment: Let's assume that one of the write operation fails with `COORDINATOR_LOAD_IN_PROGRESS`, this would result in failing `allFutures` even though some write operations may have been successful. It seems to me that we should handle exceptions for each write operation future before we combine them, no? ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -523,9 +526,38 @@ public CompletableFuture> groupsByPartition = new HashMap<>(); +for (String groupId : groupIds) { +final int partition = partitionFor(groupId); +final List groupList = groupsByPartition.getOrDefault(partition, new ArrayList<>()); +groupList.add(groupId); +groupsByPartition.put(partition, groupList); +} + +final List> futures = new ArrayList<>(); +for (Map.Entry> entry : groupsByPartition.entrySet()) { +int partition = entry.getKey(); +List groupList = entry.getValue(); + CompletableFuture future = +runtime.scheduleWriteOperation("delete-group", +new