lianetm commented on code in PR #15844: URL: https://github.com/apache/kafka/pull/15844#discussion_r1591065112
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ########## @@ -1145,14 +1141,42 @@ private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> addOffsetFetch inflightOffsetFetches.stream().filter(r -> r.sameRequest(request)).findAny(); if (dupe.isPresent() || inflight.isPresent()) { - log.info("Duplicated OffsetFetchRequest: " + request.requestedPartitions); - dupe.orElseGet(inflight::get).chainFuture(request.future); + log.info("Duplicate OffsetFetchRequest found for partitions: {}", request.requestedPartitions); + OffsetFetchRequestState originalRequest = dupe.orElseGet(inflight::get); + originalRequest.chainFuture(request.future); } else { this.unsentOffsetFetches.add(request); } return request.future; } + /** + * Remove the {@link OffsetFetchRequestState request} from the inflight requests queue <em>iff</em> + * both of the following are true: + * + * <ul> + * <li>The request completed with a <code>null</code> {@link Throwable error}</li> + * <li>The request is not {@link OffsetFetchRequestState#isExpired expired}</li> + * </ul> + * + * <p/> + * + * In some cases, even though an offset fetch request may complete without an error, <em>technically</em> + * the request took longer than the user's provided timeout. In that case, the application thread will + * still receive a timeout error, and will shortly try to fetch these offsets again. Keeping the result + * of the <em>current</em> attempt will enable the </em><em>next</em> attempt to use that result and return + * almost immediately. + */ + private void maybeRemoveInflightOffsetFetch(OffsetFetchRequestState fetchRequest, Throwable error) { + if (error == null && !fetchRequest.isExpired) { Review Comment: this line implies a big change in the current logic, that I wonder if we're taking too far. Agree with not removing the expired requests (that's the root cause of the problem we have), but why putting all errors (not only timeout) in the same bucket? With this new check, how are we ensuring that fetch requests that fail fatally are removed from the inflight queue? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org