kirktrue commented on code in PR #17700:
URL: https://github.com/apache/kafka/pull/17700#discussion_r1876888597


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java:
##########
@@ -172,6 +186,387 @@ private PollResult pollInternal(FetchRequestPreparer 
fetchRequestPreparer,
         }
     }
 
+
+    /**
+     * Create fetch requests based on the configured {@link TempFetchMode}.
+     */
+    @Override
+    protected Map<Node, FetchSessionHandler.FetchRequestData> 
prepareFetchRequests() {
+        switch (fetchConfig.tempFetchMode) {
+            case SKIP_BUFFERED:
+                return super.prepareFetchRequests();
+
+            case SKIP_FETCH:
+                return prepareFetchRequests_option2();
+
+            case INCLUDE_NOMINAL:
+                return prepareFetchRequests_option3();
+
+            case SKIP_NODE:
+                return prepareFetchRequests_option4();
+
+            default:
+                throw new IllegalArgumentException("Invalid " + 
TempFetchMode.class.getSimpleName() + " option value: " + 
fetchConfig.tempFetchMode);
+        }
+    }
+
+    private Map<Node, FetchSessionHandler.FetchRequestData> 
prepareFetchRequests_option1() {
+        // 
-------------------------------------------------------------------------------------------------------------
+        //
+        //  #######  ########  ######## ####  #######  ##    ##          ##
+        // ##     ## ##     ##    ##     ##  ##     ## ###   ##        ####
+        // ##     ## ##     ##    ##     ##  ##     ## ####  ##          ##
+        // ##     ## ########     ##     ##  ##     ## ## ## ##          ##
+        // ##     ## ##           ##     ##  ##     ## ##  ####          ##
+        // ##     ## ##           ##     ##  ##     ## ##   ###          ##
+        //  #######  ##           ##    ####  #######  ##    ##        ######
+        //
+        // 
-------------------------------------------------------------------------------------------------------------
+        // Option 1 is the existing behavior
+        // 
-------------------------------------------------------------------------------------------------------------
+        return super.prepareFetchRequests();

Review Comment:
   > It seems that the existing ClassicKafkaConsumer could still lead to 
unnecessary evictions in the fetch session. For example, two partitions (p1 and 
p2) are fetched from the same broker and are buffered in the consumer. The 
application polls p1's data and calls `ClassicKafkaConsumer.sendFetches`. This 
will generate a fetch request including p1, but with p2 being removed, causing 
it to be evicted on the server side.
   
   Correct. The logic in `ClassicKafkaConsumer` would "skip" partition `p2` 
when adding the partitions to fetch. The logic in the `FetchSessionHandler` 
would interpret the absence of partition `p2` to mean that it should be 
included in the `toForget` partition set in the fetch request.
   
   > For AsyncKafkaConsumer, the above issue also exists. It seems to have an 
additional issue that it allows more than one pending fetch request. 
`FetchRequestManager.pollInternal()` sets `pendingFetchRequestFuture` to null 
as soon as an `UnsentRequest` is generated, but not completed. This allows a 
second fetch request to be generated before the first one completes.
   
   True. But your first point above should still hold, namely that we won't 
send the second fetch request to node _N_ if it is included in 
`nodesWithPendingFetchRequests`. If I understand the scenario correctly, since 
the `AsyncKafkaConsumer` also checks `nodesWithPendingFetchRequests`, this 
would still prevent the second fetch pass from generating new requests to nodes 
that already have an inflight request.
   
   > If so, to address the issue in `ClassicKafkaConsumer`, we could only send 
a fetch request if all partitions for a broker have been drained in the fetch 
buffer and there is no pending fetch request. To address the issue in 
`AsyncKafkaConsumer`, we could track pending fetch requests and make sure there 
is only one at a time.
   
   To make sure I understand, you’re proposing to change the logic in 
`AbstractFetch.prepareFetchRequests()`, which is used by both `Consumer` 
implementations, right? The change would be that at the time the fetch requests 
are being generated, we would skip creating a fetch request for a node if: 
   
   1. The node hosts _any_ partitions that are currently buffered on the client
   2. The node has an inflight request on the client
   
   To riff off of your example above, let's say we have six partitions:
   
   - `p1` and `p2` are hosted on node `n1`
   - `p3` and `p4` are on node `n2`
   - `p5` and `p6` are on node `n3`
   
   And:
   
   - `p2` is buffered, i.e. it is included in the `Set` returned by 
`FetchBuffer.bufferedPartitions()`
   - `n3` has an inflight request, i.e. it is in `nodesWithPendingFetchRequests`
   
   In this case, we we would not generate or send requests to nodes `n1` or 
`n3`. Assuming there aren't other issues with connectivity or 
authentication/authorization, we would generate a request to node `n2`.
   
   Is that correct?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to