This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b38b5d5415 Optimize sort properties of JoinNode in case of JOIN USING
6b38b5d5415 is described below

commit 6b38b5d5415848d12a825a52116f39154445224b
Author: Weihao Li <[email protected]>
AuthorDate: Mon May 26 21:18:34 2025 +0800

    Optimize sort properties of JoinNode in case of JOIN USING
---
 .../distribute/TableDistributedPlanGenerator.java  |  79 +++++++++++--
 .../plan/relational/analyzer/JoinTest.java         | 124 +++++++++++++++++++++
 .../planner/assertions/PlanMatchPattern.java       |   4 +
 3 files changed, 200 insertions(+), 7 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
index 095ab695ff6..0680de2873c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
@@ -84,6 +84,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.schema.Table
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.schema.TableDeviceQueryScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.DataNodeLocationSupplierFactory;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.PushPredicateIntoTableScan;
+import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CoalesceExpression;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.FunctionCall;
 import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
@@ -104,6 +105,7 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -231,10 +233,51 @@ public class TableDistributedPlanGenerator
           
ImmutableSet.copyOf(node.getOutputSymbols()).containsAll(childOrdering.getOrderBy());
     }
     if (childrenNodes.size() == 1) {
+      PlanNode child = childrenNodes.get(0);
       if (containAllSortItem) {
         nodeOrderingMap.put(node.getPlanNodeId(), childOrdering);
       }
-      node.setChild(childrenNodes.get(0));
+
+      // Now the join implement but CROSS is MergeSortJoin, so it can keep 
order
+      if (child instanceof JoinNode) {
+        JoinNode joinNode = (JoinNode) child;
+
+        // We only process FULL Join here, other type will be processed in 
visitJoinNode()
+        if (joinNode.getJoinType() == JoinNode.JoinType.FULL
+            && !joinNode.getAsofCriteria().isPresent()) {
+          Map<Symbol, Expression> assignmentsMap = 
node.getAssignments().getMap();
+          // If these Coalesces are all appear in ProjectNode, the ProjectNode 
is ordered
+          int coalescesSize = joinNode.getCriteria().size();
+
+          // We use map to memorize Symbol of according Coalesce, use linked 
to avoid twice query of
+          // this Map when constructOrderingSchema
+          Map<Expression, Symbol> orderedCoalesces = new 
LinkedHashMap<>(coalescesSize);
+          for (JoinNode.EquiJoinClause clause : joinNode.getCriteria()) {
+            orderedCoalesces.put(
+                new CoalesceExpression(
+                    ImmutableList.of(
+                        clause.getLeft().toSymbolReference(),
+                        clause.getRight().toSymbolReference())),
+                null);
+          }
+
+          for (Map.Entry<Symbol, Expression> assignment : 
assignmentsMap.entrySet()) {
+            if (orderedCoalesces.containsKey(assignment.getValue())) {
+              coalescesSize--;
+              orderedCoalesces.put(assignment.getValue(), assignment.getKey());
+            }
+          }
+
+          // All Coalesces appear in ProjectNode
+          if (coalescesSize == 0) {
+            nodeOrderingMap.put(
+                node.getPlanNodeId(),
+                constructOrderingSchema(new 
ArrayList<>(orderedCoalesces.values())));
+          }
+        }
+      }
+
+      node.setChild(child);
       return Collections.singletonList(node);
     }
 
@@ -481,14 +524,36 @@ public class TableDistributedPlanGenerator
           rightChildrenNodes.size() == 1,
           "The size of right children node of JoinNode should be 1");
     }
+
+    OrderingScheme leftChildOrdering = 
nodeOrderingMap.get(node.getLeftChild().getPlanNodeId());
+    OrderingScheme rightChildOrdering = 
nodeOrderingMap.get(node.getRightChild().getPlanNodeId());
+
     // For CrossJoinNode, we need to merge children nodes(It's safe for other 
JoinNodes here since
     // the size of their children is always 1.)
