kirktrue commented on code in PR #14406:
URL: https://github.com/apache/kafka/pull/14406#discussion_r1346234546


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java:
##########
@@ -222,58 +99,158 @@ public void run() {
         } catch (final Throwable t) {
             log.error("The background thread failed due to unexpected error", 
t);
             throw new KafkaException(t);
-        } finally {
-            close();
-            log.debug("{} closed", getClass());
         }
     }
 
+    void initializeResources() {
+        applicationEventProcessor = applicationEventProcessorSupplier.get();
+        networkClientDelegate = networkClientDelegateSupplier.get();
+        requestManagers = requestManagersSupplier.get();
+    }
+
     /**
-     * Poll and process an {@link ApplicationEvent}. It performs the following 
tasks:
-     * 1. Drains and try to process all the requests in the queue.
-     * 2. Iterate through the registry, poll, and get the next poll time for 
the network poll
-     * 3. Poll the networkClient to send and retrieve the response.
+     * Poll and process the {@link ApplicationEvent application events}. It 
performs the following tasks:
+     *
+     * <ol>
+     *     <li>
+     *         Drains and processes all the events from the application 
thread's application event queue via
+     *         {@link ApplicationEventProcessor}
+     *     </li>
+     *     <li>
+     *         Iterate through the {@link RequestManager} list and invoke 
{@link RequestManager#poll(long)} to get
+     *         the {@link NetworkClientDelegate.UnsentRequest} list and the 
poll time for the network poll
+     *     </li>
+     *     <li>
+     *         Stage each {@link AbstractRequest.Builder request} to be sent 
via
+     *         {@link NetworkClientDelegate#addAll(List)}
+     *     </li>
+     *     <li>
+     *         Poll the client via {@link KafkaClient#poll(long, long)} to 
send the requests, as well as
+     *         retrieve any available responses
+     *     </li>
+     * </ol>
      */
     void runOnce() {
-        if (!applicationEventQueue.isEmpty()) {
-            LinkedList<ApplicationEvent> res = new LinkedList<>();
-            this.applicationEventQueue.drainTo(res);
-
-            for (ApplicationEvent event : res) {
-                log.debug("Consuming application event: {}", event);
-                Objects.requireNonNull(event);
-                applicationEventProcessor.process(event);
-            }
-        }
+        // If there are errors processing any events, the error will be thrown 
immediately. This will have
+        // the effect of closing the background thread.
+        applicationEventProcessor.process();
 
         final long currentTimeMs = time.milliseconds();
         final long pollWaitTimeMs = requestManagers.entries().stream()
                 .filter(Optional::isPresent)
-                .map(m -> m.get().poll(currentTimeMs))
-                .map(this::handlePollResult)
+                .map(Optional::get)
+                .map(rm -> rm.poll(currentTimeMs))
+                .map(networkClientDelegate::addAll)
                 .reduce(MAX_POLL_TIMEOUT_MS, Math::min);
         networkClientDelegate.poll(pollWaitTimeMs, currentTimeMs);
     }
 
-    long handlePollResult(NetworkClientDelegate.PollResult res) {
-        if (!res.unsentRequests.isEmpty()) {
-            networkClientDelegate.addAll(res.unsentRequests);
+    /**
+     * Performs any network I/O that is needed at the time of close for the 
consumer:
+     *
+     * <ol>
+     *     <li>
+     *         Iterate through the {@link RequestManager} list and invoke 
{@link RequestManager#pollOnClose()}
+     *         to get the {@link NetworkClientDelegate.UnsentRequest} list and 
the poll time for the network poll
+     *     </li>
+     *     <li>
+     *         Stage each {@link AbstractRequest.Builder request} to be sent 
via
+     *         {@link NetworkClientDelegate#addAll(List)}
+     *     </li>
+     *     <li>
+     *         {@link KafkaClient#poll(long, long) Poll the client} to send 
the requests, as well as
+     *         retrieve any available responses
+     *     </li>
+     *     <li>
+     *         Continuously {@link KafkaClient#poll(long, long) poll the 
client} as long as the
+     *         {@link Timer#notExpired() timer hasn't expired} to retrieve the 
responses
+     *     </li>
+     * </ol>
+     */
+    // Visible for testing
+    static void runAtClose(final Time time,
+                           final Collection<Optional<? extends 
RequestManager>> requestManagers,
+                           final NetworkClientDelegate networkClientDelegate,
+                           final Timer timer) {
+        long currentTimeMs = time.milliseconds();
+
+        // These are the optional outgoing requests at the
+        List<NetworkClientDelegate.PollResult> pollResults = 
requestManagers.stream()
+                .filter(Optional::isPresent)
+                .map(Optional::get)
+                .map(RequestManager::pollOnClose)
+                .collect(Collectors.toList());
+        long pollWaitTimeMs = pollResults.stream()
+                .map(networkClientDelegate::addAll)
+                .reduce(MAX_POLL_TIMEOUT_MS, Math::min);
+        pollWaitTimeMs = Math.min(pollWaitTimeMs, timer.remainingMs());
+        networkClientDelegate.poll(pollWaitTimeMs, currentTimeMs);
+        timer.update();
+
+        List<Future<?>> requestFutures = pollResults.stream()
+                .flatMap(fads -> fads.unsentRequests.stream())
+                .map(NetworkClientDelegate.UnsentRequest::future)
+                .collect(Collectors.toList());
+
+        // Poll to ensure that request has been written to the socket. Wait 
until either the timer has expired or until
+        // all requests have received a response.
+        while (timer.notExpired() && 
!requestFutures.stream().allMatch(Future::isDone)) {
+            networkClientDelegate.poll(timer.remainingMs(), currentTimeMs);

Review Comment:
   You're right. Thanks for catching that! Changed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to