kirktrue commented on code in PR #20521:
URL: https://github.com/apache/kafka/pull/20521#discussion_r2487422023
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -878,6 +874,105 @@ public ConsumerRecords<K, V> poll(final Duration timeout)
{
}
}
+ /**
+ * {@code checkInflightPoll()} manages the lifetime of the {@link
AsyncPollEvent} processing. If it is
+ * called when no event is currently processing, it will start a new event
processing asynchronously. A check
+ * is made during each invocation to see if the <em>inflight</em> event
has completed. If it has, it will be
+ * processed accordingly.
+ */
+ private void checkInflightPoll(Timer timer, boolean firstPass) {
+ if (firstPass && inflightPoll != null) {
+ // Handle the case where there's a remaining inflight poll from
the *previous* invocation
+ // of AsyncKafkaConsumer.poll().
+ maybeClearPreviousInflightPoll();
+ }
+
+ boolean newlySubmittedEvent = false;
+
+ if (inflightPoll == null) {
+ inflightPoll = new AsyncPollEvent(calculateDeadlineMs(timer),
time.milliseconds());
+ newlySubmittedEvent = true;
+ log.trace("Inflight event {} submitted", inflightPoll);
+ applicationEventHandler.add(inflightPoll);
+ }
+
+ try {
+ // Note: this is calling user-supplied code, so make sure that any
errors thrown here are caught and
+ // the inflight event is cleared.
+ offsetCommitCallbackInvoker.executeCallbacks();
+ processBackgroundEvents();
Review Comment:
Looking into this now.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]