This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new bd12a49baf [feature](Nereids) enable bucket shuffle join on fragment
without scan node (#12891)
bd12a49baf is described below
commit bd12a49bafd68cc60b8615656184a9194539544c
Author: morrySnow <[email protected]>
AuthorDate: Fri Sep 23 15:01:50 2022 +0800
[feature](Nereids) enable bucket shuffle join on fragment without scan node
(#12891)
In the past, with legacy planner, we could only do bucket shuffle join on
the join node belonging to the fragment with at least one scan node.
But, bucket shuffle join should do on each join node that left child's data
distribution satisfy join's demand.
In nereids, we have data distribution info on each node. So we could enable
bucket shuffle join on fragment without scan node.
---
.../doris/nereids/glue/translator/PhysicalPlanTranslator.java | 11 +++++++++--
.../doris/nereids/properties/ChildOutputPropertyDeriver.java | 2 +-
fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java | 6 +++---
.../nereids/properties/ChildOutputPropertyDeriverTest.java | 2 +-
4 files changed, 14 insertions(+), 7 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index 9d8da28230..dd7204a3c2 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -35,6 +35,7 @@ import org.apache.doris.catalog.Table;
import org.apache.doris.common.Pair;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.properties.DistributionSpecHash;
+import org.apache.doris.nereids.properties.DistributionSpecHash.ShuffleType;
import org.apache.doris.nereids.properties.OrderKey;
import org.apache.doris.nereids.trees.expressions.EqualTo;
import org.apache.doris.nereids.trees.expressions.ExprId;
@@ -969,10 +970,16 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
}
// assemble fragment
hashJoinNode.setDistributionMode(HashJoinNode.DistributionMode.BUCKET_SHUFFLE);
+ if (leftDistributionSpec.getShuffleType() != ShuffleType.NATURAL) {
+ hashJoinNode.setDistributionMode(DistributionMode.PARTITIONED);
+ }
connectChildFragment(hashJoinNode, 1, leftFragment, rightFragment,
context);
leftFragment.setPlanRoot(hashJoinNode);
- // TODO: use left fragment d
- DataPartition rhsJoinPartition = new
DataPartition(TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED,
+ TPartitionType partitionType =
TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED;
+ if (leftDistributionSpec.getShuffleType() != ShuffleType.NATURAL) {
+ partitionType = TPartitionType.HASH_PARTITIONED;
+ }
+ DataPartition rhsJoinPartition = new DataPartition(partitionType,
rightPartitionExprIds.stream().map(context::findSlotRef).collect(Collectors.toList()));
rightFragment.setOutputPartition(rhsJoinPartition);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java
index 715bf5e5d8..43cedd78a4 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java
@@ -87,7 +87,7 @@ public class ChildOutputPropertyDeriver extends
PlanVisitor<PhysicalProperties,
return PhysicalProperties.GATHER;
}
// TODO: change ENFORCED back to bucketed, when coordinator
could process bucket on agg correctly.
- return PhysicalProperties.createHash(new
DistributionSpecHash(columns, ShuffleType.ENFORCED));
+ return PhysicalProperties.createHash(new
DistributionSpecHash(columns, ShuffleType.BUCKETED));
case DISTINCT_GLOBAL:
default:
throw new RuntimeException("Could not derive output properties
for agg phase: " + agg.getAggPhase());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 7755d61ece..f0d6b3e36a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -420,7 +420,6 @@ public class Coordinator {
}
FragmentExecParams params =
fragmentExecParamsMap.get(fragment.getDestFragment().getFragmentId());
params.inputFragments.add(fragment.getFragmentId());
-
}
coordAddress = new TNetworkAddress(localIP, Config.rpc_port);
@@ -1017,7 +1016,6 @@ public class Coordinator {
int bucketSeq = 0;
int bucketNum =
bucketShuffleJoinController.getFragmentBucketNum(destFragment.getFragmentId());
- TNetworkAddress dummyServer = new TNetworkAddress("0.0.0.0",
0);
// when left table is empty, it's bucketset is empty.
// set right table destination address to the address of left
table
@@ -1026,6 +1024,8 @@ public class Coordinator {
bucketNum = 1;
destParams.instanceExecParams.get(0).bucketSeqSet.add(0);
}
+ // process bucket shuffle join on fragment without scan node
+ TNetworkAddress dummyServer = new TNetworkAddress("0.0.0.0",
0);
while (bucketSeq < bucketNum) {
TPlanFragmentDestination dest = new
TPlanFragmentDestination();
@@ -1521,7 +1521,7 @@ public class Coordinator {
bucketShuffleJoinController.computeScanRangeAssignmentByBucket((OlapScanNode)
scanNode,
idToBackend, addressToBackendID);
}
- if (!(fragmentContainsColocateJoin |
fragmentContainsBucketShuffleJoin)) {
+ if (!(fragmentContainsColocateJoin ||
fragmentContainsBucketShuffleJoin)) {
computeScanRangeAssignmentByScheduler(scanNode, locations,
assignment, assignedBytesPerHost);
}
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriverTest.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriverTest.java
index 052537c755..6e53265dfb 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriverTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriverTest.java
@@ -311,7 +311,7 @@ public class ChildOutputPropertyDeriverTest {
Assertions.assertTrue(result.getOrderSpec().getOrderKeys().isEmpty());
Assertions.assertTrue(result.getDistributionSpec() instanceof
DistributionSpecHash);
DistributionSpecHash actual = (DistributionSpecHash)
result.getDistributionSpec();
- Assertions.assertEquals(ShuffleType.ENFORCED, actual.getShuffleType());
+ Assertions.assertEquals(ShuffleType.BUCKETED, actual.getShuffleType());
Assertions.assertEquals(Lists.newArrayList(partition).stream()
.map(SlotReference::getExprId).collect(Collectors.toList()),
actual.getOrderedShuffledColumns());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]