kirktrue commented on code in PR #14406:
URL: https://github.com/apache/kafka/pull/14406#discussion_r1334872655


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java:
##########
@@ -259,17 +144,41 @@ long handlePollResult(NetworkClientDelegate.PollResult 
res) {
     }
 
     public boolean isRunning() {
-        return this.running;
+        return running;
     }
 
     public void wakeup() {
-        networkClientDelegate.wakeup();
+        if (networkClientDelegate != null)
+            networkClientDelegate.wakeup();
     }
 
+    @Override
     public void close() {
-        this.running = false;
-        this.wakeup();
-        Utils.closeQuietly(networkClientDelegate, "network client utils");
-        Utils.closeQuietly(metadata, "consumer metadata client");
+        closer.close(() -> {
+            log.debug("Closing the consumer background thread");
+            running = false;
+            wakeup();
+            Utils.closeQuietly(requestManagers, "Request managers client");
+            Utils.closeQuietly(networkClientDelegate, "network client utils");
+            log.debug("Closed the consumer background thread");
+            drainAndComplete();
+        }, () -> log.warn("The consumer background thread was previously 
closed"));
+    }
+
+
+    /**
+     * It is possible for the background thread to close before complete 
processing all the events in the queue. In
+     * this case, we need throw an exception to notify the user the consumer 
is closed.

Review Comment:
   I need "to" proofread my comments more thoroughly 😄 



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java:
##########
@@ -259,17 +144,41 @@ long handlePollResult(NetworkClientDelegate.PollResult 
res) {
     }
 
     public boolean isRunning() {
-        return this.running;
+        return running;
     }
 
     public void wakeup() {
-        networkClientDelegate.wakeup();
+        if (networkClientDelegate != null)
+            networkClientDelegate.wakeup();
     }
 
+    @Override
     public void close() {
-        this.running = false;
-        this.wakeup();
-        Utils.closeQuietly(networkClientDelegate, "network client utils");
-        Utils.closeQuietly(metadata, "consumer metadata client");
+        closer.close(() -> {
+            log.debug("Closing the consumer background thread");
+            running = false;
+            wakeup();
+            Utils.closeQuietly(requestManagers, "Request managers client");
+            Utils.closeQuietly(networkClientDelegate, "network client utils");
+            log.debug("Closed the consumer background thread");
+            drainAndComplete();
+        }, () -> log.warn("The consumer background thread was previously 
closed"));
+    }
+
+
+    /**
+     * It is possible for the background thread to close before complete 
processing all the events in the queue. In
+     * this case, we need throw an exception to notify the user the consumer 
is closed.

Review Comment:
   I need "to" proofread my comments more thoroughly 😄 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to