kirktrue commented on code in PR #17700:
URL: https://github.com/apache/kafka/pull/17700#discussion_r1917407061
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java:
##########
@@ -456,6 +457,36 @@ protected Map<Node, FetchSessionHandler.FetchRequestData>
prepareFetchRequests()
return
fetchable.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e ->
e.getValue().build()));
}
+ private Set<Integer> nodesWithBufferedPartitions(Set<TopicPartition>
buffered, long currentTimeMs) {
+ Set<Integer> nodesWithBufferedPartitions = new HashSet<>();
+
+ for (TopicPartition partition : buffered) {
+ if (!subscriptions.isAssigned(partition)) {
+ // It's possible that a partition with buffered data from a
previous request is now no longer
+ // assigned to the consumer, in which case just skip this
partition.
+ continue;
+ }
+
+ SubscriptionState.FetchPosition position =
subscriptions.position(partition);
Review Comment:
The thinking was that in that case, potentially sending a fetch request to a
node with buffered data was preferable to throwing an exception. But that may
not be the right call. I'll look at refactoring the logic.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java:
##########
@@ -408,7 +388,21 @@ protected Map<Node, FetchSessionHandler.FetchRequestData>
prepareFetchRequests()
long currentTimeMs = time.milliseconds();
Map<String, Uuid> topicIds = metadata.topicIds();
- for (TopicPartition partition : fetchablePartitions()) {
+ // This is the set of partitions that have buffered data
+ Set<TopicPartition> buffered =
Collections.unmodifiableSet(fetchBuffer.bufferedPartitions());
+
+ // This is the set of partitions that do not have buffered data
+ Set<TopicPartition> unbuffered =
Set.copyOf(subscriptions.fetchablePartitions(tp -> !buffered.contains(tp)));
+
+ if (unbuffered.isEmpty()) {
+ // If there are no partitions that don't already have data locally
buffered, there's no need to issue
+ // any fetch requests at the present time.
+ return Collections.emptyMap();
Review Comment:
Correct.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]