lianetm commented on code in PR #16241:
URL: https://github.com/apache/kafka/pull/16241#discussion_r1635060650


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -1279,4 +1288,119 @@ static class MemberInfo {
             this.memberEpoch = Optional.empty();
         }
     }
+
+    /**
+     * As the name suggests, this caches the result of an {@link 
OffsetFetchRequestState offset fetch request}
+     * <em>in specific cases</em>. The logic resembles that of {@link 
ConsumerCoordinator}'s
+     * <code>PendingCommittedOffsetRequest</code> mechanism.
+     *
+     * <p/>
+     *
+     * This handles the case where a user calls {@link 
Consumer#poll(Duration)} with a low timeout value, such as 0.
+     * As the timeout value approaches zero, the likelihood that the client 
will be able to fetch the offsets from the
+     * broker within the user's timeout also decreases. But we can take 
advantage of the fact that {@code poll()}
+     * is typically invoked in a tight loop, so the user's application will 
likely call {@code poll()} again to make
+     * a second attempt with the same set of {@link TopicPartition}s as the 
first attempt.
+     *
+     * <p/>
+     *
+     * The idea here is to cache the results of the last successful--but 
expired--response. An operation may exceed
+     * the time the user allotted, but that doesn't mean that the network 
request is aborted. In this scenario, it is
+     * often the case that the client receives a successful response, though 
it has technically exceeded the amount
+     * of time the user specified. However, as mentioned above, {@code poll()} 
is likely to be invoked again with the
+     * same set of partitions. By caching the successful response from attempt 
#1--though it timed out from the user's
+     * perspective--the client is able to satisfy attempt #2 from the cache.

Review Comment:
   That's a very good point. The legacy logic does not cache the fetch 
responses. It caches only the fetch request for which it hasn't received a 
response yet (that can be safely done if the set of partitions requested are 
the same). As soon as it gets a response it clears the cached request 
([here](https://github.com/apache/kafka/blob/f746d67c3bf62e7d4f5f4e652ea36b4d5a0e01a6/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L947)).
  We should never cache the fetch responses because it could definitely lead to 
the gap that @cadonna is pointing out, if a commit happens in between. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to