[GitHub] [kafka] cmccabe commented on a diff in pull request #13169: KAFKA-14658: Do not open broker ports until we are ready to accept traffic

2023-01-31 Thread via GitHub


cmccabe commented on code in PR #13169:
URL: https://github.com/apache/kafka/pull/13169#discussion_r1092446989


##
core/src/main/scala/kafka/network/SocketServer.scala:
##
@@ -573,7 +591,20 @@ private[kafka] abstract class Acceptor(val socketServer: 
SocketServer,
   private val listenBacklogSize = config.socketListenBacklogSize
 
   private val nioSelector = NSelector.open()
-  private[network] val serverChannel = openServerSocket(endPoint.host, 
endPoint.port, listenBacklogSize)
+
+  // If the port is configured as 0, we are using a random (ephemeral) port, 
so we need to open
+  // the socket before we can find out what port we have. If it is set to a 
nonzero value, defer
+  // opening the socket until we start the Acceptor. The reason for deferring 
the socket opening
+  // is so that systems which assume that the socket being open indicates 
readiness are not
+  // confused.
+  private[network] var serverChannel: ServerSocketChannel  = _
+  private[network] val localPort: Int  = if (endPoint.port != 0) {

Review Comment:
   good idea. let me wait for this test run to complete then I'll add a log 
message



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13169: KAFKA-14658: Do not open broker ports until we are ready to accept traffic

2023-01-31 Thread via GitHub


cmccabe commented on code in PR #13169:
URL: https://github.com/apache/kafka/pull/13169#discussion_r1092445406


##
core/src/main/scala/kafka/network/SocketServer.scala:
##
@@ -585,23 +616,35 @@ private[kafka] abstract class Acceptor(val socketServer: 
SocketServer,
   private var currentProcessorIndex = 0
   private[network] val throttledSockets = new 
mutable.PriorityQueue[DelayedCloseSocket]()
   private var started = false
-  private[network] val startFuture = new CompletableFuture[Void]()
+  private[network] val startedFuture = new CompletableFuture[Void]()
 
   val thread = KafkaThread.nonDaemon(
 
s"${threadPrefix()}-kafka-socket-acceptor-${endPoint.listenerName}-${endPoint.securityProtocol}-${endPoint.port}",
 this)
 
-  startFuture.thenRun(() => synchronized {
-if (!shouldRun.get()) {
-  debug(s"Ignoring start future for ${endPoint.listenerName} since the 
acceptor has already been shut down.")
-} else {
+  def start(): Unit = synchronized {
+try {
+  if (!shouldRun.get()) {
+throw new ClosedChannelException()

Review Comment:
   yeah, that's right.
   
   For example, if there was some error during broker startup, we would call 
shutdown and potentially shut down the SocketServer before the Acceptors had 
been started.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13169: KAFKA-14658: Do not open broker ports until we are ready to accept traffic

2023-01-30 Thread via GitHub


cmccabe commented on code in PR #13169:
URL: https://github.com/apache/kafka/pull/13169#discussion_r1091087602


##
clients/src/main/java/org/apache/kafka/common/utils/Time.java:
##
@@ -86,4 +89,30 @@ default Timer timer(Duration timeout) {
 return timer(timeout.toMillis());
 }
 
+/**
+ * Wait for a future to complete, or time out.
+ *
+ * @param futureThe future to wait for.
+ * @param deadlineNsThe time in the future, in monotonic nanoseconds, 
to time out.
+ * @return  The result of the future.
+ * @paramThe type of the future.
+ */
+default  T waitForFuture(
+CompletableFuture future,
+long deadlineNs
+) throws TimeoutException, InterruptedException, ExecutionException  {
+TimeoutException timeoutException = null;
+while (true) {
+long nowNs = nanoseconds();
+if (deadlineNs <= nowNs) {
+throw (timeoutException == null) ? new TimeoutException() : 
timeoutException;
+}
+long deltaNs = deadlineNs - nowNs;
+try {
+return future.get(deltaNs, TimeUnit.NANOSECONDS);

Review Comment:
   Thanks for reviewing. I should have been clearer that this wasn't the main 
point of the PR :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13169: KAFKA-14658: Do not open broker ports until we are ready to accept traffic

2023-01-30 Thread via GitHub


cmccabe commented on code in PR #13169:
URL: https://github.com/apache/kafka/pull/13169#discussion_r1091061226


##
clients/src/main/java/org/apache/kafka/common/utils/Time.java:
##
@@ -86,4 +89,30 @@ default Timer timer(Duration timeout) {
 return timer(timeout.toMillis());
 }
 
+/**
+ * Wait for a future to complete, or time out.
+ *
+ * @param futureThe future to wait for.
+ * @param deadlineNsThe time in the future, in monotonic nanoseconds, 
to time out.
+ * @return  The result of the future.
+ * @paramThe type of the future.
+ */
+default  T waitForFuture(
+CompletableFuture future,
+long deadlineNs
+) throws TimeoutException, InterruptedException, ExecutionException  {
+TimeoutException timeoutException = null;
+while (true) {
+long nowNs = nanoseconds();
+if (deadlineNs <= nowNs) {
+throw (timeoutException == null) ? new TimeoutException() : 
timeoutException;
+}
+long deltaNs = deadlineNs - nowNs;
+try {
+return future.get(deltaNs, TimeUnit.NANOSECONDS);

Review Comment:
   The JavaDoc for CompletableFuture doesn't say what it does in case of 
negative timeouts. Therefore, it's a bad idea to rely on this behavior. Also, 
the subtraction you propose may overflow.
   
   In any case this code was part of a different PR, #13153. I only included it 
here because I needed something from FutureUtils. Sorry for the confusion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org