dajac commented on a change in pull request #11026: URL: https://github.com/apache/kafka/pull/11026#discussion_r670436565
########## File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java ########## @@ -110,41 +112,97 @@ public String apiName() { groupOffsetsListing.put(topicPartition, new OffsetAndMetadata(offset, leaderEpoch, metadata)); } } else { - log.warn("Skipping return offset for {} due to error {}.", topicPartition, error); + handlePartitionError(groupId, topicPartition, error, groupsToUnmap, groupsToRetry); } } completed.put(groupId, groupOffsetsListing); } - return new ApiResult<>(completed, failed, unmapped); + + if (groupsToUnmap.isEmpty() && groupsToRetry.isEmpty()) { + return new ApiResult<>( + completed, Review comment: @showuon I think that there is a case that we don't handle correctly. Imagine that `GROUP_AUTHORIZATION_FAILED` is returned as a partition error. In this case, we ignore it in `handlePartitionError` and therefore don't add the failed group to `failed`. I think that we should also handle all the group level errors in `handlePartitionError`. The second thing is that if there is a group failure, we should not add the group to `completed` at L131. Otherwise, this will complete the group future with an empty list. Could you check this out and add a test for it? ########## File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java ########## @@ -110,40 +124,96 @@ public String apiName() { groupOffsetsListing.put(topicPartition, new OffsetAndMetadata(offset, leaderEpoch, metadata)); } } else { - log.warn("Skipping return offset for {} due to error {}.", topicPartition, error); + // In responseData V0 and V1, there's no top level error, we have to handle errors here + handlePartitionError(groupId, topicPartition, error, groupsToUnmap, groupsToRetry); } } completed.put(groupId, groupOffsetsListing); } - return new ApiResult<>(completed, failed, unmapped); + + if (groupsToUnmap.isEmpty() && groupsToRetry.isEmpty()) { + return new ApiResult<>( + completed, + failed, + Collections.emptyList() + ); + } else { + // retry the request, so don't send completed/failed results back + return new ApiResult<>( + Collections.emptyMap(), + Collections.emptyMap(), + new ArrayList<>(groupsToUnmap) + ); + } } - private void handleError( + private void handleGroupError( CoordinatorKey groupId, Errors error, - Map<CoordinatorKey, - Throwable> failed, - List<CoordinatorKey> unmapped + Map<CoordinatorKey, Throwable> failed, + Set<CoordinatorKey> groupsToUnmap, + Set<CoordinatorKey> groupsToRetry ) { switch (error) { case GROUP_AUTHORIZATION_FAILED: - log.error("Received authorization failure for group {} in `OffsetFetch` response", groupId, - error.exception()); + log.debug("`OffsetFetch` request for group id {} failed due to error {}", groupId.idValue, error); failed.put(groupId, error.exception()); break; + case COORDINATOR_LOAD_IN_PROGRESS: + // If the coordinator is in the middle of loading, then we just need to retry + log.debug("`OffsetFetch` request for group id {} failed because the coordinator " + + "is still in the process of loading state. Will retry", groupId.idValue); + groupsToRetry.add(groupId); + break; case COORDINATOR_NOT_AVAILABLE: + case NOT_COORDINATOR: + // If the coordinator is unavailable or there was a coordinator change, then we unmap + // the key so that we retry the `FindCoordinator` request + log.debug("`OffsetFetch` request for group id {} returned error {}. " + + "Will attempt to find the coordinator again and retry", groupId.idValue, error); + groupsToUnmap.add(groupId); + break; + + default: + final String unexpectedErrorMsg = + String.format("`OffsetFetch` request for group id %s failed due to error %s", groupId.idValue, error); + log.error(unexpectedErrorMsg); + failed.put(groupId, error.exception(unexpectedErrorMsg)); + } + } + + private void handlePartitionError( + CoordinatorKey groupId, + TopicPartition topicPartition, + Errors error, + Set<CoordinatorKey> groupsToUnmap, + Set<CoordinatorKey> groupsToRetry + ) { + switch (error) { + case COORDINATOR_LOAD_IN_PROGRESS: + // If the coordinator is in the middle of loading, then we just need to retry + log.debug("`{}` request for group {} failed because the coordinator " + Review comment: Could we also update the log messages here and below to follow what you did in `handleGroupError`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org