This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 89ff353ebe2 [opt](scanner) Control the degree of parallelism of
scanner when only limit involved #39927 (#42079)
89ff353ebe2 is described below
commit 89ff353ebe220c09f747cd44f7b13e762bc5e2e9
Author: zhiqiang <[email protected]>
AuthorDate: Mon Nov 11 23:13:17 2024 +0800
[opt](scanner) Control the degree of parallelism of scanner when only limit
involved #39927 (#42079)
cherry pick from #39927
---------
Co-authored-by: zhiqiang-hhhh <[email protected]>
---
be/src/pipeline/exec/scan_operator.cpp | 30 ++++++++++++++++++++++
.../java/org/apache/doris/planner/ScanNode.java | 16 ++++++++++--
.../java/org/apache/doris/qe/SessionVariable.java | 24 +++++++++++++++++
3 files changed, 68 insertions(+), 2 deletions(-)
diff --git a/be/src/pipeline/exec/scan_operator.cpp
b/be/src/pipeline/exec/scan_operator.cpp
index 32943c4d44e..a59098c0fee 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -45,6 +45,8 @@
namespace doris::pipeline {
+const static int32_t ADAPTIVE_PIPELINE_TASK_SERIAL_READ_ON_LIMIT_DEFAULT =
10000;
+
#define RETURN_IF_PUSH_DOWN(stmt, status) \
if (pdt == PushDownType::UNACCEPTABLE) { \
status = stmt; \
@@ -1186,6 +1188,34 @@ Status ScanOperatorX<LocalStateType>::init(const
TPlanNode& tnode, RuntimeState*
if (tnode.__isset.topn_filter_source_node_ids) {
topn_filter_source_node_ids = tnode.topn_filter_source_node_ids;
}
+
+ // The first branch is kept for compatibility with the old version of the
FE
+ if
(!query_options.__isset.enable_adaptive_pipeline_task_serial_read_on_limit) {
+ if (!tnode.__isset.conjuncts || tnode.conjuncts.empty()) {
+ // Which means the request could be fullfilled in a single segment
iterator request.
+ if (tnode.limit > 0 &&
+ tnode.limit <=
ADAPTIVE_PIPELINE_TASK_SERIAL_READ_ON_LIMIT_DEFAULT) {
+ _should_run_serial = true;
+ }
+ }
+ } else {
+
DCHECK(query_options.__isset.adaptive_pipeline_task_serial_read_on_limit);
+ // The set of enable_adaptive_pipeline_task_serial_read_on_limit
+ // is checked in previous branch.
+ if (query_options.enable_adaptive_pipeline_task_serial_read_on_limit) {
+ int32_t adaptive_pipeline_task_serial_read_on_limit =
+ ADAPTIVE_PIPELINE_TASK_SERIAL_READ_ON_LIMIT_DEFAULT;
+ if
(query_options.__isset.adaptive_pipeline_task_serial_read_on_limit) {
+ adaptive_pipeline_task_serial_read_on_limit =
+
query_options.adaptive_pipeline_task_serial_read_on_limit;
+ }
+
+ if (tnode.limit > 0 && tnode.limit <=
adaptive_pipeline_task_serial_read_on_limit) {
+ _should_run_serial = true;
+ }
+ }
+ }
+
return Status::OK();
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
index 9f28424ccc5..85c8de68b8c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
@@ -739,8 +739,20 @@ public abstract class ScanNode extends PlanNode implements
SplitGenerator {
}
public boolean shouldUseOneInstance(ConnectContext ctx) {
- long limitRowsForSingleInstance = ctx == null ? 10000 :
ctx.getSessionVariable().limitRowsForSingleInstance;
- return hasLimit() && getLimit() < limitRowsForSingleInstance &&
conjuncts.isEmpty();
+ int adaptivePipelineTaskSerialReadOnLimit = 10000;
+
+ if (ctx != null) {
+ if
(ctx.getSessionVariable().enableAdaptivePipelineTaskSerialReadOnLimit) {
+ adaptivePipelineTaskSerialReadOnLimit =
ctx.getSessionVariable().adaptivePipelineTaskSerialReadOnLimit;
+ } else {
+ return false;
+ }
+ } else {
+ // No connection context, typically for broker load.
+ }
+
+ // For UniqueKey table, we will use multiple instance.
+ return hasLimit() && getLimit() <=
adaptivePipelineTaskSerialReadOnLimit && conjuncts.isEmpty();
}
// In cloud mode, meta read lock is not enough to keep a snapshot of the
partition versions.
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index c745e8913f1..164ec1d1168 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -683,6 +683,11 @@ public class SessionVariable implements Serializable,
Writable {
*/
public static final String ENABLE_AUTO_CREATE_WHEN_OVERWRITE =
"enable_auto_create_when_overwrite";
+ public static final String
ENABLE_ADAPTIVE_PIPELINE_TASK_SERIAL_READ_ON_LIMIT =
+
"enable_adaptive_pipeline_task_serial_read_on_limit";
+ public static final String ADAPTIVE_PIPELINE_TASK_SERIAL_READ_ON_LIMIT =
+
"adaptive_pipeline_task_serial_read_on_limit";
+
/**
* If set false, user couldn't submit analyze SQL and FE won't allocate
any related resources.
*/
@@ -2273,6 +2278,22 @@ public class SessionVariable implements Serializable,
Writable {
@VariableMgr.VarAttr(name = ENABLE_COOLDOWN_REPLICA_AFFINITY, needForward
= true)
public boolean enableCooldownReplicaAffinity = true;
+ @VariableMgr.VarAttr(name =
ENABLE_ADAPTIVE_PIPELINE_TASK_SERIAL_READ_ON_LIMIT, needForward = true,
description = {
+ "开启后将会允许自动调整 pipeline task 的并发数。当 scan 节点没有过滤条件,且 limit 参数小于"
+ + "adaptive_pipeline_task_serial_read_on_limit 中指定的行数时,scan
的并行度将会被设置为 1",
+ "When enabled, the pipeline task concurrency will be adjusted
automatically. When the scan node has no filter "
+ + "conditions and the limit parameter is less than the number of
rows specified in "
+ + "adaptive_pipeline_task_serial_read_on_limit, the parallelism of
the scan will be set to 1."
+ })
+ public boolean enableAdaptivePipelineTaskSerialReadOnLimit = true;
+
+ @VariableMgr.VarAttr(name = ADAPTIVE_PIPELINE_TASK_SERIAL_READ_ON_LIMIT,
needForward = true, description = {
+ "当 enable_adaptive_pipeline_task_serial_read_on_limit 开启时,scan
的并行度将会被设置为 1 的行数阈值",
+ "When enable_adaptive_pipeline_task_serial_read_on_limit is
enabled, "
+ + "the number of rows at which the parallelism of the scan will be
set to 1."
+ })
+ public int adaptivePipelineTaskSerialReadOnLimit = 10000;
+
public void setEnableEsParallelScroll(boolean enableESParallelScroll) {
this.enableESParallelScroll = enableESParallelScroll;
}
@@ -3911,6 +3932,9 @@ public class SessionVariable implements Serializable,
Writable {
tResult.setKeepCarriageReturn(keepCarriageReturn);
tResult.setEnableAutoCreateWhenOverwrite(enableAutoCreateWhenOverwrite);
+
tResult.setEnableAdaptivePipelineTaskSerialReadOnLimit(enableAdaptivePipelineTaskSerialReadOnLimit);
+
tResult.setAdaptivePipelineTaskSerialReadOnLimit(adaptivePipelineTaskSerialReadOnLimit);
+
tResult.setOrcTinyStripeThresholdBytes(orcTinyStripeThresholdBytes);
tResult.setOrcMaxMergeDistanceBytes(orcMaxMergeDistanceBytes);
tResult.setOrcOnceMaxReadBytes(orcOnceMaxReadBytes);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]