showuon commented on code in PR #15530: URL: https://github.com/apache/kafka/pull/15530#discussion_r1525901824
########## core/src/test/scala/unit/kafka/server/KafkaServerTest.scala: ########## @@ -42,6 +42,32 @@ class KafkaServerTest extends QuorumTestHarness { TestUtils.shutdownServers(Seq(server1, server2)) } + @Test + def testListenerPortAlreadyInUse(): Unit = { + val serverSocket = new ServerSocket(0, 0, InetAddress.getLoopbackAddress) + val thread = new Thread { + override def run : Unit = { + while (true) { + serverSocket.accept() + } + } + } + thread.start() Review Comment: How could you confirm this thread will be run before `createServerWithListenerOnPort`? ########## core/src/main/scala/kafka/server/KafkaServer.scala: ########## @@ -617,14 +617,21 @@ class KafkaServer( } } } - socketServer.enableRequestProcessing(authorizerFutures) + val enableRequestProcessingFuture = socketServer.enableRequestProcessing(authorizerFutures) // Block here until all the authorizer futures are complete try { CompletableFuture.allOf(authorizerFutures.values.toSeq: _*).join() } catch { case t: Throwable => throw new RuntimeException("Received a fatal error while " + "waiting for all of the authorizer futures to be completed.", t) } + // Wait for all the SocketServer ports to be open, and the Acceptors to be started. + try { + enableRequestProcessingFuture.join() + } catch { + case t: Throwable => throw new RuntimeException("Received a fatal error while " + + "waiting for the SocketServer Acceptors to be started.", t) + } Review Comment: Could we use `FutureUtils.waitWithLogging` as in `BrokerServer` so that we can have much clear log output? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org