kirktrue commented on code in PR #14640: URL: https://github.com/apache/kafka/pull/14640#discussion_r1414627626
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -1376,6 +1425,66 @@ private void subscribeInternal(Collection<String> topics, Optional<ConsumerRebal } } + /** + * This method can be used by cases where the caller has an event that needs to both block for completion but + * also process background events. For some events, in order to fully process the associated logic, the + * {@link ConsumerNetworkThread background thread} needs assistance from the application thread to complete. + * If the application thread simply blocked on the event after submitting it, the processing would deadlock. + * The logic herein is basically a loop that performs two tasks in each iteration: + * + * <ol> + * <li>Process background events, if any</li> + * <li><em>Briefly</em> wait for {@link CompletableApplicationEvent an event} to complete</li> + * </ol> + * + * <p/> + * + * Each iteration gives the application thread an opportunity to process background events, which may be + * necessary to complete the overall processing. + * + * <p/> + * + * As an example, take {@link #unsubscribe()}. To start unsubscribing, the application thread enqueues an + * {@link UnsubscribeApplicationEvent} on the application event queue. That event will eventually trigger the + * rebalancing logic in the background thread. Critically, as part of this rebalancing work, the + * {@link ConsumerRebalanceListener#onPartitionsRevoked(Collection)} callback needs to be invoked. However, + * this callback must be executed on the application thread. To achieve this, the background thread enqueues a + * {@link ConsumerRebalanceListenerCallbackNeededEvent} on its background event queue. That event queue is + * periodically queried by the application thread to see if there's work to be done. When the application thread + * sees {@link ConsumerRebalanceListenerCallbackNeededEvent}, it is processed, and then a + * {@link ConsumerRebalanceListenerCallbackCompletedEvent} is then enqueued by the application thread on the + * background event queue. Moments later, the background thread will see that event, process it, and continue + * execution of the rebalancing logic. The rebalancing logic cannot complete until the + * {@link ConsumerRebalanceListener} callback is performed. + * + * @param event Event that contains a {@link CompletableFuture}; it is on this future that the application thread + * will wait for completion + * @param timer Overall timer that bounds how long the application thread will wait for the event to complete + * @return {@code true} if the event completed within the timeout, {@code false} otherwise + */ + private boolean processBackgroundEvents(CompletableApplicationEvent<?> event, Timer timer) { + log.trace("Enqueuing event {} for processing; will wait up to {} ms to complete", event, timer.remainingMs()); + + do { + backgroundEventProcessor.process(); + + try { + Timer pollInterval = time.timer(100L); + log.trace("Waiting {} ms for event {} to complete", event, pollInterval.remainingMs()); + ConsumerUtils.getResult(event.future(), pollInterval); + log.trace("Event {} completed successfully", event); + return true; + } catch (TimeoutException e) { + // Ignore this as we will retry the event until the timeout expires. + } finally { + timer.update(time.milliseconds()); Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org