This is an automated email from the ASF dual-hosted git repository.
lide pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-1.2-lts by this push:
new caf61f8a21d [fix](http) throw RejectedExecutionException to prevent
http hanging by Future (#29651)
caf61f8a21d is described below
commit caf61f8a21d9c32a383d28406c8a377437a59475
Author: xueweizhang <[email protected]>
AuthorDate: Mon Jan 8 11:17:49 2024 +0800
[fix](http) throw RejectedExecutionException to prevent http hanging by
Future (#29651)
---
docs/en/docs/admin-manual/config/fe-config.md | 12 ++++++++++
docs/zh-CN/docs/admin-manual/config/fe-config.md | 12 ++++++++++
.../main/java/org/apache/doris/common/Config.java | 6 +++++
.../org/apache/doris/common/ThreadPoolManager.java | 28 ++++++++++++++++++++--
.../apache/doris/httpv2/util/LoadSubmitter.java | 4 +++-
.../doris/httpv2/util/StatementSubmitter.java | 3 ++-
6 files changed, 61 insertions(+), 4 deletions(-)
diff --git a/docs/en/docs/admin-manual/config/fe-config.md
b/docs/en/docs/admin-manual/config/fe-config.md
index 91daa268db1..482ab26363e 100644
--- a/docs/en/docs/admin-manual/config/fe-config.md
+++ b/docs/en/docs/admin-manual/config/fe-config.md
@@ -705,6 +705,18 @@ trace export to zipkin like:
`http://127.0.0.1:9411/api/v2/spans`
trace export to collector like: `http://127.0.0.1:4318/v1/traces`
+#### `http_sql_submitter_max_worker_threads`
+
+Default:2
+
+The max number work threads of http sql submitter
+
+#### `http_load_submitter_max_worker_threads`
+
+Default:2
+
+The max number work threads of http upload submitter
+
### Query Engine
#### `default_max_query_instances`
diff --git a/docs/zh-CN/docs/admin-manual/config/fe-config.md
b/docs/zh-CN/docs/admin-manual/config/fe-config.md
index c0f04e0231a..23a15336506 100644
--- a/docs/zh-CN/docs/admin-manual/config/fe-config.md
+++ b/docs/zh-CN/docs/admin-manual/config/fe-config.md
@@ -705,6 +705,18 @@ trace导出到 zipkin: `http://127.0.0.1:9411/api/v2/spans`
trace导出到 collector: `http://127.0.0.1:4318/v1/traces`
+#### `http_sql_submitter_max_worker_threads`
+
+默认值:2
+
+http请求处理/api/query中sql任务的最大线程池
+
+#### `http_load_submitter_max_worker_threads`
+
+默认值:2
+
+http请求处理/api/upload任务的最大线程池
+
### 查询引擎
#### `default_max_query_instances`
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 0bf8da05c27..8bfe39ea990 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -2051,5 +2051,11 @@ public class Config extends ConfigBase {
@ConfField(mutable = true, masterOnly = true)
public static int restore_download_task_num_per_be = 3;
+
+ @ConfField(mutable = false, masterOnly = false)
+ public static int http_sql_submitter_max_worker_threads = 2;
+
+ @ConfField(mutable = false, masterOnly = false)
+ public static int http_load_submitter_max_worker_threads = 2;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java
b/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java
index 8d1b1e7da7b..0f242d422f2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java
@@ -113,6 +113,13 @@ public class ThreadPoolManager {
new LogDiscardPolicy(poolName), poolName, needRegisterMetric);
}
+ public static ThreadPoolExecutor
newDaemonCacheThreadPoolThrowException(int maxNumThread,
+ String poolName,
boolean needRegisterMetric) {
+ return newDaemonThreadPool(0, maxNumThread, KEEP_ALIVE_TIME,
+ TimeUnit.SECONDS, new SynchronousQueue(),
+ new LogDiscardPolicyThrowException(poolName), poolName,
needRegisterMetric);
+ }
+
public static ThreadPoolExecutor newDaemonFixedThreadPool(int numThread,
int queueSize, String poolName, boolean needRegisterMetric) {
return newDaemonThreadPool(numThread, numThread, KEEP_ALIVE_TIME,
TimeUnit.SECONDS,
@@ -172,8 +179,8 @@ public class ThreadPoolManager {
private static final Logger LOG =
LogManager.getLogger(LogDiscardPolicy.class);
- private String threadPoolName;
- private AtomicLong rejectedNum;
+ public String threadPoolName;
+ public AtomicLong rejectedNum;
public LogDiscardPolicy(String threadPoolName) {
this.threadPoolName = threadPoolName;
@@ -187,6 +194,23 @@ public class ThreadPoolManager {
}
}
+ static class LogDiscardPolicyThrowException extends LogDiscardPolicy {
+
+ private static final Logger LOG =
LogManager.getLogger(LogDiscardPolicyThrowException.class);
+
+ public LogDiscardPolicyThrowException(String threadPoolName) {
+ super(threadPoolName);
+ }
+
+ @Override
+ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor)
{
+ LOG.warn("Task " + r.toString() + " rejected from " +
threadPoolName + " " + executor.toString());
+ this.rejectedNum.incrementAndGet();
+ throw new RejectedExecutionException("Task " + r.toString() + "
rejected from "
+ + threadPoolName + " " +
executor.toString());
+ }
+ }
+
/**
* A handler for rejected task that try to be blocked until the pool
enqueue task succeed or timeout,
* used for fixed thread pool
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/LoadSubmitter.java
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/LoadSubmitter.java
index e971c3bc077..63706af892d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/LoadSubmitter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/LoadSubmitter.java
@@ -19,6 +19,7 @@ package org.apache.doris.httpv2.util;
import org.apache.doris.catalog.Env;
import org.apache.doris.cluster.ClusterNamespace;
+import org.apache.doris.common.Config;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.httpv2.rest.UploadAction;
@@ -52,7 +53,8 @@ import java.util.concurrent.ThreadPoolExecutor;
public class LoadSubmitter {
private static final Logger LOG =
LogManager.getLogger(LoadSubmitter.class);
- private ThreadPoolExecutor executor =
ThreadPoolManager.newDaemonCacheThreadPool(2, "load-submitter", true);
+ private ThreadPoolExecutor executor =
ThreadPoolManager.newDaemonCacheThreadPoolThrowException(
+ Config.http_load_submitter_max_worker_threads,
"load-submitter", true);
private static final String STREAM_LOAD_URL_PATTERN =
"http://%s:%d/api/%s/%s/_stream_load";
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/StatementSubmitter.java
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/StatementSubmitter.java
index 5817f1d252e..bfc89324f2f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/StatementSubmitter.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/StatementSubmitter.java
@@ -73,7 +73,8 @@ public class StatementSubmitter {
private static final String JDBC_DRIVER = "org.mariadb.jdbc.Driver";
private static final String DB_URL_PATTERN =
"jdbc:mariadb://127.0.0.1:%d/%s";
- private ThreadPoolExecutor executor =
ThreadPoolManager.newDaemonCacheThreadPool(2, "SQL submitter", true);
+ private ThreadPoolExecutor executor =
ThreadPoolManager.newDaemonCacheThreadPoolThrowException(
+ Config.http_sql_submitter_max_worker_threads, "SQL
submitter", true);
public Future<ExecutionResultSet> submit(StmtContext queryCtx) {
Worker worker = new Worker(ConnectContext.get(), queryCtx);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]