kirktrue commented on code in PR #17700:
URL: https://github.com/apache/kafka/pull/17700#discussion_r1876888597
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java:
##########
@@ -172,6 +186,387 @@ private PollResult pollInternal(FetchRequestPreparer
fetchRequestPreparer,
}
}
+
+ /**
+ * Create fetch requests based on the configured {@link TempFetchMode}.
+ */
+ @Override
+ protected Map<Node, FetchSessionHandler.FetchRequestData>
prepareFetchRequests() {
+ switch (fetchConfig.tempFetchMode) {
+ case SKIP_BUFFERED:
+ return super.prepareFetchRequests();
+
+ case SKIP_FETCH:
+ return prepareFetchRequests_option2();
+
+ case INCLUDE_NOMINAL:
+ return prepareFetchRequests_option3();
+
+ case SKIP_NODE:
+ return prepareFetchRequests_option4();
+
+ default:
+ throw new IllegalArgumentException("Invalid " +
TempFetchMode.class.getSimpleName() + " option value: " +
fetchConfig.tempFetchMode);
+ }
+ }
+
+ private Map<Node, FetchSessionHandler.FetchRequestData>
prepareFetchRequests_option1() {
+ //
-------------------------------------------------------------------------------------------------------------
+ //
+ // ####### ######## ######## #### ####### ## ## ##
+ // ## ## ## ## ## ## ## ## ### ## ####
+ // ## ## ## ## ## ## ## ## #### ## ##
+ // ## ## ######## ## ## ## ## ## ## ## ##
+ // ## ## ## ## ## ## ## ## #### ##
+ // ## ## ## ## ## ## ## ## ### ##
+ // ####### ## ## #### ####### ## ## ######
+ //
+ //
-------------------------------------------------------------------------------------------------------------
+ // Option 1 is the existing behavior
+ //
-------------------------------------------------------------------------------------------------------------
+ return super.prepareFetchRequests();
Review Comment:
> It seems that the existing ClassicKafkaConsumer could still lead to
unnecessary evictions in the fetch session. For example, two partitions (p1 and
p2) are fetched from the same broker and are buffered in the consumer. The
application polls p1's data and calls `ClassicKafkaConsumer.sendFetches`. This
will generate a fetch request including p1, but with p2 being removed, causing
it to be evicted on the server side.
Correct. The logic in `ClassicKafkaConsumer` would "skip" partition `p2`
when adding the partitions to fetch. The logic in the `FetchSessionHandler`
would interpret the absence of partition `p2` to mean that it should be
included in the `toForget` partition set in the fetch request.
> For AsyncKafkaConsumer, the above issue also exists. It seems to have an
additional issue that it allows more than one pending fetch request.
`FetchRequestManager.pollInternal()` sets `pendingFetchRequestFuture` to null
as soon as an `UnsentRequest` is generated, but not completed. This allows a
second fetch request to be generated before the first one completes.
True. But your first point above should still hold, namely that we won't
send the second fetch request to node _N_ if it is included in
`nodesWithPendingFetchRequests`. If I understand the scenario correctly, since
the `AsyncKafkaConsumer` also checks `nodesWithPendingFetchRequests`, this
would still prevent the second fetch pass from generating new requests to nodes
that already have an inflight request.
> If so, to address the issue in `ClassicKafkaConsumer`, we could only send
a fetch request if all partitions for a broker have been drained in the fetch
buffer and there is no pending fetch request. To address the issue in
`AsyncKafkaConsumer`, we could track pending fetch requests and make sure there
is only one at a time.
To make sure I understand, you’re proposing to change the logic in
`AbstractFetch.prepareFetchRequests()`, which is used by both `Consumer`
implementations, right? The change would be that at the time the fetch requests
are being generated, we would skip creating a fetch request for a node if:
1. The node hosts _any_ partitions that are currently buffered on the client
2. The node has an inflight request on the client
To riff off of your example above, let's say we have six partitions:
- `p1` and `p2` are hosted on node `n1`
- `p3` and `p4` are on node `n2`
- `p5` and `p6` are on node `n3`
And:
- `p2` is buffered, i.e. it is included in the `Set` returned by
`FetchBuffer.bufferedPartitions()`
- `n3` has an inflight request, i.e. it is in `nodesWithPendingFetchRequests`
In this case, we we would not generate or send requests to nodes `n1` or
`n3`. Assuming there aren't other issues with connectivity or
authentication/authorization, we would generate a request to node `n2`.
Is that correct?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]