kirktrue commented on code in PR #17700:
URL: https://github.com/apache/kafka/pull/17700#discussion_r1931050990
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java:
##########
@@ -408,22 +407,44 @@ protected Map<Node, FetchSessionHandler.FetchRequestData>
prepareFetchRequests()
long currentTimeMs = time.milliseconds();
Map<String, Uuid> topicIds = metadata.topicIds();
- for (TopicPartition partition : fetchablePartitions()) {
- SubscriptionState.FetchPosition position =
subscriptions.position(partition);
+ // This is the set of partitions that have buffered data
+ Set<TopicPartition> buffered =
Collections.unmodifiableSet(fetchBuffer.bufferedPartitions());
- if (position == null)
- throw new IllegalStateException("Missing position for
fetchable partition " + partition);
+ // This is the set of partitions that do not have buffered data
+ Set<TopicPartition> unbuffered = fetchablePartitions(buffered);
- Optional<Node> leaderOpt = position.currentLeader.leader;
+ if (unbuffered.isEmpty()) {
+ // If there are no partitions that don't already have data locally
buffered, there's no need to issue
+ // any fetch requests at the present time.
+ return Collections.emptyMap();
+ }
- if (leaderOpt.isEmpty()) {
- log.debug("Requesting metadata update for partition {} since
the position {} is missing the current leader node", partition, position);
- metadata.requestUpdate(false);
+ Set<Integer> bufferedNodes = new HashSet<>();
+
+ for (TopicPartition partition : buffered) {
+ // It's possible that at the time of the fetcher creating new
fetch requests, a partition with buffered
+ // data from a *previous* request is no longer assigned. So before
attempting to retrieve the node
+ // information, check that the partition is still assigned as
calling SubscriptionState.position() on an
+ // unassigned partition will throw an IllegalStateException.
+ //
+ // Note: this check is not needed for the unbuffered partitions as
the logic in
+ // SubscriptionState.fetchablePartitions() only includes
partitions currently assigned.
+ if (!subscriptions.isAssigned(partition))
Review Comment:
> An assigned partition doesn't necessarily have a valid position. So, we
need to do a stricter check here.
I've updated the logic to use `hasValidPosition()` which includes the logic
from `isAssigned()` but also includes the logic from
`TopicPartitionState`/`FetchState`/`FetchStates` related to valid partitions.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]