cmccabe commented on code in PR #13169: URL: https://github.com/apache/kafka/pull/13169#discussion_r1092445406
########## core/src/main/scala/kafka/network/SocketServer.scala: ########## @@ -585,23 +616,35 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer, private var currentProcessorIndex = 0 private[network] val throttledSockets = new mutable.PriorityQueue[DelayedCloseSocket]() private var started = false - private[network] val startFuture = new CompletableFuture[Void]() + private[network] val startedFuture = new CompletableFuture[Void]() val thread = KafkaThread.nonDaemon( s"${threadPrefix()}-kafka-socket-acceptor-${endPoint.listenerName}-${endPoint.securityProtocol}-${endPoint.port}", this) - startFuture.thenRun(() => synchronized { - if (!shouldRun.get()) { - debug(s"Ignoring start future for ${endPoint.listenerName} since the acceptor has already been shut down.") - } else { + def start(): Unit = synchronized { + try { + if (!shouldRun.get()) { + throw new ClosedChannelException() Review Comment: yeah, that's right. For example, if there was some error during broker startup, we would call shutdown and potentially shut down the SocketServer before the Acceptors had been started. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org