This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 3bb5291341e [fix)(colocate join) fix wrong use of colocate join
(#37361)(#37729) (#37937)
3bb5291341e is described below
commit 3bb5291341e68e8455f86284865db240719cf027
Author: camby <[email protected]>
AuthorDate: Wed Jul 17 10:23:11 2024 +0800
[fix)(colocate join) fix wrong use of colocate join (#37361)(#37729)
(#37937)
cherry-pick from master #37361 #37729
---
.../properties/ChildOutputPropertyDeriver.java | 2 +-
.../properties/ChildrenPropertiesRegulator.java | 2 +-
.../LogicalOlapScanToPhysicalOlapScan.java | 12 +--
.../org/apache/doris/nereids/util/JoinUtils.java | 50 ++++++++--
.../test_colocate_join_of_column_order.groovy | 111 +++++++++++++++++++++
5 files changed, 163 insertions(+), 14 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java
index 41cdf78cf89..d7695850d2a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java
@@ -278,7 +278,7 @@ public class ChildOutputPropertyDeriver extends
PlanVisitor<PhysicalProperties,
case RIGHT_SEMI_JOIN:
case RIGHT_ANTI_JOIN:
case RIGHT_OUTER_JOIN:
- if (JoinUtils.couldColocateJoin(leftHashSpec,
rightHashSpec)) {
+ if (JoinUtils.couldColocateJoin(leftHashSpec,
rightHashSpec, hashJoin.getHashJoinConjuncts())) {
return new PhysicalProperties(rightHashSpec);
} else {
// retain left shuffle type, since coordinator use
left most node to schedule fragment
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
index 0fa2d87f92f..d17d7485cce 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
@@ -249,7 +249,7 @@ public class ChildrenPropertiesRegulator extends
PlanVisitor<Boolean, Void> {
Optional<PhysicalProperties> updatedForLeft = Optional.empty();
Optional<PhysicalProperties> updatedForRight = Optional.empty();
- if (JoinUtils.couldColocateJoin(leftHashSpec, rightHashSpec)) {
+ if (JoinUtils.couldColocateJoin(leftHashSpec, rightHashSpec,
hashJoin.getHashJoinConjuncts())) {
// check colocate join with scan
return true;
} else if (couldNotRightBucketShuffleJoin(hashJoin.getJoinType())) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalOlapScanToPhysicalOlapScan.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalOlapScanToPhysicalOlapScan.java
index 43436355ae1..472d2e169db 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalOlapScanToPhysicalOlapScan.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalOlapScanToPhysicalOlapScan.java
@@ -83,8 +83,8 @@ public class LogicalOlapScanToPhysicalOlapScan extends
OneImplementationRuleFact
List<Slot> output = olapScan.getOutput();
List<Slot> baseOutput =
olapScan.getOutputByIndex(olapScan.getTable().getBaseIndexId());
List<ExprId> hashColumns = Lists.newArrayList();
- for (Slot slot : output) {
- for (Column column :
hashDistributionInfo.getDistributionColumns()) {
+ for (Column column :
hashDistributionInfo.getDistributionColumns()) {
+ for (Slot slot : output) {
if (((SlotReference)
slot).getColumn().get().getNameWithoutMvPrefix()
.equals(column.getName())) {
hashColumns.add(slot.getExprId());
@@ -92,8 +92,8 @@ public class LogicalOlapScanToPhysicalOlapScan extends
OneImplementationRuleFact
}
}
if (hashColumns.size() !=
hashDistributionInfo.getDistributionColumns().size()) {
- for (Slot slot : baseOutput) {
- for (Column column :
hashDistributionInfo.getDistributionColumns()) {
+ for (Column column :
hashDistributionInfo.getDistributionColumns()) {
+ for (Slot slot : baseOutput) {
// If the length of the column in the bucket key
changes after DDL, the length cannot be
// determined. As a result, some bucket fields are
lost in the query execution plan.
// So here we use the column name to avoid this
problem
@@ -109,8 +109,8 @@ public class LogicalOlapScanToPhysicalOlapScan extends
OneImplementationRuleFact
HashDistributionInfo hashDistributionInfo =
(HashDistributionInfo) distributionInfo;
List<Slot> output = olapScan.getOutput();
List<ExprId> hashColumns = Lists.newArrayList();
- for (Slot slot : output) {
- for (Column column :
hashDistributionInfo.getDistributionColumns()) {
+ for (Column column :
hashDistributionInfo.getDistributionColumns()) {
+ for (Slot slot : output) {
// If the length of the column in the bucket key
changes after DDL, the length cannot be
// determined. As a result, some bucket fields are
lost in the query execution plan.
// So here we use the column name to avoid this problem
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/JoinUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/JoinUtils.java
index 25f84c096c8..9a5e56b41af 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/JoinUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/JoinUtils.java
@@ -29,6 +29,7 @@ import org.apache.doris.nereids.trees.expressions.ExprId;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.Not;
import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
import
org.apache.doris.nereids.trees.expressions.functions.scalar.BitmapContains;
import org.apache.doris.nereids.trees.plans.JoinType;
import org.apache.doris.nereids.trees.plans.Plan;
@@ -44,6 +45,7 @@ import com.google.common.collect.Lists;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
@@ -214,13 +216,14 @@ public class JoinUtils {
return false;
}
return couldColocateJoin((DistributionSpecHash) leftDistributionSpec,
- (DistributionSpecHash) rightDistributionSpec);
+ (DistributionSpecHash) rightDistributionSpec,
join.getHashJoinConjuncts());
}
/**
* could do colocate join with left and right child distribution spec.
*/
- public static boolean couldColocateJoin(DistributionSpecHash leftHashSpec,
DistributionSpecHash rightHashSpec) {
+ public static boolean couldColocateJoin(DistributionSpecHash leftHashSpec,
DistributionSpecHash rightHashSpec,
+ List<Expression> conjuncts) {
if (ConnectContext.get() == null
||
ConnectContext.get().getSessionVariable().isDisableColocatePlan()) {
return false;
@@ -242,12 +245,47 @@ public class JoinUtils {
boolean noNeedCheckColocateGroup = hitSameIndex &&
(leftTablePartitions.equals(rightTablePartitions))
&& (leftTablePartitions.size() <= 1);
ColocateTableIndex colocateIndex = Env.getCurrentColocateIndex();
- if (noNeedCheckColocateGroup
- || (colocateIndex.isSameGroup(leftTableId, rightTableId)
- &&
!colocateIndex.isGroupUnstable(colocateIndex.getGroup(leftTableId)))) {
+ if (noNeedCheckColocateGroup) {
return true;
}
- return false;
+ if (!colocateIndex.isSameGroup(leftTableId, rightTableId)
+ ||
colocateIndex.isGroupUnstable(colocateIndex.getGroup(leftTableId))) {
+ return false;
+ }
+
+ Set<Integer> equalIndices = new HashSet<>();
+ for (Expression expr : conjuncts) {
+ // only simple equal predicate can use colocate join
+ if (!(expr instanceof EqualPredicate)) {
+ return false;
+ }
+ Expression leftChild = ((EqualPredicate) expr).left();
+ Expression rightChild = ((EqualPredicate) expr).right();
+ if (!(leftChild instanceof SlotReference) || !(rightChild
instanceof SlotReference)) {
+ return false;
+ }
+
+ SlotReference leftSlot = (SlotReference) leftChild;
+ SlotReference rightSlot = (SlotReference) rightChild;
+ Integer leftIndex =
leftHashSpec.getExprIdToEquivalenceSet().get(leftSlot.getExprId());
+ Integer rightIndex =
rightHashSpec.getExprIdToEquivalenceSet().get(rightSlot.getExprId());
+ if (leftIndex == null) {
+ leftIndex =
rightHashSpec.getExprIdToEquivalenceSet().get(leftSlot.getExprId());
+ rightIndex =
leftHashSpec.getExprIdToEquivalenceSet().get(rightSlot.getExprId());
+ }
+ if (!Objects.equals(leftIndex, rightIndex)) {
+ return false;
+ }
+ if (leftIndex != null) {
+ equalIndices.add(leftIndex);
+ }
+ }
+ // on conditions must contain all distributed columns
+ if
(equalIndices.containsAll(leftHashSpec.getExprIdToEquivalenceSet().values())) {
+ return true;
+ } else {
+ return false;
+ }
}
public static Set<ExprId> getJoinOutputExprIdSet(Plan left, Plan right) {
diff --git
a/regression-test/suites/correctness_p0/test_colocate_join_of_column_order.groovy
b/regression-test/suites/correctness_p0/test_colocate_join_of_column_order.groovy
new file mode 100644
index 00000000000..efef9969506
--- /dev/null
+++
b/regression-test/suites/correctness_p0/test_colocate_join_of_column_order.groovy
@@ -0,0 +1,111 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_colocate_join_of_column_order") {
+ sql """ DROP TABLE IF EXISTS `test_colocate_join_of_column_order_t1`; """
+ // distributed by k1,k2
+ sql """
+ CREATE TABLE IF NOT EXISTS `test_colocate_join_of_column_order_t1` (
+ `k1` varchar(64) NULL,
+ `k2` varchar(64) NULL,
+ `v` int NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`k1`,`k2`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`k1`,`k2`) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1",
+ "colocate_with" = "group_column_order"
+ );
+ """
+ sql """ DROP TABLE IF EXISTS `test_colocate_join_of_column_order_t2`; """
+ // distributed by k2,k1
+ sql """
+ CREATE TABLE IF NOT EXISTS `test_colocate_join_of_column_order_t2` (
+ `k1` varchar(64) NULL,
+ `k2` varchar(64) NULL,
+ `v` int NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`k1`,`k2`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`k2`,`k1`) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1",
+ "colocate_with" = "group_column_order"
+ );
+ """
+ sql """insert into test_colocate_join_of_column_order_t1
values('k1','k2',11);"""
+ sql """insert into test_colocate_join_of_column_order_t2
values('k1','k2',11);"""
+
+ sql """set enable_nereids_planner=true; """
+ explain {
+ sql("select * from test_colocate_join_of_column_order_t1 a join
test_colocate_join_of_column_order_t2 b on a.k1=b.k1 and a.k2=b.k2;")
+ notContains "COLOCATE"
+ }
+ explain {
+ sql("select * from test_colocate_join_of_column_order_t1 a join
test_colocate_join_of_column_order_t2 b on a.k1=b.k2;")
+ notContains "COLOCATE"
+ }
+ explain {
+ sql("select * from test_colocate_join_of_column_order_t1 a join
test_colocate_join_of_column_order_t2 b on a.k1=b.k1;")
+ notContains "COLOCATE"
+ }
+ explain {
+ sql("select * from test_colocate_join_of_column_order_t1 a join
test_colocate_join_of_column_order_t2 b on a.k1=b.k2 and a.v=b.v;")
+ notContains "COLOCATE"
+ }
+ explain {
+ sql("select * from test_colocate_join_of_column_order_t1 a join
test_colocate_join_of_column_order_t2 b on a.k1=b.k2 and a.k2=b.k1;")
+ contains "COLOCATE"
+ }
+ explain {
+ sql("select * from test_colocate_join_of_column_order_t1 a join
test_colocate_join_of_column_order_t2 b on a.k1=b.k2 and a.k2=b.k1 and
a.v=b.v;")
+ contains "COLOCATE"
+ }
+
+ sql """ DROP TABLE IF EXISTS `test_colocate_join_of_column_order_t1`; """
+ sql """ DROP TABLE IF EXISTS `test_colocate_join_of_column_order_t2`; """
+
+ // multi tables
+ sql """ DROP TABLE IF EXISTS `test_colocate_join_of_column_order_ta`; """
+ sql """
+ CREATE TABLE IF NOT EXISTS `test_colocate_join_of_column_order_ta` (
`c1` bigint NULL, `c2` bigint NULL)
+ DISTRIBUTED BY HASH(c1) PROPERTIES ( "replication_num" = "1",
"colocate_with" = "group_column_order3");
+ """
+ sql """ DROP TABLE IF EXISTS `test_colocate_join_of_column_order_tb`; """
+ sql """
+ CREATE TABLE IF NOT EXISTS `test_colocate_join_of_column_order_tb` (
`c1` bigint NULL, `c2` bigint NULL)
+ DISTRIBUTED BY HASH(c1) PROPERTIES ( "replication_num" = "1",
"colocate_with" = "group_column_order3");
+ """
+ sql """ DROP TABLE IF EXISTS `test_colocate_join_of_column_order_tc`; """
+ sql """
+ CREATE TABLE IF NOT EXISTS `test_colocate_join_of_column_order_tc` (
`c1` bigint NULL, `c2` bigint NULL)
+ DISTRIBUTED BY HASH(c1) PROPERTIES ( "replication_num" = "1",
"colocate_with" = "group_column_order3");
+ """
+ sql """insert into test_colocate_join_of_column_order_ta values(1,1);"""
+ sql """insert into test_colocate_join_of_column_order_tb values(1,1);"""
+ sql """insert into test_colocate_join_of_column_order_tc values(1,1);"""
+
+ explain {
+ sql("""select /*+ set_var(disable_join_reorder=true) */ * from
test_colocate_join_of_column_order_ta join [shuffle] (select cast((c2 + 1) as
bigint) c2 from test_colocate_join_of_column_order_tb)
test_colocate_join_of_column_order_tb on
test_colocate_join_of_column_order_ta.c1 =
test_colocate_join_of_column_order_tb.c2 join [shuffle]
test_colocate_join_of_column_order_tc on
test_colocate_join_of_column_order_tb.c2 =
test_colocate_join_of_column_order_tc.c1;""");
+ contains "COLOCATE"
+ }
+
+ sql """ DROP TABLE IF EXISTS `test_colocate_join_of_column_order_ta`; """
+ sql """ DROP TABLE IF EXISTS `test_colocate_join_of_column_order_tb`; """
+ sql """ DROP TABLE IF EXISTS `test_colocate_join_of_column_order_tc`; """
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]