lianetm commented on code in PR #20521:
URL: https://github.com/apache/kafka/pull/20521#discussion_r2407233098
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -876,6 +871,67 @@ public ConsumerRecords<K, V> poll(final Duration timeout) {
}
}
+ /**
+ * {@code checkInflightPollResult()} manages the lifetime of the {@link
AsyncPollEvent} processing. If it is
+ * called when no event is currently processing, it will start a new event
processing asynchronously. A check
+ * is made during each invocation to see if the <em>inflight</em> event
has reached a
+ * {@link AsyncPollEvent.State terminal state}. If it has, the result will
be processed accordingly.
+ */
+ public void checkInflightPollResult(Timer timer) {
+ if (inflightPoll == null) {
+ log.trace("No existing inflight async poll event, submitting a new
event");
+ submitEvent(timer);
+ }
+
+ try {
+ // Note: this is calling user-supplied code, so make sure to
handle possible errors.
+ offsetCommitCallbackInvoker.executeCallbacks();
+ processBackgroundEvents();
+
+ if (log.isTraceEnabled()) {
+ log.trace(
+ "Attempting to retrieve result from previously submitted
{} with {} remaining on timer",
+ inflightPoll,
+ timer.remainingMs()
+ );
+ }
+
+ // Result should be non-null and starts off as State.STARTED.
+ AsyncPollEvent.Result result = inflightPoll.result();
+ AsyncPollEvent.State state = result.state();
+
+ if (state == AsyncPollEvent.State.SUCCEEDED) {
+ // The async poll event has completed all the requisite
stages, though it does not imply that
+ // there is data in the FetchBuffer yet. Make sure to clear
out the inflight request.
+ log.trace("Event {} completed, clearing inflight",
inflightPoll);
+ inflightPoll = null;
+ } else if (state == AsyncPollEvent.State.FAILED) {
Review Comment:
Having these `states` (started, ok, error) and the `result` (ok, error) seem
somehow redundant to me.
First, wonder if the "started" is really needed, given that we always clear
the inflight anyways?
And then both (state and result) simply represent if the `AsyncPollEvent`
had an error or not, so wouldn't a one be enough? What about simply having the
AsyncPollEvent with a result, that has an Optional error?
In the end we just want to clear the inflight (no matter the result/state),
and throw if result.error.isPresent
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -876,6 +871,67 @@ public ConsumerRecords<K, V> poll(final Duration timeout) {
}
}
+ /**
+ * {@code checkInflightPollResult()} manages the lifetime of the {@link
AsyncPollEvent} processing. If it is
+ * called when no event is currently processing, it will start a new event
processing asynchronously. A check
+ * is made during each invocation to see if the <em>inflight</em> event
has reached a
+ * {@link AsyncPollEvent.State terminal state}. If it has, the result will
be processed accordingly.
+ */
+ public void checkInflightPollResult(Timer timer) {
+ if (inflightPoll == null) {
+ log.trace("No existing inflight async poll event, submitting a new
event");
+ submitEvent(timer);
+ }
+
+ try {
+ // Note: this is calling user-supplied code, so make sure to
handle possible errors.
+ offsetCommitCallbackInvoker.executeCallbacks();
+ processBackgroundEvents();
+
+ if (log.isTraceEnabled()) {
+ log.trace(
+ "Attempting to retrieve result from previously submitted
{} with {} remaining on timer",
+ inflightPoll,
+ timer.remainingMs()
+ );
+ }
+
+ // Result should be non-null and starts off as State.STARTED.
+ AsyncPollEvent.Result result = inflightPoll.result();
+ AsyncPollEvent.State state = result.state();
+
+ if (state == AsyncPollEvent.State.SUCCEEDED) {
+ // The async poll event has completed all the requisite
stages, though it does not imply that
+ // there is data in the FetchBuffer yet. Make sure to clear
out the inflight request.
+ log.trace("Event {} completed, clearing inflight",
inflightPoll);
+ inflightPoll = null;
+ } else if (state == AsyncPollEvent.State.FAILED) {
+ // The async poll failed at one of the stages. Make sure to
clear out the inflight request
+ // before the underlying error is surfaced to the user.
+ log.trace("Event {} failed, clearing inflight", inflightPoll);
+ inflightPoll = null;
+
+ throw result.error();
+ }
+ } catch (Throwable t) {
+ // If an exception is hit, bubble it up to the user but make sure
to clear out the inflight request
+ // because the error effectively renders it complete.
+ log.debug("Event {} failed due to {}, clearing inflight",
inflightPoll, String.valueOf(t));
+ inflightPoll = null;
+ throw ConsumerUtils.maybeWrapAsKafkaException(t);
+ }
+ }
+
+ private void submitEvent(Timer timer) {
Review Comment:
this is really specific to AsyncPoll, maybe `submitAsyncPollEvent`?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -726,6 +730,96 @@ private void process(final
StreamsOnAllTasksLostCallbackCompletedEvent event) {
requestManagers.streamsMembershipManager.get().onAllTasksLostCallbackCompleted(event);
}
+ private void process(final AsyncPollEvent event) {
+ log.trace("Processing poll logic for {}", event);
+
+ // Trigger a reconciliation that can safely commit offsets if needed
to rebalance,
+ // as we're processing before any new fetching starts
+
requestManagers.consumerMembershipManager.ifPresent(consumerMembershipManager ->
+ consumerMembershipManager.maybeReconcile(true));
+
+ if (requestManagers.commitRequestManager.isPresent()) {
+ CommitRequestManager commitRequestManager =
requestManagers.commitRequestManager.get();
+ commitRequestManager.updateTimerAndMaybeCommit(event.pollTimeMs());
+
+ requestManagers.consumerHeartbeatRequestManager.ifPresent(hrm -> {
+ hrm.membershipManager().onConsumerPoll();
+ hrm.resetPollTimer(event.pollTimeMs());
+ });
+ requestManagers.streamsGroupHeartbeatRequestManager.ifPresent(hrm
-> {
+ hrm.membershipManager().onConsumerPoll();
+ hrm.resetPollTimer(event.pollTimeMs());
+ });
+ }
+
+ log.trace("Processing check and update positions logic for {}", event);
+ CompletableFuture<Boolean> updatePositionsFuture =
requestManagers.offsetsRequestManager.updateFetchPositions(event.deadlineMs());
+
+ // To maintain the flow from ClassicKafkaConsumer, the
check-and-update-positions logic should be allowed
+ // to time out before moving on to the logic for sending fetch
requests. This is achieved by using the event
+ // reaper and allowing it to expire the check-and-update-positions
future.
+ applicationEventReaper.ifPresent(reaper -> {
+ CompletableEvent<Boolean> pseudoEvent = new CompletableEvent<>() {
+ @Override
+ public CompletableFuture<Boolean> future() {
+ return updatePositionsFuture;
+ }
+
+ @Override
+ public long deadlineMs() {
+ return event.deadlineMs();
+ }
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName() +
"{updatePositionsFuture=" + updatePositionsFuture + ", deadlineMs=" +
event.deadlineMs() + '}';
+ }
+ };
+
+ reaper.add(pseudoEvent);
+ });
Review Comment:
Should we maybe extract this? This section is to reapUpdatePositionsFuture
or something like it I expect.
Still, I'm trying to fully understand why we need the reaper to expire the
`updatePositionsFuture` returned from
`requestManagers.offsetsRequestManager.updateFetchPositions(event.deadlineMs());`
?
Before this PR, the updateFetchPositions future was only completed/cancelled
within the `updateFetchPositions` func, right? And we used to have a
`CheckAndUpdatePositions` event (that was reaped), but that didn't have any
effect on the future of the updateFetchPositions right? (or did it and I'm not
seeing it?)
(the relationship was more the other way around: when the
updatePositionsFuture completed, we would complete the CheckAndUpdatePositions
event)
https://github.com/apache/kafka/blob/f68a149a184e264ea709d10de54aec8fdbcf096b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java#L461
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]