-    node.setLeftChild(
-        mergeChildrenViaCollectOrMergeSort(
-            nodeOrderingMap.get(node.getLeftChild().getPlanNodeId()), 
leftChildrenNodes));
-    node.setRightChild(
-        mergeChildrenViaCollectOrMergeSort(
-            nodeOrderingMap.get(node.getRightChild().getPlanNodeId()), 
rightChildrenNodes));
+    node.setLeftChild(mergeChildrenViaCollectOrMergeSort(leftChildOrdering, 
leftChildrenNodes));
+    node.setRightChild(mergeChildrenViaCollectOrMergeSort(rightChildOrdering, 
rightChildrenNodes));
+
+    // Now the join implement but CROSS is MergeSortJoin, so it can keep order
+    if (!node.isCrossJoin() && !node.getAsofCriteria().isPresent()) {
+      switch (node.getJoinType()) {
+        case FULL:
+          // If join type is FULL Join, we will process SortProperties in 
ProjectNode above this
+          // node.
+          break;
+        case INNER:
+        case LEFT:
+          if (ImmutableSet.copyOf(node.getLeftOutputSymbols())
+              .containsAll(leftChildOrdering.getOrderBy())) {
+            nodeOrderingMap.put(node.getPlanNodeId(), leftChildOrdering);
+          }
+          break;
+        case RIGHT:
+          throw new IllegalStateException(
+              "RIGHT Join should be transformed to LEFT Join in previous 
process");
+        default:
+          throw new UnsupportedOperationException("Unsupported Join Type: " + 
node.getJoinType());
+      }
+    }
     return Collections.singletonList(node);
   }
 
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/JoinTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/JoinTest.java
index 2b0806c857c..eb5091bc878 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/JoinTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/JoinTest.java
@@ -71,8 +71,10 @@ import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions
 import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.aggregation;
 import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.aggregationFunction;
 import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.aggregationTableScan;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.exchange;
 import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.filter;
 import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.join;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.mergeSort;
 import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.output;
 import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.project;
 import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.singleGroupingSet;
@@ -635,4 +637,126 @@ public class JoinTest {
             + "ON t1.time = t2.time ORDER BY t1.tag1 OFFSET 3 LIMIT 6",
         false);
   }
