lianetm commented on code in PR #20521:
URL: https://github.com/apache/kafka/pull/20521#discussion_r2487328786


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -878,6 +874,105 @@ public ConsumerRecords<K, V> poll(final Duration timeout) 
{
         }
     }
 
+    /**
+     * {@code checkInflightPoll()} manages the lifetime of the {@link 
AsyncPollEvent} processing. If it is
+     * called when no event is currently processing, it will start a new event 
processing asynchronously. A check
+     * is made during each invocation to see if the <em>inflight</em> event 
has completed. If it has, it will be
+     * processed accordingly.
+     */
+    private void checkInflightPoll(Timer timer, boolean firstPass) {
+        if (firstPass && inflightPoll != null) {
+            // Handle the case where there's a remaining inflight poll from 
the *previous* invocation
+            // of AsyncKafkaConsumer.poll().
+            maybeClearPreviousInflightPoll();
+        }
+
+        boolean newlySubmittedEvent = false;
+
+        if (inflightPoll == null) {
+            inflightPoll = new AsyncPollEvent(calculateDeadlineMs(timer), 
time.milliseconds());
+            newlySubmittedEvent = true;
+            log.trace("Inflight event {} submitted", inflightPoll);
+            applicationEventHandler.add(inflightPoll);
+        }
+
+        try {
+            // Note: this is calling user-supplied code, so make sure that any 
errors thrown here are caught and
+            // the inflight event is cleared.
+            offsetCommitCallbackInvoker.executeCallbacks();
+            processBackgroundEvents();

Review Comment:
   this callback invocation here makes me notice that we are not updating the 
timer in this `checkInflightPoll`, should we?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -939,40 +901,75 @@ public void checkInflightPoll(Timer timer, boolean 
firstPass) {
             // the inflight event is cleared.
             offsetCommitCallbackInvoker.executeCallbacks();
             processBackgroundEvents();
+        } catch (Throwable t) {
+            // If an exception was thrown during execution of offset commit 
callbacks or background events,
+            // bubble it up to the user but make sure to clear out the 
inflight request because the error effectively
+            // renders it complete.
+            log.trace("Inflight event {} failed due to {}, clearing", 
inflightPoll, String.valueOf(t));
+            inflightPoll = null;
+            throw ConsumerUtils.maybeWrapAsKafkaException(t);
+        }
 
-            if (inflightPoll.isComplete()) {
-                Optional<KafkaException> errorOpt = inflightPoll.error();
+        if (inflightPoll != null) {
+            maybeClearCurrentInflightPoll(newlySubmittedEvent);
+        }
+    }
 
-                // The async poll event has completed, either successfully or 
not. In either case, clear out the
-                // inflight request.
-                log.trace("Inflight event {} completed, clearing", 
inflightPoll);
-                inflightPoll = null;
+    private void maybeClearPreviousInflightPoll() {
+        if (inflightPoll.isComplete()) {
+            Optional<KafkaException> errorOpt = inflightPoll.error();
 
-                if (errorOpt.isPresent()) {
-                    throw errorOpt.get();
-                }
-            } else if (!newlySubmittedEvent) {
-                if (time.milliseconds() >= inflightPoll.deadlineMs() && 
inflightPoll.isValidatePositionsComplete()) {
-                    // The inflight event inflight validated positions, but it 
has expired.
-                    log.trace("Inflight event {} expired without completing, 
clearing", inflightPoll);
+            if (errorOpt.isPresent()) {
+                // If the previous inflight event is complete, check if it 
resulted in an error. If there was
+                // an error, throw it without delay.
+                KafkaException error = errorOpt.get();
+                log.trace("Previous inflight event {} completed with an error 
({}), clearing", inflightPoll, error);
+                inflightPoll = null;
+                throw error;
+            } else {
+                // Successful case...
+                if (fetchBuffer.isEmpty()) {
+                    // If it completed without error, but without populating 
the fetch buffer, clear the event
+                    // so that a new event will be enqueued below.
+                    log.trace("Previous inflight event {} completed without 
filling the buffer, clearing", inflightPoll);
                     inflightPoll = null;
                 } else {
-                    if (log.isTraceEnabled()) {
-                        log.trace(
-                            "Inflight event {} is incomplete with {} remaining 
on timer",
-                            inflightPoll,
-                            timer.remainingMs()
-                        );
-                    }
+                    // However, if the event completed, and it populated the 
buffer, *don't* create a new event.
+                    // This is to prevent an edge case of starvation when 
poll() is called with a timeout of 0.
+                    // If a new event was created on *every* poll, each time 
the event would have to complete the
+                    // validate positions stage before the data in the fetch 
buffer is used. Because there is
+                    // no blocking, and effectively a 0 wait, the data in the 
fetch buffer is continuously ignored
+                    // leading to no data ever being returned from poll().
+                    log.trace("Previous inflight event {} completed and filled 
the buffer, not clearing", inflightPoll);
                 }
             }
-        } catch (Throwable t) {
-            // If an exception is hit in the offset commit callbacks, the 
background events, or the event result,
-            // bubble it up to the user but make sure to clear out the 
inflight request because the error effectively
-            // renders it complete.
-            log.trace("Inflight event {} failed due to {}, clearing", 
inflightPoll, String.valueOf(t));
+        } else if (inflightPoll.isExpired(time) && 
inflightPoll.isValidatePositionsComplete()) {
+            // The inflight event inflight validated positions, but it has 
expired.
+            log.trace("Previous inflight event {} expired without completing, 
clearing", inflightPoll);
             inflightPoll = null;
-            throw ConsumerUtils.maybeWrapAsKafkaException(t);
+        }
+    }
+
+    private void maybeClearCurrentInflightPoll(boolean newlySubmittedEvent) {
+        if (inflightPoll.isComplete()) {
+            Optional<KafkaException> errorOpt = inflightPoll.error();
+
+            if (errorOpt.isPresent()) {
+                // If the inflight event completed with an error, throw it 
without delay.
+                KafkaException error = errorOpt.get();
+                log.trace("Inflight event {} completed with an error ({}), 
clearing", inflightPoll, error);
+                inflightPoll = null;
+                throw error;
+            } else {
+                log.trace("Inflight event {} completed without error, 
clearing", inflightPoll);
+                inflightPoll = null;
+            }
+        } else if (!newlySubmittedEvent) {
+            if (inflightPoll.isExpired(time) && 
inflightPoll.isValidatePositionsComplete()) {
+                // The inflight event inflight validated positions, but it has 
expired.

Review Comment:
   ditto



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java:
##########
@@ -660,6 +662,71 @@ public void 
testStreamsOnAllTasksLostCallbackCompletedEventWithoutStreamsMembers
         }
     }
 
+    @Test
+    public void testUpdatePatternSubscriptionInvokedWhenMetadataUpdated() {
+        when(subscriptionState.hasPatternSubscription()).thenReturn(true);
+        
when(subscriptionState.matchesSubscribedPattern(any(String.class))).thenReturn(true);
+        when(metadata.updateVersion()).thenReturn(1, 2);
+        testUpdatePatternSubscription(times(1));
+    }
+
+    @Test
+    public void 
testUpdatePatternSubscriptionNotInvokedWhenNotUsingPatternSubscription() {
+        when(subscriptionState.hasPatternSubscription()).thenReturn(false);
+        when(metadata.updateVersion()).thenReturn(1, 2);
+        testUpdatePatternSubscription(never());
+    }
+
+    @Test
+    public void 
testUpdatePatternSubscriptionNotInvokedWhenMetadataNotUpdated() {

Review Comment:
   nice! 



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -939,40 +901,75 @@ public void checkInflightPoll(Timer timer, boolean 
firstPass) {
             // the inflight event is cleared.
             offsetCommitCallbackInvoker.executeCallbacks();
             processBackgroundEvents();
+        } catch (Throwable t) {
+            // If an exception was thrown during execution of offset commit 
callbacks or background events,
+            // bubble it up to the user but make sure to clear out the 
inflight request because the error effectively
+            // renders it complete.
+            log.trace("Inflight event {} failed due to {}, clearing", 
inflightPoll, String.valueOf(t));
+            inflightPoll = null;
+            throw ConsumerUtils.maybeWrapAsKafkaException(t);
+        }
 
-            if (inflightPoll.isComplete()) {
-                Optional<KafkaException> errorOpt = inflightPoll.error();
+        if (inflightPoll != null) {
+            maybeClearCurrentInflightPoll(newlySubmittedEvent);
+        }
+    }
 
-                // The async poll event has completed, either successfully or 
not. In either case, clear out the
-                // inflight request.
-                log.trace("Inflight event {} completed, clearing", 
inflightPoll);
-                inflightPoll = null;
+    private void maybeClearPreviousInflightPoll() {
+        if (inflightPoll.isComplete()) {
+            Optional<KafkaException> errorOpt = inflightPoll.error();
 
-                if (errorOpt.isPresent()) {
-                    throw errorOpt.get();
-                }
-            } else if (!newlySubmittedEvent) {
-                if (time.milliseconds() >= inflightPoll.deadlineMs() && 
inflightPoll.isValidatePositionsComplete()) {
-                    // The inflight event inflight validated positions, but it 
has expired.
-                    log.trace("Inflight event {} expired without completing, 
clearing", inflightPoll);
+            if (errorOpt.isPresent()) {
+                // If the previous inflight event is complete, check if it 
resulted in an error. If there was
+                // an error, throw it without delay.
+                KafkaException error = errorOpt.get();
+                log.trace("Previous inflight event {} completed with an error 
({}), clearing", inflightPoll, error);
+                inflightPoll = null;
+                throw error;
+            } else {
+                // Successful case...
+                if (fetchBuffer.isEmpty()) {
+                    // If it completed without error, but without populating 
the fetch buffer, clear the event
+                    // so that a new event will be enqueued below.
+                    log.trace("Previous inflight event {} completed without 
filling the buffer, clearing", inflightPoll);
                     inflightPoll = null;
                 } else {
-                    if (log.isTraceEnabled()) {
-                        log.trace(
-                            "Inflight event {} is incomplete with {} remaining 
on timer",
-                            inflightPoll,
-                            timer.remainingMs()
-                        );
-                    }
+                    // However, if the event completed, and it populated the 
buffer, *don't* create a new event.
+                    // This is to prevent an edge case of starvation when 
poll() is called with a timeout of 0.
+                    // If a new event was created on *every* poll, each time 
the event would have to complete the
+                    // validate positions stage before the data in the fetch 
buffer is used. Because there is
+                    // no blocking, and effectively a 0 wait, the data in the 
fetch buffer is continuously ignored
+                    // leading to no data ever being returned from poll().
+                    log.trace("Previous inflight event {} completed and filled 
the buffer, not clearing", inflightPoll);
                 }
             }
-        } catch (Throwable t) {
-            // If an exception is hit in the offset commit callbacks, the 
background events, or the event result,
-            // bubble it up to the user but make sure to clear out the 
inflight request because the error effectively
-            // renders it complete.
-            log.trace("Inflight event {} failed due to {}, clearing", 
inflightPoll, String.valueOf(t));
+        } else if (inflightPoll.isExpired(time) && 
inflightPoll.isValidatePositionsComplete()) {
+            // The inflight event inflight validated positions, but it has 
expired.

Review Comment:
   inflight..inflight?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java:
##########
@@ -660,6 +662,71 @@ public void 
testStreamsOnAllTasksLostCallbackCompletedEventWithoutStreamsMembers
         }
     }
 
+    @Test
+    public void testUpdatePatternSubscriptionInvokedWhenMetadataUpdated() {
+        when(subscriptionState.hasPatternSubscription()).thenReturn(true);
+        
when(subscriptionState.matchesSubscribedPattern(any(String.class))).thenReturn(true);
+        when(metadata.updateVersion()).thenReturn(1, 2);
+        testUpdatePatternSubscription(times(1));
+    }
+
+    @Test
+    public void 
testUpdatePatternSubscriptionNotInvokedWhenNotUsingPatternSubscription() {
+        when(subscriptionState.hasPatternSubscription()).thenReturn(false);
+        when(metadata.updateVersion()).thenReturn(1, 2);
+        testUpdatePatternSubscription(never());
+    }
+
+    @Test
+    public void 
testUpdatePatternSubscriptionNotInvokedWhenMetadataNotUpdated() {
+        when(subscriptionState.hasPatternSubscription()).thenReturn(true);
+        
when(subscriptionState.matchesSubscribedPattern(any(String.class))).thenReturn(true);
+        when(metadata.updateVersion()).thenReturn(1, 1);
+        testUpdatePatternSubscription(never());
+    }
+
+    private void testUpdatePatternSubscription(VerificationMode 
verificationMode) {
+        String topic = "test-topic";
+        Cluster cluster = mock(Cluster.class);
+
+        when(metadata.fetch()).thenReturn(cluster);
+        when(cluster.topics()).thenReturn(Set.of(topic));
+
+        
when(heartbeatRequestManager.membershipManager()).thenReturn(membershipManager);
+        
when(offsetsRequestManager.updateFetchPositions(anyLong())).thenReturn(CompletableFuture.completedFuture(true));
+
+        setupProcessor(true);
+        processor.process(new AsyncPollEvent(110, 100));
+        verify(membershipManager, verificationMode).onSubscriptionUpdated();

Review Comment:
   this is good, but I would suggest we also verify that we never called 
`subscriptionState.matchesSubscribedPattern` (that is truly the expensive call 
we want to make sure is not happening if metadata did not change..just to catch 
regressions in the future)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to