junrao commented on code in PR #17700:
URL: https://github.com/apache/kafka/pull/17700#discussion_r1876938254
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java:
##########
@@ -172,6 +186,387 @@ private PollResult pollInternal(FetchRequestPreparer
fetchRequestPreparer,
}
}
+
+ /**
+ * Create fetch requests based on the configured {@link TempFetchMode}.
+ */
+ @Override
+ protected Map<Node, FetchSessionHandler.FetchRequestData>
prepareFetchRequests() {
+ switch (fetchConfig.tempFetchMode) {
+ case SKIP_BUFFERED:
+ return super.prepareFetchRequests();
+
+ case SKIP_FETCH:
+ return prepareFetchRequests_option2();
+
+ case INCLUDE_NOMINAL:
+ return prepareFetchRequests_option3();
+
+ case SKIP_NODE:
+ return prepareFetchRequests_option4();
+
+ default:
+ throw new IllegalArgumentException("Invalid " +
TempFetchMode.class.getSimpleName() + " option value: " +
fetchConfig.tempFetchMode);
+ }
+ }
+
+ private Map<Node, FetchSessionHandler.FetchRequestData>
prepareFetchRequests_option1() {
+ //
-------------------------------------------------------------------------------------------------------------
+ //
+ // ####### ######## ######## #### ####### ## ## ##
+ // ## ## ## ## ## ## ## ## ### ## ####
+ // ## ## ## ## ## ## ## ## #### ## ##
+ // ## ## ######## ## ## ## ## ## ## ## ##
+ // ## ## ## ## ## ## ## ## #### ##
+ // ## ## ## ## ## ## ## ## ### ##
+ // ####### ## ## #### ####### ## ## ######
+ //
+ //
-------------------------------------------------------------------------------------------------------------
+ // Option 1 is the existing behavior
+ //
-------------------------------------------------------------------------------------------------------------
+ return super.prepareFetchRequests();
Review Comment:
> True. But your first point above should still hold, namely that we won't
send the second fetch request to node N if it is included in
nodesWithPendingFetchRequests. If I understand the scenario correctly, since
the AsyncKafkaConsumer also checks nodesWithPendingFetchRequests, this would
still prevent the second fetch pass from generating new requests to nodes that
already have an inflight request.
Thanks for the explanation. You are correct.
> To make sure I understand, you’re proposing to change the logic in
AbstractFetch.prepareFetchRequests(), which is used by both Consumer
implementations, right? The change would be that at the time the fetch requests
are being generated, we would skip creating a fetch request for a node if:
>
> 1. The node hosts any partitions that are currently buffered on the client
> 2. The node has an inflight request on the client
Yes, that's my suggestion. This should fix the issue for both Consumer
implementation, right? I think it's better than the approach that includes the
partition but sets maxBytes to 0 since it defeats the purpose of fetch session.
@hachikuji : Does this approach make sense to you for `ClassicKafkaConsumer`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]