AndrewJSchofield commented on code in PR #18848:
URL: https://github.com/apache/kafka/pull/18848#discussion_r1956632577
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -822,20 +831,27 @@ public
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection
});
groupsByTopicPartition.forEach((topicPartition, groupList) -> {
-
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection>
future =
- runtime.scheduleWriteOperation(
- "delete-groups",
- topicPartition,
- Duration.ofMillis(config.offsetCommitTimeoutMs()),
- coordinator -> coordinator.deleteGroups(context, groupList)
- ).exceptionally(exception -> handleOperationException(
- "delete-groups",
- groupList,
- exception,
- (error, __) ->
DeleteGroupsRequest.getErrorResultCollection(groupList, error),
- log
- ));
+
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection>
future = deleteShareGroups(topicPartition, groupList).thenCompose(groupErrMap
-> {
+ DeleteGroupsResponseData.DeletableGroupResultCollection
collection = new DeleteGroupsResponseData.DeletableGroupResultCollection();
+ List<String> retainedGroupIds =
deleteCandidateGroupIds(groupErrMap, groupList, collection);
+ if (retainedGroupIds.isEmpty()) {
+ return CompletableFuture.completedFuture(collection);
+ }
+ return handleDeleteGroups(context, topicPartition,
retainedGroupIds)
+ .whenComplete((resp, __) -> resp.forEach(result ->
collection.add(result.duplicate())))
+ .thenApply(__ -> collection);
+ });
+ // deleteShareGroups has its own exceptionally block, so we don't
need one here.
+
+ // This future object has the following stages:
+ // - First it invokes the share group delete flow where the shard
sharePartitionDeleteRequests
+ // method is invoked, and it returns request objects for each
valid share group passed to it.
+ // - Then the requests are passed to the persister.deleteStata
method one at a time. The results
+ // are collated as a Map of groupId -> persister errors
+ // - The above map is then used to decide whether to invoke the
group coordinator delete groups logic
+ // - Share groups with failed persister delete are NOT CONSIDERED
for group coordinator delete.
+ // TLDR: DeleteShareGroups -> filter erroneous persister deletes
-> Original delete groups logic
Review Comment:
I think "original" is going to date quickly. I think "general" might be
better.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -6280,6 +6286,48 @@ public void createGroupTombstoneRecords(
group.createGroupTombstoneRecords(records);
}
+ /**
+ * Returns an optional of delete share group request object to be used
with the persister.
Review Comment:
This doesn't really do the method justice. It seems to me that the salient
points are:
* It checks whether the share group is empty. If it is not empty, the return
value is empty.
* If there are no subscribed topics, again the return value is empty.
* Otherwise, it returns the parameters required by the persister to delete
the share-partition state.
I would rename the method to something like
`shareGroupCheckEmptyAndBuildPartitionDeleteRequest`.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -6280,6 +6286,48 @@ public void createGroupTombstoneRecords(
group.createGroupTombstoneRecords(records);
}
+ /**
+ * Returns an optional of delete share group request object to be used
with the persister.
Review Comment:
It could even throw `GroupNotEmptyException` if the group is not empty. That
seems like the cleanest way out of this deletion process for a non-empty share
group.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]