This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 7476cea0 [#720] feat(netty): support random port for netty (#723)
7476cea0 is described below

commit 7476cea0095803d655cc5346a65feb765ec68885
Author: xumanbu <[email protected]>
AuthorDate: Mon Apr 3 23:16:22 2023 +0800

    [#720] feat(netty): support random port for netty (#723)
    
    ### What changes were proposed in this pull request?
    
    Support random Netty server port
    
    1、add RssUitls.startServiceOnPort to support start service with random port
    2、StreamServer implement ServerInterface to support random port
    
    ### Why are the changes needed?
    Fix #720
    
    ### Does this PR introduce _any_ user-facing change?
    StreamServer support random port,if port==0 start server with random port
    
    ### How was this patch tested?
    UT
    
    Co-authored-by: jam.xu <[email protected]>
---
 .../apache/uniffle/common/config/ConfigUtils.java  |   2 +
 .../apache/uniffle/common/config/RssBaseConf.java  |  18 ++++
 .../org/apache/uniffle/common/rpc/GrpcServer.java  |   9 +-
 .../apache/uniffle/common/rpc/ServerInterface.java |   4 +-
 .../org/apache/uniffle/common/util/Constants.java  |   1 +
 .../org/apache/uniffle/common/util/RssUtils.java   |  60 +++++++++++
 .../apache/uniffle/common/util/RssUtilsTest.java   | 111 +++++++++++++++++++++
 docs/server_guide.md                               |   1 +
 .../test/ShuffleServerEnableStreamServerTest.java  |  71 +++++++++++++
 .../org/apache/uniffle/server/ShuffleServer.java   |   5 +-
 .../apache/uniffle/server/ShuffleServerConf.java   |   3 +-
 .../apache/uniffle/server/netty/StreamServer.java  |  29 +++++-
 .../apache/uniffle/server/ShuffleServerTest.java   |   1 +
 13 files changed, 306 insertions(+), 9 deletions(-)

diff --git 
a/common/src/main/java/org/apache/uniffle/common/config/ConfigUtils.java 
b/common/src/main/java/org/apache/uniffle/common/config/ConfigUtils.java
index bc0672eb..1a78abcf 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/ConfigUtils.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/ConfigUtils.java
@@ -207,6 +207,8 @@ public class ConfigUtils {
 
   public static final Function<Long, Boolean> NON_NEGATIVE_LONG_VALIDATOR = 
value -> value >= 0;
 
+  public static final Function<Integer, Boolean> SERVER_PORT_VALIDATOR = value 
-> ((value == 0)
+                                                                               
 || (value >= 1024 && value <= 65535));
   public static final Function<Long, Boolean> POSITIVE_INTEGER_VALIDATOR =
       value -> value > 0L && value <= Integer.MAX_VALUE;
 
diff --git 
a/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java 
b/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
index 761e5bf3..09322a62 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
@@ -212,6 +212,24 @@ public class RssBaseConf extends RssConf {
       .defaultValue(5L)
       .withDescription("Reconfigure check interval.");
 
+  public static final ConfigOption<Integer> RSS_RANDOM_PORT_MIN = ConfigOptions
+      .key("rss.random.port.min")
+      .intType()
+      .defaultValue(40000)
+      .withDescription("Min value for random for range");
+
+  public static final ConfigOption<Integer> RSS_RANDOM_PORT_MAX = ConfigOptions
+      .key("rss.random.port.max")
+      .intType()
+      .defaultValue(65535)
+      .withDescription("Max value for random for range");
+
+  public static final ConfigOption<Integer> SERVER_PORT_MAX_RETRIES = 
ConfigOptions
+      .key("rss.port.max.retry")
+      .intType()
+      .defaultValue(16)
+      .withDescription("start server service max retry");
+
   public boolean loadCommonConf(Map<String, String> properties) {
     if (properties == null) {
       return false;
diff --git a/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java 
b/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java
index 59ade455..d96ee40b 100644
--- a/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java
+++ b/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java
@@ -155,7 +155,7 @@ public class GrpcServer implements ServerInterface {
     }
   }
 
-  public void start() throws IOException {
+  public int start() throws IOException {
     try {
       server.start();
       listenPort = server.getPort();
@@ -163,6 +163,13 @@ public class GrpcServer implements ServerInterface {
       ExitUtils.terminate(1, "Fail to start grpc server", e, LOG);
     }
     LOG.info("Grpc server started, configured port: {}, listening on {}.", 
port, listenPort);
+    return port;
+  }
+
+  @Override
+  public void startOnPort(int port) {
+    ExitUtils.terminate(1, "Fail to start grpc server",
+        new RuntimeException("GRpcServer not implement now"), LOG);
   }
 
   public void stop() throws InterruptedException {
diff --git 
a/common/src/main/java/org/apache/uniffle/common/rpc/ServerInterface.java 
b/common/src/main/java/org/apache/uniffle/common/rpc/ServerInterface.java
index 67cb8d5c..ab6a91b5 100644
--- a/common/src/main/java/org/apache/uniffle/common/rpc/ServerInterface.java
+++ b/common/src/main/java/org/apache/uniffle/common/rpc/ServerInterface.java
@@ -21,7 +21,9 @@ import java.io.IOException;
 
 public interface ServerInterface {
 
-  void start() throws IOException;
+  int start() throws IOException;
+
+  void startOnPort(int port) throws Exception;
 
   void stop() throws InterruptedException;
 
diff --git a/common/src/main/java/org/apache/uniffle/common/util/Constants.java 
b/common/src/main/java/org/apache/uniffle/common/util/Constants.java
index b62d5b03..5e946866 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/Constants.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/Constants.java
@@ -71,4 +71,5 @@ public final class Constants {
   
   public static final double MILLION_SECONDS_PER_SECOND = 1E3D;
   public static final String DEVICE_NO_SPACE_ERROR_MESSAGE = "No space left on 
device";
+  public static final String NETTY_STREAM_SERVICE_NAME = "netty.rpc.server";
 }
diff --git a/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java 
b/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
index dfae9e23..888e96fa 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
@@ -26,6 +26,7 @@ import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.lang.reflect.Constructor;
+import java.net.BindException;
 import java.net.Inet4Address;
 import java.net.InetAddress;
 import java.net.InterfaceAddress;
@@ -39,11 +40,14 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import com.google.common.net.InetAddresses;
+import io.netty.channel.unix.Errors;
+import org.eclipse.jetty.util.MultiException;
 import org.roaringbitmap.longlong.Roaring64NavigableMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -52,6 +56,8 @@ import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.config.RssBaseConf;
 import org.apache.uniffle.common.config.RssConf;
 import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.rpc.ServerInterface;
+
 
 public class RssUtils {
 
@@ -156,6 +162,60 @@ public class RssUtils {
     return siteLocalAddress;
   }
 
+  public static int startServiceOnPort(ServerInterface service, String 
serviceName, int servicePort, RssBaseConf conf) {
+    if (servicePort < 0 || servicePort > 65535) {
+      throw new IllegalArgumentException(String.format("Bad service %s on port 
(%s)", serviceName, servicePort));
+    }
+    int actualPort = servicePort;
+    int maxRetries = conf.get(RssBaseConf.SERVER_PORT_MAX_RETRIES);
+    for (int i = 0; i < maxRetries; i++) {
+      try {
+        if (servicePort == 0) {
+          actualPort = findRandomTcpPort(conf);
+        } else {
+          actualPort += i;
+        }
+        service.startOnPort(actualPort);
+        return actualPort;
+      } catch (Exception e) {
+        if (isServerPortBindCollision(e)) {
+          LOGGER.warn(String.format("%s:Service %s failed after %s retries (on 
a random free port (%s))!",
+              e.getMessage(), serviceName, i + 1, actualPort));
+        } else {
+          throw new RssException(String.format("Failed to start service %s on 
port %s", serviceName, servicePort), e);
+        }
+      }
+    }
+    throw new RssException(String.format("Failed to start service %s on port 
%s", serviceName, servicePort));
+  }
+
+  /**
+   * check whether the exception is caused by an address-port collision when 
binding.
+   */
+  public static boolean isServerPortBindCollision(Throwable e) {
+    if (e instanceof BindException) {
+      if (e.getMessage() != null) {
+        return true;
+      }
+      return isServerPortBindCollision(e.getCause());
+    } else if (e instanceof MultiException) {
+      return !((MultiException) e).getThrowables().stream()
+          .noneMatch((Throwable throwable) -> 
isServerPortBindCollision(throwable));
+    } else if (e instanceof Errors.NativeIoException) {
+      return (e.getMessage() != null && e.getMessage().startsWith("bind() 
failed: "))
+              || isServerPortBindCollision(e.getCause());
+    } else {
+      return false;
+    }
+  }
+
+  public static int findRandomTcpPort(RssBaseConf baseConf) {
+    int portRangeMin = baseConf.getInteger(RssBaseConf.RSS_RANDOM_PORT_MIN);
+    int portRangeMax =  baseConf.getInteger(RssBaseConf.RSS_RANDOM_PORT_MAX);
+    int portRange = portRangeMax - portRangeMin;
+    return  portRangeMin + ThreadLocalRandom.current().nextInt(portRange + 1);
+  }
+
   public static byte[] serializeBitMap(Roaring64NavigableMap bitmap) throws 
IOException {
     long size = bitmap.serializedSizeInBytes();
     if (size > Integer.MAX_VALUE) {
diff --git 
a/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java 
b/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java
index 759dd09c..a98bbe19 100644
--- a/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java
+++ b/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java
@@ -17,9 +17,12 @@
 
 package org.apache.uniffle.common.util;
 
+import java.io.IOException;
 import java.net.Inet4Address;
 import java.net.InetAddress;
 import java.net.NetworkInterface;
+import java.net.ServerSocket;
+import java.net.Socket;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -27,6 +30,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Random;
 import java.util.Set;
+import javax.net.ServerSocketFactory;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -37,6 +41,7 @@ import org.roaringbitmap.longlong.Roaring64NavigableMap;
 import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.config.RssBaseConf;
 import org.apache.uniffle.common.config.RssConf;
+import org.apache.uniffle.common.rpc.ServerInterface;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -87,6 +92,70 @@ public class RssUtilsTest {
     }
   }
 
+  @Test
+  public void testStartServiceOnPort() throws InterruptedException {
+    RssBaseConf rssBaseConf = new RssBaseConf();
+    rssBaseConf.set(RssBaseConf.SERVER_PORT_MAX_RETRIES, 100);
+    rssBaseConf.set(RssBaseConf.RSS_RANDOM_PORT_MIN, 30000);
+    rssBaseConf.set(RssBaseConf.RSS_RANDOM_PORT_MAX, 39999);
+    // zero port to get random port
+    MockServer mockServer = new MockServer();
+    int port = 0;
+    try {
+      int actualPort = RssUtils.startServiceOnPort(mockServer, "MockServer", 
port, rssBaseConf);
+      assertTrue(actualPort >= 30000 && actualPort < 39999 + 
rssBaseConf.get(RssBaseConf.SERVER_PORT_MAX_RETRIES));
+    } finally {
+      if (mockServer != null) {
+        mockServer.stop();
+      }
+    }
+    // error port test
+    try {
+      port = -1;
+      RssUtils.startServiceOnPort(mockServer, "MockServer", port, rssBaseConf);
+    } catch (RuntimeException e) {
+      assertTrue(e.toString().startsWith("java.lang.IllegalArgumentException: 
Bad service"));
+    }
+    // a specific port to start
+    try {
+      mockServer = new MockServer();
+      port = 10000;
+      rssBaseConf.set(RssBaseConf.SERVER_PORT_MAX_RETRIES, 100);
+      int actualPort = RssUtils.startServiceOnPort(mockServer, "MockServer", 
port, rssBaseConf);
+      assertTrue(actualPort >= port && actualPort < port + 
rssBaseConf.get(RssBaseConf.SERVER_PORT_MAX_RETRIES));
+    } finally {
+      if (mockServer != null) {
+        mockServer.stop();
+      }
+    }
+
+    // bind exception
+    MockServer toStartSockServer = new MockServer();
+    try {
+      mockServer = new MockServer();
+      port = 10000;
+      int actualPort1 = RssUtils.startServiceOnPort(mockServer, "MockServer", 
port, rssBaseConf);
+      rssBaseConf.set(RssBaseConf.SERVER_PORT_MAX_RETRIES, 10);
+      int actualPort2 = RssUtils.startServiceOnPort(toStartSockServer, 
"MockServer", actualPort1, rssBaseConf);
+      assertTrue(actualPort1 < actualPort2);
+      toStartSockServer.stop();
+      rssBaseConf.set(RssBaseConf.SERVER_PORT_MAX_RETRIES, 0);
+      RssUtils.startServiceOnPort(toStartSockServer, "MockServer", 
actualPort1, rssBaseConf);
+      assertFalse(false);
+    } catch (RuntimeException e) {
+      assertTrue(e.getMessage().startsWith("Failed to start service"));
+    } finally {
+      if (mockServer != null) {
+        mockServer.stop();
+      }
+      if (toStartSockServer != null) {
+        toStartSockServer.stop();
+      }
+    }
+
+  }
+
+
   @Test
   public void testSerializeBitmap() throws Exception {
     Roaring64NavigableMap bitmap1 = Roaring64NavigableMap.bitmapOf(1, 2, 100, 
10000);
@@ -235,5 +304,47 @@ public class RssUtilsTest {
     }
   }
 
+  public static class MockServer implements ServerInterface {
+
+    ServerSocket serverSocket;
+
+    @Override
+    public int start() throws IOException {
+      // not implement
+      return -1;
+    }
+
+    @Override
+    public void startOnPort(int port) throws IOException {
+      serverSocket = ServerSocketFactory.getDefault().createServerSocket(
+          port, 1, InetAddress.getByName("localhost"));
+      new Thread(() -> {
+        Socket accept;
+        try {
+          accept = serverSocket.accept();
+          accept.close();
+        } catch (IOException e) {
+          //e.printStackTrace();
+        }
+      }).start();
+    }
+
+    @Override
+    public void stop() throws InterruptedException {
+      if (serverSocket != null && !serverSocket.isClosed()) {
+        try {
+          serverSocket.close();
+        } catch (IOException e) {
+          //e.printStackTrace();
+        }
+      }
+    }
+
+    @Override
+    public void blockUntilShutdown() throws InterruptedException {
+      // not implement
+    }
+  }
+
 
 }
diff --git a/docs/server_guide.md b/docs/server_guide.md
index bfae4405..4b3dbedd 100644
--- a/docs/server_guide.md
+++ b/docs/server_guide.md
@@ -69,6 +69,7 @@ This document will introduce how to deploy Uniffle shuffle 
servers.
 | rss.coordinator.quorum                                | -       | 
Coordinator quorum                                                              
                                                                                
                                                                                
                                                                                
                                                         |
 | rss.rpc.server.port                                   | -       | RPC port 
for Shuffle server                                                              
                                                                                
                                                                                
                                                                                
                                                |
 | rss.jetty.http.port                                   | -       | Http port 
for Shuffle server                                                              
                                                                                
                                                                                
                                                                                
                                               |
+| rss.server.netty.port                                 | -1      | Netty port 
for Shuffle server, if set zero, netty server start on random port.             
                                                                                
                                                                                
                                                                                
                                              |
 | rss.server.buffer.capacity                            | -1      | Max memory 
of buffer manager for shuffle server. If negative, JVM heap size * buffer.ratio 
is used                                                                         
                                                                                
                                                                                
                                              |
 | rss.server.buffer.capacity.ratio                      | 0.8     | when 
`rss.server.buffer.capacity`=-1, then the buffer capacity is JVM heap size * 
ratio                                                                           
                                                                                
                                                                                
                                                       |
 | rss.server.memory.shuffle.highWaterMark.percentage    | 75.0    | Threshold 
of spill data to storage, percentage of rss.server.buffer.capacity              
                                                                                
                                                                                
                                                                                
                                               |
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerEnableStreamServerTest.java
 
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerEnableStreamServerTest.java
new file mode 100644
index 00000000..f98af1a4
--- /dev/null
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerEnableStreamServerTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.test;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import org.apache.uniffle.common.config.RssBaseConf;
+import org.apache.uniffle.coordinator.CoordinatorConf;
+import org.apache.uniffle.server.ShuffleServer;
+import org.apache.uniffle.server.ShuffleServerConf;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ShuffleServerEnableStreamServerTest extends CoordinatorTestBase {
+  @BeforeAll
+  public static void setupServers() throws Exception {
+    CoordinatorConf coordinatorConf = getCoordinatorConf();
+    coordinatorConf.set(RssBaseConf.RPC_METRICS_ENABLED, true);
+    
coordinatorConf.setString(CoordinatorConf.COORDINATOR_ASSIGNMENT_STRATEGY.key(),
 "BASIC");
+    coordinatorConf.setLong("rss.coordinator.app.expired", 2000);
+    coordinatorConf.setLong("rss.coordinator.server.heartbeat.timeout", 3000);
+    createCoordinatorServer(coordinatorConf);
+    ShuffleServerConf shuffleServerConf = getShuffleServerConf();
+    shuffleServerConf.setInteger("rss.server.netty.port", 0);
+    shuffleServerConf.setInteger("rss.random.port.min", 30000);
+    shuffleServerConf.setInteger("rss.random.port.max", 40000);
+    createShuffleServer(shuffleServerConf);
+    shuffleServerConf.setInteger("rss.rpc.server.port", SHUFFLE_SERVER_PORT + 
1);
+    shuffleServerConf.setInteger("rss.jetty.http.port", 18081);
+    createShuffleServer(shuffleServerConf);
+    startServers();
+  }
+
+  @Test
+  public void startStreamServerOnRandomPort() throws Exception {
+    CoordinatorTestUtils.waitForRegister(coordinatorClient, 2);
+    Thread.sleep(5000);
+    int actualPort = shuffleServers.get(0).getNettyPort();
+    assertTrue(actualPort >= 30000 && actualPort < 40000);
+    actualPort = shuffleServers.get(1).getNettyPort();
+    assertTrue(actualPort >= 30000 && actualPort <= 40000);
+
+    int maxRetries = 100;
+    ShuffleServerConf shuffleServerConf = getShuffleServerConf();
+    shuffleServerConf.setInteger("rss.server.netty.port", actualPort);
+    shuffleServerConf.setInteger("rss.jetty.http.port", 18082);
+    shuffleServerConf.setInteger("rss.rpc.server.port", SHUFFLE_SERVER_PORT + 
2);
+    shuffleServerConf.setInteger("rss.port.max.retry", maxRetries);
+    ShuffleServer ss = new ShuffleServer(shuffleServerConf);
+    ss.start();
+    assertTrue(ss.getNettyPort() > actualPort && actualPort <= actualPort + 
maxRetries);
+    ss.stopServer();
+  }
+
+}
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
index bde07d2b..0a1d2cd2 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
@@ -129,7 +129,7 @@ public class ShuffleServer {
     jettyServer.start();
     server.start();
     if (nettyServerEnabled) {
-      streamServer.start();
+      nettyPort = streamServer.start();
     }
 
     Runtime.getRuntime().addShutdownHook(new Thread() {
@@ -195,7 +195,8 @@ public class ShuffleServer {
     }
     grpcPort = shuffleServerConf.getInteger(ShuffleServerConf.RPC_SERVER_PORT);
     nettyPort = 
shuffleServerConf.getInteger(ShuffleServerConf.NETTY_SERVER_PORT);
-    if (nettyPort > 0) {
+    if (nettyPort >= 0) {
+      // when nettyPort is zero,actual netty port will be changed,but id can't 
be change.
       id = ip + "-" + grpcPort + "-" + nettyPort;
     } else {
       id = ip + "-" + grpcPort;
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
index f4d26749..5f7b21e1 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
@@ -400,7 +400,8 @@ public class ShuffleServerConf extends RssBaseConf {
   public static final ConfigOption<Integer> NETTY_SERVER_PORT = ConfigOptions
       .key("rss.server.netty.port")
       .intType()
-      .checkValue(ConfigUtils.POSITIVE_INTEGER_VALIDATOR_2, "netty port must 
be positive")
+      .checkValue(ConfigUtils.SERVER_PORT_VALIDATOR, "check server port value 
is 0 "
+          + "or value >= 1024 && value <= 65535")
       .defaultValue(-1)
       .withDescription("Shuffle netty server port");
 
diff --git 
a/server/src/main/java/org/apache/uniffle/server/netty/StreamServer.java 
b/server/src/main/java/org/apache/uniffle/server/netty/StreamServer.java
index 8423515b..57f91338 100644
--- a/server/src/main/java/org/apache/uniffle/server/netty/StreamServer.java
+++ b/server/src/main/java/org/apache/uniffle/server/netty/StreamServer.java
@@ -17,6 +17,7 @@
 
 package org.apache.uniffle.server.netty;
 
+import java.io.IOException;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 
@@ -35,12 +36,15 @@ import io.netty.channel.socket.nio.NioServerSocketChannel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.uniffle.common.rpc.ServerInterface;
+import org.apache.uniffle.common.util.Constants;
 import org.apache.uniffle.common.util.ExitUtils;
+import org.apache.uniffle.common.util.RssUtils;
 import org.apache.uniffle.server.ShuffleServer;
 import org.apache.uniffle.server.ShuffleServerConf;
 import org.apache.uniffle.server.netty.decoder.StreamServerInitDecoder;
 
-public class StreamServer {
+public class StreamServer implements ServerInterface {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(StreamServer.class);
 
@@ -103,7 +107,20 @@ public class StreamServer {
     return serverBootstrap;
   }
 
-  public void start() {
+  @Override
+  public int start() throws IOException {
+    int port = 
shuffleServerConf.getInteger(ShuffleServerConf.NETTY_SERVER_PORT);
+    try {
+      port = RssUtils.startServiceOnPort(this,
+          Constants.NETTY_STREAM_SERVICE_NAME, port, shuffleServerConf);
+    } catch (Exception e) {
+      ExitUtils.terminate(1, "Fail to start stream server", e, LOG);
+    }
+    return port;
+  }
+
+  @Override
+  public void startOnPort(int port) throws Exception {
     Supplier<ChannelHandler[]> streamHandlers = () -> new ChannelHandler[]{
         new StreamServerInitDecoder()
     };
@@ -116,14 +133,13 @@ public class StreamServer {
 
     // Bind the ports and save the results so that the channels can be closed 
later.
     // If the second bind fails, the first one gets cleaned up in the shutdown.
-    int port = 
shuffleServerConf.getInteger(ShuffleServerConf.NETTY_SERVER_PORT);
     try {
       channelFuture =  serverBootstrap.bind(port);
       channelFuture.syncUninterruptibly();
       LOG.info("bind localAddress is " + 
channelFuture.channel().localAddress());
       LOG.info("Start stream server successfully with port " + port);
     } catch (Exception e) {
-      ExitUtils.terminate(1, "Fail to start stream server", e, LOG);
+      throw e;
     }
   }
 
@@ -139,4 +155,9 @@ public class StreamServer {
       shuffleWorkerGroup = null;
     }
   }
+
+  @Override
+  public void blockUntilShutdown() throws InterruptedException {
+
+  }
 }
diff --git 
a/server/src/test/java/org/apache/uniffle/server/ShuffleServerTest.java 
b/server/src/test/java/org/apache/uniffle/server/ShuffleServerTest.java
index 0dc57338..7fe77fa2 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleServerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleServerTest.java
@@ -138,6 +138,7 @@ public class ShuffleServerTest {
     ExitUtils.disableSystemExit();
     serverConf.set(ShuffleServerConf.RPC_SERVER_PORT, 19997);
     serverConf.set(ShuffleServerConf.JETTY_HTTP_PORT, 19996);
+    serverConf.set(ShuffleServerConf.SERVER_PORT_MAX_RETRIES, 1);
     ShuffleServer ss2 = new ShuffleServer(serverConf);
     String expectMessage = "Fail to start stream server";
     final int expectStatus = 1;

Reply via email to