This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new e6e070236 [#2354] feat(client): Explicitly setting grpc netty based 
event loop threads to avoid too much threads (#2355)
e6e070236 is described below

commit e6e07023627cbf27fc3e0283ebdd0a5b636a7ed4
Author: Junfan Zhang <[email protected]>
AuthorDate: Fri Jan 31 19:44:20 2025 +0800

    [#2354] feat(client): Explicitly setting grpc netty based event loop 
threads to avoid too much threads (#2355)
    
    ### What changes were proposed in this pull request?
    
    Introduing the config option to set the grpc netty based network impl event 
loop threads
    
    ### Why are the changes needed?
    
    Fix: #2354
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes.
    
    ### How was this patch tested?
    
    Tests in the spark jobs
    
    Co-authored-by: Junfan Zhang <[email protected]>
---
 .../apache/uniffle/common/config/RssClientConf.java  |  6 ++++++
 .../apache/uniffle/client/impl/grpc/GrpcClient.java  | 10 ++++++++--
 .../client/impl/grpc/ShuffleServerGrpcClient.java    | 20 ++++++++++++++++----
 .../impl/grpc/ShuffleServerGrpcNettyClient.java      | 13 ++++++++++++-
 4 files changed, 42 insertions(+), 7 deletions(-)

diff --git 
a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java 
b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
index 36d80b6e5..63b4666d1 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
@@ -317,4 +317,10 @@ public class RssClientConf {
           .asList()
           .noDefaultValue()
           .withDescription("the report include properties could be configured 
by this option");
+
+  public static final ConfigOption<Integer> RSS_CLIENT_GRPC_EVENT_LOOP_THREADS 
=
+      ConfigOptions.key("rss.client.grpc.nettyEventLoopThreads")
+          .intType()
+          .defaultValue(-1)
+          .withDescription("the event loop threads of netty impl for grpc");
 }
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/GrpcClient.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/GrpcClient.java
index 2f5b09c78..1d5d167f1 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/GrpcClient.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/GrpcClient.java
@@ -37,7 +37,7 @@ public abstract class GrpcClient {
   protected ManagedChannel channel;
 
   protected GrpcClient(String host, int port, int maxRetryAttempts, boolean 
usePlaintext) {
-    this(host, port, maxRetryAttempts, usePlaintext, 0, 0, 0);
+    this(host, port, maxRetryAttempts, usePlaintext, 0, 0, 0, -1);
   }
 
   protected GrpcClient(
@@ -47,12 +47,18 @@ public abstract class GrpcClient {
       boolean usePlaintext,
       int pageSize,
       int maxOrder,
-      int smallCacheSize) {
+      int smallCacheSize,
+      int nettyEventLoopThreads) {
     this.host = host;
     this.port = port;
     this.maxRetryAttempts = maxRetryAttempts;
     this.usePlaintext = usePlaintext;
 
+    if (nettyEventLoopThreads > 0) {
+      System.setProperty(
+          "io.grpc.netty.shaded.io.netty.eventLoopThreads", 
String.valueOf(nettyEventLoopThreads));
+    }
+
     NettyChannelBuilder channelBuilder =
         NettyChannelBuilder.forAddress(host, port)
             .withOption(
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
index 399c21547..cb477df34 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
@@ -118,6 +118,7 @@ import 
org.apache.uniffle.proto.RssProtos.ShuffleRegisterResponse;
 import org.apache.uniffle.proto.ShuffleServerGrpc;
 import org.apache.uniffle.proto.ShuffleServerGrpc.ShuffleServerBlockingStub;
 
+import static 
org.apache.uniffle.common.config.RssClientConf.RSS_CLIENT_GRPC_EVENT_LOOP_THREADS;
 import static org.apache.uniffle.proto.RssProtos.StatusCode.NO_BUFFER;
 
 public class ShuffleServerGrpcClient extends GrpcClient implements 
ShuffleServerClient {
@@ -155,7 +156,8 @@ public class ShuffleServerGrpcClient extends GrpcClient 
implements ShuffleServer
         true,
         0,
         0,
-        0);
+        0,
+        -1);
   }
 
   public ShuffleServerGrpcClient(RssConf rssConf, String host, int port) {
@@ -171,7 +173,8 @@ public class ShuffleServerGrpcClient extends GrpcClient 
implements ShuffleServer
         true,
         0,
         0,
-        0);
+        0,
+        rssConf == null ? -1 : 
rssConf.get(RSS_CLIENT_GRPC_EVENT_LOOP_THREADS));
   }
 
   public ShuffleServerGrpcClient(
@@ -182,8 +185,17 @@ public class ShuffleServerGrpcClient extends GrpcClient 
implements ShuffleServer
       boolean usePlaintext,
       int pageSize,
       int maxOrder,
-      int smallCacheSize) {
-    super(host, port, maxRetryAttempts, usePlaintext, pageSize, maxOrder, 
smallCacheSize);
+      int smallCacheSize,
+      int nettyEventLoopThreads) {
+    super(
+        host,
+        port,
+        maxRetryAttempts,
+        usePlaintext,
+        pageSize,
+        maxOrder,
+        smallCacheSize,
+        nettyEventLoopThreads);
     blockingStub = ShuffleServerGrpc.newBlockingStub(channel);
     rpcTimeout = rpcTimeoutMs;
   }
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
index fbc4e363b..78732733b 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
@@ -64,6 +64,8 @@ import 
org.apache.uniffle.common.netty.protocol.SendShuffleDataRequest;
 import org.apache.uniffle.common.rpc.StatusCode;
 import org.apache.uniffle.common.util.RetryUtils;
 
+import static 
org.apache.uniffle.common.config.RssClientConf.RSS_CLIENT_GRPC_EVENT_LOOP_THREADS;
+
 public class ShuffleServerGrpcNettyClient extends ShuffleServerGrpcClient {
   private static final Logger LOG = 
LoggerFactory.getLogger(ShuffleServerGrpcNettyClient.class);
   private int nettyPort;
@@ -107,7 +109,16 @@ public class ShuffleServerGrpcNettyClient extends 
ShuffleServerGrpcClient {
       int pageSize,
       int maxOrder,
       int smallCacheSize) {
-    super(host, grpcPort, maxRetryAttempts, rpcTimeoutMs, true, pageSize, 
maxOrder, smallCacheSize);
+    super(
+        host,
+        grpcPort,
+        maxRetryAttempts,
+        rpcTimeoutMs,
+        true,
+        pageSize,
+        maxOrder,
+        smallCacheSize,
+        rssConf.get(RSS_CLIENT_GRPC_EVENT_LOOP_THREADS));
     this.nettyPort = nettyPort;
     TransportContext transportContext = new TransportContext(new 
TransportConf(rssConf));
     this.clientFactory = new TransportClientFactory(transportContext);

Reply via email to