ableegoldman commented on a change in pull request #11057: URL: https://github.com/apache/kafka/pull/11057#discussion_r675269697
########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ########## @@ -907,14 +944,23 @@ private void maybeSetOffsetForLeaderException(RuntimeException e) { final AtomicInteger remainingResponses = new AtomicInteger(timestampsToSearchByNode.size()); for (Map.Entry<Node, Map<TopicPartition, ListOffsetsPartition>> entry : timestampsToSearchByNode.entrySet()) { - RequestFuture<ListOffsetResult> future = - sendListOffsetRequest(entry.getKey(), entry.getValue(), requireTimestamps); + // we skip sending the list off request only if there's already one with the exact + // requested offsets for the destination node Review comment: Hm..I wonder if deduplicating like this within the Fetcher itself is too low-level, ie there may be other callers of `sendListOffsetsRequests` that actually do want to issue a new request. I think there are arguments to be made for doing this for all requests, but maybe also some arguments against it -- this is a more drastic change that means APIs like `Consumer#endOffsets` can actually return old/stale results (by up to the configured `request.timeout` at most). Since this is a last-minute blocker fix I'd prefer to keep the changes to a minimum and scoped to the specific bug, if at all possible. Can we do the deduplication in another layer, so that we only avoid re-sending the listOffsets request in the specific case of `currentLag`, where we know it's acceptable to report a slightly-out-of-date value because the alternative is to report no value at all? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org