This is an automated email from the ASF dual-hosted git repository. lihaopeng pushed a commit to branch tpc_preview6 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 2a6ca57964385931088cc5ab8d00ed3c45ac96e0 Author: happenlee <[email protected]> AuthorDate: Mon Feb 9 20:53:33 2026 +0800 change colocate execution parallel num --- .../job/UnassignedScanBucketOlapTableJob.java | 36 ++++++++++++++++++++++ .../java/org/apache/doris/qe/SessionVariable.java | 4 +++ 2 files changed, 40 insertions(+) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java index a97315d8d80..6a2315966bc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java @@ -505,4 +505,40 @@ public class UnassignedScanBucketOlapTableJob extends AbstractUnassignedScanJob } return workers; } + + @Override + protected int degreeOfParallelism(int maxParallel, boolean useLocalShuffleToAddParallel) { + Preconditions.checkArgument(maxParallel > 0, "maxParallel must be positive"); + if (!fragment.getDataPartition().isPartitioned()) { + return 1; + } + if (fragment.queryCacheParam != null) { + return maxParallel; + } + if (scanNodes.size() == 1 && scanNodes.get(0) instanceof OlapScanNode) { + OlapScanNode olapScanNode = (OlapScanNode) scanNodes.get(0); + ConnectContext connectContext = statementContext.getConnectContext(); + if (connectContext != null && olapScanNode.shouldUseOneInstance(connectContext)) { + return 1; + } + } + + long tabletNum = 0; + for (ScanNode scanNode : scanNodes) { + if (scanNode instanceof OlapScanNode) { + OlapScanNode olapScanNode = (OlapScanNode) scanNode; + tabletNum = olapScanNode.getTotalTabletsNum(); + break; + } + } + + ConnectContext connectContext = statementContext.getConnectContext(); + int colocateMaxParallelNum = 128; + if (connectContext != null) { + colocateMaxParallelNum = connectContext.getSessionVariable().colocateMaxParallelNum; + } + + int maxParallelism = (int) Math.max(tabletNum, fragment.getParallelExecNum()); + return Math.min(maxParallelism, colocateMaxParallelNum); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 02929b38868..de0480ec0b5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -168,6 +168,7 @@ public class SessionVariable implements Serializable, Writable { "enable_distinct_streaming_agg_force_passthrough"; public static final String ENABLE_BROADCAST_JOIN_FORCE_PASSTHROUGH = "enable_broadcast_join_force_passthrough"; public static final String DISABLE_COLOCATE_PLAN = "disable_colocate_plan"; + public static final String COLOCATE_MAX_PARALLEL_NUM = "colocate_max_parallel_num"; public static final String ENABLE_BUCKET_SHUFFLE_JOIN = "enable_bucket_shuffle_join"; public static final String PARALLEL_FRAGMENT_EXEC_INSTANCE_NUM = "parallel_fragment_exec_instance_num"; public static final String PARALLEL_PIPELINE_TASK_NUM = "parallel_pipeline_task_num"; @@ -1291,6 +1292,9 @@ public class SessionVariable implements Serializable, Writable { setter = "setFragmentInstanceNum", varType = VariableAnnotation.DEPRECATED) public int parallelExecInstanceNum = 8; + @VariableMgr.VarAttr(name = COLOCATE_MAX_PARALLEL_NUM, needForward = true, fuzzy = false) + public int colocateMaxParallelNum = 128; + @VariableMgr.VarAttr(name = PARALLEL_PIPELINE_TASK_NUM, fuzzy = true, needForward = true, setter = "setPipelineTaskNum") public int parallelPipelineTaskNum = 0; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
