This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new d77bfa09d85 [Improvement](shuffle) Use a knob to decide whether a
serial exchange… (#44676)
d77bfa09d85 is described below
commit d77bfa09d85d9759965c109cc50ee89e8fbde499
Author: Gabriel <[email protected]>
AuthorDate: Thu Nov 28 16:33:16 2024 +0800
[Improvement](shuffle) Use a knob to decide whether a serial exchange…
(#44676)
… should be used
This improvement was completed in #43199 and reverted by #44075 due to
performance fallback. After fixing it, this improvement is re-submited.
A new knob to control a exchange node should be serial or not.
For example, a partitioned hash join should be executed like below:
```
┌────────────────────────────┐
┌────────────────────────────┐
│ │ │
│
│Exchange(HASH PARTITIONED N)│ │Exchange(HASH PARTITIONED
N)│
│ │ │
│
└────────────────────────────┴─────────┬────────┴────────────────────────────┘
│
│
│
│
│
│
┌──────▼──────┐
│ │
│ HASH JOIN │
│ │
└─────────────┘
```
After turning on this knob, the real plan should be:
```
┌──────────────────────────────┐
┌──────────────────────────────┐
│ │ │
│
│ Exchange (HASH PARTITIONED 1)│ │ Exchange (HASH
PARTITIONED 1)│
│ │ │
│
└────────────┬─────────────────┘
└────────────┬─────────────────┘
│ │
│ │
│ │
│ │
│ │
┌──────────────▼─────────────────────┐
┌──────────────▼─────────────────────┐
│ │ │
│
│ Local Exchange(HASH PARTITIONED N)│ │ Local
Exchange(HASH PARTITIONED N)│
│ 1 -> N │ │ 1 ->
N │
└────────────────────────────────────┴─────────┬────────┴────────────────────────────────────┴
│
│
│
│
│
│
┌──────▼──────┐
│ │
│ HASH JOIN │
│ │
└─────────────┘
```
For large cluster, X (mappers) * Y (reducers) rpc channels can be reduced
to X (mappers) * Z (BEs).
---
.../main/java/org/apache/doris/planner/ExchangeNode.java | 13 ++++++++++++-
.../main/java/org/apache/doris/planner/PlanFragment.java | 14 ++++++--------
.../src/main/java/org/apache/doris/planner/PlanNode.java | 7 +++++++
.../src/main/java/org/apache/doris/planner/ScanNode.java | 5 +++++
.../src/main/java/org/apache/doris/qe/Coordinator.java | 10 ++--------
.../src/main/java/org/apache/doris/qe/SessionVariable.java | 11 +++++++++++
6 files changed, 43 insertions(+), 17 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
index 1ca1db56bfc..cb6628b01c5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
@@ -25,6 +25,7 @@ import org.apache.doris.analysis.SortInfo;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.analysis.TupleId;
import org.apache.doris.common.UserException;
+import org.apache.doris.qe.ConnectContext;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.statistics.StatsRecursiveDerive;
import org.apache.doris.thrift.TExchangeNode;
@@ -169,6 +170,10 @@ public class ExchangeNode extends PlanNode {
@Override
protected void toThrift(TPlanNode msg) {
+ // If this fragment has another scan node, this exchange node is
serial or not should be decided by the scan
+ // node.
+ msg.setIsSerialOperator((isSerialOperator() ||
fragment.hasSerialScanNode())
+ && fragment.useSerialSource(ConnectContext.get()));
msg.node_type = TPlanNodeType.EXCHANGE_NODE;
msg.exchange_node = new TExchangeNode();
for (TupleId tid : tupleIds) {
@@ -228,11 +233,17 @@ public class ExchangeNode extends PlanNode {
*/
@Override
public boolean isSerialOperator() {
- return partitionType == TPartitionType.UNPARTITIONED && mergeInfo !=
null;
+ return (ConnectContext.get() != null &&
ConnectContext.get().getSessionVariable().isUseSerialExchange()
+ || partitionType == TPartitionType.UNPARTITIONED) && mergeInfo
!= null;
}
@Override
public boolean hasSerialChildren() {
return isSerialOperator();
}
+
+ @Override
+ public boolean hasSerialScanChildren() {
+ return false;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java
index fef3de9b696..ab5307c07e9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java
@@ -515,15 +515,13 @@ public class PlanFragment extends TreeNode<PlanFragment> {
&& !hasNullAwareLeftAntiJoin()
// If planRoot is not a serial operator and has serial
children, we can use serial source and improve
// parallelism of non-serial operators.
- && sink instanceof DataStreamSink &&
!planRoot.isSerialOperator()
- && planRoot.hasSerialChildren();
+ // For bucket shuffle / colocate join fragment, always use
serial source if the bucket scan nodes are
+ // serial.
+ && (hasSerialScanNode() || (sink instanceof DataStreamSink &&
!planRoot.isSerialOperator()
+ && planRoot.hasSerialChildren()));
}
- public int getNumBackends() {
- return numBackends;
- }
-
- public void setNumBackends(int numBackends) {
- this.numBackends = numBackends;
+ public boolean hasSerialScanNode() {
+ return planRoot.hasSerialScanChildren();
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
index 14bd34e93e1..73768435154 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
@@ -1388,4 +1388,11 @@ public abstract class PlanNode extends
TreeNode<PlanNode> implements PlanStats {
}
return children.stream().allMatch(PlanNode::hasSerialChildren);
}
+
+ public boolean hasSerialScanChildren() {
+ if (children.isEmpty()) {
+ return false;
+ }
+ return children.stream().anyMatch(PlanNode::hasSerialScanChildren);
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
index a2583868346..b4033a0535e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
@@ -861,4 +861,9 @@ public abstract class ScanNode extends PlanNode implements
SplitGenerator {
<
ConnectContext.get().getSessionVariable().getParallelExecInstanceNum() *
numScanBackends()
|| (ConnectContext.get() != null &&
ConnectContext.get().getSessionVariable().isForceToLocalShuffle());
}
+
+ @Override
+ public boolean hasSerialScanChildren() {
+ return isSerialOperator();
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 3a6f6e4f840..262b5836689 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -1886,17 +1886,11 @@ public class Coordinator implements CoordInterface {
return scanNode.getId().asInt() == planNodeId;
}).findFirst();
- /**
- * Ignore storage data distribution iff:
- * 1. `parallelExecInstanceNum * numBackends` is
larger than scan ranges.
- * 2. Use Nereids planner.
- */
boolean sharedScan = true;
int expectedInstanceNum =
Math.min(parallelExecInstanceNum,
leftMostNode.getNumInstances());
- boolean ignoreStorageDataDistribution =
node.isPresent()
- && fragment.useSerialSource(context);
- if (node.isPresent() && ignoreStorageDataDistribution)
{
+ boolean ignoreStorageDataDistribution =
fragment.useSerialSource(context);
+ if (ignoreStorageDataDistribution) {
expectedInstanceNum =
Math.max(expectedInstanceNum, 1);
// if have limit and no conjuncts, only need 1
instance to save cpu and
// mem resource
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 71b746c7907..115614a4187 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -268,6 +268,8 @@ public class SessionVariable implements Serializable,
Writable {
public static final String IGNORE_STORAGE_DATA_DISTRIBUTION =
"ignore_storage_data_distribution";
+ public static final String USE_SERIAL_EXCHANGE = "use_serial_exchange";
+
public static final String ENABLE_PARALLEL_SCAN = "enable_parallel_scan";
// Limit the max count of scanners to prevent generate too many scanners.
@@ -1112,6 +1114,10 @@ public class SessionVariable implements Serializable,
Writable {
varType = VariableAnnotation.EXPERIMENTAL, needForward = true)
private boolean ignoreStorageDataDistribution = true;
+ @VariableMgr.VarAttr(name = USE_SERIAL_EXCHANGE, fuzzy = true,
+ varType = VariableAnnotation.EXPERIMENTAL, needForward = true)
+ private boolean useSerialExchange = false;
+
@VariableMgr.VarAttr(
name = ENABLE_LOCAL_SHUFFLE, fuzzy = false, varType =
VariableAnnotation.EXPERIMENTAL,
description = {"是否在pipelineX引擎上开启local shuffle优化",
@@ -2353,6 +2359,7 @@ public class SessionVariable implements Serializable,
Writable {
this.parallelPrepareThreshold = random.nextInt(32) + 1;
this.enableCommonExprPushdown = random.nextBoolean();
this.enableLocalExchange = random.nextBoolean();
+ this.useSerialExchange = random.nextBoolean();
// This will cause be dead loop, disable it first
// this.disableJoinReorder = random.nextBoolean();
this.enableCommonExpPushDownForInvertedIndex = random.nextBoolean();
@@ -4563,6 +4570,10 @@ public class SessionVariable implements Serializable,
Writable {
return enableCooldownReplicaAffinity;
}
+ public boolean isUseSerialExchange() {
+ return useSerialExchange && getEnableLocalExchange();
+ }
+
public void setDisableInvertedIndexV1ForVaraint(boolean
disableInvertedIndexV1ForVaraint) {
this.disableInvertedIndexV1ForVaraint =
disableInvertedIndexV1ForVaraint;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]