lianetm commented on code in PR #20521:
URL: https://github.com/apache/kafka/pull/20521#discussion_r2413578798
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1771,15 +1827,9 @@ private Fetch<K, V> pollForFetches(Timer timer) {
return fetch;
}
- // send any new fetches (won't resend pending fetches)
- sendFetches(timer);
-
// We do not want to be stuck blocking in poll if we are missing some
positions
// since the offset lookup may be backing off after a failure
-
- // NOTE: the use of cachedSubscriptionHasAllFetchPositions means we
MUST call
- // updateAssignmentMetadataIfNeeded before this method.
- if (!cachedSubscriptionHasAllFetchPositions && pollTimeout >
retryBackoffMs) {
+ if (pollTimeout > retryBackoffMs) {
Review Comment:
my concern with `hasAllFetchPositions` is that even though it's
`synchronized`, it's at the func level, but not on the `assignment`
collection,. So I expect we could still get a race where the hasAll is called
from the app thread here only, but in the background the assignment collection
changes (in the end it's a simple LinkedhashMap used with an iterator, so we
could get ConcurrentModificationException I expect)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]