This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 3bb5291341e [fix)(colocate join) fix wrong use of colocate join 
(#37361)(#37729) (#37937)
3bb5291341e is described below

commit 3bb5291341e68e8455f86284865db240719cf027
Author: camby <[email protected]>
AuthorDate: Wed Jul 17 10:23:11 2024 +0800

    [fix)(colocate join) fix wrong use of colocate join (#37361)(#37729) 
(#37937)
    
    cherry-pick from master #37361 #37729
---
 .../properties/ChildOutputPropertyDeriver.java     |   2 +-
 .../properties/ChildrenPropertiesRegulator.java    |   2 +-
 .../LogicalOlapScanToPhysicalOlapScan.java         |  12 +--
 .../org/apache/doris/nereids/util/JoinUtils.java   |  50 ++++++++--
 .../test_colocate_join_of_column_order.groovy      | 111 +++++++++++++++++++++
 5 files changed, 163 insertions(+), 14 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java
index 41cdf78cf89..d7695850d2a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java
@@ -278,7 +278,7 @@ public class ChildOutputPropertyDeriver extends 
PlanVisitor<PhysicalProperties,
                 case RIGHT_SEMI_JOIN:
                 case RIGHT_ANTI_JOIN:
                 case RIGHT_OUTER_JOIN:
-                    if (JoinUtils.couldColocateJoin(leftHashSpec, 
rightHashSpec)) {
+                    if (JoinUtils.couldColocateJoin(leftHashSpec, 
rightHashSpec, hashJoin.getHashJoinConjuncts())) {
                         return new PhysicalProperties(rightHashSpec);
                     } else {
                         // retain left shuffle type, since coordinator use 
left most node to schedule fragment
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
index 0fa2d87f92f..d17d7485cce 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
@@ -249,7 +249,7 @@ public class ChildrenPropertiesRegulator extends 
PlanVisitor<Boolean, Void> {
         Optional<PhysicalProperties> updatedForLeft = Optional.empty();
         Optional<PhysicalProperties> updatedForRight = Optional.empty();
 
-        if (JoinUtils.couldColocateJoin(leftHashSpec, rightHashSpec)) {
+        if (JoinUtils.couldColocateJoin(leftHashSpec, rightHashSpec, 
hashJoin.getHashJoinConjuncts())) {
             // check colocate join with scan
             return true;
         } else if (couldNotRightBucketShuffleJoin(hashJoin.getJoinType())) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalOlapScanToPhysicalOlapScan.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalOlapScanToPhysicalOlapScan.java
index 43436355ae1..472d2e169db 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalOlapScanToPhysicalOlapScan.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalOlapScanToPhysicalOlapScan.java
@@ -83,8 +83,8 @@ public class LogicalOlapScanToPhysicalOlapScan extends 
OneImplementationRuleFact
                 List<Slot> output = olapScan.getOutput();
                 List<Slot> baseOutput = 
olapScan.getOutputByIndex(olapScan.getTable().getBaseIndexId());
                 List<ExprId> hashColumns = Lists.newArrayList();
-                for (Slot slot : output) {
-                    for (Column column : 
hashDistributionInfo.getDistributionColumns()) {
+                for (Column column : 
hashDistributionInfo.getDistributionColumns()) {
+                    for (Slot slot : output) {
                         if (((SlotReference) 
slot).getColumn().get().getNameWithoutMvPrefix()
                                 .equals(column.getName())) {
                             hashColumns.add(slot.getExprId());
@@ -92,8 +92,8 @@ public class LogicalOlapScanToPhysicalOlapScan extends 
OneImplementationRuleFact
                     }
                 }
                 if (hashColumns.size() != 
hashDistributionInfo.getDistributionColumns().size()) {
-                    for (Slot slot : baseOutput) {
-                        for (Column column : 
hashDistributionInfo.getDistributionColumns()) {
+                    for (Column column : 
hashDistributionInfo.getDistributionColumns()) {
+                        for (Slot slot : baseOutput) {
                             // If the length of the column in the bucket key 
changes after DDL, the length cannot be
                             // determined. As a result, some bucket fields are 
lost in the query execution plan.
                             // So here we use the column name to avoid this 
problem
@@ -109,8 +109,8 @@ public class LogicalOlapScanToPhysicalOlapScan extends 
OneImplementationRuleFact
                 HashDistributionInfo hashDistributionInfo = 
(HashDistributionInfo) distributionInfo;
                 List<Slot> output = olapScan.getOutput();
                 List<ExprId> hashColumns = Lists.newArrayList();
-                for (Slot slot : output) {
-                    for (Column column : 
hashDistributionInfo.getDistributionColumns()) {
+                for (Column column : 
hashDistributionInfo.getDistributionColumns()) {
+                    for (Slot slot : output) {
                         // If the length of the column in the bucket key 
changes after DDL, the length cannot be
                         // determined. As a result, some bucket fields are 
lost in the query execution plan.
                         // So here we use the column name to avoid this problem
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/JoinUtils.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/JoinUtils.java
index 25f84c096c8..9a5e56b41af 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/JoinUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/JoinUtils.java
@@ -29,6 +29,7 @@ import org.apache.doris.nereids.trees.expressions.ExprId;
 import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.expressions.Not;
 import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
 import 
org.apache.doris.nereids.trees.expressions.functions.scalar.BitmapContains;
 import org.apache.doris.nereids.trees.plans.JoinType;
 import org.apache.doris.nereids.trees.plans.Plan;
@@ -44,6 +45,7 @@ import com.google.common.collect.Lists;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -214,13 +216,14 @@ public class JoinUtils {
             return false;
         }
         return couldColocateJoin((DistributionSpecHash) leftDistributionSpec,
-                (DistributionSpecHash) rightDistributionSpec);
+                (DistributionSpecHash) rightDistributionSpec, 
join.getHashJoinConjuncts());
     }
 
     /**
      * could do colocate join with left and right child distribution spec.
      */
-    public static boolean couldColocateJoin(DistributionSpecHash leftHashSpec, 
DistributionSpecHash rightHashSpec) {
+    public static boolean couldColocateJoin(DistributionSpecHash leftHashSpec, 
DistributionSpecHash rightHashSpec,
+            List<Expression> conjuncts) {
         if (ConnectContext.get() == null
                 || 
ConnectContext.get().getSessionVariable().isDisableColocatePlan()) {
             return false;
@@ -242,12 +245,47 @@ public class JoinUtils {
         boolean noNeedCheckColocateGroup = hitSameIndex && 
(leftTablePartitions.equals(rightTablePartitions))
                 && (leftTablePartitions.size() <= 1);
         ColocateTableIndex colocateIndex = Env.getCurrentColocateIndex();
-        if (noNeedCheckColocateGroup
-                || (colocateIndex.isSameGroup(leftTableId, rightTableId)
-                && 
!colocateIndex.isGroupUnstable(colocateIndex.getGroup(leftTableId)))) {
+        if (noNeedCheckColocateGroup) {
             return true;
         }
-        return false;
+        if (!colocateIndex.isSameGroup(leftTableId, rightTableId)
+                || 
colocateIndex.isGroupUnstable(colocateIndex.getGroup(leftTableId))) {
+            return false;
+        }
+
+        Set<Integer> equalIndices = new HashSet<>();
+        for (Expression expr : conjuncts) {
+            // only simple equal predicate can use colocate join
+            if (!(expr instanceof EqualPredicate)) {
+                return false;
+            }
+            Expression leftChild = ((EqualPredicate) expr).left();
+            Expression rightChild = ((EqualPredicate) expr).right();
+            if (!(leftChild instanceof SlotReference) || !(rightChild 
instanceof SlotReference)) {
+                return false;
+            }
+
+            SlotReference leftSlot = (SlotReference) leftChild;
+            SlotReference rightSlot = (SlotReference) rightChild;
+            Integer leftIndex = 
leftHashSpec.getExprIdToEquivalenceSet().get(leftSlot.getExprId());
+            Integer rightIndex = 
rightHashSpec.getExprIdToEquivalenceSet().get(rightSlot.getExprId());
+            if (leftIndex == null) {
+                leftIndex = 
rightHashSpec.getExprIdToEquivalenceSet().get(leftSlot.getExprId());
+                rightIndex = 
leftHashSpec.getExprIdToEquivalenceSet().get(rightSlot.getExprId());
+            }
+            if (!Objects.equals(leftIndex, rightIndex)) {
+                return false;
+            }
+            if (leftIndex != null) {
+                equalIndices.add(leftIndex);
+            }
+        }
+        // on conditions must contain all distributed columns
+        if 
(equalIndices.containsAll(leftHashSpec.getExprIdToEquivalenceSet().values())) {
+            return true;
+        } else {
+            return false;
+        }
     }
 
     public static Set<ExprId> getJoinOutputExprIdSet(Plan left, Plan right) {
diff --git 
a/regression-test/suites/correctness_p0/test_colocate_join_of_column_order.groovy
 
b/regression-test/suites/correctness_p0/test_colocate_join_of_column_order.groovy
new file mode 100644
index 00000000000..efef9969506
--- /dev/null
+++ 
b/regression-test/suites/correctness_p0/test_colocate_join_of_column_order.groovy
@@ -0,0 +1,111 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_colocate_join_of_column_order") {
+    sql """ DROP TABLE IF EXISTS `test_colocate_join_of_column_order_t1`; """
+    // distributed by k1,k2
+    sql """
+        CREATE TABLE IF NOT EXISTS `test_colocate_join_of_column_order_t1` (
+        `k1` varchar(64) NULL,
+        `k2` varchar(64) NULL,
+        `v` int NULL
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`k1`,`k2`)
+        COMMENT 'OLAP'
+        DISTRIBUTED BY HASH(`k1`,`k2`) BUCKETS 1
+        PROPERTIES (
+            "replication_num" = "1",
+            "colocate_with" = "group_column_order"
+        );
+    """
+    sql """ DROP TABLE IF EXISTS `test_colocate_join_of_column_order_t2`; """
+    // distributed by k2,k1
+    sql """
+        CREATE TABLE IF NOT EXISTS `test_colocate_join_of_column_order_t2` (
+        `k1` varchar(64) NULL,
+        `k2` varchar(64) NULL,
+        `v` int NULL
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`k1`,`k2`)
+        COMMENT 'OLAP'
+        DISTRIBUTED BY HASH(`k2`,`k1`) BUCKETS 1
+        PROPERTIES (
+            "replication_num" = "1",
+            "colocate_with" = "group_column_order"
+        );
+    """
+    sql """insert into test_colocate_join_of_column_order_t1 
values('k1','k2',11);"""
+    sql """insert into test_colocate_join_of_column_order_t2 
values('k1','k2',11);"""
+
+    sql """set enable_nereids_planner=true; """
+    explain {
+        sql("select * from test_colocate_join_of_column_order_t1 a join 
test_colocate_join_of_column_order_t2 b on a.k1=b.k1 and a.k2=b.k2;")
+        notContains "COLOCATE"
+    }
+    explain {
+        sql("select * from test_colocate_join_of_column_order_t1 a join 
test_colocate_join_of_column_order_t2 b on a.k1=b.k2;")
+        notContains "COLOCATE"
+    }
+    explain {
+        sql("select * from test_colocate_join_of_column_order_t1 a join 
test_colocate_join_of_column_order_t2 b on a.k1=b.k1;")
+        notContains "COLOCATE"
+    }
+    explain {
+        sql("select * from test_colocate_join_of_column_order_t1 a join 
test_colocate_join_of_column_order_t2 b on a.k1=b.k2 and a.v=b.v;")
+        notContains "COLOCATE"
+    }
+    explain {
+        sql("select * from test_colocate_join_of_column_order_t1 a join 
test_colocate_join_of_column_order_t2 b on a.k1=b.k2 and a.k2=b.k1;")
+        contains "COLOCATE"
+    }
+    explain {
+        sql("select * from test_colocate_join_of_column_order_t1 a join 
test_colocate_join_of_column_order_t2 b on a.k1=b.k2 and a.k2=b.k1 and 
a.v=b.v;")
+        contains "COLOCATE"
+    }
+
+    sql """ DROP TABLE IF EXISTS `test_colocate_join_of_column_order_t1`; """
+    sql """ DROP TABLE IF EXISTS `test_colocate_join_of_column_order_t2`; """
+
+    // multi tables
+    sql """ DROP TABLE IF EXISTS `test_colocate_join_of_column_order_ta`; """
+    sql """
+        CREATE TABLE IF NOT EXISTS `test_colocate_join_of_column_order_ta` ( 
`c1` bigint NULL, `c2` bigint NULL)
+            DISTRIBUTED BY HASH(c1) PROPERTIES ( "replication_num" = "1", 
"colocate_with" = "group_column_order3");
+    """
+    sql """ DROP TABLE IF EXISTS `test_colocate_join_of_column_order_tb`; """
+    sql """
+        CREATE TABLE IF NOT EXISTS `test_colocate_join_of_column_order_tb` ( 
`c1` bigint NULL, `c2` bigint NULL)
+            DISTRIBUTED BY HASH(c1) PROPERTIES ( "replication_num" = "1", 
"colocate_with" = "group_column_order3");
+    """
+    sql """ DROP TABLE IF EXISTS `test_colocate_join_of_column_order_tc`; """
+    sql """
+        CREATE TABLE IF NOT EXISTS `test_colocate_join_of_column_order_tc` ( 
`c1` bigint NULL, `c2` bigint NULL)
+            DISTRIBUTED BY HASH(c1) PROPERTIES ( "replication_num" = "1", 
"colocate_with" = "group_column_order3");
+    """
+    sql """insert into test_colocate_join_of_column_order_ta values(1,1);"""
+    sql """insert into test_colocate_join_of_column_order_tb values(1,1);"""
+    sql """insert into test_colocate_join_of_column_order_tc values(1,1);"""
+
+    explain {
+        sql("""select /*+ set_var(disable_join_reorder=true) */ * from 
test_colocate_join_of_column_order_ta join [shuffle] (select cast((c2 + 1) as 
bigint) c2 from test_colocate_join_of_column_order_tb) 
test_colocate_join_of_column_order_tb  on 
test_colocate_join_of_column_order_ta.c1 = 
test_colocate_join_of_column_order_tb.c2 join [shuffle] 
test_colocate_join_of_column_order_tc on 
test_colocate_join_of_column_order_tb.c2 = 
test_colocate_join_of_column_order_tc.c1;""");
+        contains "COLOCATE"
+    }
+
+    sql """ DROP TABLE IF EXISTS `test_colocate_join_of_column_order_ta`; """
+    sql """ DROP TABLE IF EXISTS `test_colocate_join_of_column_order_tb`; """
+    sql """ DROP TABLE IF EXISTS `test_colocate_join_of_column_order_tc`; """
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to