kirktrue commented on code in PR #20521:
URL: https://github.com/apache/kafka/pull/20521#discussion_r2487421359


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -939,40 +901,75 @@ public void checkInflightPoll(Timer timer, boolean 
firstPass) {
             // the inflight event is cleared.
             offsetCommitCallbackInvoker.executeCallbacks();
             processBackgroundEvents();
+        } catch (Throwable t) {
+            // If an exception was thrown during execution of offset commit 
callbacks or background events,
+            // bubble it up to the user but make sure to clear out the 
inflight request because the error effectively
+            // renders it complete.
+            log.trace("Inflight event {} failed due to {}, clearing", 
inflightPoll, String.valueOf(t));
+            inflightPoll = null;
+            throw ConsumerUtils.maybeWrapAsKafkaException(t);
+        }
 
-            if (inflightPoll.isComplete()) {
-                Optional<KafkaException> errorOpt = inflightPoll.error();
+        if (inflightPoll != null) {
+            maybeClearCurrentInflightPoll(newlySubmittedEvent);
+        }
+    }
 
-                // The async poll event has completed, either successfully or 
not. In either case, clear out the
-                // inflight request.
-                log.trace("Inflight event {} completed, clearing", 
inflightPoll);
-                inflightPoll = null;
+    private void maybeClearPreviousInflightPoll() {
+        if (inflightPoll.isComplete()) {
+            Optional<KafkaException> errorOpt = inflightPoll.error();
 
-                if (errorOpt.isPresent()) {
-                    throw errorOpt.get();
-                }
-            } else if (!newlySubmittedEvent) {
-                if (time.milliseconds() >= inflightPoll.deadlineMs() && 
inflightPoll.isValidatePositionsComplete()) {
-                    // The inflight event inflight validated positions, but it 
has expired.
-                    log.trace("Inflight event {} expired without completing, 
clearing", inflightPoll);
+            if (errorOpt.isPresent()) {
+                // If the previous inflight event is complete, check if it 
resulted in an error. If there was
+                // an error, throw it without delay.
+                KafkaException error = errorOpt.get();
+                log.trace("Previous inflight event {} completed with an error 
({}), clearing", inflightPoll, error);
+                inflightPoll = null;
+                throw error;
+            } else {
+                // Successful case...
+                if (fetchBuffer.isEmpty()) {
+                    // If it completed without error, but without populating 
the fetch buffer, clear the event
+                    // so that a new event will be enqueued below.
+                    log.trace("Previous inflight event {} completed without 
filling the buffer, clearing", inflightPoll);
                     inflightPoll = null;
                 } else {
-                    if (log.isTraceEnabled()) {
-                        log.trace(
-                            "Inflight event {} is incomplete with {} remaining 
on timer",
-                            inflightPoll,
-                            timer.remainingMs()
-                        );
-                    }
+                    // However, if the event completed, and it populated the 
buffer, *don't* create a new event.
+                    // This is to prevent an edge case of starvation when 
poll() is called with a timeout of 0.
+                    // If a new event was created on *every* poll, each time 
the event would have to complete the
+                    // validate positions stage before the data in the fetch 
buffer is used. Because there is
+                    // no blocking, and effectively a 0 wait, the data in the 
fetch buffer is continuously ignored
+                    // leading to no data ever being returned from poll().
+                    log.trace("Previous inflight event {} completed and filled 
the buffer, not clearing", inflightPoll);
                 }
             }
-        } catch (Throwable t) {
-            // If an exception is hit in the offset commit callbacks, the 
background events, or the event result,
-            // bubble it up to the user but make sure to clear out the 
inflight request because the error effectively
-            // renders it complete.
-            log.trace("Inflight event {} failed due to {}, clearing", 
inflightPoll, String.valueOf(t));
+        } else if (inflightPoll.isExpired(time) && 
inflightPoll.isValidatePositionsComplete()) {
+            // The inflight event inflight validated positions, but it has 
expired.
+            log.trace("Previous inflight event {} expired without completing, 
clearing", inflightPoll);
             inflightPoll = null;
-            throw ConsumerUtils.maybeWrapAsKafkaException(t);
+        }
+    }
+
+    private void maybeClearCurrentInflightPoll(boolean newlySubmittedEvent) {
+        if (inflightPoll.isComplete()) {
+            Optional<KafkaException> errorOpt = inflightPoll.error();
+
+            if (errorOpt.isPresent()) {
+                // If the inflight event completed with an error, throw it 
without delay.
+                KafkaException error = errorOpt.get();
+                log.trace("Inflight event {} completed with an error ({}), 
clearing", inflightPoll, error);
+                inflightPoll = null;
+                throw error;
+            } else {
+                log.trace("Inflight event {} completed without error, 
clearing", inflightPoll);
+                inflightPoll = null;
+            }
+        } else if (!newlySubmittedEvent) {
+            if (inflightPoll.isExpired(time) && 
inflightPoll.isValidatePositionsComplete()) {
+                // The inflight event inflight validated positions, but it has 
expired.

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to