kirktrue commented on code in PR #14406: URL: https://github.com/apache/kafka/pull/14406#discussion_r1334872655
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java: ########## @@ -259,17 +144,41 @@ long handlePollResult(NetworkClientDelegate.PollResult res) { } public boolean isRunning() { - return this.running; + return running; } public void wakeup() { - networkClientDelegate.wakeup(); + if (networkClientDelegate != null) + networkClientDelegate.wakeup(); } + @Override public void close() { - this.running = false; - this.wakeup(); - Utils.closeQuietly(networkClientDelegate, "network client utils"); - Utils.closeQuietly(metadata, "consumer metadata client"); + closer.close(() -> { + log.debug("Closing the consumer background thread"); + running = false; + wakeup(); + Utils.closeQuietly(requestManagers, "Request managers client"); + Utils.closeQuietly(networkClientDelegate, "network client utils"); + log.debug("Closed the consumer background thread"); + drainAndComplete(); + }, () -> log.warn("The consumer background thread was previously closed")); + } + + + /** + * It is possible for the background thread to close before complete processing all the events in the queue. In + * this case, we need throw an exception to notify the user the consumer is closed. Review Comment: I need "to" proofread my comments more thoroughly 😄 ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java: ########## @@ -259,17 +144,41 @@ long handlePollResult(NetworkClientDelegate.PollResult res) { } public boolean isRunning() { - return this.running; + return running; } public void wakeup() { - networkClientDelegate.wakeup(); + if (networkClientDelegate != null) + networkClientDelegate.wakeup(); } + @Override public void close() { - this.running = false; - this.wakeup(); - Utils.closeQuietly(networkClientDelegate, "network client utils"); - Utils.closeQuietly(metadata, "consumer metadata client"); + closer.close(() -> { + log.debug("Closing the consumer background thread"); + running = false; + wakeup(); + Utils.closeQuietly(requestManagers, "Request managers client"); + Utils.closeQuietly(networkClientDelegate, "network client utils"); + log.debug("Closed the consumer background thread"); + drainAndComplete(); + }, () -> log.warn("The consumer background thread was previously closed")); + } + + + /** + * It is possible for the background thread to close before complete processing all the events in the queue. In + * this case, we need throw an exception to notify the user the consumer is closed. Review Comment: I need "to" proofread my comments more thoroughly 😄 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org