kirktrue commented on code in PR #20521:
URL: https://github.com/apache/kafka/pull/20521#discussion_r2487422023


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -878,6 +874,105 @@ public ConsumerRecords<K, V> poll(final Duration timeout) 
{
         }
     }
 
+    /**
+     * {@code checkInflightPoll()} manages the lifetime of the {@link 
AsyncPollEvent} processing. If it is
+     * called when no event is currently processing, it will start a new event 
processing asynchronously. A check
+     * is made during each invocation to see if the <em>inflight</em> event 
has completed. If it has, it will be
+     * processed accordingly.
+     */
+    private void checkInflightPoll(Timer timer, boolean firstPass) {
+        if (firstPass && inflightPoll != null) {
+            // Handle the case where there's a remaining inflight poll from 
the *previous* invocation
+            // of AsyncKafkaConsumer.poll().
+            maybeClearPreviousInflightPoll();
+        }
+
+        boolean newlySubmittedEvent = false;
+
+        if (inflightPoll == null) {
+            inflightPoll = new AsyncPollEvent(calculateDeadlineMs(timer), 
time.milliseconds());
+            newlySubmittedEvent = true;
+            log.trace("Inflight event {} submitted", inflightPoll);
+            applicationEventHandler.add(inflightPoll);
+        }
+
+        try {
+            // Note: this is calling user-supplied code, so make sure that any 
errors thrown here are caught and
+            // the inflight event is cleared.
+            offsetCommitCallbackInvoker.executeCallbacks();
+            processBackgroundEvents();

Review Comment:
   Looking into this now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to