+
+  @Test
+  public void testJoinSortProperties() {
+    // FULL JOIN
+    PlanTester planTester = new PlanTester();
+    sql =
+        "select * from table1 t1 "
+            + "full join table1 t2 using (time, s1)"
+            + "full join table1 t3 using (time, s1)";
+    logicalQueryPlan = planTester.createPlan(sql);
+    assertPlan(
+        logicalQueryPlan.getRootNode(),
+        output(
+            project(
+                join(
+                    sort(
+                        project(
+                            join(
+                                sort(tableScan("testdb.table1")),
+                                sort(tableScan("testdb.table1"))))),
+                    sort(tableScan("testdb.table1"))))));
+
+    assertPlan(planTester.getFragmentPlan(0), output(project(join(exchange(), 
exchange()))));
+
+    // the sort node above JoinNode has been eliminated
+    assertPlan(planTester.getFragmentPlan(1), project(join(exchange(), 
exchange())));
+
+    assertPlan(planTester.getFragmentPlan(2), mergeSort(exchange(), 
exchange(), exchange()));
+
+    assertPlan(planTester.getFragmentPlan(3), 
sort(tableScan("testdb.table1")));
+
+    assertPlan(planTester.getFragmentPlan(4), 
sort(tableScan("testdb.table1")));
+
+    assertPlan(planTester.getFragmentPlan(5), 
sort(tableScan("testdb.table1")));
+
+    assertPlan(planTester.getFragmentPlan(6), mergeSort(exchange(), 
exchange(), exchange()));
+
+    assertPlan(planTester.getFragmentPlan(7), 
sort(tableScan("testdb.table1")));
+
+    assertPlan(planTester.getFragmentPlan(8), 
sort(tableScan("testdb.table1")));
+
+    assertPlan(planTester.getFragmentPlan(9), 
sort(tableScan("testdb.table1")));
+
+    assertPlan(planTester.getFragmentPlan(10), mergeSort(exchange(), 
exchange(), exchange()));
+
+    // LEFT
+    sql =
+        "select * from table1 t1 "
+            + "left join table1 t2 using (time, s1)"
+            + "left join table1 t3 using (time, s1)";
+    assertLeftOrInner(planTester);
+
+    // INNER JOIN
+    sql =
+        "select * from table1 t1 "
+            + "inner join table1 t2 using (time, s1)"
+            + "inner join table1 t3 using (time, s1)";
+    assertLeftOrInner(planTester);
+
+    // RIGHT JOIN
+    sql =
+        "select * from table1 t1 "
+            + "right join table1 t2 using (time, s1)"
+            + "right join table1 t3 using (time, s1)";
+    logicalQueryPlan = planTester.createPlan(sql);
+    assertPlan(
+        logicalQueryPlan.getRootNode(),
+        output(
+            join(
+                sort(tableScan("testdb.table1")),
+                sort(join(sort(tableScan("testdb.table1")), 
sort(tableScan("testdb.table1")))))));
+
+    assertPlan(planTester.getFragmentPlan(0), output(join(exchange(), 
exchange())));
+
+    assertPlan(planTester.getFragmentPlan(1), mergeSort(exchange(), 
exchange(), exchange()));
+
+    assertPlan(planTester.getFragmentPlan(2), 
sort(tableScan("testdb.table1")));
+
+    assertPlan(planTester.getFragmentPlan(3), 
sort(tableScan("testdb.table1")));
+
+    assertPlan(planTester.getFragmentPlan(4), 
sort(tableScan("testdb.table1")));
+
+    // the sort node above JoinNode has been eliminated
+    assertPlan(planTester.getFragmentPlan(5), join(exchange(), exchange()));
+
+    assertPlan(planTester.getFragmentPlan(6), mergeSort(exchange(), 
exchange(), exchange()));
+
+    assertPlan(planTester.getFragmentPlan(10), mergeSort(exchange(), 
exchange(), exchange()));
+  }
+
+  private void assertLeftOrInner(PlanTester planTester) {
+    logicalQueryPlan = planTester.createPlan(sql);
+    assertPlan(
+        logicalQueryPlan.getRootNode(),
+        output(
+            join(
+                sort(join(sort(tableScan("testdb.table1")), 
sort(tableScan("testdb.table1")))),
+                sort(tableScan("testdb.table1")))));
+
+    assertPlan(planTester.getFragmentPlan(0), output(join(exchange(), 
exchange())));
+
+    // the sort node above JoinNode has been eliminated
+    assertPlan(planTester.getFragmentPlan(1), join(exchange(), exchange()));
+
+    assertPlan(planTester.getFragmentPlan(2), mergeSort(exchange(), 
exchange(), exchange()));
+
+    assertPlan(planTester.getFragmentPlan(3), 
sort(tableScan("testdb.table1")));
+
+    assertPlan(planTester.getFragmentPlan(4), 
sort(tableScan("testdb.table1")));
+
+    assertPlan(planTester.getFragmentPlan(5), 
sort(tableScan("testdb.table1")));
+
+    assertPlan(planTester.getFragmentPlan(6), mergeSort(exchange(), 
exchange(), exchange()));
+
+    assertPlan(planTester.getFragmentPlan(7), 
sort(tableScan("testdb.table1")));
+
+    assertPlan(planTester.getFragmentPlan(8), 
sort(tableScan("testdb.table1")));
+
+    assertPlan(planTester.getFragmentPlan(9), 
sort(tableScan("testdb.table1")));
+
+    assertPlan(planTester.getFragmentPlan(10), mergeSort(exchange(), 
exchange(), exchange()));
+  }
 }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/PlanMatchPattern.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/PlanMatchPattern.java
index 93662f6082b..66bccb425a4 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/PlanMatchPattern.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/PlanMatchPattern.java
@@ -516,6 +516,10 @@ public final class PlanMatchPattern {
       return builder.build();
   }*/
 
+  public static PlanMatchPattern join(PlanMatchPattern left, PlanMatchPattern 
right) {
+    return node(JoinNode.class, left, right);
+  }
+
   public static PlanMatchPattern join(
       JoinNode.JoinType type, Consumer<JoinMatcher.Builder> handler) {
     JoinMatcher.Builder builder = new JoinMatcher.Builder(type);

Reply via email to