This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new b0ae6db3 [#779] feat: Grpc server support random port (#820)
b0ae6db3 is described below
commit b0ae6db371edfce13268d8fc2eeddb6fb3950130
Author: xumanbu <[email protected]>
AuthorDate: Thu Apr 13 10:47:09 2023 +0800
[#779] feat: Grpc server support random port (#820)
### What changes were proposed in this pull request?
1. grpc server port support random port
2. grpc sever will retry binding with increased port num if configured port
is already occupied
### Why are the changes needed?
1. improve robustness for server startup
2. sync with netty server functionality
Fixs #779
### Does this PR introduce any user-facing change?
Yes. `rss.rpc.server.port` could be set as zero.
### How was this patch tested?
UT
Co-authored-by: jam.xu <[email protected]>
---
.../org/apache/uniffle/common/rpc/GrpcServer.java | 51 ++++++++++++++--------
.../org/apache/uniffle/common/util/Constants.java | 1 +
.../org/apache/uniffle/common/util/RssUtils.java | 3 ++
docs/server_guide.md | 4 +-
...est.java => ShuffleServerOnRandomPortTest.java} | 27 ++++++++++--
.../org/apache/uniffle/server/ShuffleServer.java | 2 +-
6 files changed, 65 insertions(+), 23 deletions(-)
diff --git a/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java
b/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java
index d96ee40b..132ec04d 100644
--- a/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java
+++ b/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java
@@ -40,24 +40,32 @@ import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.config.RssBaseConf;
import org.apache.uniffle.common.metrics.GRPCMetrics;
+import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.common.util.ExitUtils;
+import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.common.util.ThreadUtils;
public class GrpcServer implements ServerInterface {
private static final Logger LOG = LoggerFactory.getLogger(GrpcServer.class);
- private final Server server;
+ private Server server;
private final int port;
private int listenPort;
private final ExecutorService pool;
+ private List<Pair<BindableService, List<ServerInterceptor>>>
servicesWithInterceptors;
+ private GRPCMetrics grpcMetrics;
+ private RssBaseConf rssConf;
protected GrpcServer(
RssBaseConf conf,
List<Pair<BindableService, List<ServerInterceptor>>>
servicesWithInterceptors,
GRPCMetrics grpcMetrics) {
- this.port = conf.getInteger(RssBaseConf.RPC_SERVER_PORT);
- long maxInboundMessageSize =
conf.getLong(RssBaseConf.RPC_MESSAGE_MAX_SIZE);
+ this.rssConf = conf;
+ this.port = rssConf.getInteger(RssBaseConf.RPC_SERVER_PORT);
+ this.servicesWithInterceptors = servicesWithInterceptors;
+ this.grpcMetrics = grpcMetrics;
+
int rpcExecutorSize = conf.getInteger(RssBaseConf.RPC_EXECUTOR_SIZE);
pool = new GrpcThreadPoolExecutor(
rpcExecutorSize,
@@ -68,12 +76,15 @@ public class GrpcServer implements ServerInterface {
ThreadUtils.getThreadFactory("Grpc"),
grpcMetrics
);
+ }
- boolean isMetricsEnabled =
conf.getBoolean(RssBaseConf.RPC_METRICS_ENABLED);
+ private Server buildGrpcServer(int serverPort) {
+ boolean isMetricsEnabled =
rssConf.getBoolean(RssBaseConf.RPC_METRICS_ENABLED);
+ long maxInboundMessageSize =
rssConf.getLong(RssBaseConf.RPC_MESSAGE_MAX_SIZE);
ServerBuilder<?> builder = ServerBuilder
- .forPort(port)
+ .forPort(serverPort)
.executor(pool)
- .maxInboundMessageSize((int)maxInboundMessageSize);
+ .maxInboundMessageSize((int) maxInboundMessageSize);
if (isMetricsEnabled) {
builder.addTransportFilter(new
MonitoringServerTransportFilter(grpcMetrics));
}
@@ -88,7 +99,7 @@ public class GrpcServer implements ServerInterface {
}
builder.addService(ServerInterceptors.intercept(serviceWithInterceptors.getLeft(),
interceptors));
});
- this.server = builder.build();
+ return builder.build();
}
public static class Builder {
@@ -155,21 +166,27 @@ public class GrpcServer implements ServerInterface {
}
}
+ @Override
public int start() throws IOException {
try {
- server.start();
- listenPort = server.getPort();
- } catch (IOException e) {
- ExitUtils.terminate(1, "Fail to start grpc server", e, LOG);
+ this.listenPort = RssUtils.startServiceOnPort(this,
+ Constants.GRPC_SERVICE_NAME, port, rssConf);
+ } catch (Exception e) {
+ ExitUtils.terminate(1, "Fail to start grpc server on conf port:" + port,
e, LOG);
}
- LOG.info("Grpc server started, configured port: {}, listening on {}.",
port, listenPort);
- return port;
+ return listenPort;
}
@Override
- public void startOnPort(int port) {
- ExitUtils.terminate(1, "Fail to start grpc server",
- new RuntimeException("GRpcServer not implement now"), LOG);
+ public void startOnPort(int startPort) throws Exception {
+ this.server = buildGrpcServer(startPort);
+ try {
+ server.start();
+ listenPort = server.getPort();
+ } catch (Exception e) {
+ throw e;
+ }
+ LOG.info("Grpc server started, configured port: {}, listening on {}.",
port, listenPort);
}
public void stop() throws InterruptedException {
@@ -189,7 +206,7 @@ public class GrpcServer implements ServerInterface {
}
public int getPort() {
- return port <= 0 ? listenPort : port;
+ return listenPort;
}
}
diff --git a/common/src/main/java/org/apache/uniffle/common/util/Constants.java
b/common/src/main/java/org/apache/uniffle/common/util/Constants.java
index 5e946866..514cf0e7 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/Constants.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/Constants.java
@@ -72,4 +72,5 @@ public final class Constants {
public static final double MILLION_SECONDS_PER_SECOND = 1E3D;
public static final String DEVICE_NO_SPACE_ERROR_MESSAGE = "No space left on
device";
public static final String NETTY_STREAM_SERVICE_NAME = "netty.rpc.server";
+ public static final String GRPC_SERVICE_NAME = "grpc.server";
}
diff --git a/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
b/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
index 888e96fa..92688083 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
@@ -204,6 +204,9 @@ public class RssUtils {
} else if (e instanceof Errors.NativeIoException) {
return (e.getMessage() != null && e.getMessage().startsWith("bind()
failed: "))
|| isServerPortBindCollision(e.getCause());
+ } else if (e instanceof IOException) {
+ return (e.getMessage() != null && e.getMessage().startsWith("Failed to
bind to address"))
+ || isServerPortBindCollision(e.getCause());
} else {
return false;
}
diff --git a/docs/server_guide.md b/docs/server_guide.md
index 0fb7af4e..48143a22 100644
--- a/docs/server_guide.md
+++ b/docs/server_guide.md
@@ -67,7 +67,7 @@ This document will introduce how to deploy Uniffle shuffle
servers.
| Property Name | Default |
Description
|
|-------------------------------------------------------|---------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| rss.coordinator.quorum | - |
Coordinator quorum
|
-| rss.rpc.server.port | - | RPC port
for Shuffle server
|
+| rss.rpc.server.port | - | RPC port
for Shuffle server, if set zero, grpc server start on random port.
|
| rss.jetty.http.port | - | Http port
for Shuffle server
|
| rss.server.netty.port | -1 | Netty port
for Shuffle server, if set zero, netty server start on random port.
|
| rss.server.buffer.capacity | -1 | Max memory
of buffer manager for shuffle server. If negative, JVM heap size * buffer.ratio
is used
|
@@ -156,4 +156,4 @@ rss.server.single.buffer.flush.threshold 129m
rss.server.max.concurrency.of.single.partition.writer 20
rss.server.huge-partition.size.threshold 20g
rss.server.huge-partition.memory.limit.ratio 0.2
-```
\ No newline at end of file
+```
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerEnableStreamServerTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerOnRandomPortTest.java
similarity index 72%
rename from
integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerEnableStreamServerTest.java
rename to
integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerOnRandomPortTest.java
index f98af1a4..67664a68 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerEnableStreamServerTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerOnRandomPortTest.java
@@ -27,7 +27,7 @@ import org.apache.uniffle.server.ShuffleServerConf;
import static org.junit.jupiter.api.Assertions.assertTrue;
-public class ShuffleServerEnableStreamServerTest extends CoordinatorTestBase {
+public class ShuffleServerOnRandomPortTest extends CoordinatorTestBase {
@BeforeAll
public static void setupServers() throws Exception {
CoordinatorConf coordinatorConf = getCoordinatorConf();
@@ -38,10 +38,10 @@ public class ShuffleServerEnableStreamServerTest extends
CoordinatorTestBase {
createCoordinatorServer(coordinatorConf);
ShuffleServerConf shuffleServerConf = getShuffleServerConf();
shuffleServerConf.setInteger("rss.server.netty.port", 0);
+ shuffleServerConf.setInteger("rss.rpc.server.port", 0);
shuffleServerConf.setInteger("rss.random.port.min", 30000);
shuffleServerConf.setInteger("rss.random.port.max", 40000);
createShuffleServer(shuffleServerConf);
- shuffleServerConf.setInteger("rss.rpc.server.port", SHUFFLE_SERVER_PORT +
1);
shuffleServerConf.setInteger("rss.jetty.http.port", 18081);
createShuffleServer(shuffleServerConf);
startServers();
@@ -58,9 +58,9 @@ public class ShuffleServerEnableStreamServerTest extends
CoordinatorTestBase {
int maxRetries = 100;
ShuffleServerConf shuffleServerConf = getShuffleServerConf();
+ // start netty server with already bind port
shuffleServerConf.setInteger("rss.server.netty.port", actualPort);
shuffleServerConf.setInteger("rss.jetty.http.port", 18082);
- shuffleServerConf.setInteger("rss.rpc.server.port", SHUFFLE_SERVER_PORT +
2);
shuffleServerConf.setInteger("rss.port.max.retry", maxRetries);
ShuffleServer ss = new ShuffleServer(shuffleServerConf);
ss.start();
@@ -68,4 +68,25 @@ public class ShuffleServerEnableStreamServerTest extends
CoordinatorTestBase {
ss.stopServer();
}
+ @Test
+ public void startGrpcServerOnRandomPort() throws Exception {
+ CoordinatorTestUtils.waitForRegister(coordinatorClient, 2);
+ Thread.sleep(5000);
+ int actualPort = shuffleServers.get(0).getGrpcPort();
+ assertTrue(actualPort >= 30000 && actualPort < 40000);
+ actualPort = shuffleServers.get(1).getGrpcPort();
+ assertTrue(actualPort >= 30000 && actualPort <= 40000);
+
+ int maxRetries = 100;
+ ShuffleServerConf shuffleServerConf = getShuffleServerConf();
+ // start grpc server with already bind port
+ shuffleServerConf.setInteger("rss.rpc.server.port", actualPort);
+ shuffleServerConf.setInteger("rss.jetty.http.port", 18083);
+ shuffleServerConf.setInteger("rss.port.max.retry", maxRetries);
+ ShuffleServer ss = new ShuffleServer(shuffleServerConf);
+ ss.start();
+ assertTrue(ss.getGrpcPort() > actualPort && actualPort <= actualPort +
maxRetries);
+ ss.stopServer();
+ }
+
}
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
index e713649a..c23dbb0a 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
@@ -126,7 +126,7 @@ public class ShuffleServer {
public void start() throws Exception {
jettyServer.start();
- server.start();
+ grpcPort = server.start();
if (nettyServerEnabled) {
nettyPort = streamServer.start();
}