This is an automated email from the ASF dual-hosted git repository.
rickyma pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new cad584a2f [#1651] improvement(netty): Set Netty as the default server
type (#1919)
cad584a2f is described below
commit cad584a2fb7f3536ebcd32d70537c7e943e2d9c4
Author: RickyMa <[email protected]>
AuthorDate: Mon Mar 24 11:41:42 2025 +0800
[#1651] improvement(netty): Set Netty as the default server type (#1919)
### What changes were proposed in this pull request?
Set Netty as the default server type.
### Why are the changes needed?
For: https://github.com/apache/incubator-uniffle/issues/1651.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing UTs.
---
.../apache/uniffle/common/config/RssBaseConf.java | 4 ++--
.../uniffle/common/config/RssClientConf.java | 4 ++--
.../uniffle/coordinator/CoordinatorConfTest.java | 2 +-
coordinator/src/test/resources/coordinator.conf | 2 +-
docs/client_guide/client_guide.md | 26 +++++++++++-----------
docs/server_guide.md | 10 +++++----
.../apache/uniffle/server/ShuffleServerConf.java | 2 +-
.../uniffle/server/ShuffleServerConfTest.java | 4 ++--
8 files changed, 28 insertions(+), 26 deletions(-)
diff --git
a/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
b/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
index 8d4adf42c..d5be88f97 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
@@ -39,9 +39,9 @@ public class RssBaseConf extends RssConf {
public static final ConfigOption<ServerType> RPC_SERVER_TYPE =
ConfigOptions.key("rss.rpc.server.type")
.enumType(ServerType.class)
- .defaultValue(ServerType.GRPC)
+ .defaultValue(ServerType.GRPC_NETTY)
.withDescription(
- "Shuffle server type, supports GRPC_NETTY, GRPC. The default
value is GRPC for now. But we recommend using GRPC_NETTY to enable Netty on the
server side for better stability and performance.");
+ "Shuffle server type, supports GRPC_NETTY, GRPC. The default
value is GRPC_NETTY. We recommend using GRPC_NETTY to enable Netty on the
server side for better stability and performance.");
public static final ConfigOption<Integer> RPC_SERVER_PORT =
ConfigOptions.key("rss.rpc.server.port")
diff --git
a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
index 1f803f167..14fd2a6d3 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
@@ -252,9 +252,9 @@ public class RssClientConf {
public static final ConfigOption<ClientType> RSS_CLIENT_TYPE =
ConfigOptions.key("rss.client.type")
.enumType(ClientType.class)
- .defaultValue(ClientType.GRPC)
+ .defaultValue(ClientType.GRPC_NETTY)
.withDescription(
- "Supports GRPC_NETTY, GRPC. The default value is GRPC. But we
recommend using GRPC_NETTY to enable Netty on the client side for better
stability and performance.");
+ "Supports GRPC_NETTY, GRPC. The default value is GRPC_NETTY. We
recommend using GRPC_NETTY to enable Netty on the client side for better
stability and performance.");
public static final ConfigOption<Boolean>
RSS_CLIENT_REMOTE_STORAGE_USE_LOCAL_CONF_ENABLED =
ConfigOptions.key("rss.client.remote.storage.useLocalConfAsDefault")
diff --git
a/coordinator/src/test/java/org/apache/uniffle/coordinator/CoordinatorConfTest.java
b/coordinator/src/test/java/org/apache/uniffle/coordinator/CoordinatorConfTest.java
index d5c5c369f..d0c85efa5 100644
---
a/coordinator/src/test/java/org/apache/uniffle/coordinator/CoordinatorConfTest.java
+++
b/coordinator/src/test/java/org/apache/uniffle/coordinator/CoordinatorConfTest.java
@@ -34,7 +34,7 @@ public class CoordinatorConfTest {
// test base conf
assertEquals(9527, conf.getInteger(CoordinatorConf.RPC_SERVER_PORT));
- assertEquals("GRPC", conf.get(CoordinatorConf.RPC_SERVER_TYPE).name());
+ assertEquals("GRPC_NETTY",
conf.get(CoordinatorConf.RPC_SERVER_TYPE).name());
assertEquals(9526, conf.getInteger(CoordinatorConf.JETTY_HTTP_PORT));
// test coordinator specific conf
diff --git a/coordinator/src/test/resources/coordinator.conf
b/coordinator/src/test/resources/coordinator.conf
index d597ebb72..8d944df14 100644
--- a/coordinator/src/test/resources/coordinator.conf
+++ b/coordinator/src/test/resources/coordinator.conf
@@ -17,7 +17,7 @@
# base conf
rss.rpc.server.port 9527
-rss.rpc.server.type GRPC
+rss.rpc.server.type GRPC_NETTY
rss.jetty.http.port 9526
# coordinator specific conf
diff --git a/docs/client_guide/client_guide.md
b/docs/client_guide/client_guide.md
index 18c11e46e..acd2c5da4 100644
--- a/docs/client_guide/client_guide.md
+++ b/docs/client_guide/client_guide.md
@@ -108,16 +108,16 @@ spark.rss.data.replica.read 2
```
### Netty Setting
-| Property Name | Default |
Description
|
-|----------------------------------------------------------------|---------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| <client_type>.rss.client.type | GRPC |
Supports GRPC_NETTY, GRPC. The default value is GRPC. But we recommend using
GRPC_NETTY to enable Netty on the client side for better stability and
performance.
|
-| <client_type>.rss.client.netty.io.mode | NIO |
Netty EventLoopGroup backend, available options: NIO, EPOLL.
|
-| <client_type>.rss.client.netty.client.connection.timeout.ms | 600000 |
Connection active timeout.
|
-| <client_type>.rss.client.netty.client.threads | 0 |
Number of threads used in the client thread pool. Default is 0, Netty will use
the number of (available logical cores * 2) as the number of threads.
|
-| <client_type>.rss.client.netty.client.threads.ratio | 2.0 |
The number of threads used in the client thread pool will be
(`<client_type>.rss.client.netty.client.connections.per.peer` *
`<client_type>.rss.client.netty.client.threads.ratio`). This is only effective
when `<client_type>.rss.client.netty.client.threads` is not explicitly set.
| [...]
-| <client_type>.rss.client.netty.client.prefer.direct.bufs | true |
If true, we will prefer allocating off-heap byte buffers within Netty.
|
-| <client_type>.rss.client.netty.client.pooled.allocator.enabled | true |
If true, we will use PooledByteBufAllocator to allocate byte buffers within
Netty, otherwise we'll use UnpooledByteBufAllocator.
|
-| <client_type>.rss.client.netty.client.shared.allocator.enabled | true | A
flag indicating whether to share the ByteBuf allocators between the different
Netty channels when enabling Netty. If enabled then only three ByteBuf
allocators are created: one PooledByteBufAllocator where caching is allowed,
one PooledByteBufAllocator where not and one UnpooledByteBufAllocator. When
disabled, a new allocator is created for each transport client. |
-| <client_type>.rss.client.netty.client.connections.per.peer | 2 |
Suppose there are 100 executors,
spark.rss.client.netty.client.connections.per.peer = 2, then each ShuffleServer
will establish a total of (100 * 2) connections with multiple clients.
|
-| <client_type>.rss.client.netty.client.receive.buffer | 0 |
Receive buffer size (SO_RCVBUF). Note: the optimal size for receive buffer and
send buffer should be latency * network_bandwidth. Assuming latency = 1ms,
network_bandwidth = 10Gbps, buffer size should be ~ 1.25MB. Default is 0, the
operating system automatically estimates the receive buffer size based on
default settings. |
-| <client_type>.rss.client.netty.client.send.buffer | 0 |
Send buffer size (SO_SNDBUF).
|
+| Property Name | Default
| Description
|
+|----------------------------------------------------------------|------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| <client_type>.rss.client.type | GRPC_NETTY
| Supports GRPC_NETTY, GRPC. The default value is GRPC_NETTY. We recommend
using GRPC_NETTY to enable Netty on the client side for better stability and
performance.
|
+| <client_type>.rss.client.netty.io.mode | NIO
| Netty EventLoopGroup backend, available options: NIO, EPOLL.
|
+| <client_type>.rss.client.netty.client.connection.timeout.ms | 600000
| Connection active timeout.
|
+| <client_type>.rss.client.netty.client.threads | 0
| Number of threads used in the client thread pool. Default is 0, Netty will
use the number of (available logical cores * 2) as the number of threads.
|
+| <client_type>.rss.client.netty.client.threads.ratio | 2.0
| The number of threads used in the client thread pool will be
(`<client_type>.rss.client.netty.client.connections.per.peer` *
`<client_type>.rss.client.netty.client.threads.ratio`). This is only effective
when `<client_type>.rss.client.netty.client.threads` is not explicitly set.
| [...]
+| <client_type>.rss.client.netty.client.prefer.direct.bufs | true
| If true, we will prefer allocating off-heap byte buffers within Netty.
|
+| <client_type>.rss.client.netty.client.pooled.allocator.enabled | true
| If true, we will use PooledByteBufAllocator to allocate byte buffers within
Netty, otherwise we'll use UnpooledByteBufAllocator.
|
+| <client_type>.rss.client.netty.client.shared.allocator.enabled | true
| A flag indicating whether to share the ByteBuf allocators between the
different Netty channels when enabling Netty. If enabled then only three
ByteBuf allocators are created: one PooledByteBufAllocator where caching is
allowed, one PooledByteBufAllocator where not and one UnpooledByteBufAllocator.
When disabled, a new allocator is created for each transport client. |
+| <client_type>.rss.client.netty.client.connections.per.peer | 2
| Suppose there are 100 executors,
spark.rss.client.netty.client.connections.per.peer = 2, then each ShuffleServer
will establish a total of (100 * 2) connections with multiple clients.
|
+| <client_type>.rss.client.netty.client.receive.buffer | 0
| Receive buffer size (SO_RCVBUF). Note: the optimal size for receive buffer
and send buffer should be latency * network_bandwidth. Assuming latency = 1ms,
network_bandwidth = 10Gbps, buffer size should be ~ 1.25MB. Default is 0, the
operating system automatically estimates the receive buffer size based on
default settings. |
+| <client_type>.rss.client.netty.client.send.buffer | 0
| Send buffer size (SO_SNDBUF).
|
diff --git a/docs/server_guide.md b/docs/server_guide.md
index af2901235..a07134d8b 100644
--- a/docs/server_guide.md
+++ b/docs/server_guide.md
@@ -73,7 +73,7 @@ This document will introduce how to deploy Uniffle shuffle
servers.
|----------------------------------------------------------|------------------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
[...]
| rss.coordinator.quorum | -
| Coordinator quorum
[...]
| rss.coordinator.rpc.client.type | GRPC
| The client type for
coordinator rpc client.
[...]
-| rss.rpc.server.type | GRPC
| Shuffle server type,
supports GRPC_NETTY, GRPC. The default value is GRPC. But we recommend using
GRPC_NETTY to enable Netty on the server side for better stability and
performance.
[...]
+| rss.rpc.server.type | GRPC_NETTY
| Shuffle server type,
supports GRPC_NETTY, GRPC. The default value is GRPC_NETTY. We recommend using
GRPC_NETTY to enable Netty on the server side for better stability and
performance.
[...]
| rss.rpc.server.port | 19999
| RPC port for Shuffle
server, if set zero, grpc server start on random port.
[...]
| rss.rpc.netty.pageSize | 4096
| The value of pageSize
for PooledByteBufAllocator when using gRPC internal Netty on the server-side.
This configuration will only take effect when rss.rpc.server.type is set to
GRPC_NETTY.
[...]
| rss.rpc.netty.maxOrder | 3
| The value of maxOrder
for PooledByteBufAllocator when using gRPC internal Netty on the server-side.
This configuration will only take effect when rss.rpc.server.type is set to
GRPC_NETTY.
[...]
@@ -88,11 +88,11 @@ This document will introduce how to deploy Uniffle shuffle
servers.
| rss.server.netty.receive.buf | 0
| Receive buffer size
(SO_RCVBUF). Note: the optimal size for receive buffer and send buffer should
be latency * network_bandwidth. Assuming latency = 1ms, network_bandwidth =
10Gbps, buffer size should be ~ 1.25MB. Default is 0, the operating system
automatically estimates the receive buffer size based on default settings.
[...]
| rss.server.netty.send.buf | 0
| Send buffer size
(SO_SNDBUF).
[...]
| rss.server.buffer.capacity | -1
| Max memory of buffer
manager for shuffle server. If negative, JVM heap size * buffer.ratio is used
[...]
-| rss.server.buffer.capacity.ratio | 0.6
| when
`rss.server.buffer.capacity`=-1, then the buffer capacity is JVM heap size or
off-heap size(when enabling Netty) * ratio
[...]
+| rss.server.buffer.capacity.ratio | 0.7
| when
`rss.server.buffer.capacity`=-1, then the buffer capacity is JVM heap size or
off-heap size(when enabling Netty) * ratio
[...]
| rss.server.memory.shuffle.highWaterMark.percentage | 75.0
| Threshold of spill data
to storage, percentage of rss.server.buffer.capacity
[...]
| rss.server.memory.shuffle.lowWaterMark.percentage | 25.0
| Threshold of keep data
in memory, percentage of rss.server.buffer.capacity
[...]
| rss.server.read.buffer.capacity | -1
| Max size of buffer for
reading data. If negative, JVM heap size * read.buffer.ratio is used
[...]
-| rss.server.read.buffer.capacity.ratio | 0.2
| when
`rss.server.read.buffer.capacity`=-1, then read buffer capacity is JVM heap
size or off-heap size(when enabling Netty) * ratio
[...]
+| rss.server.read.buffer.capacity.ratio | 0.1
| when
`rss.server.read.buffer.capacity`=-1, then read buffer capacity is JVM heap
size or off-heap size(when enabling Netty) * ratio
[...]
| rss.server.heartbeat.interval | 10000
| Heartbeat interval to
Coordinator (ms)
[...]
| rss.server.netty.metrics.pendingTaskNumPollingIntervalMs | 10000
| How often to collect
Netty pending tasks number metrics (in milliseconds)
[...]
| rss.server.flush.localfile.threadPool.size | 10
| Thread pool for flush
data to local file
[...]
@@ -159,7 +159,9 @@ Once the huge partition reaches the hard limit size, which
is set by the configu
For example, if the hard limit is set to 50g, the server will reject the
request if the partition size is greater than 50g, causing the job to
eventually fail.
### Netty
-In version 0.8.0, we introduced Netty. Enabling Netty on ShuffleServer can
significantly reduce GC time in high-throughput scenarios. We can enable Netty
through the parameters `rss.server.netty.port` and `rss.rpc.server.type`. Note:
After setting the parameter `rss.rpc.server.type` to `GRPC_NETTY`,
ShuffleServer will be tagged with `GRPC_NETTY`, that is, the node can only be
assigned to clients with `spark.rss.client.type=GRPC_NETTY`.
+In version 0.8.0, we introduced Netty. Enabling Netty on ShuffleServer can
significantly reduce GC time in high-throughput scenarios. We can enable Netty
through the parameters `rss.server.netty.port` and `rss.rpc.server.type`. Note:
After setting the parameter `rss.rpc.server.type` to `GRPC_NETTY`,
ShuffleServer will be tagged with `GRPC_NETTY`, that is, the node can only be
assigned to clients with `spark.rss.client.type=GRPC_NETTY`.
+
+In version 0.10.0, Netty is enabled by default.
When enabling Netty, we should also consider memory related configurations.
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
index d197b8286..495288286 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
@@ -555,7 +555,7 @@ public class ShuffleServerConf extends RssBaseConf {
.checkValue(
ConfigUtils.SERVER_PORT_VALIDATOR,
"check server port value is 0 or value >= 1024 && value <=
65535")
- .defaultValue(-1)
+ .defaultValue(17000)
.withDescription("Shuffle netty server port");
public static final ConfigOption<Boolean> NETTY_SERVER_EPOLL_ENABLE =
diff --git
a/server/src/test/java/org/apache/uniffle/server/ShuffleServerConfTest.java
b/server/src/test/java/org/apache/uniffle/server/ShuffleServerConfTest.java
index 5a8a7a09e..3684d9d74 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleServerConfTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleServerConfTest.java
@@ -35,7 +35,7 @@ public class ShuffleServerConfTest {
public void defaultConfTest() {
ShuffleServerConf shuffleServerConf = new ShuffleServerConf();
assertFalse(shuffleServerConf.loadConfFromFile(null));
- assertEquals("GRPC",
shuffleServerConf.get(ShuffleServerConf.RPC_SERVER_TYPE).name());
+ assertEquals("GRPC_NETTY",
shuffleServerConf.get(ShuffleServerConf.RPC_SERVER_TYPE).name());
assertEquals(256,
shuffleServerConf.getInteger(ShuffleServerConf.JETTY_CORE_POOL_SIZE));
assertEquals(0,
shuffleServerConf.getLong(ShuffleServerConf.SERVER_SHUFFLE_FLUSH_THRESHOLD));
}
@@ -68,7 +68,7 @@ public class ShuffleServerConfTest {
assertEquals(2,
shuffleServerConf.getLong(ShuffleServerConf.SERVER_BUFFER_CAPACITY));
assertEquals("value1",
shuffleServerConf.getString("rss.server.hadoop.a.b", ""));
assertEquals("value2", shuffleServerConf.getString("rss.server.had.a.b",
""));
- assertEquals("GRPC",
shuffleServerConf.get(ShuffleServerConf.RPC_SERVER_TYPE).name());
+ assertEquals("GRPC_NETTY",
shuffleServerConf.get(ShuffleServerConf.RPC_SERVER_TYPE).name());
assertEquals("v1", shuffleServerConf.getString("plugin.custom.key", null));
}