lianetm commented on code in PR #14557:
URL: https://github.com/apache/kafka/pull/14557#discussion_r1426842174


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -457,120 +539,224 @@ public void onResponse(final ClientResponse response) {
             }
         }
 
-        private void handleRetriableError(Errors error, ClientResponse 
response) {
-            if (error == COORDINATOR_NOT_AVAILABLE ||
-                error == NOT_COORDINATOR ||
-                error == REQUEST_TIMED_OUT) {
-                
coordinatorRequestManager.markCoordinatorUnknown(error.message(), 
response.receivedTimeMs());
+        /**
+         * Enqueue the request to be retried with exponential backoff. This 
will fail the request
+         * without retrying if the request timer expired.
+         */
+        @Override
+        void retry(long currentTimeMs, Throwable throwable) {
+            if (!expirationTimeMs.isPresent() || isExpired(currentTimeMs)) {
+                // Fail requests that had no expiration time (async requests), 
or that had it, and
+                // it expired (sync requests).
+                future.completeExceptionally(throwable);
+                return;
             }
-        }
 
-        private void retry(final long currentTimeMs) {
+            // Enqueue request to be retried with backoff. Note that this 
maintains the same
+            // timer of the initial request, so all the retries are 
time-bounded.
             onFailedAttempt(currentTimeMs);
             pendingRequests.addOffsetCommitRequest(this);
         }
 
-        private void handleFatalError(final Errors error) {
-            switch (error) {
-                case GROUP_AUTHORIZATION_FAILED:
-                    
future.completeExceptionally(GroupAuthorizationException.forGroupId(groupId));
-                    break;
-                case OFFSET_METADATA_TOO_LARGE:
-                case INVALID_COMMIT_OFFSET_SIZE:
-                    future.completeExceptionally(error.exception());
-                    break;
-                case FENCED_INSTANCE_ID:
-                    log.info("OffsetCommit failed due to group instance id {} 
fenced: {}", groupInstanceId, error.message());
-                    future.completeExceptionally(new CommitFailedException());
-                    break;

Review Comment:
   Got removed with the simplification that all errors that just need to fail 
the future (no other specific action) are not handled explicitly, and just go 
to the general else where we log an error with the error.message, and fail the 
future with 
`error.exception`[here](https://github.com/apache/kafka/blob/b7eb1081cd2d7546c47ed549ea4881b51ef96fec/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java#L531).
 
   
   So right now this will fail with `FenceInstanceExpception`, and log an error 
line including the fence message. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to