This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new e6e070236 [#2354] feat(client): Explicitly setting grpc netty based
event loop threads to avoid too much threads (#2355)
e6e070236 is described below
commit e6e07023627cbf27fc3e0283ebdd0a5b636a7ed4
Author: Junfan Zhang <[email protected]>
AuthorDate: Fri Jan 31 19:44:20 2025 +0800
[#2354] feat(client): Explicitly setting grpc netty based event loop
threads to avoid too much threads (#2355)
### What changes were proposed in this pull request?
Introduing the config option to set the grpc netty based network impl event
loop threads
### Why are the changes needed?
Fix: #2354
### Does this PR introduce _any_ user-facing change?
Yes.
### How was this patch tested?
Tests in the spark jobs
Co-authored-by: Junfan Zhang <[email protected]>
---
.../apache/uniffle/common/config/RssClientConf.java | 6 ++++++
.../apache/uniffle/client/impl/grpc/GrpcClient.java | 10 ++++++++--
.../client/impl/grpc/ShuffleServerGrpcClient.java | 20 ++++++++++++++++----
.../impl/grpc/ShuffleServerGrpcNettyClient.java | 13 ++++++++++++-
4 files changed, 42 insertions(+), 7 deletions(-)
diff --git
a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
index 36d80b6e5..63b4666d1 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
@@ -317,4 +317,10 @@ public class RssClientConf {
.asList()
.noDefaultValue()
.withDescription("the report include properties could be configured
by this option");
+
+ public static final ConfigOption<Integer> RSS_CLIENT_GRPC_EVENT_LOOP_THREADS
=
+ ConfigOptions.key("rss.client.grpc.nettyEventLoopThreads")
+ .intType()
+ .defaultValue(-1)
+ .withDescription("the event loop threads of netty impl for grpc");
}
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/GrpcClient.java
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/GrpcClient.java
index 2f5b09c78..1d5d167f1 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/GrpcClient.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/GrpcClient.java
@@ -37,7 +37,7 @@ public abstract class GrpcClient {
protected ManagedChannel channel;
protected GrpcClient(String host, int port, int maxRetryAttempts, boolean
usePlaintext) {
- this(host, port, maxRetryAttempts, usePlaintext, 0, 0, 0);
+ this(host, port, maxRetryAttempts, usePlaintext, 0, 0, 0, -1);
}
protected GrpcClient(
@@ -47,12 +47,18 @@ public abstract class GrpcClient {
boolean usePlaintext,
int pageSize,
int maxOrder,
- int smallCacheSize) {
+ int smallCacheSize,
+ int nettyEventLoopThreads) {
this.host = host;
this.port = port;
this.maxRetryAttempts = maxRetryAttempts;
this.usePlaintext = usePlaintext;
+ if (nettyEventLoopThreads > 0) {
+ System.setProperty(
+ "io.grpc.netty.shaded.io.netty.eventLoopThreads",
String.valueOf(nettyEventLoopThreads));
+ }
+
NettyChannelBuilder channelBuilder =
NettyChannelBuilder.forAddress(host, port)
.withOption(
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
index 399c21547..cb477df34 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
@@ -118,6 +118,7 @@ import
org.apache.uniffle.proto.RssProtos.ShuffleRegisterResponse;
import org.apache.uniffle.proto.ShuffleServerGrpc;
import org.apache.uniffle.proto.ShuffleServerGrpc.ShuffleServerBlockingStub;
+import static
org.apache.uniffle.common.config.RssClientConf.RSS_CLIENT_GRPC_EVENT_LOOP_THREADS;
import static org.apache.uniffle.proto.RssProtos.StatusCode.NO_BUFFER;
public class ShuffleServerGrpcClient extends GrpcClient implements
ShuffleServerClient {
@@ -155,7 +156,8 @@ public class ShuffleServerGrpcClient extends GrpcClient
implements ShuffleServer
true,
0,
0,
- 0);
+ 0,
+ -1);
}
public ShuffleServerGrpcClient(RssConf rssConf, String host, int port) {
@@ -171,7 +173,8 @@ public class ShuffleServerGrpcClient extends GrpcClient
implements ShuffleServer
true,
0,
0,
- 0);
+ 0,
+ rssConf == null ? -1 :
rssConf.get(RSS_CLIENT_GRPC_EVENT_LOOP_THREADS));
}
public ShuffleServerGrpcClient(
@@ -182,8 +185,17 @@ public class ShuffleServerGrpcClient extends GrpcClient
implements ShuffleServer
boolean usePlaintext,
int pageSize,
int maxOrder,
- int smallCacheSize) {
- super(host, port, maxRetryAttempts, usePlaintext, pageSize, maxOrder,
smallCacheSize);
+ int smallCacheSize,
+ int nettyEventLoopThreads) {
+ super(
+ host,
+ port,
+ maxRetryAttempts,
+ usePlaintext,
+ pageSize,
+ maxOrder,
+ smallCacheSize,
+ nettyEventLoopThreads);
blockingStub = ShuffleServerGrpc.newBlockingStub(channel);
rpcTimeout = rpcTimeoutMs;
}
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
index fbc4e363b..78732733b 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
@@ -64,6 +64,8 @@ import
org.apache.uniffle.common.netty.protocol.SendShuffleDataRequest;
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.util.RetryUtils;
+import static
org.apache.uniffle.common.config.RssClientConf.RSS_CLIENT_GRPC_EVENT_LOOP_THREADS;
+
public class ShuffleServerGrpcNettyClient extends ShuffleServerGrpcClient {
private static final Logger LOG =
LoggerFactory.getLogger(ShuffleServerGrpcNettyClient.class);
private int nettyPort;
@@ -107,7 +109,16 @@ public class ShuffleServerGrpcNettyClient extends
ShuffleServerGrpcClient {
int pageSize,
int maxOrder,
int smallCacheSize) {
- super(host, grpcPort, maxRetryAttempts, rpcTimeoutMs, true, pageSize,
maxOrder, smallCacheSize);
+ super(
+ host,
+ grpcPort,
+ maxRetryAttempts,
+ rpcTimeoutMs,
+ true,
+ pageSize,
+ maxOrder,
+ smallCacheSize,
+ rssConf.get(RSS_CLIENT_GRPC_EVENT_LOOP_THREADS));
this.nettyPort = nettyPort;
TransportContext transportContext = new TransportContext(new
TransportConf(rssConf));
this.clientFactory = new TransportClientFactory(transportContext);