AndrewJSchofield commented on code in PR #18848:
URL: https://github.com/apache/kafka/pull/18848#discussion_r1956632577


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -822,20 +831,27 @@ public 
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection
         });
 
         groupsByTopicPartition.forEach((topicPartition, groupList) -> {
-            
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> 
future =
-                runtime.scheduleWriteOperation(
-                    "delete-groups",
-                    topicPartition,
-                    Duration.ofMillis(config.offsetCommitTimeoutMs()),
-                    coordinator -> coordinator.deleteGroups(context, groupList)
-                ).exceptionally(exception -> handleOperationException(
-                    "delete-groups",
-                    groupList,
-                    exception,
-                    (error, __) -> 
DeleteGroupsRequest.getErrorResultCollection(groupList, error),
-                    log
-                ));
+            
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> 
future = deleteShareGroups(topicPartition, groupList).thenCompose(groupErrMap 
-> {
+                DeleteGroupsResponseData.DeletableGroupResultCollection 
collection = new DeleteGroupsResponseData.DeletableGroupResultCollection();
+                List<String> retainedGroupIds = 
deleteCandidateGroupIds(groupErrMap, groupList, collection);
+                if (retainedGroupIds.isEmpty()) {
+                    return CompletableFuture.completedFuture(collection);
+                }
 
+                return handleDeleteGroups(context, topicPartition, 
retainedGroupIds)
+                    .whenComplete((resp, __) -> resp.forEach(result -> 
collection.add(result.duplicate())))
+                    .thenApply(__ -> collection);
+            });
+            // deleteShareGroups has its own exceptionally block, so we don't 
need one here.
+
+            // This future object has the following stages:
+            // - First it invokes the share group delete flow where the shard 
sharePartitionDeleteRequests
+            // method is invoked, and it returns request objects for each 
valid share group passed to it.
+            // - Then the requests are passed to the persister.deleteStata 
method one at a time. The results
+            // are collated as a Map of groupId -> persister errors
+            // - The above map is then used to decide whether to invoke the 
group coordinator delete groups logic
+            // - Share groups with failed persister delete are NOT CONSIDERED 
for group coordinator delete.
+            // TLDR: DeleteShareGroups -> filter erroneous persister deletes 
-> Original delete groups logic

Review Comment:
   I think "original" is going to date quickly. I think "general" might be 
better.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -6280,6 +6286,48 @@ public void createGroupTombstoneRecords(
         group.createGroupTombstoneRecords(records);
     }
 
+    /**
+     * Returns an optional of delete share group request object to be used 
with the persister.

Review Comment:
   This doesn't really do the method justice. It seems to me that the salient 
points are:
   * It checks whether the share group is empty. If it is not empty, the return 
value is empty.
   * If there are no subscribed topics, again the return value is empty.
   * Otherwise, it returns the parameters required by the persister to delete 
the share-partition state.
   
   I would rename the method to something like 
`shareGroupCheckEmptyAndBuildPartitionDeleteRequest`.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -6280,6 +6286,48 @@ public void createGroupTombstoneRecords(
         group.createGroupTombstoneRecords(records);
     }
 
+    /**
+     * Returns an optional of delete share group request object to be used 
with the persister.

Review Comment:
   It could even throw `GroupNotEmptyException` if the group is not empty. That 
seems like the cleanest way out of this deletion process for a non-empty share 
group.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to