lianetm commented on code in PR #14557: URL: https://github.com/apache/kafka/pull/14557#discussion_r1426842174
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ########## @@ -457,120 +539,224 @@ public void onResponse(final ClientResponse response) { } } - private void handleRetriableError(Errors error, ClientResponse response) { - if (error == COORDINATOR_NOT_AVAILABLE || - error == NOT_COORDINATOR || - error == REQUEST_TIMED_OUT) { - coordinatorRequestManager.markCoordinatorUnknown(error.message(), response.receivedTimeMs()); + /** + * Enqueue the request to be retried with exponential backoff. This will fail the request + * without retrying if the request timer expired. + */ + @Override + void retry(long currentTimeMs, Throwable throwable) { + if (!expirationTimeMs.isPresent() || isExpired(currentTimeMs)) { + // Fail requests that had no expiration time (async requests), or that had it, and + // it expired (sync requests). + future.completeExceptionally(throwable); + return; } - } - private void retry(final long currentTimeMs) { + // Enqueue request to be retried with backoff. Note that this maintains the same + // timer of the initial request, so all the retries are time-bounded. onFailedAttempt(currentTimeMs); pendingRequests.addOffsetCommitRequest(this); } - private void handleFatalError(final Errors error) { - switch (error) { - case GROUP_AUTHORIZATION_FAILED: - future.completeExceptionally(GroupAuthorizationException.forGroupId(groupId)); - break; - case OFFSET_METADATA_TOO_LARGE: - case INVALID_COMMIT_OFFSET_SIZE: - future.completeExceptionally(error.exception()); - break; - case FENCED_INSTANCE_ID: - log.info("OffsetCommit failed due to group instance id {} fenced: {}", groupInstanceId, error.message()); - future.completeExceptionally(new CommitFailedException()); - break; Review Comment: Got removed with the simplification that all errors that just need to fail the future (no other specific action) are not handled explicitly, and just go to the general else where we log an error with the error.message, and fail the future with `error.exception`[here](https://github.com/apache/kafka/blob/b7eb1081cd2d7546c47ed549ea4881b51ef96fec/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java#L531). So right now this will fail with `FenceInstanceExpception`, and log an error line including the fence message. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org