This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 9f8951fe3 [MINOR] improvement(server,coordinator): Support config grpc
queue size (#2175)
9f8951fe3 is described below
commit 9f8951fe399f616c1c9e5ecb63a1728d70629218
Author: maobaolong <[email protected]>
AuthorDate: Fri Oct 18 14:24:52 2024 +0800
[MINOR] improvement(server,coordinator): Support config grpc queue size
(#2175)
### What changes were proposed in this pull request?
Support config grpc thread pool waiting queue size
### Why are the changes needed?
Without this, the server could be rush full of rpc call and cost all memory.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing UTs.
---
.../src/main/java/org/apache/uniffle/common/config/RssBaseConf.java | 5 +++++
common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java | 3 ++-
2 files changed, 7 insertions(+), 1 deletion(-)
diff --git
a/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
b/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
index cdaf59f19..b2cd4e9d0 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
@@ -181,6 +181,11 @@ public class RssBaseConf extends RssConf {
.intType()
.defaultValue(1000)
.withDescription("Thread number for grpc to process request");
+ public static final ConfigOption<Integer> RPC_EXECUTOR_QUEUE_SIZE =
+ ConfigOptions.key("rss.rpc.executor.queue.size")
+ .intType()
+ .defaultValue(Integer.MAX_VALUE)
+ .withDescription("Thread pool waiting queue size");
public static final ConfigOption<Boolean> RSS_JVM_METRICS_VERBOSE_ENABLE =
ConfigOptions.key("rss.jvm.metrics.verbose.enable")
diff --git a/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java
b/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java
index 2e51cce55..7a93b2b82 100644
--- a/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java
+++ b/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java
@@ -73,13 +73,14 @@ public class GrpcServer implements ServerInterface {
this.grpcMetrics = grpcMetrics;
int rpcExecutorSize = conf.getInteger(RssBaseConf.RPC_EXECUTOR_SIZE);
+ int queueSize = conf.getInteger(RssBaseConf.RPC_EXECUTOR_QUEUE_SIZE);
pool =
new GrpcThreadPoolExecutor(
rpcExecutorSize,
rpcExecutorSize * 2,
10,
TimeUnit.MINUTES,
- Queues.newLinkedBlockingQueue(Integer.MAX_VALUE),
+ Queues.newLinkedBlockingQueue(queueSize),
ThreadUtils.getThreadFactory("Grpc"),
grpcMetrics);
ThreadPoolManager.registerThreadPool(