This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 802f0a9b1ab [Fix](Cloud)decouple min pipeline executor size from
ConnectContext (#60958)
802f0a9b1ab is described below
commit 802f0a9b1abb379548938326bc4332a26cd2dbd0
Author: Calvin Kirs <[email protected]>
AuthorDate: Mon Mar 9 12:30:13 2026 +0800
[Fix](Cloud)decouple min pipeline executor size from ConnectContext (#60958)
#60648
getMinPipelineExecutorSize in cloud paths implicitly depended on
ConnectContext, which caused two issues:
Unstable behavior when no thread-local context is available (e.g.
internal/async paths). Unclear API semantics since callers could not
explicitly specify the target cluster. This PR makes the API explicit by
requiring clusterName.
What Changed
Removed the no-arg getMinPipelineExecutorSize() API and kept only:
getMinPipelineExecutorSize(String clusterName). Unified
SystemInfoService and CloudSystemInfoService implementations to the
string-arg API. Updated SessionVariable#getParallelExecInstanceNum() to
call the string-arg API, with cluster resolved from session/auth
information (instead of directly depending on ConnectContext). Added
synchronization in ConnectContext:
---
.../doris/cloud/system/CloudSystemInfoService.java | 12 +----
.../apache/doris/datasource/ExternalScanNode.java | 5 +-
.../apache/doris/datasource/FileQueryScanNode.java | 7 +--
.../org/apache/doris/datasource/FileScanNode.java | 6 ++-
.../doris/source/RemoteDorisScanNode.java | 5 +-
.../doris/datasource/es/source/EsScanNode.java | 5 +-
.../doris/datasource/hive/source/HiveScanNode.java | 12 ++---
.../doris/datasource/hudi/source/HudiScanNode.java | 5 +-
.../iceberg/rewrite/RewriteGroupTask.java | 3 +-
.../datasource/iceberg/source/IcebergScanNode.java | 12 +++--
.../doris/datasource/jdbc/source/JdbcScanNode.java | 16 ++++--
.../lakesoul/source/LakeSoulScanNode.java | 7 +--
.../maxcompute/source/MaxComputeScanNode.java | 10 ++--
.../doris/datasource/odbc/source/OdbcScanNode.java | 11 ++--
.../datasource/paimon/source/PaimonScanNode.java | 9 ++--
.../source/TrinoConnectorScanNode.java | 5 +-
.../datasource/tvf/source/MetadataScanNode.java | 6 ++-
.../doris/datasource/tvf/source/TVFScanNode.java | 8 +--
.../org/apache/doris/nereids/cost/CostModel.java | 3 +-
.../glue/translator/PhysicalPlanTranslator.java | 40 +++++++++------
.../glue/translator/PlanTranslatorContext.java | 21 ++++++++
.../nereids/load/NereidsLoadingTaskPlanner.java | 6 ++-
.../nereids/load/NereidsStreamLoadPlanner.java | 6 ++-
.../properties/ChildrenPropertiesRegulator.java | 7 +--
.../rewrite/DecomposeRepeatWithPreAggregation.java | 4 +-
.../doris/nereids/rules/rewrite/SaltJoin.java | 4 +-
.../worker/job/UnassignedShuffleJob.java | 5 +-
.../planner/BackendPartitionedSchemaScanNode.java | 5 +-
.../java/org/apache/doris/planner/CTEScanNode.java | 4 +-
.../org/apache/doris/planner/DataGenScanNode.java | 10 ++--
.../org/apache/doris/planner/FileLoadScanNode.java | 4 +-
.../apache/doris/planner/GroupCommitScanNode.java | 4 +-
.../org/apache/doris/planner/OlapScanNode.java | 9 ++--
.../org/apache/doris/planner/PlanFragment.java | 6 ++-
.../java/org/apache/doris/planner/ScanContext.java | 58 ++++++++++++++++++++++
.../java/org/apache/doris/planner/ScanNode.java | 22 ++++++--
.../org/apache/doris/planner/SchemaScanNode.java | 5 +-
.../java/org/apache/doris/qe/SessionVariable.java | 37 ++++++++++++--
.../java/org/apache/doris/qe/StmtExecutor.java | 4 +-
.../org/apache/doris/system/SystemInfoService.java | 4 +-
.../tablefunction/DataGenTableValuedFunction.java | 4 +-
.../ExternalFileTableValuedFunction.java | 5 +-
.../GroupCommitTableValuedFunction.java | 4 +-
.../tablefunction/JdbcQueryTableValueFunction.java | 4 +-
.../tablefunction/MetadataTableValuedFunction.java | 4 +-
.../cloud/system/CloudSystemInfoServiceTest.java | 30 +++++------
.../doris/datasource/FileQueryScanNodeTest.java | 3 +-
.../datasource/hive/source/HiveScanNodeTest.java | 5 +-
.../iceberg/source/IcebergScanNodeTest.java | 3 +-
.../maxcompute/source/MaxComputeScanNodeTest.java | 3 +-
.../paimon/source/PaimonScanNodeTest.java | 8 +--
.../tvf/source/MetadataScanNodeTest.java | 7 +--
.../datasource/tvf/source/TVFScanNodeTest.java | 3 +-
.../worker/job/UnassignedShuffleJobTest.java | 15 +++---
.../org/apache/doris/qe/HmsQueryCacheTest.java | 3 +-
.../org/apache/doris/qe/OlapQueryCacheTest.java | 5 +-
.../apache/doris/system/SystemInfoServiceTest.java | 12 ++---
57 files changed, 366 insertions(+), 164 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
index 4f01a4546f5..34865f06e61 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
@@ -760,16 +760,8 @@ public class CloudSystemInfoService extends
SystemInfoService {
}
@Override
- public int getMinPipelineExecutorSize() {
- String clusterName = "";
- try {
- clusterName = ConnectContext.get().getCloudCluster(false);
- } catch (ComputeGroupException e) {
- LOG.warn("failed to get cluster name", e);
- return 1;
- }
- if (ConnectContext.get() != null
- && Strings.isNullOrEmpty(clusterName)) {
+ public int getMinPipelineExecutorSize(String clusterName) {
+ if (Strings.isNullOrEmpty(clusterName)) {
return 1;
}
List<Backend> currentBackends = getBackendsByClusterName(clusterName);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalScanNode.java
index 141a3753a22..04e1829fbe3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalScanNode.java
@@ -20,6 +20,7 @@ package org.apache.doris.datasource;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.common.UserException;
import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.thrift.TPlanNode;
@@ -50,8 +51,8 @@ public abstract class ExternalScanNode extends ScanNode {
: new FederationBackendPolicy();
public ExternalScanNode(PlanNodeId id, TupleDescriptor desc, String
planNodeName,
- boolean needCheckColumnPriv) {
- super(id, desc, planNodeName);
+ ScanContext scanContext, boolean needCheckColumnPriv) {
+ super(id, desc, planNodeName, scanContext);
this.needCheckColumnPriv = needCheckColumnPriv;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
index 26b78ad52ee..f1a37fbeff0 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
@@ -35,6 +35,7 @@ import org.apache.doris.common.util.BrokerUtil;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.hive.source.HiveSplit;
import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.qe.StmtExecutor;
@@ -104,8 +105,8 @@ public abstract class FileQueryScanNode extends
FileScanNode {
* These scan nodes do not have corresponding catalog/database/table info,
so no need to do priv check
*/
public FileQueryScanNode(PlanNodeId id, TupleDescriptor desc, String
planNodeName,
- boolean needCheckColumnPriv, SessionVariable sv) {
- super(id, desc, planNodeName, needCheckColumnPriv);
+ ScanContext scanContext, boolean needCheckColumnPriv,
SessionVariable sv) {
+ super(id, desc, planNodeName, scanContext, needCheckColumnPriv);
this.sessionVariable = sv;
}
@@ -537,7 +538,7 @@ public abstract class FileQueryScanNode extends
FileScanNode {
@Override
public int getNumInstances() {
if (sessionVariable.isIgnoreStorageDataDistribution()) {
- return sessionVariable.getParallelExecInstanceNum();
+ return
sessionVariable.getParallelExecInstanceNum(scanContext.getClusterName());
}
return scanRangeLocations.size();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
index 1c1d6ac6720..4b80078b1d0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
@@ -35,6 +35,7 @@ import org.apache.doris.nereids.types.DataType;
import org.apache.doris.nereids.types.VarcharType;
import org.apache.doris.nereids.util.TypeCoercionUtils;
import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
import org.apache.doris.thrift.TExplainLevel;
import org.apache.doris.thrift.TExpr;
import org.apache.doris.thrift.TFileRangeDesc;
@@ -70,8 +71,9 @@ public abstract class FileScanNode extends ExternalScanNode {
// For display pushdown agg result
protected long tableLevelRowCount = -1;
- public FileScanNode(PlanNodeId id, TupleDescriptor desc, String
planNodeName, boolean needCheckColumnPriv) {
- super(id, desc, planNodeName, needCheckColumnPriv);
+ public FileScanNode(PlanNodeId id, TupleDescriptor desc, String
planNodeName,
+ ScanContext scanContext, boolean needCheckColumnPriv) {
+ super(id, desc, planNodeName, scanContext, needCheckColumnPriv);
this.needCheckColumnPriv = needCheckColumnPriv;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/doris/source/RemoteDorisScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/doris/source/RemoteDorisScanNode.java
index 056961eb2b7..644c00028e3 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/doris/source/RemoteDorisScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/doris/source/RemoteDorisScanNode.java
@@ -34,6 +34,7 @@ import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.FileQueryScanNode;
import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.spi.Split;
@@ -79,8 +80,8 @@ public class RemoteDorisScanNode extends FileQueryScanNode {
private RemoteDorisSource source;
public RemoteDorisScanNode(PlanNodeId id, TupleDescriptor desc, boolean
needCheckColumnPriv,
- SessionVariable sv) {
- super(id, desc, "REMOTE_DORIS_SCAN_NODE", needCheckColumnPriv, sv);
+ SessionVariable sv, ScanContext scanContext) {
+ super(id, desc, "REMOTE_DORIS_SCAN_NODE", scanContext,
needCheckColumnPriv, sv);
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/es/source/EsScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/es/source/EsScanNode.java
index 44201b4ecf7..8d18854f62a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/es/source/EsScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/es/source/EsScanNode.java
@@ -40,6 +40,7 @@ import
org.apache.doris.datasource.es.QueryBuilders.QueryBuilder;
import org.apache.doris.planner.PartitionPruner;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.planner.RangePartitionPrunerV2;
+import org.apache.doris.planner.ScanContext;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TEsScanNode;
@@ -82,8 +83,8 @@ public class EsScanNode extends ExternalScanNode {
/**
* For multicatalog es.
**/
- public EsScanNode(PlanNodeId id, TupleDescriptor desc, boolean
esExternalTable) {
- super(id, desc, "EsScanNode", false);
+ public EsScanNode(PlanNodeId id, TupleDescriptor desc, boolean
esExternalTable, ScanContext scanContext) {
+ super(id, desc, "EsScanNode", scanContext, false);
if (esExternalTable) {
EsExternalTable externalTable = (EsExternalTable)
(desc.getTable());
table = externalTable.getEsTable();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
index 2e8f3735f6b..27f4df2f593 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
@@ -48,6 +48,7 @@ import org.apache.doris.datasource.mvcc.MvccUtil;
import org.apache.doris.fs.DirectoryLister;
import
org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions;
import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.spi.Split;
@@ -111,14 +112,14 @@ public class HiveScanNode extends FileQueryScanNode {
* These scan nodes do not have corresponding catalog/database/table info,
so no need to do priv check
*/
public HiveScanNode(PlanNodeId id, TupleDescriptor desc, boolean
needCheckColumnPriv, SessionVariable sv,
- DirectoryLister directoryLister) {
- this(id, desc, "HIVE_SCAN_NODE", needCheckColumnPriv, sv,
directoryLister);
+ DirectoryLister directoryLister, ScanContext scanContext) {
+ this(id, desc, "HIVE_SCAN_NODE", needCheckColumnPriv, sv,
directoryLister, scanContext);
}
public HiveScanNode(PlanNodeId id, TupleDescriptor desc, String
planNodeName,
boolean needCheckColumnPriv, SessionVariable sv,
- DirectoryLister directoryLister) {
- super(id, desc, planNodeName, needCheckColumnPriv, sv);
+ DirectoryLister directoryLister, ScanContext scanContext) {
+ super(id, desc, planNodeName, scanContext, needCheckColumnPriv, sv);
hmsTable = (HMSExternalTable) desc.getTable();
brokerName = hmsTable.getCatalog().bindBrokerName();
this.directoryLister = directoryLister;
@@ -320,7 +321,7 @@ public class HiveScanNode extends FileQueryScanNode {
totalFileNum += fileCacheValue.getFiles().size();
}
}
- int parallelNum = sessionVariable.getParallelExecInstanceNum();
+ int parallelNum =
sessionVariable.getParallelExecInstanceNum(scanContext.getClusterName());
needSplit = FileSplitter.needSplitForCountPushdown(parallelNum,
numBackends, totalFileNum);
}
@@ -637,4 +638,3 @@ public class HiveScanNode extends FileQueryScanNode {
return compressType;
}
}
-
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
index 36240722335..8653d0c6378 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
@@ -40,6 +40,7 @@ import org.apache.doris.datasource.hudi.HudiUtils;
import org.apache.doris.datasource.mvcc.MvccUtil;
import org.apache.doris.fs.DirectoryLister;
import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.spi.Split;
import org.apache.doris.thrift.TExplainLevel;
@@ -130,8 +131,8 @@ public class HudiScanNode extends HiveScanNode {
*/
public HudiScanNode(PlanNodeId id, TupleDescriptor desc, boolean
needCheckColumnPriv,
Optional<TableScanParams> scanParams,
Optional<IncrementalRelation> incrementalRelation,
- SessionVariable sv, DirectoryLister directoryLister) {
- super(id, desc, "HUDI_SCAN_NODE", needCheckColumnPriv, sv,
directoryLister);
+ SessionVariable sv, DirectoryLister directoryLister, ScanContext
scanContext) {
+ super(id, desc, "HUDI_SCAN_NODE", needCheckColumnPriv, sv,
directoryLister, scanContext);
isCowTable = hmsTable.isHoodieCowTable();
if (LOG.isDebugEnabled()) {
if (isCowTable) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/rewrite/RewriteGroupTask.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/rewrite/RewriteGroupTask.java
index cd623528c76..13b7ee3ff68 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/rewrite/RewriteGroupTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/rewrite/RewriteGroupTask.java
@@ -288,7 +288,8 @@ public class RewriteGroupTask implements
TransientTaskExecutor {
"availableBeCount must be greater than 0 for rewrite task");
// 3. Get default parallelism from session variable (pipeline task num)
- int defaultParallelism =
connectContext.getSessionVariable().getParallelExecInstanceNum();
+ String clusterName =
connectContext.getSessionVariable().resolveCloudClusterName(connectContext);
+ int defaultParallelism =
connectContext.getSessionVariable().getParallelExecInstanceNum(clusterName);
// 4. Determine strategy based on expected file count
boolean useGather = false;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
index f0b9766c7b7..5831d3d00d0 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
@@ -47,6 +47,7 @@ import
org.apache.doris.datasource.iceberg.source.IcebergDeleteFileFilter.Equali
import org.apache.doris.datasource.property.storage.StorageProperties;
import org.apache.doris.nereids.exceptions.NotSupportedException;
import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.spi.Split;
@@ -151,8 +152,8 @@ public class IcebergScanNode extends FileQueryScanNode {
// for test
@VisibleForTesting
- public IcebergScanNode(PlanNodeId id, TupleDescriptor desc,
SessionVariable sv) {
- super(id, desc, "ICEBERG_SCAN_NODE", false, sv);
+ public IcebergScanNode(PlanNodeId id, TupleDescriptor desc,
SessionVariable sv, ScanContext scanContext) {
+ super(id, desc, "ICEBERG_SCAN_NODE", scanContext, false, sv);
}
/**
@@ -161,8 +162,9 @@ public class IcebergScanNode extends FileQueryScanNode {
* eg: s3 tvf
* These scan nodes do not have corresponding catalog/database/table info,
so no need to do priv check
*/
- public IcebergScanNode(PlanNodeId id, TupleDescriptor desc, boolean
needCheckColumnPriv, SessionVariable sv) {
- super(id, desc, "ICEBERG_SCAN_NODE", needCheckColumnPriv, sv);
+ public IcebergScanNode(PlanNodeId id, TupleDescriptor desc, boolean
needCheckColumnPriv, SessionVariable sv,
+ ScanContext scanContext) {
+ super(id, desc, "ICEBERG_SCAN_NODE", scanContext, needCheckColumnPriv,
sv);
ExternalTable table = (ExternalTable) desc.getTable();
if (table instanceof HMSExternalTable) {
@@ -752,7 +754,7 @@ public class IcebergScanNode extends FileQueryScanNode {
try (CloseableIterable<FileScanTask> fileScanTasks =
planFileScanTask(scan)) {
if (tableLevelPushDownCount) {
int needSplitCnt = countFromSnapshot <
COUNT_WITH_PARALLEL_SPLITS
- ? 1 : sessionVariable.getParallelExecInstanceNum() *
numBackends;
+ ? 1 :
sessionVariable.getParallelExecInstanceNum(scanContext.getClusterName()) *
numBackends;
for (FileScanTask next : fileScanTasks) {
splits.add(createIcebergSplit(next));
if (splits.size() >= needSplitCnt) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/source/JdbcScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/source/JdbcScanNode.java
index be6a415f61d..2322fac5d30 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/source/JdbcScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/source/JdbcScanNode.java
@@ -43,6 +43,7 @@ import org.apache.doris.datasource.ExternalFunctionRules;
import org.apache.doris.datasource.ExternalScanNode;
import org.apache.doris.datasource.jdbc.JdbcExternalTable;
import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.thrift.TExplainLevel;
import org.apache.doris.thrift.TJdbcScanNode;
@@ -72,8 +73,8 @@ public class JdbcScanNode extends ExternalScanNode {
private JdbcTable tbl;
private long catalogId;
- public JdbcScanNode(PlanNodeId id, TupleDescriptor desc, boolean
isJdbcExternalTable) {
- super(id, desc, "JdbcScanNode", false);
+ public JdbcScanNode(PlanNodeId id, TupleDescriptor desc, boolean
isJdbcExternalTable, ScanContext scanContext) {
+ super(id, desc, "JdbcScanNode", scanContext, false);
if (isJdbcExternalTable) {
JdbcExternalTable jdbcExternalTable = (JdbcExternalTable)
(desc.getTable());
tbl = jdbcExternalTable.getJdbcTable();
@@ -84,8 +85,9 @@ public class JdbcScanNode extends ExternalScanNode {
tableName = tbl.getProperRemoteFullTableName(jdbcType);
}
- public JdbcScanNode(PlanNodeId id, TupleDescriptor desc, boolean
isTableValuedFunction, String query) {
- super(id, desc, "JdbcScanNode", false);
+ public JdbcScanNode(PlanNodeId id, TupleDescriptor desc, boolean
isTableValuedFunction, String query,
+ ScanContext scanContext) {
+ super(id, desc, "JdbcScanNode", scanContext, false);
this.isTableValuedFunction = isTableValuedFunction;
this.query = query;
tbl = (JdbcTable) desc.getTable();
@@ -271,7 +273,11 @@ public class JdbcScanNode extends ExternalScanNode {
@Override
public int getNumInstances() {
- return
ConnectContext.get().getSessionVariable().getParallelExecInstanceNum();
+ ConnectContext context = ConnectContext.get();
+ if (context == null) {
+ return 1;
+ }
+ return
context.getSessionVariable().getParallelExecInstanceNum(scanContext.getClusterName());
}
private static boolean shouldPushDownConjunct(TOdbcTableType tableType,
Expr expr) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/source/LakeSoulScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/source/LakeSoulScanNode.java
index bb4b232ae06..2662dc55ff7 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/source/LakeSoulScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/source/LakeSoulScanNode.java
@@ -26,6 +26,7 @@ import org.apache.doris.datasource.TableFormatType;
import org.apache.doris.datasource.lakesoul.LakeSoulExternalTable;
import org.apache.doris.datasource.lakesoul.LakeSoulUtils;
import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.spi.Split;
import org.apache.doris.thrift.TFileFormatType;
@@ -82,8 +83,9 @@ public class LakeSoulScanNode extends FileQueryScanNode {
String readType;
- public LakeSoulScanNode(PlanNodeId id, TupleDescriptor desc, boolean
needCheckColumnPriv, SessionVariable sv) {
- super(id, desc, "planNodeName", needCheckColumnPriv, sv);
+ public LakeSoulScanNode(PlanNodeId id, TupleDescriptor desc, boolean
needCheckColumnPriv, SessionVariable sv,
+ ScanContext scanContext) {
+ super(id, desc, "planNodeName", scanContext, needCheckColumnPriv, sv);
}
@Override
@@ -284,4 +286,3 @@ public class LakeSoulScanNode extends FileQueryScanNode {
return splits;
}
}
-
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java
index 898022b2dcf..1925ca7a0ac 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java
@@ -43,6 +43,7 @@ import
org.apache.doris.datasource.maxcompute.source.MaxComputeSplit.SplitType;
import
org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions;
import org.apache.doris.nereids.util.DateUtils;
import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.spi.Split;
import org.apache.doris.thrift.TFileFormatType;
@@ -113,13 +114,14 @@ public class MaxComputeScanNode extends FileQueryScanNode
{
// For new planner
public MaxComputeScanNode(PlanNodeId id, TupleDescriptor desc,
SelectedPartitions selectedPartitions, boolean needCheckColumnPriv,
- SessionVariable sv) {
- this(id, desc, "MCScanNode", selectedPartitions, needCheckColumnPriv,
sv);
+ SessionVariable sv, ScanContext scanContext) {
+ this(id, desc, "MCScanNode", selectedPartitions, needCheckColumnPriv,
sv, scanContext);
}
private MaxComputeScanNode(PlanNodeId id, TupleDescriptor desc, String
planNodeName,
- SelectedPartitions selectedPartitions, boolean
needCheckColumnPriv, SessionVariable sv) {
- super(id, desc, planNodeName, needCheckColumnPriv, sv);
+ SelectedPartitions selectedPartitions, boolean
needCheckColumnPriv, SessionVariable sv,
+ ScanContext scanContext) {
+ super(id, desc, planNodeName, scanContext, needCheckColumnPriv, sv);
table = (MaxComputeExternalTable) desc.getTable();
this.selectedPartitions = selectedPartitions;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/odbc/source/OdbcScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/odbc/source/OdbcScanNode.java
index abe1ff6f072..219d4d097c7 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/odbc/source/OdbcScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/odbc/source/OdbcScanNode.java
@@ -32,6 +32,7 @@ import org.apache.doris.common.UserException;
import org.apache.doris.datasource.ExternalScanNode;
import org.apache.doris.datasource.jdbc.source.JdbcScanNode;
import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.thrift.TExplainLevel;
import org.apache.doris.thrift.TOdbcScanNode;
@@ -66,8 +67,8 @@ public class OdbcScanNode extends ExternalScanNode {
/**
* Constructs node to scan given data files of table 'tbl'.
*/
- public OdbcScanNode(PlanNodeId id, TupleDescriptor desc, OdbcTable tbl) {
- super(id, desc, "SCAN ODBC", false);
+ public OdbcScanNode(PlanNodeId id, TupleDescriptor desc, OdbcTable tbl,
ScanContext scanContext) {
+ super(id, desc, "SCAN ODBC", scanContext, false);
connectString = tbl.getConnectString();
odbcType = tbl.getOdbcTableType();
tblName = JdbcTable.databaseProperName(odbcType,
tbl.getOdbcTableName());
@@ -210,7 +211,11 @@ public class OdbcScanNode extends ExternalScanNode {
@Override
public int getNumInstances() {
- return
ConnectContext.get().getSessionVariable().getParallelExecInstanceNum();
+ ConnectContext context = ConnectContext.get();
+ if (context == null) {
+ return 1;
+ }
+ return
context.getSessionVariable().getParallelExecInstanceNum(scanContext.getClusterName());
}
public static boolean shouldPushDownConjunct(TOdbcTableType tableType,
Expr expr) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
index 6e274dd98a2..9cf75eae4ac 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
@@ -38,6 +38,7 @@ import
org.apache.doris.datasource.paimon.profile.PaimonMetricRegistry;
import org.apache.doris.datasource.paimon.profile.PaimonScanMetricsReporter;
import org.apache.doris.datasource.property.storage.StorageProperties;
import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.spi.Split;
import org.apache.doris.thrift.TExplainLevel;
@@ -150,8 +151,9 @@ public class PaimonScanNode extends FileQueryScanNode {
public PaimonScanNode(PlanNodeId id,
TupleDescriptor desc,
boolean needCheckColumnPriv,
- SessionVariable sv) {
- super(id, desc, "PAIMON_SCAN_NODE", needCheckColumnPriv, sv);
+ SessionVariable sv,
+ ScanContext scanContext) {
+ super(id, desc, "PAIMON_SCAN_NODE", scanContext, needCheckColumnPriv,
sv);
}
@Override
@@ -442,7 +444,8 @@ public class PaimonScanNode extends FileQueryScanNode {
// if applyCountPushdown is true, calcute row count for count pushdown
if (applyCountPushdown && !pushDownCountSplits.isEmpty()) {
if (pushDownCountSum > COUNT_WITH_PARALLEL_SPLITS) {
- int minSplits = sessionVariable.getParallelExecInstanceNum() *
numBackends;
+ int minSplits =
sessionVariable.getParallelExecInstanceNum(scanContext.getClusterName())
+ * numBackends;
pushDownCountSplits = pushDownCountSplits.subList(0,
Math.min(pushDownCountSplits.size(), minSplits));
} else {
pushDownCountSplits =
Collections.singletonList(pushDownCountSplits.get(0));
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/source/TrinoConnectorScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/source/TrinoConnectorScanNode.java
index 922be806d6d..279a71ded44 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/source/TrinoConnectorScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/source/TrinoConnectorScanNode.java
@@ -28,6 +28,7 @@ import org.apache.doris.datasource.FileQueryScanNode;
import org.apache.doris.datasource.TableFormatType;
import org.apache.doris.datasource.trinoconnector.TrinoConnectorPluginLoader;
import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.spi.Split;
import org.apache.doris.thrift.TFileAttributes;
@@ -98,8 +99,8 @@ public class TrinoConnectorScanNode extends FileQueryScanNode
{
private Constraint constraint;
public TrinoConnectorScanNode(PlanNodeId id, TupleDescriptor desc, boolean
needCheckColumnPriv,
- SessionVariable sv) {
- super(id, desc, "TRINO_CONNECTOR_SCAN_NODE", needCheckColumnPriv, sv);
+ SessionVariable sv, ScanContext scanContext) {
+ super(id, desc, "TRINO_CONNECTOR_SCAN_NODE", scanContext,
needCheckColumnPriv, sv);
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/MetadataScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/MetadataScanNode.java
index 87fd0e58aa9..796090b0099 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/MetadataScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/MetadataScanNode.java
@@ -21,6 +21,7 @@ import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.ExternalScanNode;
import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.system.Backend;
import org.apache.doris.tablefunction.MetadataTableValuedFunction;
@@ -44,8 +45,9 @@ public class MetadataScanNode extends ExternalScanNode {
private boolean initedScanRangeLocations = false;
private final List<TScanRangeLocations> scanRangeLocations =
Lists.newArrayList();
- public MetadataScanNode(PlanNodeId id, TupleDescriptor desc,
MetadataTableValuedFunction tvf) {
- super(id, desc, "METADATA_SCAN_NODE", false);
+ public MetadataScanNode(PlanNodeId id, TupleDescriptor desc,
MetadataTableValuedFunction tvf,
+ ScanContext scanContext) {
+ super(id, desc, "METADATA_SCAN_NODE", scanContext, false);
this.tvf = tvf;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java
index d3c891c1b4c..16037395dc3 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java
@@ -32,6 +32,7 @@ import org.apache.doris.datasource.FileSplit.FileSplitCreator;
import org.apache.doris.datasource.FileSplitter;
import org.apache.doris.datasource.TableFormatType;
import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.spi.Split;
import org.apache.doris.system.Backend;
@@ -67,8 +68,9 @@ public class TVFScanNode extends FileQueryScanNode {
* eg: s3 tvf
* These scan nodes do not have corresponding catalog/database/table info,
so no need to do priv check
*/
- public TVFScanNode(PlanNodeId id, TupleDescriptor desc, boolean
needCheckColumnPriv, SessionVariable sv) {
- super(id, desc, "TVF_SCAN_NODE", needCheckColumnPriv, sv);
+ public TVFScanNode(PlanNodeId id, TupleDescriptor desc, boolean
needCheckColumnPriv, SessionVariable sv,
+ ScanContext scanContext) {
+ super(id, desc, "TVF_SCAN_NODE", scanContext, needCheckColumnPriv, sv);
table = (FunctionGenTable) this.desc.getTable();
tableValuedFunction = (ExternalFileTableValuedFunction) table.getTvf();
}
@@ -141,7 +143,7 @@ public class TVFScanNode extends FileQueryScanNode {
// Push down count optimization.
boolean needSplit = true;
if (getPushDownAggNoGroupingOp() == TPushAggOp.COUNT) {
- int parallelNum = sessionVariable.getParallelExecInstanceNum();
+ int parallelNum =
sessionVariable.getParallelExecInstanceNum(scanContext.getClusterName());
int totalFileNum = fileStatuses.size();
needSplit = FileSplitter.needSplitForCountPushdown(parallelNum,
numBackends, totalFileNum);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/cost/CostModel.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/cost/CostModel.java
index 34bb9dacddf..e942e71d064 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/cost/CostModel.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/cost/CostModel.java
@@ -99,7 +99,8 @@ class CostModel extends PlanVisitor<Cost, PlanContext> {
parallelInstance = 8;
} else {
beNumber = Math.max(1,
connectContext.getEnv().getClusterInfo().getBackendsNumber(true));
- parallelInstance = Math.max(1,
connectContext.getSessionVariable().getParallelExecInstanceNum());
+ String clusterName =
sessionVariable.resolveCloudClusterName(connectContext);
+ parallelInstance = Math.max(1,
sessionVariable.getParallelExecInstanceNum(clusterName));
}
this.hboPlanStatisticsProvider =
Objects.requireNonNull(Env.getCurrentEnv().getHboPlanStatisticsManager()
.getHboPlanStatisticsProvider(), "HboPlanStatisticsProvider is
null");
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index 892441752c5..0b226ff2738 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -677,10 +677,12 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
}
switch (((HMSExternalTable) table).getDlaType()) {
case ICEBERG:
- scanNode = new IcebergScanNode(context.nextPlanNodeId(),
tupleDescriptor, false, sv);
+ scanNode = new IcebergScanNode(context.nextPlanNodeId(),
tupleDescriptor, false, sv,
+ context.getScanContext());
break;
case HIVE:
- scanNode = new HiveScanNode(context.nextPlanNodeId(),
tupleDescriptor, false, sv, directoryLister);
+ scanNode = new HiveScanNode(context.nextPlanNodeId(),
tupleDescriptor, false, sv, directoryLister,
+ context.getScanContext());
HiveScanNode hiveScanNode = (HiveScanNode) scanNode;
hiveScanNode.setSelectedPartitions(fileScan.getSelectedPartitions());
if (fileScan.getTableSample().isPresent()) {
@@ -699,18 +701,23 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
throw new RuntimeException("do not support DLA type " +
((HMSExternalTable) table).getDlaType());
}
} else if (table instanceof IcebergExternalTable) {
- scanNode = new IcebergScanNode(context.nextPlanNodeId(),
tupleDescriptor, false, sv);
+ scanNode = new IcebergScanNode(context.nextPlanNodeId(),
tupleDescriptor, false, sv,
+ context.getScanContext());
} else if (table.getType() == TableIf.TableType.PAIMON_EXTERNAL_TABLE)
{
- scanNode = new PaimonScanNode(context.nextPlanNodeId(),
tupleDescriptor, false, sv);
+ scanNode = new PaimonScanNode(context.nextPlanNodeId(),
tupleDescriptor, false, sv,
+ context.getScanContext());
} else if (table instanceof TrinoConnectorExternalTable) {
- scanNode = new TrinoConnectorScanNode(context.nextPlanNodeId(),
tupleDescriptor, false, sv);
+ scanNode = new TrinoConnectorScanNode(context.nextPlanNodeId(),
tupleDescriptor, false, sv,
+ context.getScanContext());
} else if (table instanceof MaxComputeExternalTable) {
scanNode = new MaxComputeScanNode(context.nextPlanNodeId(),
tupleDescriptor,
- fileScan.getSelectedPartitions(), false, sv);
+ fileScan.getSelectedPartitions(), false, sv,
context.getScanContext());
} else if (table instanceof LakeSoulExternalTable) {
- scanNode = new LakeSoulScanNode(context.nextPlanNodeId(),
tupleDescriptor, false, sv);
+ scanNode = new LakeSoulScanNode(context.nextPlanNodeId(),
tupleDescriptor, false, sv,
+ context.getScanContext());
} else if (table instanceof RemoteDorisExternalTable) {
- scanNode = new RemoteDorisScanNode(context.nextPlanNodeId(),
tupleDescriptor, false, sv);
+ scanNode = new RemoteDorisScanNode(context.nextPlanNodeId(),
tupleDescriptor, false, sv,
+ context.getScanContext());
} else {
throw new RuntimeException("do not support table type " +
table.getType());
}
@@ -749,7 +756,7 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
TableIf table = esScan.getTable();
TupleDescriptor tupleDescriptor = generateTupleDesc(slots, table,
context);
EsScanNode esScanNode = new EsScanNode(context.nextPlanNodeId(),
tupleDescriptor,
- table instanceof EsExternalTable);
+ table instanceof EsExternalTable, context.getScanContext());
esScanNode.setNereidsId(esScan.getId());
context.getNereidsIdToPlanNodeIdMap().put(esScan.getId(),
esScanNode.getId());
Utils.execWithUncheckedException(esScanNode::init);
@@ -794,7 +801,7 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
PhysicalHudiScan hudiScan = (PhysicalHudiScan) fileScan;
ScanNode scanNode = new HudiScanNode(context.nextPlanNodeId(),
tupleDescriptor, false,
hudiScan.getScanParams(), hudiScan.getIncrementalRelation(),
ConnectContext.get().getSessionVariable(),
- directoryLister);
+ directoryLister, context.getScanContext());
if (fileScan.getTableSnapshot().isPresent()) {
((FileQueryScanNode)
scanNode).setQueryTableSnapshot(fileScan.getTableSnapshot().get());
}
@@ -835,7 +842,7 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
TableIf table = jdbcScan.getTable();
TupleDescriptor tupleDescriptor = generateTupleDesc(slots, table,
context);
JdbcScanNode jdbcScanNode = new JdbcScanNode(context.nextPlanNodeId(),
tupleDescriptor,
- table instanceof JdbcExternalTable);
+ table instanceof JdbcExternalTable, context.getScanContext());
jdbcScanNode.setNereidsId(jdbcScan.getId());
context.getNereidsIdToPlanNodeIdMap().put(jdbcScan.getId(),
jdbcScanNode.getId());
Utils.execWithUncheckedException(jdbcScanNode::init);
@@ -854,7 +861,7 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
TableIf table = odbcScan.getTable();
TupleDescriptor tupleDescriptor = generateTupleDesc(slots, table,
context);
OdbcScanNode odbcScanNode = new OdbcScanNode(context.nextPlanNodeId(),
tupleDescriptor,
- (OdbcTable) table);
+ (OdbcTable) table, context.getScanContext());
odbcScanNode.setNereidsId(odbcScan.getId());
context.getNereidsIdToPlanNodeIdMap().put(odbcScan.getId(),
odbcScanNode.getId());
Utils.execWithUncheckedException(odbcScanNode::init);
@@ -895,7 +902,8 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
generateTupleDesc(olapScan.getBaseOutputs(), olapTable, context);
}
- OlapScanNode olapScanNode = new OlapScanNode(context.nextPlanNodeId(),
tupleDescriptor, "OlapScanNode");
+ OlapScanNode olapScanNode = new OlapScanNode(context.nextPlanNodeId(),
tupleDescriptor, "OlapScanNode",
+ context.getScanContext());
olapScanNode.setNereidsId(olapScan.getId());
context.getNereidsIdToPlanNodeIdMap().put(olapScan.getId(),
olapScanNode.getId());
@@ -1079,12 +1087,12 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
scanNode = new
BackendPartitionedSchemaScanNode(context.nextPlanNodeId(), table,
tupleDescriptor,
schemaScan.getSchemaCatalog().orElse(null),
schemaScan.getSchemaDatabase().orElse(null),
schemaScan.getSchemaTable().orElse(null),
- translateToExprs(schemaScan.getFrontendConjuncts(),
context));
+ translateToExprs(schemaScan.getFrontendConjuncts(),
context), context.getScanContext());
} else {
scanNode = new SchemaScanNode(context.nextPlanNodeId(),
tupleDescriptor,
schemaScan.getSchemaCatalog().orElse(null),
schemaScan.getSchemaDatabase().orElse(null),
schemaScan.getSchemaTable().orElse(null),
translateToExprs(schemaScan.getFrontendConjuncts(),
- context));
+ context), context.getScanContext());
}
scanNode.setNereidsId(schemaScan.getId());
context.getNereidsIdToPlanNodeIdMap().put(schemaScan.getId(),
scanNode.getId());
@@ -1426,7 +1434,7 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
context.addExprIdSlotRefPair(consumerSlot.getExprId(),
slotRef);
}
}
- CTEScanNode cteScanNode = new CTEScanNode(tupleDescriptor);
+ CTEScanNode cteScanNode = new CTEScanNode(tupleDescriptor,
context.getScanContext());
translateRuntimeFilter(cteConsumer, cteScanNode, context);
context.getCteScanNodeMap().put(multiCastFragment.getFragmentId(),
cteScanNode);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PlanTranslatorContext.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PlanTranslatorContext.java
index 07017d814e3..9c66e56ef72 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PlanTranslatorContext.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PlanTranslatorContext.java
@@ -44,6 +44,7 @@ import org.apache.doris.planner.PlanFragmentId;
import org.apache.doris.planner.PlanNode;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.planner.RuntimeFilterId;
+import org.apache.doris.planner.ScanContext;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;
@@ -67,6 +68,7 @@ import javax.annotation.Nullable;
public class PlanTranslatorContext {
private final ConnectContext connectContext;
private final StatementContext statementContext;
+ private final ScanContext scanContext;
private final List<PlanFragment> planFragments = Lists.newArrayList();
private DescriptorTable descTable;
@@ -122,6 +124,11 @@ public class PlanTranslatorContext {
public PlanTranslatorContext(CascadesContext ctx) {
this.connectContext = ctx.getConnectContext();
this.statementContext = ctx.getStatementContext();
+ this.scanContext = connectContext == null ||
connectContext.getSessionVariable() == null
+ ? ScanContext.EMPTY
+ : ScanContext.builder()
+
.clusterName(connectContext.getSessionVariable().resolveCloudClusterName(connectContext))
+ .build();
this.translator = new
RuntimeFilterTranslator(ctx.getRuntimeFilterContext());
this.topnFilterContext = ctx.getTopnFilterContext();
this.runtimeFilterV2Context = ctx.getRuntimeFilterV2Context();
@@ -132,6 +139,11 @@ public class PlanTranslatorContext {
public PlanTranslatorContext(CascadesContext ctx, DescriptorTable
descTable) {
this.connectContext = ctx.getConnectContext();
this.statementContext = ctx.getStatementContext();
+ this.scanContext = connectContext == null ||
connectContext.getSessionVariable() == null
+ ? ScanContext.EMPTY
+ : ScanContext.builder()
+
.clusterName(connectContext.getSessionVariable().resolveCloudClusterName(connectContext))
+ .build();
this.translator = new
RuntimeFilterTranslator(ctx.getRuntimeFilterContext());
this.topnFilterContext = ctx.getTopnFilterContext();
this.runtimeFilterV2Context = ctx.getRuntimeFilterV2Context();
@@ -145,6 +157,7 @@ public class PlanTranslatorContext {
public PlanTranslatorContext() {
this.connectContext = null;
this.statementContext = new StatementContext();
+ this.scanContext = ScanContext.EMPTY;
this.translator = null;
this.topnFilterContext = new TopnFilterContext();
IdGenerator<RuntimeFilterId> runtimeFilterIdGen =
RuntimeFilterId.createGenerator();
@@ -278,6 +291,14 @@ public class PlanTranslatorContext {
physicalRelations.add(physicalRelation);
}
+ public String getClusterName() {
+ return scanContext.getClusterName();
+ }
+
+ public ScanContext getScanContext() {
+ return scanContext;
+ }
+
public List<PhysicalRelation> getPhysicalRelations() {
return physicalRelations;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadingTaskPlanner.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadingTaskPlanner.java
index cda9b93c98d..ac8cc4b93b8 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadingTaskPlanner.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadingTaskPlanner.java
@@ -36,6 +36,7 @@ import org.apache.doris.planner.OlapTableSink;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.planner.PlanFragmentId;
import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.thrift.TBrokerFileStatus;
@@ -195,7 +196,10 @@ public class NereidsLoadingTaskPlanner {
}
// Create a single FileLoadScanNode for all file groups
- FileLoadScanNode fileScanNode = new FileLoadScanNode(new
PlanNodeId(0), loadPlanInfos.get(0).getDestTuple());
+ String clusterName = ConnectContext.get() == null ? ""
+ :
ConnectContext.get().getSessionVariable().resolveCloudClusterName();
+ FileLoadScanNode fileScanNode = new FileLoadScanNode(new
PlanNodeId(0), loadPlanInfos.get(0).getDestTuple(),
+ ScanContext.builder().clusterName(clusterName).build());
fileScanNode.finalizeForNereids(loadId, fileGroupInfos, contexts,
loadPlanInfos);
scanNodes.add(fileScanNode);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsStreamLoadPlanner.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsStreamLoadPlanner.java
index d4cc7c01afe..8300f6a759e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsStreamLoadPlanner.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsStreamLoadPlanner.java
@@ -38,6 +38,7 @@ import org.apache.doris.planner.FileLoadScanNode;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.planner.PlanFragmentId;
import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.service.FrontendOptions;
@@ -253,7 +254,10 @@ public class NereidsStreamLoadPlanner {
scanTupleDesc.setTable(destTable);
NereidsLoadPlanInfoCollector.LoadPlanInfo loadPlanInfo =
planInfoCollector.collectLoadPlanInfo(streamLoadPlan,
descriptorTable, scanTupleDesc);
- FileLoadScanNode fileScanNode = new FileLoadScanNode(new
PlanNodeId(0), loadPlanInfo.getDestTuple());
+ String clusterName = ConnectContext.get() == null ? ""
+ :
ConnectContext.get().getSessionVariable().resolveCloudClusterName();
+ FileLoadScanNode fileScanNode = new FileLoadScanNode(new
PlanNodeId(0), loadPlanInfo.getDestTuple(),
+ ScanContext.builder().clusterName(clusterName).build());
fileScanNode.finalizeForNereids(loadId,
Lists.newArrayList(fileGroupInfo), Lists.newArrayList(context),
Lists.newArrayList(loadPlanInfo));
scanNode = fileScanNode;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
index aa30308e38e..588960b4801 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
@@ -291,9 +291,10 @@ public class ChildrenPropertiesRegulator extends
PlanVisitor<List<List<PhysicalP
int prunedPartNum = candidate.getSelectedPartitionIds().size();
int bucketNum =
candidate.getTable().getDefaultDistributionInfo().getBucketNum();
int totalBucketNum = prunedPartNum * bucketNum;
- int backEndNum = Math.max(1,
ConnectContext.get().getEnv().getClusterInfo()
- .getBackendsNumber(true));
- int paraNum = Math.max(1,
ConnectContext.get().getSessionVariable().getParallelExecInstanceNum());
+ ConnectContext connectContext = ConnectContext.get();
+ int backEndNum = Math.max(1,
connectContext.getEnv().getClusterInfo().getBackendsNumber(true));
+ String clusterName =
connectContext.getSessionVariable().resolveCloudClusterName(connectContext);
+ int paraNum = Math.max(1,
connectContext.getSessionVariable().getParallelExecInstanceNum(clusterName));
return totalBucketNum < backEndNum * paraNum * 0.8;
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/DecomposeRepeatWithPreAggregation.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/DecomposeRepeatWithPreAggregation.java
index 0751f76c0ee..a3101afb58b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/DecomposeRepeatWithPreAggregation.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/DecomposeRepeatWithPreAggregation.java
@@ -515,7 +515,9 @@ public class DecomposeRepeatWithPreAggregation extends
DefaultPlanRewriter<Disti
return Optional.empty();
}
int beNumber = Math.max(1,
connectContext.getEnv().getClusterInfo().getBackendsNumber(true));
- int parallelInstance = Math.max(1,
connectContext.getSessionVariable().getParallelExecInstanceNum());
+ String clusterName =
connectContext.getSessionVariable().resolveCloudClusterName(connectContext);
+ int parallelInstance = Math.max(1,
+
connectContext.getSessionVariable().getParallelExecInstanceNum(clusterName));
int totalInstanceNum = beNumber * parallelInstance;
Optional<Expression> chosen;
switch (repeat.getRepeatType()) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/SaltJoin.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/SaltJoin.java
index 1227d63ebc8..dd04b5ceee8 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/SaltJoin.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/SaltJoin.java
@@ -340,7 +340,9 @@ public class SaltJoin extends OneRewriteRuleFactory {
.getSessionVariable().skewRewriteJoinSaltExplodeFactor;
if (factor == 0) {
int beNumber = Math.max(1,
connectContext.getEnv().getClusterInfo().getBackendsNumber(true));
- int parallelInstance = Math.max(1,
connectContext.getSessionVariable().getParallelExecInstanceNum());
+ String clusterName =
connectContext.getSessionVariable().resolveCloudClusterName(connectContext);
+ int parallelInstance = Math.max(1,
+
connectContext.getSessionVariable().getParallelExecInstanceNum(clusterName));
factor = (int) Math.min((long) beNumber * parallelInstance *
SALT_FACTOR, Integer.MAX_VALUE);
}
return Math.max(factor, 1);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedShuffleJob.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedShuffleJob.java
index b60282d20a0..27792eb288e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedShuffleJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedShuffleJob.java
@@ -91,11 +91,12 @@ public class UnassignedShuffleJob extends
AbstractUnassignedJob {
expectInstanceNum =
connectContext.getSessionVariable().getExchangeInstanceParallel();
}
// If child fragment uses query cache, limit instance num to avoid too
many instances
- if (childInstanceNum > 0 && connectContext != null) {
+ if (childInstanceNum > 0 && connectContext != null &&
connectContext.getSessionVariable() != null) {
boolean childHasQueryCacheParam = inputJobs.values().stream()
.anyMatch(job ->
job.unassignedJob().getFragment().queryCacheParam != null);
if (childHasQueryCacheParam) {
- int maxInstanceNum =
connectContext.getSessionVariable().getParallelExecInstanceNum()
+ String clusterName =
connectContext.getSessionVariable().resolveCloudClusterName(connectContext);
+ int maxInstanceNum =
connectContext.getSessionVariable().getParallelExecInstanceNum(clusterName)
* Env.getCurrentSystemInfo().getBackendsNumber(false);
expectInstanceNum = expectInstanceNum > 0
? Math.min(expectInstanceNum,
Math.min(childInstanceNum, maxInstanceNum))
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/BackendPartitionedSchemaScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/BackendPartitionedSchemaScanNode.java
index 16e1798483e..4338ecbabd9 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/BackendPartitionedSchemaScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/BackendPartitionedSchemaScanNode.java
@@ -98,8 +98,9 @@ public class BackendPartitionedSchemaScanNode extends
SchemaScanNode {
private Collection<Long> selectedPartitionIds = Lists.newArrayList();
public BackendPartitionedSchemaScanNode(PlanNodeId id, TableIf table,
TupleDescriptor desc,
- String schemaCatalog, String schemaDatabase, String schemaTable,
List<Expr> frontendConjuncts) {
- super(id, desc, schemaCatalog, schemaDatabase, schemaTable,
frontendConjuncts);
+ String schemaCatalog, String schemaDatabase, String schemaTable,
List<Expr> frontendConjuncts,
+ ScanContext scanContext) {
+ super(id, desc, schemaCatalog, schemaDatabase, schemaTable,
frontendConjuncts, scanContext);
this.tableIf = table;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/CTEScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/CTEScanNode.java
index d6e67d32f90..ac202cfa34c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/CTEScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/CTEScanNode.java
@@ -31,8 +31,8 @@ import java.util.List;
public class CTEScanNode extends ScanNode {
private static final PlanNodeId UNINITIAL_PLANNODEID = new PlanNodeId(-1);
- public CTEScanNode(TupleDescriptor desc) {
- super(UNINITIAL_PLANNODEID, desc, "CTEScanNode");
+ public CTEScanNode(TupleDescriptor desc, ScanContext scanContext) {
+ super(UNINITIAL_PLANNODEID, desc, "CTEScanNode", scanContext);
}
public void setPlanNodeId(PlanNodeId id) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/DataGenScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/DataGenScanNode.java
index afb7e372751..d5f5f079f0e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/DataGenScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DataGenScanNode.java
@@ -44,8 +44,9 @@ public class DataGenScanNode extends ExternalScanNode {
private DataGenTableValuedFunction tvf;
- public DataGenScanNode(PlanNodeId id, TupleDescriptor desc,
DataGenTableValuedFunction tvf) {
- super(id, desc, "DataGenScanNode", false);
+ public DataGenScanNode(PlanNodeId id, TupleDescriptor desc,
DataGenTableValuedFunction tvf,
+ ScanContext scanContext) {
+ super(id, desc, "DataGenScanNode", scanContext, false);
this.tvf = tvf;
}
@@ -94,8 +95,9 @@ public class DataGenScanNode extends ExternalScanNode {
// by multi-processes or multi-threads. So we assign instance number to 1.
@Override
public int getNumInstances() {
- if
(ConnectContext.get().getSessionVariable().isIgnoreStorageDataDistribution()) {
- return
ConnectContext.get().getSessionVariable().getParallelExecInstanceNum();
+ ConnectContext context = ConnectContext.get();
+ if (context != null &&
context.getSessionVariable().isIgnoreStorageDataDistribution()) {
+ return
context.getSessionVariable().getParallelExecInstanceNum(scanContext.getClusterName());
}
return 1;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/FileLoadScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/FileLoadScanNode.java
index b9d0f36e3cf..3944b57b211 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/FileLoadScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/FileLoadScanNode.java
@@ -58,8 +58,8 @@ public class FileLoadScanNode extends FileScanNode {
* External file scan node for load from file
* These scan nodes do not have corresponding catalog/database/table info,
so no need to do priv check
*/
- public FileLoadScanNode(PlanNodeId id, TupleDescriptor desc) {
- super(id, desc, "FILE_LOAD_SCAN_NODE", false);
+ public FileLoadScanNode(PlanNodeId id, TupleDescriptor desc, ScanContext
scanContext) {
+ super(id, desc, "FILE_LOAD_SCAN_NODE", scanContext, false);
}
public void finalizeForNereids(TUniqueId loadId,
List<NereidsFileGroupInfo> fileGroupInfos,
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitScanNode.java
index b74d143c63d..da068c286e0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitScanNode.java
@@ -34,8 +34,8 @@ public class GroupCommitScanNode extends ExternalScanNode {
long tableId;
- public GroupCommitScanNode(PlanNodeId id, TupleDescriptor desc, long
tableId) {
- super(id, desc, "GROUP_COMMIT_SCAN_NODE", false);
+ public GroupCommitScanNode(PlanNodeId id, TupleDescriptor desc, long
tableId, ScanContext scanContext) {
+ super(id, desc, "GROUP_COMMIT_SCAN_NODE", scanContext, false);
this.tableId = tableId;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
index 06c9a4d7e4c..0cdb9206a9b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -203,8 +203,8 @@ public class OlapScanNode extends ScanNode {
private Column globalRowIdColumn;
// Constructs node to scan given data files of table 'tbl'.
- public OlapScanNode(PlanNodeId id, TupleDescriptor desc, String
planNodeName) {
- super(id, desc, planNodeName);
+ public OlapScanNode(PlanNodeId id, TupleDescriptor desc, String
planNodeName, ScanContext scanContext) {
+ super(id, desc, planNodeName, scanContext);
olapTable = (OlapTable) desc.getTable();
distributionColumnIds = Sets.newTreeSet();
@@ -1071,8 +1071,9 @@ public class OlapScanNode extends ScanNode {
public int getNumInstances() {
// In pipeline exec engine, the instance num equals be_num * parallel
instance.
// so here we need count distinct be_num to do the work. make sure get
right instance
- if
(ConnectContext.get().getSessionVariable().isIgnoreStorageDataDistribution()) {
- return
ConnectContext.get().getSessionVariable().getParallelExecInstanceNum();
+ ConnectContext context = ConnectContext.get();
+ if (context != null &&
context.getSessionVariable().isIgnoreStorageDataDistribution()) {
+ return
context.getSessionVariable().getParallelExecInstanceNum(scanContext.getClusterName());
}
return scanRangeLocations.size();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java
index 7d6137aec23..bd0ccbddbdf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java
@@ -237,8 +237,10 @@ public class PlanFragment extends TreeNode<PlanFragment> {
* Assign ParallelExecNum by default value for Asynchronous request
*/
public void setParallelExecNumIfExists() {
- if (ConnectContext.get() != null) {
- parallelExecNum =
ConnectContext.get().getSessionVariable().getParallelExecInstanceNum();
+ ConnectContext context = ConnectContext.get();
+ if (context != null) {
+ String clusterName =
context.getSessionVariable().resolveCloudClusterName(context);
+ parallelExecNum =
context.getSessionVariable().getParallelExecInstanceNum(clusterName);
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanContext.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanContext.java
new file mode 100644
index 00000000000..875bbffa8e3
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanContext.java
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner;
+
+/**
+ * Shared context for scan planning/runtime decisions.
+ * <p>
+ * Keep this object immutable so scan nodes can safely cache it and
+ * we can evolve fields incrementally in future.
+ */
+public final class ScanContext {
+ public static final ScanContext EMPTY = new ScanContext("");
+
+ private final String clusterName;
+
+ private ScanContext(String clusterName) {
+ this.clusterName = clusterName == null ? "" : clusterName;
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public String getClusterName() {
+ return clusterName;
+ }
+
+ public static final class Builder {
+ private String clusterName = "";
+
+ public Builder clusterName(String clusterName) {
+ this.clusterName = clusterName;
+ return this;
+ }
+
+ public ScanContext build() {
+ if (clusterName == null || clusterName.isEmpty()) {
+ return ScanContext.EMPTY;
+ }
+ return new ScanContext(clusterName);
+ }
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
index 26c0024af8e..d52977bf2a2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
@@ -71,6 +71,7 @@ import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -106,10 +107,13 @@ public abstract class ScanNode extends PlanNode
implements SplitGenerator {
// This is also important for local shuffle logic.
// Now only OlapScanNode and FileQueryScanNode implement this.
protected HashSet<Long> scanBackendIds = new HashSet<>();
+ // Immutable scan context used for evolving scan-related metadata.
+ protected final ScanContext scanContext;
- public ScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName) {
+ public ScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName,
ScanContext scanContext) {
super(id, desc.getId().asList(), planNodeName);
this.desc = desc;
+ this.scanContext = Objects.requireNonNull(scanContext, "scanContext
can not be null");
}
protected List<Column> getColumns() {
@@ -692,11 +696,21 @@ public abstract class ScanNode extends PlanNode
implements SplitGenerator {
return selectedSplitNum;
}
+ public ScanContext getScanContext() {
+ return scanContext;
+ }
+
@Override
public boolean isSerialOperator() {
- return numScanBackends() <= 0 || getScanRangeNum()
- <
ConnectContext.get().getSessionVariable().getParallelExecInstanceNum() *
numScanBackends()
- || (ConnectContext.get() != null &&
ConnectContext.get().getSessionVariable().isForceToLocalShuffle());
+ ConnectContext context = ConnectContext.get();
+ if (context == null) {
+ return numScanBackends() <= 0;
+ }
+ int parallelExecInstanceNum = context.getSessionVariable()
+ .getParallelExecInstanceNum(scanContext.getClusterName());
+ return numScanBackends() <= 0
+ || getScanRangeNum() < parallelExecInstanceNum *
numScanBackends()
+ || context.getSessionVariable().isForceToLocalShuffle();
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/SchemaScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/SchemaScanNode.java
index 1de7b4ad800..a76b77d6b8a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/SchemaScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SchemaScanNode.java
@@ -66,8 +66,9 @@ public class SchemaScanNode extends ScanNode {
* Constructs node to scan given data files of table 'tbl'.
*/
public SchemaScanNode(PlanNodeId id, TupleDescriptor desc,
- String schemaCatalog, String schemaDb, String schemaTable,
List<Expr> frontendConjuncts) {
- super(id, desc, "SCAN SCHEMA");
+ String schemaCatalog, String schemaDb, String schemaTable,
List<Expr> frontendConjuncts,
+ ScanContext scanContext) {
+ super(id, desc, "SCAN SCHEMA", scanContext);
this.tableName = desc.getTable().getName();
this.schemaCatalog = schemaCatalog;
this.schemaDb = schemaDb;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 2c277defeca..d54a128dcc9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -21,6 +21,7 @@ import org.apache.doris.analysis.SetVar;
import org.apache.doris.analysis.StatementBase;
import org.apache.doris.analysis.StringLiteral;
import org.apache.doris.catalog.Env;
+import org.apache.doris.cloud.qe.ComputeGroupException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.VariableAnnotation;
@@ -4230,7 +4231,7 @@ public class SessionVariable implements Serializable,
Writable {
this.debugSkipFoldConstant = debugSkipFoldConstant;
}
- public int getParallelExecInstanceNum() {
+ public int getParallelExecInstanceNum(String clusterName) {
ConnectContext connectContext = ConnectContext.get();
if (connectContext != null && connectContext.getEnv() != null &&
connectContext.getEnv().getAuth() != null) {
int userParallelExecInstanceNum = connectContext.getEnv().getAuth()
@@ -4239,8 +4240,12 @@ public class SessionVariable implements Serializable,
Writable {
return userParallelExecInstanceNum;
}
}
+ String resolvedClusterName = clusterName;
+ if (Config.isCloudMode() &&
Strings.isNullOrEmpty(resolvedClusterName)) {
+ resolvedClusterName = resolveCloudClusterName(connectContext);
+ }
if (parallelPipelineTaskNum == 0) {
- int size = Env.getCurrentSystemInfo().getMinPipelineExecutorSize();
+ int size =
Env.getCurrentSystemInfo().getMinPipelineExecutorSize(resolvedClusterName);
int autoInstance = (size + 1) / 2;
return Math.min(autoInstance, maxInstanceNum);
} else {
@@ -4248,6 +4253,31 @@ public class SessionVariable implements Serializable,
Writable {
}
}
+ public String resolveCloudClusterName() {
+ return resolveCloudClusterName(ConnectContext.get());
+ }
+
+ public String resolveCloudClusterName(ConnectContext connectContext) {
+ if (!Config.isCloudMode()) {
+ return "";
+ }
+ if (!Strings.isNullOrEmpty(cloudCluster)) {
+ return cloudCluster;
+ }
+ if (connectContext == null) {
+ return "";
+ }
+ try {
+ String clusterName = connectContext.getCloudCluster(false);
+ return clusterName == null ? "" : clusterName;
+ } catch (ComputeGroupException e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("failed to resolve cloud cluster for parallel
instance num", e);
+ }
+ return "";
+ }
+ }
+
public boolean getEnablePreferCachedRowset() {
ConnectContext connectContext = ConnectContext.get();
if (connectContext != null && connectContext.getEnv() != null &&
connectContext.getEnv().getAuth() != null) {
@@ -5168,7 +5198,8 @@ public class SessionVariable implements Serializable,
Writable {
}
tResult.setBeExecVersion(Config.be_exec_version);
tResult.setEnableLocalShuffle(enableLocalShuffle);
- tResult.setParallelInstance(getParallelExecInstanceNum());
+ String clusterName = resolveCloudClusterName();
+ tResult.setParallelInstance(getParallelExecInstanceNum(clusterName));
tResult.setReturnObjectDataAsBinary(returnObjectDataAsBinary);
tResult.setTrimTailingSpacesForExternalTableQuery(trimTailingSpacesForExternalTableQuery);
tResult.setEnableShareHashTableForBroadcastJoin(enableShareHashTableForBroadcastJoin);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 5005ad0759c..33029cc9b37 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -339,7 +339,9 @@ public class StmtExecutor {
builder.instancesNumPerBe(
beToInstancesNum.entrySet().stream().map(entry ->
entry.getKey() + ":" + entry.getValue())
.collect(Collectors.joining(",")));
-
builder.parallelFragmentExecInstance(String.valueOf(context.sessionVariable.getParallelExecInstanceNum()));
+ String clusterName =
context.sessionVariable.resolveCloudClusterName(context);
+ builder.parallelFragmentExecInstance(
+
String.valueOf(context.sessionVariable.getParallelExecInstanceNum(clusterName)));
builder.traceId(context.getSessionVariable().getTraceId());
builder.isNereids(context.getState().isNereids() ? "Yes" : "No");
try {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
index 203a5d969df..0a5abcf9516 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
@@ -1079,7 +1079,9 @@ public class SystemInfoService {
return idToBackendRef;
}
- public int getMinPipelineExecutorSize() {
+ // CloudSystemInfoService override.
+ // Non-cloud ignores clusterName and calculates from all backends.
+ public int getMinPipelineExecutorSize(String clusterName) {
List<Backend> currentBackends = null;
try {
currentBackends = getAllBackendsByAllCluster().values().asList();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/DataGenTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/DataGenTableValuedFunction.java
index 66f344f03be..c4be29b1617 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/DataGenTableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/DataGenTableValuedFunction.java
@@ -21,6 +21,7 @@ import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.common.UserException;
import org.apache.doris.planner.DataGenScanNode;
import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.thrift.TDataGenFunctionName;
@@ -34,6 +35,7 @@ public abstract class DataGenTableValuedFunction extends
TableValuedFunctionIf {
@Override
public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc,
SessionVariable sv) {
- return new DataGenScanNode(id, desc, this);
+ return new DataGenScanNode(id, desc, this,
+
ScanContext.builder().clusterName(sv.resolveCloudClusterName()).build());
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
index df1a6bd6a10..a3b75f40903 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
@@ -53,6 +53,7 @@ import org.apache.doris.datasource.tvf.source.TVFScanNode;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.exceptions.NotSupportedException;
import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.proto.InternalService;
import org.apache.doris.proto.InternalService.PFetchTableSchemaRequest;
@@ -241,7 +242,8 @@ public abstract class ExternalFileTableValuedFunction
extends TableValuedFunctio
@Override
public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc,
SessionVariable sv) {
- return new TVFScanNode(id, desc, false, sv);
+ return new TVFScanNode(id, desc, false, sv,
+
ScanContext.builder().clusterName(sv.resolveCloudClusterName()).build());
}
@Override
@@ -556,4 +558,3 @@ public abstract class ExternalFileTableValuedFunction
extends TableValuedFunctio
}
}
}
-
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/GroupCommitTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/GroupCommitTableValuedFunction.java
index 111c3b9370d..074308bea32 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/GroupCommitTableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/GroupCommitTableValuedFunction.java
@@ -29,6 +29,7 @@ import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.planner.GroupCommitPlanner;
import org.apache.doris.planner.GroupCommitScanNode;
import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;
@@ -91,7 +92,8 @@ public class GroupCommitTableValuedFunction extends
ExternalFileTableValuedFunct
@Override
public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc,
SessionVariable sv) {
- return new GroupCommitScanNode(id, desc, tableId);
+ return new GroupCommitScanNode(id, desc, tableId,
+
ScanContext.builder().clusterName(sv.resolveCloudClusterName()).build());
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/JdbcQueryTableValueFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/JdbcQueryTableValueFunction.java
index 040d47c7b41..e39857e4588 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/JdbcQueryTableValueFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/JdbcQueryTableValueFunction.java
@@ -25,6 +25,7 @@ import org.apache.doris.common.AnalysisException;
import org.apache.doris.datasource.jdbc.JdbcExternalCatalog;
import org.apache.doris.datasource.jdbc.source.JdbcScanNode;
import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.qe.SessionVariable;
@@ -54,6 +55,7 @@ public class JdbcQueryTableValueFunction extends
QueryTableValueFunction {
desc.getTable().getFullSchema(), TableType.JDBC);
catalog.configureJdbcTable(jdbcTable, desc.getTable().getName());
desc.setTable(jdbcTable);
- return new JdbcScanNode(id, desc, true, query);
+ return new JdbcScanNode(id, desc, true, query,
+
ScanContext.builder().clusterName(sv.resolveCloudClusterName()).build());
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java
index 32be139657c..39fde6a5615 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java
@@ -21,6 +21,7 @@ import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.datasource.tvf.source.MetadataScanNode;
import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.thrift.TMetaScanRange;
@@ -64,6 +65,7 @@ public abstract class MetadataTableValuedFunction extends
TableValuedFunctionIf
@Override
public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc,
SessionVariable sv) {
- return new MetadataScanNode(id, desc, this);
+ return new MetadataScanNode(id, desc, this,
+
ScanContext.builder().clusterName(sv.resolveCloudClusterName()).build());
}
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/cloud/system/CloudSystemInfoServiceTest.java
b/fe/fe-core/src/test/java/org/apache/doris/cloud/system/CloudSystemInfoServiceTest.java
index 85d14585f21..85bd4677a0c 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/cloud/system/CloudSystemInfoServiceTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/cloud/system/CloudSystemInfoServiceTest.java
@@ -369,7 +369,7 @@ public class CloudSystemInfoServiceTest {
try {
// Since there are no backends in the cluster, should return 1
- int result = infoService.getMinPipelineExecutorSize();
+ int result = infoService.getMinPipelineExecutorSize(clusterName);
Assert.assertEquals(1, result);
} finally {
ConnectContext.remove();
@@ -403,7 +403,7 @@ public class CloudSystemInfoServiceTest {
try {
// Should return the pipeline executor size of the single backend
- int result = infoService.getMinPipelineExecutorSize();
+ int result = infoService.getMinPipelineExecutorSize(clusterName);
Assert.assertEquals(8, result);
} finally {
ConnectContext.remove();
@@ -454,7 +454,7 @@ public class CloudSystemInfoServiceTest {
try {
// Should return the minimum pipeline executor size (6)
- int result = infoService.getMinPipelineExecutorSize();
+ int result = infoService.getMinPipelineExecutorSize(clusterName);
Assert.assertEquals(6, result);
} finally {
ConnectContext.remove();
@@ -505,7 +505,7 @@ public class CloudSystemInfoServiceTest {
try {
// Should return the minimum positive pipeline executor size (4)
- int result = infoService.getMinPipelineExecutorSize();
+ int result = infoService.getMinPipelineExecutorSize(clusterName);
Assert.assertEquals(4, result);
} finally {
ConnectContext.remove();
@@ -549,7 +549,7 @@ public class CloudSystemInfoServiceTest {
try {
// Should return 1 when no valid pipeline executor sizes are
// found
- int result = infoService.getMinPipelineExecutorSize();
+ int result = infoService.getMinPipelineExecutorSize(clusterName);
Assert.assertEquals(1, result);
} finally {
ConnectContext.remove();
@@ -565,7 +565,7 @@ public class CloudSystemInfoServiceTest {
createTestConnectContext(null);
try {
// Should return 1 when no cluster is set in ConnectContext
- int result = infoService.getMinPipelineExecutorSize();
+ int result = infoService.getMinPipelineExecutorSize("");
Assert.assertEquals(1, result);
} finally {
ConnectContext.remove();
@@ -628,7 +628,7 @@ public class CloudSystemInfoServiceTest {
try {
// Should return 8 (minimum valid size)
- int result = infoService.getMinPipelineExecutorSize();
+ int result = infoService.getMinPipelineExecutorSize(clusterName);
Assert.assertEquals(8, result);
} finally {
ConnectContext.remove();
@@ -679,7 +679,7 @@ public class CloudSystemInfoServiceTest {
try {
// Should return 512 (minimum among large values)
- int result = infoService.getMinPipelineExecutorSize();
+ int result = infoService.getMinPipelineExecutorSize(clusterName);
Assert.assertEquals(512, result);
} finally {
ConnectContext.remove();
@@ -715,7 +715,7 @@ public class CloudSystemInfoServiceTest {
try {
// Should return 32 (consistent across all backends)
- int result = infoService.getMinPipelineExecutorSize();
+ int result = infoService.getMinPipelineExecutorSize(clusterName);
Assert.assertEquals(32, result);
} finally {
ConnectContext.remove();
@@ -786,7 +786,7 @@ public class CloudSystemInfoServiceTest {
try {
// Should return 8 (minimum from current cluster2), not 2 (global
minimum from cluster1)
- int result = infoService.getMinPipelineExecutorSize();
+ int result = infoService.getMinPipelineExecutorSize(cluster2Name);
Assert.assertEquals(8, result);
} finally {
ConnectContext.remove();
@@ -860,14 +860,14 @@ public class CloudSystemInfoServiceTest {
try {
// Should return 32 (minimum from virtual cluster's physical
cluster), not 8
// (from other cluster)
- int result = infoService.getMinPipelineExecutorSize();
+ int result =
infoService.getMinPipelineExecutorSize(virtualClusterName);
Assert.assertEquals(32, result);
// Switch to other cluster
ctx.setCloudCluster(otherClusterName);
// Should return 8 (from other cluster)
- result = infoService.getMinPipelineExecutorSize();
+ result = infoService.getMinPipelineExecutorSize(otherClusterName);
Assert.assertEquals(8, result);
} finally {
@@ -885,7 +885,7 @@ public class CloudSystemInfoServiceTest {
try {
// Should return 1 because no cluster is set (will catch
AnalysisException)
- int result = infoService.getMinPipelineExecutorSize();
+ int result = infoService.getMinPipelineExecutorSize("");
Assert.assertEquals(1, result);
} finally {
@@ -958,14 +958,14 @@ public class CloudSystemInfoServiceTest {
try {
// Should return 2 (minimum from cluster1), not 16 (minimum from
cluster2)
- int result = infoService.getMinPipelineExecutorSize();
+ int result = infoService.getMinPipelineExecutorSize(cluster1Name);
Assert.assertEquals(2, result);
// Now switch to cluster2
ctx.setCloudCluster(cluster2Name);
// Should return 16 (minimum from cluster2), not 2 (minimum from
cluster1)
- result = infoService.getMinPipelineExecutorSize();
+ result = infoService.getMinPipelineExecutorSize(cluster2Name);
Assert.assertEquals(16, result);
} finally {
// Clean up ConnectContext
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/FileQueryScanNodeTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/FileQueryScanNodeTest.java
index 7cea54b7127..aee814907bc 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/datasource/FileQueryScanNodeTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/FileQueryScanNodeTest.java
@@ -22,6 +22,7 @@ import org.apache.doris.analysis.TupleId;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.UserException;
import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.thrift.TFileFormatType;
@@ -37,7 +38,7 @@ public class FileQueryScanNodeTest {
private static class TestFileQueryScanNode extends FileQueryScanNode {
TestFileQueryScanNode(SessionVariable sv) {
- super(new PlanNodeId(0), new TupleDescriptor(new TupleId(0)),
"test", false, sv);
+ super(new PlanNodeId(0), new TupleDescriptor(new TupleId(0)),
"test", ScanContext.EMPTY, false, sv);
}
@Override
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/source/HiveScanNodeTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/source/HiveScanNodeTest.java
index 727ff939003..ba7687157ea 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/source/HiveScanNodeTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/source/HiveScanNodeTest.java
@@ -23,6 +23,7 @@ import org.apache.doris.datasource.hive.HMSExternalCatalog;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.hive.HiveMetaStoreCache;
import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
import org.apache.doris.qe.SessionVariable;
import org.junit.Assert;
@@ -46,7 +47,7 @@ public class HiveScanNodeTest {
Mockito.when(table.getCatalog()).thenReturn(catalog);
Mockito.when(catalog.bindBrokerName()).thenReturn("");
desc.setTable(table);
- HiveScanNode node = new HiveScanNode(new PlanNodeId(0), desc, false,
sv, null);
+ HiveScanNode node = new HiveScanNode(new PlanNodeId(0), desc, false,
sv, null, ScanContext.EMPTY);
HiveMetaStoreCache.FileCacheValue fileCacheValue = new
HiveMetaStoreCache.FileCacheValue();
HiveMetaStoreCache.HiveFileStatus status = new
HiveMetaStoreCache.HiveFileStatus();
@@ -71,7 +72,7 @@ public class HiveScanNodeTest {
Mockito.when(table.getCatalog()).thenReturn(catalog);
Mockito.when(catalog.bindBrokerName()).thenReturn("");
desc.setTable(table);
- HiveScanNode node = new HiveScanNode(new PlanNodeId(0), desc, false,
sv, null);
+ HiveScanNode node = new HiveScanNode(new PlanNodeId(0), desc, false,
sv, null, ScanContext.EMPTY);
HiveMetaStoreCache.FileCacheValue fileCacheValue = new
HiveMetaStoreCache.FileCacheValue();
HiveMetaStoreCache.HiveFileStatus status = new
HiveMetaStoreCache.HiveFileStatus();
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/source/IcebergScanNodeTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/source/IcebergScanNodeTest.java
index 48031a2303e..e16ee803dca 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/source/IcebergScanNodeTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/source/IcebergScanNodeTest.java
@@ -20,6 +20,7 @@ package org.apache.doris.datasource.iceberg.source;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.analysis.TupleId;
import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
import org.apache.doris.qe.SessionVariable;
import org.apache.iceberg.DataFile;
@@ -37,7 +38,7 @@ public class IcebergScanNodeTest {
private static class TestIcebergScanNode extends IcebergScanNode {
TestIcebergScanNode(SessionVariable sv) {
- super(new PlanNodeId(0), new TupleDescriptor(new TupleId(0)), sv);
+ super(new PlanNodeId(0), new TupleDescriptor(new TupleId(0)), sv,
ScanContext.EMPTY);
}
@Override
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNodeTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNodeTest.java
index b00adb1a2e6..cd3f30b0f5d 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNodeTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNodeTest.java
@@ -33,6 +33,7 @@ import
org.apache.doris.datasource.maxcompute.source.MaxComputeSplit.SplitType;
import
org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions;
import org.apache.doris.planner.PlanNode;
import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.spi.Split;
@@ -95,7 +96,7 @@ public class MaxComputeScanNodeTest {
sv = new SessionVariable();
node = new MaxComputeScanNode(new PlanNodeId(0), desc,
- SelectedPartitions.NOT_PRUNED, false, sv);
+ SelectedPartitions.NOT_PRUNED, false, sv, ScanContext.EMPTY);
}
// ==================== Reflection Helpers ====================
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java
index 07dc64a8ec9..03828278c94 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java
@@ -26,6 +26,7 @@ import org.apache.doris.datasource.FileSplitter;
import org.apache.doris.datasource.paimon.PaimonFileExternalCatalog;
import org.apache.doris.datasource.paimon.PaimonSysExternalTable;
import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
import org.apache.doris.qe.SessionVariable;
import org.apache.paimon.data.BinaryRow;
@@ -62,7 +63,7 @@ public class PaimonScanNodeTest {
public void testSplitWeight() throws UserException {
TupleDescriptor desc = new TupleDescriptor(new TupleId(3));
- PaimonScanNode paimonScanNode = new PaimonScanNode(new PlanNodeId(1),
desc, false, sv);
+ PaimonScanNode paimonScanNode = new PaimonScanNode(new PlanNodeId(1),
desc, false, sv, ScanContext.EMPTY);
paimonScanNode.setSource(new PaimonSource());
@@ -395,7 +396,7 @@ public class PaimonScanNodeTest {
@Test
public void testPaimonDataSystemTableForceJniEvenWhenNativeSupported()
throws UserException {
TupleDescriptor desc = new TupleDescriptor(new TupleId(3));
- PaimonScanNode paimonScanNode = new PaimonScanNode(new PlanNodeId(1),
desc, false, sv);
+ PaimonScanNode paimonScanNode = new PaimonScanNode(new PlanNodeId(1),
desc, false, sv, ScanContext.EMPTY);
PaimonScanNode spyPaimonScanNode = Mockito.spy(paimonScanNode);
DataFileMeta dfm = DataFileMeta.forAppend("f1.parquet", 64L * 1024 *
1024, 1L, SimpleStats.EMPTY_STATS,
@@ -454,7 +455,8 @@ public class PaimonScanNodeTest {
public void testDetermineTargetFileSplitSizeHonorsMaxFileSplitNum() throws
Exception {
SessionVariable sv = new SessionVariable();
sv.setMaxFileSplitNum(100);
- PaimonScanNode node = new PaimonScanNode(new PlanNodeId(0), new
TupleDescriptor(new TupleId(0)), false, sv);
+ PaimonScanNode node = new PaimonScanNode(new PlanNodeId(0), new
TupleDescriptor(new TupleId(0)),
+ false, sv, ScanContext.EMPTY);
PaimonSource source = Mockito.mock(PaimonSource.class);
Mockito.when(source.getFileFormatFromTableProperties()).thenReturn("parquet");
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/tvf/source/MetadataScanNodeTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/tvf/source/MetadataScanNodeTest.java
index a3df5dc8a04..9700c306f51 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/datasource/tvf/source/MetadataScanNodeTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/tvf/source/MetadataScanNodeTest.java
@@ -20,6 +20,7 @@ package org.apache.doris.datasource.tvf.source;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.analysis.TupleId;
import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
import org.apache.doris.system.Backend;
import org.apache.doris.tablefunction.MetadataTableValuedFunction;
import org.apache.doris.thrift.TMetaScanRange;
@@ -63,7 +64,7 @@ public class MetadataScanNodeTest {
*/
@Test
public void testInitedScanRangeLocationsInitialState() throws Exception {
- MetadataScanNode scanNode = new MetadataScanNode(planNodeId,
tupleDescriptor, mockTvf);
+ MetadataScanNode scanNode = new MetadataScanNode(planNodeId,
tupleDescriptor, mockTvf, ScanContext.EMPTY);
// Use reflection to access the private field
Field field =
MetadataScanNode.class.getDeclaredField("initedScanRangeLocations");
@@ -90,7 +91,7 @@ public class MetadataScanNodeTest {
Mockito.when(mockTvf.getMetaScanRange(Mockito.anyList())).thenReturn(metaScanRange);
- MetadataScanNode scanNode = new MetadataScanNode(planNodeId,
tupleDescriptor, mockTvf);
+ MetadataScanNode scanNode = new MetadataScanNode(planNodeId,
tupleDescriptor, mockTvf, ScanContext.EMPTY);
// Mock the backend policy using reflection
mockBackendPolicy(scanNode);
@@ -128,7 +129,7 @@ public class MetadataScanNodeTest {
Mockito.when(mockTvf.getMetaScanRange(Mockito.anyList())).thenReturn(metaScanRange);
- MetadataScanNode scanNode = new MetadataScanNode(planNodeId,
tupleDescriptor, mockTvf);
+ MetadataScanNode scanNode = new MetadataScanNode(planNodeId,
tupleDescriptor, mockTvf, ScanContext.EMPTY);
mockBackendPolicy(scanNode);
// Call getScanRangeLocations multiple times
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/tvf/source/TVFScanNodeTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/tvf/source/TVFScanNodeTest.java
index 8d591362376..8cf98daea94 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/datasource/tvf/source/TVFScanNodeTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/tvf/source/TVFScanNodeTest.java
@@ -21,6 +21,7 @@ import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.analysis.TupleId;
import org.apache.doris.catalog.FunctionGenTable;
import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.tablefunction.ExternalFileTableValuedFunction;
import org.apache.doris.thrift.TBrokerFileStatus;
@@ -45,7 +46,7 @@ public class TVFScanNodeTest {
ExternalFileTableValuedFunction tvf =
Mockito.mock(ExternalFileTableValuedFunction.class);
Mockito.when(table.getTvf()).thenReturn(tvf);
desc.setTable(table);
- TVFScanNode node = new TVFScanNode(new PlanNodeId(0), desc, false, sv);
+ TVFScanNode node = new TVFScanNode(new PlanNodeId(0), desc, false, sv,
ScanContext.EMPTY);
TBrokerFileStatus status = new TBrokerFileStatus();
status.setSize(10_000L * MB);
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedShuffleJobTest.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedShuffleJobTest.java
index 403ddfc9e1a..200b18f4a37 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedShuffleJobTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedShuffleJobTest.java
@@ -62,6 +62,7 @@ public class UnassignedShuffleJobTest {
sessionVariable = Mockito.mock(SessionVariable.class);
connectContext = Mockito.mock(ConnectContext.class);
Mockito.when(connectContext.getSessionVariable()).thenReturn(sessionVariable);
+
Mockito.when(sessionVariable.resolveCloudClusterName(connectContext)).thenReturn("test_cluster");
// nextInstanceId() is called from buildInstances; provide unique IDs
instanceIdCounter = new AtomicLong(0);
@@ -178,7 +179,7 @@ public class UnassignedShuffleJobTest {
// exchangeInstanceParallel not set
Mockito.when(sessionVariable.getExchangeInstanceParallel()).thenReturn(-1);
// parallelExecInstanceNum = 8, backendNum = 3 => maxInstanceNum = 24
-
Mockito.when(sessionVariable.getParallelExecInstanceNum()).thenReturn(8);
+
Mockito.when(sessionVariable.getParallelExecInstanceNum(Mockito.anyString())).thenReturn(8);
ListMultimap<ExchangeNode, UnassignedJob> exchangeToChildJob =
ArrayListMultimap.create();
@@ -208,7 +209,7 @@ public class UnassignedShuffleJobTest {
public void testDegreeOfParallelismWithQueryCacheChildSmallerThanMax() {
Mockito.when(sessionVariable.getExchangeInstanceParallel()).thenReturn(-1);
// parallelExecInstanceNum = 8, backendNum = 3 => maxInstanceNum = 24
-
Mockito.when(sessionVariable.getParallelExecInstanceNum()).thenReturn(8);
+
Mockito.when(sessionVariable.getParallelExecInstanceNum(Mockito.anyString())).thenReturn(8);
ListMultimap<ExchangeNode, UnassignedJob> exchangeToChildJob =
ArrayListMultimap.create();
@@ -239,7 +240,7 @@ public class UnassignedShuffleJobTest {
// exchangeInstanceParallel = 5
Mockito.when(sessionVariable.getExchangeInstanceParallel()).thenReturn(5);
// parallelExecInstanceNum = 8, backendNum = 3 => maxInstanceNum = 24
-
Mockito.when(sessionVariable.getParallelExecInstanceNum()).thenReturn(8);
+
Mockito.when(sessionVariable.getParallelExecInstanceNum(Mockito.anyString())).thenReturn(8);
ListMultimap<ExchangeNode, UnassignedJob> exchangeToChildJob =
ArrayListMultimap.create();
@@ -269,7 +270,7 @@ public class UnassignedShuffleJobTest {
// exchangeInstanceParallel = 50 (larger than maxInstanceNum)
Mockito.when(sessionVariable.getExchangeInstanceParallel()).thenReturn(50);
// parallelExecInstanceNum = 4, backendNum = 3 => maxInstanceNum = 12
-
Mockito.when(sessionVariable.getParallelExecInstanceNum()).thenReturn(4);
+
Mockito.when(sessionVariable.getParallelExecInstanceNum(Mockito.anyString())).thenReturn(4);
ListMultimap<ExchangeNode, UnassignedJob> exchangeToChildJob =
ArrayListMultimap.create();
@@ -297,7 +298,7 @@ public class UnassignedShuffleJobTest {
@Test
public void testDegreeOfParallelismWithMixedQueryCacheJobs() {
Mockito.when(sessionVariable.getExchangeInstanceParallel()).thenReturn(-1);
-
Mockito.when(sessionVariable.getParallelExecInstanceNum()).thenReturn(8);
+
Mockito.when(sessionVariable.getParallelExecInstanceNum(Mockito.anyString())).thenReturn(8);
ListMultimap<ExchangeNode, UnassignedJob> exchangeToChildJob =
ArrayListMultimap.create();
@@ -354,7 +355,7 @@ public class UnassignedShuffleJobTest {
public void testComputeAssignedJobsWithQueryCacheLimitsInstanceCount() {
Mockito.when(sessionVariable.getExchangeInstanceParallel()).thenReturn(-1);
// parallelExecInstanceNum = 2, backendNum = 3 => maxInstanceNum = 6
-
Mockito.when(sessionVariable.getParallelExecInstanceNum()).thenReturn(2);
+
Mockito.when(sessionVariable.getParallelExecInstanceNum(Mockito.anyString())).thenReturn(2);
ListMultimap<ExchangeNode, UnassignedJob> exchangeToChildJob =
ArrayListMultimap.create();
ExchangeNode exchangeNode = Mockito.mock(ExchangeNode.class);
@@ -415,7 +416,7 @@ public class UnassignedShuffleJobTest {
public void testComputeAssignedJobsWithQueryCacheTakesIfBranch() {
Mockito.when(sessionVariable.getExchangeInstanceParallel()).thenReturn(-1);
// parallelExecInstanceNum = 1, backendNum = 3 => maxInstanceNum = 3
-
Mockito.when(sessionVariable.getParallelExecInstanceNum()).thenReturn(1);
+
Mockito.when(sessionVariable.getParallelExecInstanceNum(Mockito.anyString())).thenReturn(1);
ListMultimap<ExchangeNode, UnassignedJob> exchangeToChildJob =
ArrayListMultimap.create();
ExchangeNode exchangeNode = Mockito.mock(ExchangeNode.class);
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/qe/HmsQueryCacheTest.java
b/fe/fe-core/src/test/java/org/apache/doris/qe/HmsQueryCacheTest.java
index 41e91a11f1a..be7137f7b99 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/HmsQueryCacheTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/HmsQueryCacheTest.java
@@ -42,6 +42,7 @@ import
org.apache.doris.nereids.trees.plans.commands.CreateCatalogCommand;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.planner.OlapScanNode;
import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.qe.cache.CacheAnalyzer;
import org.apache.doris.qe.cache.SqlCache;
@@ -227,7 +228,7 @@ public class HmsQueryCacheTest extends AnalyzeCheckTestBase
{
TupleDescriptor desc = new TupleDescriptor(new TupleId(1));
desc.setTable(mgr.getInternalCatalog().getDbNullable("test").getTableNullable("tbl1"));
- olapScanNode = new OlapScanNode(new PlanNodeId(1), desc,
"tb1ScanNode");
+ olapScanNode = new OlapScanNode(new PlanNodeId(1), desc,
"tb1ScanNode", ScanContext.EMPTY);
}
@Test
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/qe/OlapQueryCacheTest.java
b/fe/fe-core/src/test/java/org/apache/doris/qe/OlapQueryCacheTest.java
index f499caa96f6..e897ddc0f15 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/OlapQueryCacheTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/OlapQueryCacheTest.java
@@ -56,6 +56,7 @@ import org.apache.doris.nereids.parser.NereidsParser;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.planner.OlapScanNode;
import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.proto.Types;
import org.apache.doris.qe.cache.Cache;
@@ -378,7 +379,7 @@ public class OlapQueryCacheTest {
OlapTable table = createProfileTable();
TupleDescriptor desc = new TupleDescriptor(new TupleId(20004));
desc.setTable(table);
- OlapScanNode node = new OlapScanNode(new PlanNodeId(20008), desc,
"userprofilenode");
+ OlapScanNode node = new OlapScanNode(new PlanNodeId(20008), desc,
"userprofilenode", ScanContext.EMPTY);
node.setSelectedPartitionIds(selectedPartitionIds);
return node;
}
@@ -491,7 +492,7 @@ public class OlapQueryCacheTest {
OlapTable table = createEventTable();
TupleDescriptor desc = new TupleDescriptor(new TupleId(30002));
desc.setTable(table);
- OlapScanNode node = new OlapScanNode(new PlanNodeId(30004), desc,
"appeventnode");
+ OlapScanNode node = new OlapScanNode(new PlanNodeId(30004), desc,
"appeventnode", ScanContext.EMPTY);
node.setSelectedPartitionIds(selectedPartitionIds);
return node;
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java
b/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java
index 0ad09096626..033568017d9 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java
@@ -447,7 +447,7 @@ public class SystemInfoServiceTest {
@Test
public void testGetMinPipelineExecutorSize() {
// Test case 1: No backends
- int result = infoService.getMinPipelineExecutorSize();
+ int result = infoService.getMinPipelineExecutorSize("");
Assert.assertEquals(1, result);
// Test case 2: Single backend with pipeline executor size = 8
@@ -456,7 +456,7 @@ public class SystemInfoServiceTest {
be1.setPipelineExecutorSize(8);
be1.setAlive(true);
- result = infoService.getMinPipelineExecutorSize();
+ result = infoService.getMinPipelineExecutorSize("");
Assert.assertEquals(8, result);
// Test case 3: Multiple backends with different pipeline executor
sizes
@@ -470,7 +470,7 @@ public class SystemInfoServiceTest {
be3.setPipelineExecutorSize(12);
be3.setAlive(true);
- result = infoService.getMinPipelineExecutorSize();
+ result = infoService.getMinPipelineExecutorSize("");
Assert.assertEquals(4, result);
// Test case 4: Backends with zero and negative pipeline executor
sizes (should
@@ -485,7 +485,7 @@ public class SystemInfoServiceTest {
be5.setPipelineExecutorSize(-1); // Should be ignored
be5.setAlive(true);
- result = infoService.getMinPipelineExecutorSize();
+ result = infoService.getMinPipelineExecutorSize("");
Assert.assertEquals(4, result); // Still should be 4 from be2
// Test case 5: All backends have zero or negative pipeline executor
sizes
@@ -493,7 +493,7 @@ public class SystemInfoServiceTest {
be2.setPipelineExecutorSize(-5);
be3.setPipelineExecutorSize(0);
- result = infoService.getMinPipelineExecutorSize();
+ result = infoService.getMinPipelineExecutorSize("");
Assert.assertEquals(1, result); // Should return default value 1
// Test case 6: Mix of positive and non-positive values
@@ -501,7 +501,7 @@ public class SystemInfoServiceTest {
be2.setPipelineExecutorSize(0); // ignored
be3.setPipelineExecutorSize(6); // This should be the minimum
- result = infoService.getMinPipelineExecutorSize();
+ result = infoService.getMinPipelineExecutorSize("");
Assert.assertEquals(6, result);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]