This is an automated email from the ASF dual-hosted git repository.
huajianlan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 5a4f7c6e7ca [fix](Nereids) fix new coordinator compute a wrong
scanRangeNum (#43850)
5a4f7c6e7ca is described below
commit 5a4f7c6e7ca460cce36d6b85f3eb56b30eedc788
Author: 924060929 <[email protected]>
AuthorDate: Wed Nov 13 18:49:21 2024 +0800
[fix](Nereids) fix new coordinator compute a wrong scanRangeNum (#43850)
fix new coordinator compute a wrong scanRangeNum, introduced by #41730
This bug will show a wrong progress in s3 load:
```
Progress: 0.00%(73/2147483647)
```
---
.../org/apache/doris/qe/CoordinatorContext.java | 62 +++++++++++++++++-
.../org/apache/doris/nereids/util/PlanChecker.java | 11 ++++
.../apache/doris/qe/NereidsCoordinatorTest.java | 73 ++++++++++++++++++++++
3 files changed, 144 insertions(+), 2 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/qe/CoordinatorContext.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/CoordinatorContext.java
index 2ab94808f27..aed0fd9c98c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/CoordinatorContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/CoordinatorContext.java
@@ -30,6 +30,10 @@ import
org.apache.doris.nereids.trees.plans.distribute.DistributedPlan;
import org.apache.doris.nereids.trees.plans.distribute.PipelineDistributedPlan;
import
org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorker;
import org.apache.doris.nereids.trees.plans.distribute.worker.job.AssignedJob;
+import
org.apache.doris.nereids.trees.plans.distribute.worker.job.BucketScanSource;
+import
org.apache.doris.nereids.trees.plans.distribute.worker.job.DefaultScanSource;
+import org.apache.doris.nereids.trees.plans.distribute.worker.job.ScanRanges;
+import org.apache.doris.nereids.trees.plans.distribute.worker.job.ScanSource;
import org.apache.doris.nereids.trees.plans.physical.TopnFilter;
import org.apache.doris.planner.DataSink;
import org.apache.doris.planner.PlanFragment;
@@ -40,12 +44,18 @@ import org.apache.doris.qe.runtime.QueryProcessor;
import org.apache.doris.resource.workloadgroup.QueryQueue;
import org.apache.doris.resource.workloadgroup.QueueToken;
import org.apache.doris.service.ExecuteEnv;
+import org.apache.doris.thrift.TBrokerScanRange;
import org.apache.doris.thrift.TDescriptorTable;
+import org.apache.doris.thrift.TExternalScanRange;
+import org.apache.doris.thrift.TFileScanRange;
import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TPaloScanRange;
import org.apache.doris.thrift.TPipelineWorkloadGroup;
import org.apache.doris.thrift.TQueryGlobals;
import org.apache.doris.thrift.TQueryOptions;
import org.apache.doris.thrift.TResourceLimit;
+import org.apache.doris.thrift.TScanRange;
+import org.apache.doris.thrift.TScanRangeParams;
import org.apache.doris.thrift.TUniqueId;
import com.google.common.base.Suppliers;
@@ -418,9 +428,57 @@ public class CoordinatorContext {
private int getScanRangeNum() {
int scanRangeNum = 0;
- for (ScanNode scanNode : scanNodes) {
- scanRangeNum += scanNode.getScanRangeNum();
+ for (PipelineDistributedPlan distributedPlan : distributedPlans) {
+ for (AssignedJob instanceJob : distributedPlan.getInstanceJobs()) {
+ ScanSource scanSource = instanceJob.getScanSource();
+ if (scanSource instanceof BucketScanSource) {
+ BucketScanSource bucketScanSource = (BucketScanSource)
scanSource;
+ for (Map<ScanNode, ScanRanges> kv :
bucketScanSource.bucketIndexToScanNodeToTablets.values()) {
+ for (ScanRanges scanRanges : kv.values()) {
+ for (TScanRangeParams param : scanRanges.params) {
+ scanRangeNum +=
computeScanRangeNumByScanRange(param);
+ }
+ }
+ }
+ } else {
+ DefaultScanSource defaultScanSource = (DefaultScanSource)
scanSource;
+ for (ScanRanges scanRanges :
defaultScanSource.scanNodeToScanRanges.values()) {
+ for (TScanRangeParams param : scanRanges.params) {
+ scanRangeNum +=
computeScanRangeNumByScanRange(param);
+ }
+ }
+ }
+ }
+ }
+ return scanRangeNum;
+ }
+
+ private int computeScanRangeNumByScanRange(TScanRangeParams param) {
+ int scanRangeNum = 0;
+ TScanRange scanRange = param.getScanRange();
+ if (scanRange == null) {
+ return scanRangeNum;
+ }
+ TBrokerScanRange brokerScanRange = scanRange.getBrokerScanRange();
+ if (brokerScanRange != null) {
+ scanRangeNum += brokerScanRange.getRanges().size();
+ }
+ TExternalScanRange externalScanRange = scanRange.getExtScanRange();
+ if (externalScanRange != null) {
+ TFileScanRange fileScanRange =
externalScanRange.getFileScanRange();
+ if (fileScanRange != null) {
+ if (fileScanRange.isSetRanges()) {
+ scanRangeNum += fileScanRange.getRanges().size();
+ } else if (fileScanRange.isSetSplitSource()) {
+ scanRangeNum +=
fileScanRange.getSplitSource().getNumSplits();
+ }
+ }
+ }
+ TPaloScanRange paloScanRange = scanRange.getPaloScanRange();
+ if (paloScanRange != null) {
+ scanRangeNum += 1;
}
+ // TODO: more ranges?
return scanRangeNum;
}
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/util/PlanChecker.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/util/PlanChecker.java
index b95027a1385..f0a45d1e7bc 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/nereids/util/PlanChecker.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/util/PlanChecker.java
@@ -270,6 +270,17 @@ public class PlanChecker {
return this;
}
+ public NereidsPlanner plan(String sql) {
+ StatementContext statementContext = new
StatementContext(connectContext, new OriginStatement(sql, 0));
+ connectContext.setStatementContext(statementContext);
+ NereidsPlanner planner = new NereidsPlanner(statementContext);
+ LogicalPlan parsedPlan = new NereidsParser().parseSingle(sql);
+ LogicalPlanAdapter parsedPlanAdaptor = new
LogicalPlanAdapter(parsedPlan, statementContext);
+ statementContext.setParsedStatement(parsedPlanAdaptor);
+ planner.planWithLock(parsedPlanAdaptor);
+ return planner;
+ }
+
public PlanChecker dpHypOptimize() {
double now = System.currentTimeMillis();
cascadesContext.getStatementContext().setDpHyp(true);
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/qe/NereidsCoordinatorTest.java
b/fe/fe-core/src/test/java/org/apache/doris/qe/NereidsCoordinatorTest.java
new file mode 100644
index 00000000000..23326d94310
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/NereidsCoordinatorTest.java
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.qe;
+
+import org.apache.doris.catalog.EnvFactory;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.nereids.NereidsPlanner;
+import org.apache.doris.nereids.util.PlanChecker;
+import org.apache.doris.thrift.TUniqueId;
+import org.apache.doris.utframe.TestWithFeService;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.UUID;
+
+public class NereidsCoordinatorTest extends TestWithFeService {
+ @BeforeAll
+ public void init() throws Exception {
+ FeConstants.runningUnitTest = true;
+
+ createDatabase("test");
+ useDatabase("test");
+
+ createTable("create table tbl(id int) distributed by hash(id) buckets
10 properties('replication_num' = '1');");
+ }
+
+ @Test
+ public void testNereidsCoordinatorScanRangeNum() throws IOException {
+ NereidsPlanner planner = plan("select * from test.tbl");
+ NereidsCoordinator coordinator = (NereidsCoordinator)
EnvFactory.getInstance()
+ .createCoordinator(connectContext, null, planner, null);
+ int scanRangeNum = coordinator.getScanRangeNum();
+ Assertions.assertEquals(10, scanRangeNum);
+ }
+
+ @Test
+ public void testNereidsCoordinatorScanRangeNum2() throws IOException {
+ NereidsPlanner planner = plan("select * from
information_schema.columns");
+ NereidsCoordinator coordinator = (NereidsCoordinator)
EnvFactory.getInstance()
+ .createCoordinator(connectContext, null, planner, null);
+ int scanRangeNum = coordinator.getScanRangeNum();
+ Assertions.assertEquals(0, scanRangeNum);
+ }
+
+ private NereidsPlanner plan(String sql) throws IOException {
+ ConnectContext connectContext = createDefaultCtx();
+
connectContext.getSessionVariable().setDisableNereidsRules("PRUNE_EMPTY_PARTITION,OLAP_SCAN_TABLET_PRUNE");
+ connectContext.setThreadLocalInfo();
+
+ UUID uuid = UUID.randomUUID();
+ connectContext.setQueryId(new TUniqueId(uuid.getMostSignificantBits(),
uuid.getLeastSignificantBits()));
+ NereidsPlanner planner = PlanChecker.from(connectContext).plan(sql);
+ return planner;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]