cmccabe commented on code in PR #13169:
URL: https://github.com/apache/kafka/pull/13169#discussion_r1092445406


##########
core/src/main/scala/kafka/network/SocketServer.scala:
##########
@@ -585,23 +616,35 @@ private[kafka] abstract class Acceptor(val socketServer: 
SocketServer,
   private var currentProcessorIndex = 0
   private[network] val throttledSockets = new 
mutable.PriorityQueue[DelayedCloseSocket]()
   private var started = false
-  private[network] val startFuture = new CompletableFuture[Void]()
+  private[network] val startedFuture = new CompletableFuture[Void]()
 
   val thread = KafkaThread.nonDaemon(
     
s"${threadPrefix()}-kafka-socket-acceptor-${endPoint.listenerName}-${endPoint.securityProtocol}-${endPoint.port}",
     this)
 
-  startFuture.thenRun(() => synchronized {
-    if (!shouldRun.get()) {
-      debug(s"Ignoring start future for ${endPoint.listenerName} since the 
acceptor has already been shut down.")
-    } else {
+  def start(): Unit = synchronized {
+    try {
+      if (!shouldRun.get()) {
+        throw new ClosedChannelException()

Review Comment:
   yeah, that's right.
   
   For example, if there was some error during broker startup, we would call 
shutdown and potentially shut down the SocketServer before the Acceptors had 
been started.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to