junrao commented on code in PR #17700:
URL: https://github.com/apache/kafka/pull/17700#discussion_r1876938254


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java:
##########
@@ -172,6 +186,387 @@ private PollResult pollInternal(FetchRequestPreparer 
fetchRequestPreparer,
         }
     }
 
+
+    /**
+     * Create fetch requests based on the configured {@link TempFetchMode}.
+     */
+    @Override
+    protected Map<Node, FetchSessionHandler.FetchRequestData> 
prepareFetchRequests() {
+        switch (fetchConfig.tempFetchMode) {
+            case SKIP_BUFFERED:
+                return super.prepareFetchRequests();
+
+            case SKIP_FETCH:
+                return prepareFetchRequests_option2();
+
+            case INCLUDE_NOMINAL:
+                return prepareFetchRequests_option3();
+
+            case SKIP_NODE:
+                return prepareFetchRequests_option4();
+
+            default:
+                throw new IllegalArgumentException("Invalid " + 
TempFetchMode.class.getSimpleName() + " option value: " + 
fetchConfig.tempFetchMode);
+        }
+    }
+
+    private Map<Node, FetchSessionHandler.FetchRequestData> 
prepareFetchRequests_option1() {
+        // 
-------------------------------------------------------------------------------------------------------------
+        //
+        //  #######  ########  ######## ####  #######  ##    ##          ##
+        // ##     ## ##     ##    ##     ##  ##     ## ###   ##        ####
+        // ##     ## ##     ##    ##     ##  ##     ## ####  ##          ##
+        // ##     ## ########     ##     ##  ##     ## ## ## ##          ##
+        // ##     ## ##           ##     ##  ##     ## ##  ####          ##
+        // ##     ## ##           ##     ##  ##     ## ##   ###          ##
+        //  #######  ##           ##    ####  #######  ##    ##        ######
+        //
+        // 
-------------------------------------------------------------------------------------------------------------
+        // Option 1 is the existing behavior
+        // 
-------------------------------------------------------------------------------------------------------------
+        return super.prepareFetchRequests();

Review Comment:
   > True. But your first point above should still hold, namely that we won't 
send the second fetch request to node N if it is included in 
nodesWithPendingFetchRequests. If I understand the scenario correctly, since 
the AsyncKafkaConsumer also checks nodesWithPendingFetchRequests, this would 
still prevent the second fetch pass from generating new requests to nodes that 
already have an inflight request.
   
   Thanks for the explanation. You are correct.
   
   > To make sure I understand, you’re proposing to change the logic in 
AbstractFetch.prepareFetchRequests(), which is used by both Consumer 
implementations, right? The change would be that at the time the fetch requests 
are being generated, we would skip creating a fetch request for a node if:
   > 
   > 1. The node hosts any partitions that are currently buffered on the client
   > 2. The node has an inflight request on the client
   
   Yes, that's my suggestion. This should fix the issue for both Consumer 
implementation, right? I think it's better than the approach that includes the 
partition but sets maxBytes to 0 since it defeats the purpose of fetch session.
   
   @hachikuji : Does this approach make sense to you for `ClassicKafkaConsumer`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to