This is an automated email from the ASF dual-hosted git repository.
englefly pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new f1bf64ef81f [feat](nereids)set actual row count in physical plan
according to merged profile (branch-3.0) (#42254)
f1bf64ef81f is described below
commit f1bf64ef81ffc2a2b0627354324382e6a9c30cdd
Author: minghong <[email protected]>
AuthorDate: Tue Oct 22 19:01:49 2024 +0800
[feat](nereids)set actual row count in physical plan according to merged
profile (branch-3.0) (#42254)
## Proposed changes
pick #40361
Issue Number: close #xxx
<!--Describe your changes.-->
---
.../doris/common/profile/ExecutionProfile.java | 1 +
.../org/apache/doris/common/profile/Profile.java | 78 +++++++++++++++++-----
.../doris/common/profile/SummaryProfile.java | 2 -
.../apache/doris/common/util/RuntimeProfile.java | 15 +++++
.../doris/nereids/trees/plans/AbstractPlan.java | 4 ++
.../trees/plans/physical/AbstractPhysicalJoin.java | 3 +-
.../trees/plans/physical/PhysicalCTEProducer.java | 1 +
.../plans/physical/PhysicalHashAggregate.java | 2 +-
.../trees/plans/physical/PhysicalQuickSort.java | 4 +-
.../nereids/trees/plans/physical/PhysicalTopN.java | 1 +
.../trees/plans/physical/PhysicalUnion.java | 4 +-
.../trees/plans/physical/PhysicalWindow.java | 3 +-
.../java/org/apache/doris/qe/StmtExecutor.java | 4 ++
.../org/apache/doris/statistics/Statistics.java | 44 +++++++-----
14 files changed, 126 insertions(+), 40 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
index d4a00939fe7..a7a05ee12fd 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
@@ -143,6 +143,7 @@ public class ExecutionProfile {
RuntimeProfile.mergeProfiles(allPipelineTask,
mergedpipelineProfile, planNodeMap);
newFragmentProfile.addChild(mergedpipelineProfile);
pipelineIdx++;
+
fragmentsProfile.rowsProducedMap.putAll(mergedpipelineProfile.rowsProducedMap);
}
}
return fragmentsProfile;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java
index 25c6979be7f..1fbc0fdb1d6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java
@@ -23,8 +23,11 @@ import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.ProfileManager;
import org.apache.doris.common.util.RuntimeProfile;
import org.apache.doris.nereids.NereidsPlanner;
+import org.apache.doris.nereids.trees.plans.AbstractPlan;
+import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.distribute.DistributedPlan;
import org.apache.doris.nereids.trees.plans.distribute.FragmentIdMapping;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalRelation;
import org.apache.doris.planner.Planner;
@@ -46,6 +49,8 @@ import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.zip.Deflater;
@@ -108,6 +113,10 @@ public class Profile {
// Profile size is the size of profile file
private long profileSize = 0;
+ private PhysicalPlan physicalPlan;
+ public Map<String, Long> rowsProducedMap = new HashMap<>();
+ private List<PhysicalRelation> physicalRelations = new ArrayList<>();
+
private String changedSessionVarCache = "";
// Need default constructor for read from storage
@@ -276,20 +285,8 @@ public class Profile {
if (planner instanceof NereidsPlanner) {
NereidsPlanner nereidsPlanner = ((NereidsPlanner) planner);
- StringBuilder builder = new StringBuilder();
- builder.append("\n");
- builder.append(nereidsPlanner.getPhysicalPlan()
- .treeString());
- builder.append("\n");
- for (PhysicalRelation relation :
nereidsPlanner.getPhysicalRelations()) {
- if (relation.getStats() != null) {
- builder.append(relation).append("\n")
-
.append(relation.getStats().printColumnStats());
- }
- }
- summaryInfo.put(SummaryProfile.PHYSICAL_PLAN,
- builder.toString().replace("\n", "\n "));
-
+ physicalPlan = nereidsPlanner.getPhysicalPlan();
+
physicalRelations.addAll(nereidsPlanner.getPhysicalRelations());
FragmentIdMapping<DistributedPlan> distributedPlans =
nereidsPlanner.getDistributedPlans();
if (distributedPlans != null) {
summaryInfo.put(SummaryProfile.DISTRIBUTED_PLAN,
@@ -382,15 +379,43 @@ public class Profile {
// Only generate merged profile for select, insert into select.
// Not support broker load now.
+ RuntimeProfile mergedProfile = null;
if (this.profileLevel == MergedProfileLevel &&
this.executionProfiles.size() == 1) {
try {
- builder.append("\n MergedProfile \n");
-
this.executionProfiles.get(0).getAggregatedFragmentsProfile(planNodeMap).prettyPrint(builder,
" ");
+ mergedProfile =
this.executionProfiles.get(0).getAggregatedFragmentsProfile(planNodeMap);
+ this.rowsProducedMap.putAll(mergedProfile.rowsProducedMap);
+ if (physicalPlan != null) {
+ updateActualRowCountOnPhysicalPlan(physicalPlan);
+ }
} catch (Throwable aggProfileException) {
LOG.warn("build merged simple profile {} failed", this.id,
aggProfileException);
+ }
+ }
+
+ if (physicalPlan != null) {
+ builder.append("\nPhysical Plan \n");
+ StringBuilder physcialPlanBuilder = new StringBuilder();
+ physcialPlanBuilder.append(physicalPlan.treeString());
+ physcialPlanBuilder.append("\n");
+ for (PhysicalRelation relation : physicalRelations) {
+ if (relation.getStats() != null) {
+ physcialPlanBuilder.append(relation).append("\n")
+ .append(relation.getStats().printColumnStats());
+ }
+ }
+ builder.append(
+ physcialPlanBuilder.toString().replace("\n", "\n "));
+ }
+
+ if (this.profileLevel == MergedProfileLevel &&
this.executionProfiles.size() == 1) {
+ builder.append("\nMergedProfile \n");
+ if (mergedProfile != null) {
+ mergedProfile.prettyPrint(builder, " ");
+ } else {
builder.append("build merged simple profile failed");
}
}
+
try {
// For load task, they will have multiple execution_profiles.
for (ExecutionProfile executionProfile : executionProfiles) {
@@ -681,4 +706,25 @@ public class Profile {
return;
}
+
+ public PhysicalPlan getPhysicalPlan() {
+ return physicalPlan;
+ }
+
+ public void setPhysicalPlan(PhysicalPlan physicalPlan) {
+ this.physicalPlan = physicalPlan;
+ }
+
+ private void updateActualRowCountOnPhysicalPlan(Plan plan) {
+ if (plan == null || rowsProducedMap.isEmpty()) {
+ return;
+ }
+ Long actualRowCount =
rowsProducedMap.get(String.valueOf(((AbstractPlan) plan).getId()));
+ if (actualRowCount != null) {
+ ((AbstractPlan) plan).updateActualRowCount(actualRowCount);
+ }
+ for (Plan child : plan.children()) {
+ updateActualRowCountOnPhysicalPlan(child);
+ }
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
index 5728f79f1e6..8356bc34a13 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
@@ -66,7 +66,6 @@ public class SummaryProfile {
public static final String PARALLEL_FRAGMENT_EXEC_INSTANCE = "Parallel
Fragment Exec Instance Num";
public static final String TRACE_ID = "Trace ID";
public static final String WORKLOAD_GROUP = "Workload Group";
- public static final String PHYSICAL_PLAN = "Physical Plan";
public static final String DISTRIBUTED_PLAN = "Distributed Plan";
public static final String SYSTEM_MESSAGE = "System Message";
public static final String EXECUTED_BY_FRONTEND = "Executed By Frontend";
@@ -129,7 +128,6 @@ public class SummaryProfile {
START_TIME, END_TIME, TOTAL_TIME, TASK_STATE, USER,
DEFAULT_CATALOG, DEFAULT_DB, SQL_STATEMENT);
public static final ImmutableList<String> SUMMARY_KEYS = new
ImmutableList.Builder<String>()
.addAll(SUMMARY_CAPTIONS)
- .add(PHYSICAL_PLAN)
.add(DISTRIBUTED_PLAN)
.build();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java
b/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java
index 60207b49172..3ffc303a6db 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java
@@ -40,12 +40,15 @@ import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Formatter;
+import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
/**
* It is accessed by two kinds of thread, one is to create this RuntimeProfile
@@ -100,6 +103,8 @@ public class RuntimeProfile {
@SerializedName(value = "nodeid")
private int nodeid = -1;
+ public Map<String, Long> rowsProducedMap = new HashMap<>();
+
public RuntimeProfile() {
init();
}
@@ -494,6 +499,7 @@ public class RuntimeProfile {
// RuntimeProfile has at least one counter named TotalTime, should
exclude it.
if (newCreatedMergedChildProfile.counterMap.size() > 1) {
simpleProfile.addChildWithCheck(newCreatedMergedChildProfile,
planNodeMap);
+
simpleProfile.rowsProducedMap.putAll(newCreatedMergedChildProfile.rowsProducedMap);
}
}
}
@@ -504,6 +510,12 @@ public class RuntimeProfile {
return;
}
RuntimeProfile templateProfile = profiles.get(0);
+ Pattern pattern = Pattern.compile("nereids_id=(\\d+)");
+ Matcher matcher = pattern.matcher(templateProfile.getName());
+ String nereidsId = null;
+ if (matcher.find()) {
+ nereidsId = matcher.group(1);
+ }
Set<String> childCounterSet =
templateProfile.childCounterMap.get(parentCounterName);
if (childCounterSet == null) {
return;
@@ -517,6 +529,9 @@ public class RuntimeProfile {
Counter orgCounter =
profile.counterMap.get(childCounterName);
aggCounter.addCounter(orgCounter);
}
+ if (nereidsId != null &&
childCounterName.equals("RowsProduced")) {
+ simpleProfile.rowsProducedMap.put(nereidsId,
aggCounter.sum.getValue());
+ }
if (simpleProfile.counterMap.containsKey(parentCounterName)) {
simpleProfile.addCounter(childCounterName, aggCounter,
parentCounterName);
} else {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/AbstractPlan.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/AbstractPlan.java
index 9dfca3195d6..eb65048050f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/AbstractPlan.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/AbstractPlan.java
@@ -226,4 +226,8 @@ public abstract class AbstractPlan extends
AbstractTreeNode<Plan> implements Pla
}
return ancestors;
}
+
+ public void updateActualRowCount(long actualRowCount) {
+ statistics.setActualRowCount(actualRowCount);
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalJoin.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalJoin.java
index 5f4bb5aa5f4..194f6356045 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalJoin.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalJoin.java
@@ -267,8 +267,9 @@ public abstract class AbstractPhysicalJoin<
@Override
public String toString() {
- List<Object> args = Lists.newArrayList("type", joinType,
+ List<Object> args = Lists.newArrayList(
"stats", statistics,
+ "type", joinType,
"hashCondition", hashJoinConjuncts,
"otherCondition", otherJoinConjuncts,
"markCondition", markJoinConjuncts);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalCTEProducer.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalCTEProducer.java
index 53ff3e30257..568b8e6660a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalCTEProducer.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalCTEProducer.java
@@ -89,6 +89,7 @@ public class PhysicalCTEProducer<CHILD_TYPE extends Plan>
extends PhysicalUnary<
@Override
public String toString() {
return Utils.toSqlString("PhysicalCTEProducer[" + id.asInt() + "]",
+ "stats", statistics,
"cteId", cteId);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashAggregate.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashAggregate.java
index 72c69a18bee..404c30fe379 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashAggregate.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashAggregate.java
@@ -199,9 +199,9 @@ public class PhysicalHashAggregate<CHILD_TYPE extends Plan>
extends PhysicalUnar
TopnPushInfo topnPushInfo = (TopnPushInfo) getMutableState(
MutableState.KEY_PUSH_TOPN_TO_AGG).orElseGet(() -> null);
return Utils.toSqlString("PhysicalHashAggregate[" + id.asInt() + "]" +
getGroupIdWithPrefix(),
+ "stats", statistics,
"aggPhase", aggregateParam.aggPhase,
"aggMode", aggregateParam.aggMode,
- "stats", statistics,
"maybeUseStreaming", maybeUsingStream,
"groupByExpr", groupByExpressions,
"outputExpr", outputExpressions,
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalQuickSort.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalQuickSort.java
index c1973668c7d..0e377b46d23 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalQuickSort.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalQuickSort.java
@@ -107,8 +107,8 @@ public class PhysicalQuickSort<CHILD_TYPE extends Plan>
extends AbstractPhysical
@Override
public String toString() {
return Utils.toSqlString("PhysicalQuickSort[" + id.asInt() + "]" +
getGroupIdWithPrefix(),
- "orderKeys", orderKeys,
- "phase", phase.toString(), "stats", statistics
+ "stats", statistics, "orderKeys", orderKeys,
+ "phase", phase.toString()
);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTopN.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTopN.java
index 96dc709bbde..c387a58dd0c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTopN.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTopN.java
@@ -143,6 +143,7 @@ public class PhysicalTopN<CHILD_TYPE extends Plan> extends
AbstractPhysicalSort<
@Override
public String toString() {
return Utils.toSqlString("PhysicalTopN[" + id.asInt() + "]" +
getGroupIdWithPrefix(),
+ "stats", statistics,
"limit", limit,
"offset", offset,
"orderKeys", orderKeys,
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalUnion.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalUnion.java
index ba20c926705..2a81698812a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalUnion.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalUnion.java
@@ -89,11 +89,11 @@ public class PhysicalUnion extends PhysicalSetOperation
implements Union {
@Override
public String toString() {
return Utils.toSqlString("PhysicalUnion" + "[" + id.asInt() + "]" +
getGroupIdWithPrefix(),
+ "stats", statistics,
"qualifier", qualifier,
"outputs", outputs,
"regularChildrenOutputs", regularChildrenOutputs,
- "constantExprsList", constantExprsList,
- "stats", statistics);
+ "constantExprsList", constantExprsList);
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalWindow.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalWindow.java
index b1703f47496..7e6fd48f02d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalWindow.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalWindow.java
@@ -105,8 +105,9 @@ public class PhysicalWindow<CHILD_TYPE extends Plan>
extends PhysicalUnary<CHILD
@Override
public String toString() {
return Utils.toSqlString("PhysicalWindow[" + id.asInt() + "]" +
getGroupIdWithPrefix(),
+ "stats", statistics,
"windowFrameGroup", windowFrameGroup,
- "requiredProperties", requireProperties, "stats", statistics
+ "requiredProperties", requireProperties
);
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 70da91bc66d..16600536aac 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -1223,6 +1223,10 @@ public class StmtExecutor {
// failed, the insert stmt should be success
try {
profile.updateSummary(getSummaryInfo(isFinished), isFinished,
this.planner);
+ if (planner instanceof NereidsPlanner) {
+ NereidsPlanner nereidsPlanner = ((NereidsPlanner) planner);
+ profile.setPhysicalPlan(nereidsPlanner.getPhysicalPlan());
+ }
} catch (Throwable t) {
LOG.warn("failed to update profile, ignore this error", t);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/Statistics.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/Statistics.java
index 162dab5d136..f4552a2560d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/Statistics.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/Statistics.java
@@ -46,12 +46,19 @@ public class Statistics {
private double deltaRowCount = 0.0;
+ private long actualRowCount = -1L;
+
+ public Statistics(double rowCount, Map<Expression, ColumnStatistic>
expressionToColumnStats) {
+ this(rowCount, 1, expressionToColumnStats);
+ }
+
public Statistics(Statistics another) {
this.rowCount = another.rowCount;
this.widthInJoinCluster = another.widthInJoinCluster;
this.expressionToColumnStats = new
HashMap<>(another.expressionToColumnStats);
this.tupleSize = another.tupleSize;
this.deltaRowCount = another.getDeltaRowCount();
+ this.actualRowCount = another.actualRowCount;
}
public Statistics(double rowCount, int widthInJoinCluster,
@@ -62,10 +69,6 @@ public class Statistics {
this.deltaRowCount = deltaRowCount;
}
- public Statistics(double rowCount, Map<Expression, ColumnStatistic>
expressionToColumnStats) {
- this(rowCount, 1, expressionToColumnStats, 0);
- }
-
public Statistics(double rowCount, int widthInJoinCluster,
Map<Expression, ColumnStatistic> expressionToColumnStats) {
this(rowCount, widthInJoinCluster, expressionToColumnStats, 0);
@@ -193,21 +196,24 @@ public class Statistics {
@Override
public String toString() {
+ StringBuilder builder = new StringBuilder();
if (Double.isNaN(rowCount)) {
- return "NaN";
- }
- if (Double.POSITIVE_INFINITY == rowCount) {
- return "Infinite";
- }
- if (Double.NEGATIVE_INFINITY == rowCount) {
- return "-Infinite";
+ builder.append("NaN");
+ } else if (Double.POSITIVE_INFINITY == rowCount) {
+ builder.append("Infinite");
+ } else if (Double.NEGATIVE_INFINITY == rowCount) {
+ builder.append("-Infinite");
+ } else {
+ DecimalFormat format = new DecimalFormat("#,###.##");
+ builder.append(format.format(rowCount));
}
- DecimalFormat format = new DecimalFormat("#,###.##");
- String rows = format.format(rowCount);
if (deltaRowCount > 0) {
- rows = rows + "(" + format.format(deltaRowCount) + ")";
+ builder.append("(").append((long) deltaRowCount).append(")");
}
- return rows;
+ if (actualRowCount != -1) {
+ builder.append(" actualRows=").append(actualRowCount);
+ }
+ return builder.toString();
}
public String printColumnStats() {
@@ -292,4 +298,12 @@ public class Statistics {
public void setDeltaRowCount(double deltaRowCount) {
this.deltaRowCount = deltaRowCount;
}
+
+ public long getActualRowCount() {
+ return actualRowCount;
+ }
+
+ public void setActualRowCount(long actualRowCount) {
+ this.actualRowCount = actualRowCount;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]