[GitHub] [kafka] cmccabe commented on a diff in pull request #13169: KAFKA-14658: Do not open broker ports until we are ready to accept traffic
cmccabe commented on code in PR #13169: URL: https://github.com/apache/kafka/pull/13169#discussion_r1092446989 ## core/src/main/scala/kafka/network/SocketServer.scala: ## @@ -573,7 +591,20 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer, private val listenBacklogSize = config.socketListenBacklogSize private val nioSelector = NSelector.open() - private[network] val serverChannel = openServerSocket(endPoint.host, endPoint.port, listenBacklogSize) + + // If the port is configured as 0, we are using a random (ephemeral) port, so we need to open + // the socket before we can find out what port we have. If it is set to a nonzero value, defer + // opening the socket until we start the Acceptor. The reason for deferring the socket opening + // is so that systems which assume that the socket being open indicates readiness are not + // confused. + private[network] var serverChannel: ServerSocketChannel = _ + private[network] val localPort: Int = if (endPoint.port != 0) { Review Comment: good idea. let me wait for this test run to complete then I'll add a log message -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13169: KAFKA-14658: Do not open broker ports until we are ready to accept traffic
cmccabe commented on code in PR #13169: URL: https://github.com/apache/kafka/pull/13169#discussion_r1092445406 ## core/src/main/scala/kafka/network/SocketServer.scala: ## @@ -585,23 +616,35 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer, private var currentProcessorIndex = 0 private[network] val throttledSockets = new mutable.PriorityQueue[DelayedCloseSocket]() private var started = false - private[network] val startFuture = new CompletableFuture[Void]() + private[network] val startedFuture = new CompletableFuture[Void]() val thread = KafkaThread.nonDaemon( s"${threadPrefix()}-kafka-socket-acceptor-${endPoint.listenerName}-${endPoint.securityProtocol}-${endPoint.port}", this) - startFuture.thenRun(() => synchronized { -if (!shouldRun.get()) { - debug(s"Ignoring start future for ${endPoint.listenerName} since the acceptor has already been shut down.") -} else { + def start(): Unit = synchronized { +try { + if (!shouldRun.get()) { +throw new ClosedChannelException() Review Comment: yeah, that's right. For example, if there was some error during broker startup, we would call shutdown and potentially shut down the SocketServer before the Acceptors had been started. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13169: KAFKA-14658: Do not open broker ports until we are ready to accept traffic
cmccabe commented on code in PR #13169: URL: https://github.com/apache/kafka/pull/13169#discussion_r1091087602 ## clients/src/main/java/org/apache/kafka/common/utils/Time.java: ## @@ -86,4 +89,30 @@ default Timer timer(Duration timeout) { return timer(timeout.toMillis()); } +/** + * Wait for a future to complete, or time out. + * + * @param futureThe future to wait for. + * @param deadlineNsThe time in the future, in monotonic nanoseconds, to time out. + * @return The result of the future. + * @paramThe type of the future. + */ +default T waitForFuture( +CompletableFuture future, +long deadlineNs +) throws TimeoutException, InterruptedException, ExecutionException { +TimeoutException timeoutException = null; +while (true) { +long nowNs = nanoseconds(); +if (deadlineNs <= nowNs) { +throw (timeoutException == null) ? new TimeoutException() : timeoutException; +} +long deltaNs = deadlineNs - nowNs; +try { +return future.get(deltaNs, TimeUnit.NANOSECONDS); Review Comment: Thanks for reviewing. I should have been clearer that this wasn't the main point of the PR :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13169: KAFKA-14658: Do not open broker ports until we are ready to accept traffic
cmccabe commented on code in PR #13169: URL: https://github.com/apache/kafka/pull/13169#discussion_r1091061226 ## clients/src/main/java/org/apache/kafka/common/utils/Time.java: ## @@ -86,4 +89,30 @@ default Timer timer(Duration timeout) { return timer(timeout.toMillis()); } +/** + * Wait for a future to complete, or time out. + * + * @param futureThe future to wait for. + * @param deadlineNsThe time in the future, in monotonic nanoseconds, to time out. + * @return The result of the future. + * @paramThe type of the future. + */ +default T waitForFuture( +CompletableFuture future, +long deadlineNs +) throws TimeoutException, InterruptedException, ExecutionException { +TimeoutException timeoutException = null; +while (true) { +long nowNs = nanoseconds(); +if (deadlineNs <= nowNs) { +throw (timeoutException == null) ? new TimeoutException() : timeoutException; +} +long deltaNs = deadlineNs - nowNs; +try { +return future.get(deltaNs, TimeUnit.NANOSECONDS); Review Comment: The JavaDoc for CompletableFuture doesn't say what it does in case of negative timeouts. Therefore, it's a bad idea to rely on this behavior. Also, the subtraction you propose may overflow. In any case this code was part of a different PR, #13153. I only included it here because I needed something from FutureUtils. Sorry for the confusion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org