kirktrue commented on code in PR #14640:
URL: https://github.com/apache/kafka/pull/14640#discussion_r1423064727


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1499,6 +1575,149 @@ private void subscribeInternal(Collection<String> 
topics, Optional<ConsumerRebal
         }
     }
 
+    /**
+     * This method can be used by cases where the caller has an event that 
needs to both block for completion but
+     * also process background events. For some events, in order to fully 
process the associated logic, the
+     * {@link ConsumerNetworkThread background thread} needs assistance from 
the application thread to complete.
+     * If the application thread simply blocked on the event after submitting 
it, the processing would deadlock.
+     * The logic herein is basically a loop that performs two tasks in each 
iteration:
+     *
+     * <ol>
+     *     <li>Process background events, if any</li>
+     *     <li><em>Briefly</em> wait for {@link CompletableApplicationEvent an 
event} to complete</li>
+     * </ol>
+     *
+     * <p/>
+     *
+     * Each iteration gives the application thread an opportunity to process 
background events, which may be
+     * necessary to complete the overall processing.
+     *
+     * <p/>
+     *
+     * As an example, take {@link #unsubscribe()}. To start unsubscribing, the 
application thread enqueues an
+     * {@link UnsubscribeApplicationEvent} on the application event queue. 
That event will eventually trigger the
+     * rebalancing logic in the background thread. Critically, as part of this 
rebalancing work, the
+     * {@link ConsumerRebalanceListener#onPartitionsRevoked(Collection)} 
callback needs to be invoked. However,
+     * this callback must be executed on the application thread. To achieve 
this, the background thread enqueues a
+     * {@link ConsumerRebalanceListenerCallbackNeededEvent} on its background 
event queue. That event queue is
+     * periodically queried by the application thread to see if there's work 
to be done. When the application thread
+     * sees {@link ConsumerRebalanceListenerCallbackNeededEvent}, it is 
processed, and then a
+     * {@link ConsumerRebalanceListenerCallbackCompletedEvent} is then 
enqueued by the application thread on the
+     * background event queue. Moments later, the background thread will see 
that event, process it, and continue
+     * execution of the rebalancing logic. The rebalancing logic cannot 
complete until the
+     * {@link ConsumerRebalanceListener} callback is performed.
+     *
+     * @param eventProcessor Event processor that contains the queue of events 
to process
+     * @param future         Event that contains a {@link CompletableFuture}; 
it is on this future that the
+     *                       application thread will wait for completion
+     * @param timer          Overall timer that bounds how long to wait for 
the event to complete
+     * @return {@code true} if the event completed within the timeout, {@code 
false} otherwise
+     */
+    // Visible for testing
+    <T> T processBackgroundEvents(EventProcessor<?> eventProcessor,
+                                  Future<T> future,
+                                  Timer timer) {
+        log.trace("Will wait up to {} ms for future {} to complete", 
timer.remainingMs(), future);
+
+        do {
+            boolean hadEvents = eventProcessor.process();
+
+            try {
+                if (future.isDone()) {
+                    // If the event is done (either successfully or 
otherwise), go ahead and attempt to return
+                    // without waiting. We use the ConsumerUtils.getResult() 
method here to handle the conversion
+                    // of the exception types.
+                    T result = ConsumerUtils.getResult(future);
+                    log.trace("Future {} completed successfully", future);
+                    return result;
+                } else if (!hadEvents) {
+                    // If the above processing yielded no events, then let's 
sit tight for a bit to allow the
+                    // background thread to either a) finish the task, or b) 
populate the background event
+                    // queue with things to process in our next loop.
+                    Timer pollInterval = time.timer(100L);
+                    log.trace("Waiting {} ms for future {} to complete", 
pollInterval.remainingMs(), future);
+                    T result = ConsumerUtils.getResult(future, pollInterval);
+                    log.trace("Future {} completed successfully", future);
+                    return result;
+                }
+            } catch (TimeoutException e) {
+                // Ignore this as we will retry the event until the timeout 
expires.
+            } finally {
+                timer.update();
+            }
+        } while (timer.notExpired());
+
+        log.trace("Future {} did not complete within timeout", future);
+        throw new TimeoutException("Operation timed out before completion");
+    }
+
+    void updateConsumerGroupMetadata(String newMemberId, int newMemberEpoch) {

Review Comment:
   OK. I will revert it and file a separate Jira to refactor the group metadata 
updates.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to