kirktrue commented on code in PR #14406:
URL: https://github.com/apache/kafka/pull/14406#discussion_r1346270372


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java:
##########
@@ -222,58 +99,158 @@ public void run() {
         } catch (final Throwable t) {
             log.error("The background thread failed due to unexpected error", 
t);
             throw new KafkaException(t);
-        } finally {
-            close();
-            log.debug("{} closed", getClass());
         }
     }
 
+    void initializeResources() {
+        applicationEventProcessor = applicationEventProcessorSupplier.get();
+        networkClientDelegate = networkClientDelegateSupplier.get();
+        requestManagers = requestManagersSupplier.get();
+    }
+
     /**
-     * Poll and process an {@link ApplicationEvent}. It performs the following 
tasks:
-     * 1. Drains and try to process all the requests in the queue.
-     * 2. Iterate through the registry, poll, and get the next poll time for 
the network poll
-     * 3. Poll the networkClient to send and retrieve the response.
+     * Poll and process the {@link ApplicationEvent application events}. It 
performs the following tasks:
+     *
+     * <ol>
+     *     <li>
+     *         Drains and processes all the events from the application 
thread's application event queue via
+     *         {@link ApplicationEventProcessor}
+     *     </li>
+     *     <li>
+     *         Iterate through the {@link RequestManager} list and invoke 
{@link RequestManager#poll(long)} to get
+     *         the {@link NetworkClientDelegate.UnsentRequest} list and the 
poll time for the network poll
+     *     </li>
+     *     <li>
+     *         Stage each {@link AbstractRequest.Builder request} to be sent 
via
+     *         {@link NetworkClientDelegate#addAll(List)}
+     *     </li>
+     *     <li>
+     *         Poll the client via {@link KafkaClient#poll(long, long)} to 
send the requests, as well as
+     *         retrieve any available responses
+     *     </li>
+     * </ol>
      */
     void runOnce() {
-        if (!applicationEventQueue.isEmpty()) {
-            LinkedList<ApplicationEvent> res = new LinkedList<>();
-            this.applicationEventQueue.drainTo(res);
-
-            for (ApplicationEvent event : res) {
-                log.debug("Consuming application event: {}", event);
-                Objects.requireNonNull(event);
-                applicationEventProcessor.process(event);
-            }
-        }
+        // If there are errors processing any events, the error will be thrown 
immediately. This will have
+        // the effect of closing the background thread.

Review Comment:
   Yes. I added a check that it's not closed at the top of `runOnce()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to