lianetm commented on code in PR #20521:
URL: https://github.com/apache/kafka/pull/20521#discussion_r2411213981
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##########
@@ -368,18 +372,27 @@ void cleanup() {
/**
* If there is a metadata error, complete all uncompleted events that
require subscription metadata.
*/
- private void maybeFailOnMetadataError(List<CompletableEvent<?>> events) {
- List<CompletableApplicationEvent<?>> subscriptionMetadataEvent = new
ArrayList<>();
+ private boolean maybeFailOnMetadataError(List<?> events) {
+ List<MetadataErrorNotifiableEvent> filteredEvents = new ArrayList<>();
- for (CompletableEvent<?> ce : events) {
- if (ce instanceof CompletableApplicationEvent &&
((CompletableApplicationEvent<?>) ce).requireSubscriptionMetadata())
- subscriptionMetadataEvent.add((CompletableApplicationEvent<?>)
ce);
+ for (Object obj : events) {
+ if (obj instanceof MetadataErrorNotifiableEvent) {
+ filteredEvents.add((MetadataErrorNotifiableEvent) obj);
+ }
}
- if (subscriptionMetadataEvent.isEmpty())
- return;
-
networkClientDelegate.getAndClearMetadataError().ifPresent(metadataError ->
- subscriptionMetadataEvent.forEach(event ->
event.future().completeExceptionally(metadataError))
- );
+ // Don't get-and-clear the metadata error if there are no events that
will be notified.
+ if (filteredEvents.isEmpty())
+ return false;
+
+ Optional<Exception> andClearMetadataError =
networkClientDelegate.getAndClearMetadataError();
Review Comment:
```suggestion
Optional<Exception> metadataError =
networkClientDelegate.getAndClearMetadataError();
```
?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1771,15 +1827,9 @@ private Fetch<K, V> pollForFetches(Timer timer) {
return fetch;
}
- // send any new fetches (won't resend pending fetches)
- sendFetches(timer);
-
// We do not want to be stuck blocking in poll if we are missing some
positions
// since the offset lookup may be backing off after a failure
-
- // NOTE: the use of cachedSubscriptionHasAllFetchPositions means we
MUST call
- // updateAssignmentMetadataIfNeeded before this method.
- if (!cachedSubscriptionHasAllFetchPositions && pollTimeout >
retryBackoffMs) {
+ if (pollTimeout > retryBackoffMs) {
Review Comment:
I'm afraid this is not really what we need to do. Here, the intention was to
not block on poll for the whole pollTimeout if we had missing positions. So to
keep this logic, we really need to know if there are missing positions or not,
I don't see any other choice really.
So first, no concern about caching anything because that was only to avoid
iterating the assignment twice in the poll loop, but now that our poll moved to
the background we iterate once there (so ok to have a single iteration here).
Secondly, I guess we should avoid using the `subscription.hasAllFetchPositions`
here in the app thread because the assignment it iterates could change in the
background.
One option could be to check `hasAllFetchPositions` here in the app thread
based on the subscriptions.assignedPartitions maybe? (get
subscription.assignment, that's a copy, and iterate that in the app thread).
Would that work? ...still thinking about this one
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]