edoardocomar commented on code in PR #15530: URL: https://github.com/apache/kafka/pull/15530#discussion_r1526169481
########## core/src/main/scala/kafka/server/KafkaServer.scala: ########## @@ -617,14 +617,21 @@ class KafkaServer( } } } - socketServer.enableRequestProcessing(authorizerFutures) + val enableRequestProcessingFuture = socketServer.enableRequestProcessing(authorizerFutures) // Block here until all the authorizer futures are complete try { CompletableFuture.allOf(authorizerFutures.values.toSeq: _*).join() } catch { case t: Throwable => throw new RuntimeException("Received a fatal error while " + "waiting for all of the authorizer futures to be completed.", t) } + // Wait for all the SocketServer ports to be open, and the Acceptors to be started. + try { + enableRequestProcessingFuture.join() + } catch { + case t: Throwable => throw new RuntimeException("Received a fatal error while " + + "waiting for the SocketServer Acceptors to be started.", t) + } Review Comment: @showuon I prefer for consistency to use the same waiting done on the `authorizerFutures` in the same `KafkaServer`. The `BrokerServer` approach requires the `startupDeadline` which should be used in the other waits, for consistency. It is computed from `config.serverMaxStartupTimeMs` which although being set by default as Long>MAX_LONG, its is annotated as KRaft config. Maybe another PR could be used to make `KafkaServer` waits more consistent with `BrokerServer`. However, with ZK mode now deprecated, I don't think this my PR of mine is the right place for that change. In any case, I find the current log output shows clearly the address in use and for which listener, e.g.: ``` [2024-03-15 11:51:07,740] ERROR [KafkaServer id=0] Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer:159) java.lang.RuntimeException: Received a fatal error while waiting for the SocketServer Acceptors to be started. at kafka.server.KafkaServer.startup(KafkaServer.scala:633) at kafka.utils.TestUtils$.createServer(TestUtils.scala:207) at kafka.server.KafkaServerTest.createServerWithListenerOnPort(KafkaServerTest.scala:194) at kafka.server.KafkaServerTest.$anonfun$testListenerPortAlreadyInUse$1(KafkaServerTest.scala:62) at org.junit.jupiter.api.AssertThrows.assertThrows(AssertThrows.java:53) at org.junit.jupiter.api.AssertThrows.assertThrows(AssertThrows.java:35) at org.junit.jupiter.api.Assertions.assertThrows(Assertions.java:3115) at kafka.server.KafkaServerTest.testListenerPortAlreadyInUse(KafkaServerTest.scala:62) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:568) at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:728) at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156) at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147) at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:86) at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103) at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93) at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92) at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:218) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:214) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:139) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:69) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141) at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138) at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95) at java.base/java.util.ArrayList.forEach(ArrayList.java:1511) at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141) at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138) at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95) at java.base/java.util.ArrayList.forEach(ArrayList.java:1511) at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141) at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138) at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95) at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35) at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57) at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:54) at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107) at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88) at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54) at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67) at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52) at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114) at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86) at org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:86) at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor$CollectAllTestClassesExecutor.processAllTestClasses(JUnitPlatformTestClassProcessor.java:119) at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor$CollectAllTestClassesExecutor.access$000(JUnitPlatformTestClassProcessor.java:94) at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor.stop(JUnitPlatformTestClassProcessor.java:89) at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.stop(SuiteTestClassProcessor.java:62) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:568) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33) at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94) at jdk.proxy1/jdk.proxy1.$Proxy2.stop(Unknown Source) at org.gradle.api.internal.tasks.testing.worker.TestWorker$3.run(TestWorker.java:193) at org.gradle.api.internal.tasks.testing.worker.TestWorker.executeAndMaintainThreadName(TestWorker.java:129) at org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:100) at org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:60) at org.gradle.process.internal.worker.child.ActionExecutionWorker.execute(ActionExecutionWorker.java:56) at org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:113) at org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:65) at worker.org.gradle.process.internal.worker.GradleWorkerMain.run(GradleWorkerMain.java:69) at worker.org.gradle.process.internal.worker.GradleWorkerMain.main(GradleWorkerMain.java:74) Caused by: java.util.concurrent.CompletionException: java.lang.RuntimeException: Unable to start acceptor for ListenerName(PLAINTEXT) at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:332) at java.base/java.util.concurrent.CompletableFuture.andTree(CompletableFuture.java:1527) at java.base/java.util.concurrent.CompletableFuture.allOf(CompletableFuture.java:2419) at kafka.network.SocketServer.enableRequestProcessing(SocketServer.scala:238) at kafka.server.KafkaServer.startup(KafkaServer.scala:620) ... 91 more Caused by: java.lang.RuntimeException: Unable to start acceptor for ListenerName(PLAINTEXT) at kafka.network.Acceptor.liftedTree1$1(SocketServer.scala:647) at kafka.network.Acceptor.start(SocketServer.scala:627) at kafka.network.SocketServer.$anonfun$enableRequestProcessing$2(SocketServer.scala:223) at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863) at java.base/java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:887) at java.base/java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2325) at kafka.network.SocketServer.chainAcceptorFuture$1(SocketServer.scala:216) at kafka.network.SocketServer.$anonfun$enableRequestProcessing$5(SocketServer.scala:230) at java.base/java.util.concurrent.ConcurrentHashMap$ValuesView.forEach(ConcurrentHashMap.java:4780) at kafka.network.SocketServer.enableRequestProcessing(SocketServer.scala:230) ... 92 more Caused by: org.apache.kafka.common.KafkaException: Socket server failed to bind to localhost:53802: Address already in use. at kafka.network.Acceptor.openServerSocket(SocketServer.scala:729) at kafka.network.Acceptor.liftedTree1$1(SocketServer.scala:632) ... 101 more Caused by: java.net.BindException: Address already in use at java.base/sun.nio.ch.Net.bind0(Native Method) at java.base/sun.nio.ch.Net.bind(Net.java:555) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org