mumrah commented on code in PR #13169:
URL: https://github.com/apache/kafka/pull/13169#discussion_r1092435602


##########
core/src/main/scala/kafka/network/SocketServer.scala:
##########
@@ -585,23 +616,35 @@ private[kafka] abstract class Acceptor(val socketServer: 
SocketServer,
   private var currentProcessorIndex = 0
   private[network] val throttledSockets = new 
mutable.PriorityQueue[DelayedCloseSocket]()
   private var started = false
-  private[network] val startFuture = new CompletableFuture[Void]()
+  private[network] val startedFuture = new CompletableFuture[Void]()
 
   val thread = KafkaThread.nonDaemon(
     
s"${threadPrefix()}-kafka-socket-acceptor-${endPoint.listenerName}-${endPoint.securityProtocol}-${endPoint.port}",
     this)
 
-  startFuture.thenRun(() => synchronized {
-    if (!shouldRun.get()) {
-      debug(s"Ignoring start future for ${endPoint.listenerName} since the 
acceptor has already been shut down.")
-    } else {
+  def start(): Unit = synchronized {
+    try {
+      if (!shouldRun.get()) {
+        throw new ClosedChannelException()

Review Comment:
   How does this happen? If the kafka server is shutdown before the socket 
server is able to finish starting?



##########
core/src/main/scala/kafka/network/SocketServer.scala:
##########
@@ -573,7 +591,20 @@ private[kafka] abstract class Acceptor(val socketServer: 
SocketServer,
   private val listenBacklogSize = config.socketListenBacklogSize
 
   private val nioSelector = NSelector.open()
-  private[network] val serverChannel = openServerSocket(endPoint.host, 
endPoint.port, listenBacklogSize)
+
+  // If the port is configured as 0, we are using a random (ephemeral) port, 
so we need to open
+  // the socket before we can find out what port we have. If it is set to a 
nonzero value, defer
+  // opening the socket until we start the Acceptor. The reason for deferring 
the socket opening
+  // is so that systems which assume that the socket being open indicates 
readiness are not
+  // confused.
+  private[network] var serverChannel: ServerSocketChannel  = _
+  private[network] val localPort: Int  = if (endPoint.port != 0) {

Review Comment:
   It might be good to log when we are opening the socket server early.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to