showuon commented on a change in pull request #11019:
URL: https://github.com/apache/kafka/pull/11019#discussion_r669471854



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java
##########
@@ -97,54 +108,79 @@ public String apiName() {
         Set<CoordinatorKey> groupIds,
         AbstractResponse abstractResponse
     ) {
+        validateKeys(groupIds);
+
         final OffsetDeleteResponse response = (OffsetDeleteResponse) 
abstractResponse;
-        Map<CoordinatorKey, Map<TopicPartition, Errors>> completed = new 
HashMap<>();
-        Map<CoordinatorKey, Throwable> failed = new HashMap<>();
-        List<CoordinatorKey> unmapped = new ArrayList<>();
+        final Map<CoordinatorKey, Map<TopicPartition, Errors>> completed = new 
HashMap<>();
+        final Map<CoordinatorKey, Throwable> failed = new HashMap<>();
+        final Set<CoordinatorKey> groupsToUnmap = new HashSet<>();
+        final Set<CoordinatorKey> groupsToRetry = new HashSet<>();
 
         final Errors error = Errors.forCode(response.data().errorCode());
         if (error != Errors.NONE) {
-            handleError(groupId, error, failed, unmapped);
+            handleGroupError(groupId, error, failed, groupsToUnmap, 
groupsToRetry);
         } else {
-            final Map<TopicPartition, Errors> partitions = new HashMap<>();
-            response.data().topics().forEach(topic -> 
+            final Map<TopicPartition, Errors> partitionResults = new 
HashMap<>();
+            response.data().topics().forEach(topic ->
                 topic.partitions().forEach(partition -> {
                     Errors partitionError = 
Errors.forCode(partition.errorCode());
-                    if (!handleError(groupId, partitionError, failed, 
unmapped)) {
-                        partitions.put(new TopicPartition(topic.name(), 
partition.partitionIndex()), partitionError);
-                    }
+
+                    partitionResults.put(new TopicPartition(topic.name(), 
partition.partitionIndex()), partitionError);
                 })
             );
-            if (!partitions.isEmpty())
-                completed.put(groupId, partitions);
+
+            completed.put(groupId, partitionResults);
+        }
+
+        if (groupsToUnmap.isEmpty() && groupsToRetry.isEmpty()) {
+            return new ApiResult<>(
+                completed,
+                failed,
+                Collections.emptyList()
+            );
+        } else {
+            // retry the request, so don't send completed/failed results back
+            return new ApiResult<>(
+                Collections.emptyMap(),
+                Collections.emptyMap(),
+                new ArrayList<>(groupsToUnmap)
+            );
         }
-        return new ApiResult<>(completed, failed, unmapped);
     }
 
-    private boolean handleError(
+    private void handleGroupError(
         CoordinatorKey groupId,
         Errors error,
         Map<CoordinatorKey, Throwable> failed,
-        List<CoordinatorKey> unmapped
+        Set<CoordinatorKey> groupsToUnmap,
+        Set<CoordinatorKey> groupsToRetry
     ) {
         switch (error) {
             case GROUP_AUTHORIZATION_FAILED:
             case GROUP_ID_NOT_FOUND:
             case INVALID_GROUP_ID:
-                log.error("Received non retriable error for group {} in 
`DeleteConsumerGroupOffsets` response", groupId,
-                        error.exception());
+            case NON_EMPTY_GROUP:
+                log.debug("`OffsetDelete` request for group id {} failed due 
to error {}.", groupId, error);
                 failed.put(groupId, error.exception());
-                return true;
+                break;
             case COORDINATOR_LOAD_IN_PROGRESS:
+                // If the coordinator is in the middle of loading, then we 
just need to retry
+                log.debug("`OffsetDelete` request for group {} failed because 
the coordinator" +

Review comment:
       Updated. I'll also update other PRs.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to