showuon commented on a change in pull request #11019: URL: https://github.com/apache/kafka/pull/11019#discussion_r669471854
########## File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java ########## @@ -97,54 +108,79 @@ public String apiName() { Set<CoordinatorKey> groupIds, AbstractResponse abstractResponse ) { + validateKeys(groupIds); + final OffsetDeleteResponse response = (OffsetDeleteResponse) abstractResponse; - Map<CoordinatorKey, Map<TopicPartition, Errors>> completed = new HashMap<>(); - Map<CoordinatorKey, Throwable> failed = new HashMap<>(); - List<CoordinatorKey> unmapped = new ArrayList<>(); + final Map<CoordinatorKey, Map<TopicPartition, Errors>> completed = new HashMap<>(); + final Map<CoordinatorKey, Throwable> failed = new HashMap<>(); + final Set<CoordinatorKey> groupsToUnmap = new HashSet<>(); + final Set<CoordinatorKey> groupsToRetry = new HashSet<>(); final Errors error = Errors.forCode(response.data().errorCode()); if (error != Errors.NONE) { - handleError(groupId, error, failed, unmapped); + handleGroupError(groupId, error, failed, groupsToUnmap, groupsToRetry); } else { - final Map<TopicPartition, Errors> partitions = new HashMap<>(); - response.data().topics().forEach(topic -> + final Map<TopicPartition, Errors> partitionResults = new HashMap<>(); + response.data().topics().forEach(topic -> topic.partitions().forEach(partition -> { Errors partitionError = Errors.forCode(partition.errorCode()); - if (!handleError(groupId, partitionError, failed, unmapped)) { - partitions.put(new TopicPartition(topic.name(), partition.partitionIndex()), partitionError); - } + + partitionResults.put(new TopicPartition(topic.name(), partition.partitionIndex()), partitionError); }) ); - if (!partitions.isEmpty()) - completed.put(groupId, partitions); + + completed.put(groupId, partitionResults); + } + + if (groupsToUnmap.isEmpty() && groupsToRetry.isEmpty()) { + return new ApiResult<>( + completed, + failed, + Collections.emptyList() + ); + } else { + // retry the request, so don't send completed/failed results back + return new ApiResult<>( + Collections.emptyMap(), + Collections.emptyMap(), + new ArrayList<>(groupsToUnmap) + ); } - return new ApiResult<>(completed, failed, unmapped); } - private boolean handleError( + private void handleGroupError( CoordinatorKey groupId, Errors error, Map<CoordinatorKey, Throwable> failed, - List<CoordinatorKey> unmapped + Set<CoordinatorKey> groupsToUnmap, + Set<CoordinatorKey> groupsToRetry ) { switch (error) { case GROUP_AUTHORIZATION_FAILED: case GROUP_ID_NOT_FOUND: case INVALID_GROUP_ID: - log.error("Received non retriable error for group {} in `DeleteConsumerGroupOffsets` response", groupId, - error.exception()); + case NON_EMPTY_GROUP: + log.debug("`OffsetDelete` request for group id {} failed due to error {}.", groupId, error); failed.put(groupId, error.exception()); - return true; + break; case COORDINATOR_LOAD_IN_PROGRESS: + // If the coordinator is in the middle of loading, then we just need to retry + log.debug("`OffsetDelete` request for group {} failed because the coordinator" + Review comment: Updated. I'll also update other PRs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org