lianetm commented on code in PR #20521:
URL: https://github.com/apache/kafka/pull/20521#discussion_r2411213981


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##########
@@ -368,18 +372,27 @@ void cleanup() {
     /**
      * If there is a metadata error, complete all uncompleted events that 
require subscription metadata.
      */
-    private void maybeFailOnMetadataError(List<CompletableEvent<?>> events) {
-        List<CompletableApplicationEvent<?>> subscriptionMetadataEvent = new 
ArrayList<>();
+    private boolean maybeFailOnMetadataError(List<?> events) {
+        List<MetadataErrorNotifiableEvent> filteredEvents = new ArrayList<>();
 
-        for (CompletableEvent<?> ce : events) {
-            if (ce instanceof CompletableApplicationEvent && 
((CompletableApplicationEvent<?>) ce).requireSubscriptionMetadata())
-                subscriptionMetadataEvent.add((CompletableApplicationEvent<?>) 
ce);
+        for (Object obj : events) {
+            if (obj instanceof MetadataErrorNotifiableEvent) {
+                filteredEvents.add((MetadataErrorNotifiableEvent) obj);
+            }
         }
 
-        if (subscriptionMetadataEvent.isEmpty())
-            return;
-        
networkClientDelegate.getAndClearMetadataError().ifPresent(metadataError ->
-                subscriptionMetadataEvent.forEach(event -> 
event.future().completeExceptionally(metadataError))
-        );
+        // Don't get-and-clear the metadata error if there are no events that 
will be notified.
+        if (filteredEvents.isEmpty())
+            return false;
+
+        Optional<Exception> andClearMetadataError = 
networkClientDelegate.getAndClearMetadataError();

Review Comment:
   ```suggestion
           Optional<Exception> metadataError = 
networkClientDelegate.getAndClearMetadataError();
   ```
   ?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1771,15 +1827,9 @@ private Fetch<K, V> pollForFetches(Timer timer) {
             return fetch;
         }
 
-        // send any new fetches (won't resend pending fetches)
-        sendFetches(timer);
-
         // We do not want to be stuck blocking in poll if we are missing some 
positions
         // since the offset lookup may be backing off after a failure
-
-        // NOTE: the use of cachedSubscriptionHasAllFetchPositions means we 
MUST call
-        // updateAssignmentMetadataIfNeeded before this method.
-        if (!cachedSubscriptionHasAllFetchPositions && pollTimeout > 
retryBackoffMs) {
+        if (pollTimeout > retryBackoffMs) {

Review Comment:
   I'm afraid this is not really what we need to do. Here, the intention was to 
not block on poll for the whole pollTimeout if we had missing positions. So to 
keep this logic, we really need to know if there are missing positions or not, 
I don't see any other choice really.
   
   So first, no concern about caching anything because that was only to avoid 
iterating the assignment twice in the poll loop, but now that our poll moved to 
the background we iterate once there (so ok to have a single iteration here). 
Secondly, I guess we should avoid using the `subscription.hasAllFetchPositions` 
here in the app thread because the assignment it iterates could change in the 
background. 
   
   One option could be to check `hasAllFetchPositions` here in the app thread 
based on the subscriptions.assignedPartitions maybe? (get 
subscription.assignment, that's a copy, and iterate that in the app thread). 
Would that work? ...still thinking about this one



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to