kirktrue commented on code in PR #20521:
URL: https://github.com/apache/kafka/pull/20521#discussion_r2407859643


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -876,6 +871,67 @@ public ConsumerRecords<K, V> poll(final Duration timeout) {
         }
     }
 
+    /**
+     * {@code checkInflightPollResult()} manages the lifetime of the {@link 
AsyncPollEvent} processing. If it is
+     * called when no event is currently processing, it will start a new event 
processing asynchronously. A check
+     * is made during each invocation to see if the <em>inflight</em> event 
has reached a
+     * {@link AsyncPollEvent.State terminal state}. If it has, the result will 
be processed accordingly.
+     */
+    public void checkInflightPollResult(Timer timer) {
+        if (inflightPoll == null) {
+            log.trace("No existing inflight async poll event, submitting a new 
event");
+            submitEvent(timer);
+        }
+
+        try {
+            // Note: this is calling user-supplied code, so make sure to 
handle possible errors.
+            offsetCommitCallbackInvoker.executeCallbacks();
+            processBackgroundEvents();
+
+            if (log.isTraceEnabled()) {
+                log.trace(
+                    "Attempting to retrieve result from previously submitted 
{} with {} remaining on timer",
+                    inflightPoll,
+                    timer.remainingMs()
+                );
+            }
+
+            // Result should be non-null and starts off as State.STARTED.
+            AsyncPollEvent.Result result = inflightPoll.result();
+            AsyncPollEvent.State state = result.state();
+
+            if (state == AsyncPollEvent.State.SUCCEEDED) {
+                // The async poll event has completed all the requisite 
stages, though it does not imply that
+                // there is data in the FetchBuffer yet. Make sure to clear 
out the inflight request.
+                log.trace("Event {} completed, clearing inflight", 
inflightPoll);
+                inflightPoll = null;
+            } else if (state == AsyncPollEvent.State.FAILED) {
+                // The async poll failed at one of the stages. Make sure to 
clear out the inflight request
+                // before the underlying error is surfaced to the user.
+                log.trace("Event {} failed, clearing inflight", inflightPoll);
+                inflightPoll = null;
+
+                throw result.error();
+            }
+        } catch (Throwable t) {
+            // If an exception is hit, bubble it up to the user but make sure 
to clear out the inflight request
+            // because the error effectively renders it complete.
+            log.debug("Event {} failed due to {}, clearing inflight", 
inflightPoll, String.valueOf(t));
+            inflightPoll = null;
+            throw ConsumerUtils.maybeWrapAsKafkaException(t);
+        }
+    }
+
+    private void submitEvent(Timer timer) {

Review Comment:
   I've moved the code inline in checkInflightPollResult(). PTAL. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to