This is an automated email from the ASF dual-hosted git repository.

chenyz pushed a commit to branch udtf-optimize
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 249691ced288fd3e5ec5a63d1be50b628cbc53b5
Author: Chen YZ <[email protected]>
AuthorDate: Sat Feb 22 16:06:03 2025 +0800

    done
---
 .../apache/iotdb/udf/table/HOPTableFunction.java   | 158 +++++++++++
 .../it/query/recent/IoTDBFillTableIT.java          |  12 +-
 .../plan/planner/plan/node/PlanGraphPrinter.java   |  11 +
 .../plan/planner/plan/node/PlanNodeType.java       |   4 +
 .../plan/planner/plan/node/PlanVisitor.java        |   5 +
 .../plan/relational/planner/OrderingScheme.java    |  12 +
 .../plan/relational/planner/QueryPlanner.java      |   8 +-
 .../distribute/TableDistributedPlanGenerator.java  | 158 +++++++++--
 .../TableModelTypeProviderExtractor.java           |   7 +
 .../rule/ImplementTableFunctionSource.java         |  31 ++-
 .../plan/relational/planner/node/AuxSortNode.java  | 139 ++++++++++
 .../optimizations/LogicalOptimizeFactory.java      |   3 +-
 .../planner/optimizations/ParallelizeAuxSort.java  | 289 +++++++++++++++++++++
 .../PushLimitOffsetIntoTableScan.java              |   6 +
 .../optimizations/TransformSortToStreamSort.java   |   6 +
 .../optimizations/UnaliasSymbolReferences.java     |  21 ++
 16 files changed, 834 insertions(+), 36 deletions(-)

diff --git 
a/example/udf/src/main/java/org/apache/iotdb/udf/table/HOPTableFunction.java 
b/example/udf/src/main/java/org/apache/iotdb/udf/table/HOPTableFunction.java
new file mode 100644
index 00000000000..fb9f7beac33
--- /dev/null
+++ b/example/udf/src/main/java/org/apache/iotdb/udf/table/HOPTableFunction.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.udf.table;
+
+import org.apache.iotdb.udf.api.exception.UDFException;
+import org.apache.iotdb.udf.api.relational.TableFunction;
+import org.apache.iotdb.udf.api.relational.access.Record;
+import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis;
+import 
org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider;
+import org.apache.iotdb.udf.api.relational.table.argument.Argument;
+import org.apache.iotdb.udf.api.relational.table.argument.DescribedSchema;
+import org.apache.iotdb.udf.api.relational.table.argument.ScalarArgument;
+import org.apache.iotdb.udf.api.relational.table.argument.TableArgument;
+import 
org.apache.iotdb.udf.api.relational.table.processor.TableFunctionDataProcessor;
+import 
org.apache.iotdb.udf.api.relational.table.specification.ParameterSpecification;
+import 
org.apache.iotdb.udf.api.relational.table.specification.ScalarParameterSpecification;
+import 
org.apache.iotdb.udf.api.relational.table.specification.TableParameterSpecification;
+import org.apache.iotdb.udf.api.type.Type;
+
+import org.apache.tsfile.block.column.ColumnBuilder;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+public class HOPTableFunction implements TableFunction {
+
+  private static final String DATA_PARAMETER_NAME = "DATA";
+  private static final String TIMECOL_PARAMETER_NAME = "TIMECOL";
+  private static final String SLIDE_PARAMETER_NAME = "SLIDE";
+  private static final String SIZE_PARAMETER_NAME = "SIZE";
+  private static final String START_PARAMETER_NAME = "START";
+
+  @Override
+  public List<ParameterSpecification> getArgumentsSpecifications() {
+    return Arrays.asList(
+        TableParameterSpecification.builder()
+            .name(DATA_PARAMETER_NAME)
+            .passThroughColumns()
+            .keepWhenEmpty()
+            .build(),
+        ScalarParameterSpecification.builder()
+            .name(TIMECOL_PARAMETER_NAME)
+            .type(Type.STRING)
+            .build(),
+        
ScalarParameterSpecification.builder().name(SLIDE_PARAMETER_NAME).type(Type.INT64).build(),
+        
ScalarParameterSpecification.builder().name(SIZE_PARAMETER_NAME).type(Type.INT64).build(),
+        ScalarParameterSpecification.builder()
+            .name(START_PARAMETER_NAME)
+            .type(Type.TIMESTAMP)
+            .defaultValue(Long.MIN_VALUE)
+            .build());
+  }
+
+  private int findTimeColumnIndex(TableArgument tableArgument, String 
expectedFieldName) {
+    int requiredIndex = -1;
+    for (int i = 0; i < tableArgument.getFieldTypes().size(); i++) {
+      Optional<String> fieldName = tableArgument.getFieldNames().get(i);
+      if (fieldName.isPresent() && 
expectedFieldName.equalsIgnoreCase(fieldName.get())) {
+        requiredIndex = i;
+        break;
+      }
+    }
+    return requiredIndex;
+  }
+
+  @Override
+  public TableFunctionAnalysis analyze(Map<String, Argument> arguments) {
+    TableArgument tableArgument = (TableArgument) 
arguments.get(DATA_PARAMETER_NAME);
+    String expectedFieldName =
+        (String) ((ScalarArgument) 
arguments.get(TIMECOL_PARAMETER_NAME)).getValue();
+    int requiredIndex = findTimeColumnIndex(tableArgument, expectedFieldName);
+    if (requiredIndex == -1) {
+      throw new UDFException("The required field is not found in the input 
table");
+    }
+    DescribedSchema properColumnSchema =
+        new DescribedSchema.Builder()
+            .addField("window_start", Type.TIMESTAMP)
+            .addField("window_end", Type.TIMESTAMP)
+            .build();
+
+    // outputColumnSchema
+    return TableFunctionAnalysis.builder()
+        .properColumnSchema(properColumnSchema)
+        .requiredColumns(DATA_PARAMETER_NAME, 
Collections.singletonList(requiredIndex))
+        .build();
+  }
+
+  @Override
+  public TableFunctionProcessorProvider getProcessorProvider(Map<String, 
Argument> arguments) {
+    return new TableFunctionProcessorProvider() {
+      @Override
+      public TableFunctionDataProcessor getDataProcessor() {
+        return new HOPDataProcessor(
+            (Long) ((ScalarArgument) 
arguments.get(START_PARAMETER_NAME)).getValue(),
+            (Long) ((ScalarArgument) 
arguments.get(SLIDE_PARAMETER_NAME)).getValue(),
+            (Long) ((ScalarArgument) 
arguments.get(SIZE_PARAMETER_NAME)).getValue());
+      }
+    };
+  }
+
+  private static class HOPDataProcessor implements TableFunctionDataProcessor {
+
+    private final long slide;
+    private final long size;
+    private long curTime;
+    private long curIndex = 0;
+
+    public HOPDataProcessor(long startTime, long slide, long size) {
+      this.slide = slide;
+      this.size = size;
+      this.curTime = startTime;
+    }
+
+    @Override
+    public void process(
+        Record input,
+        List<ColumnBuilder> properColumnBuilders,
+        ColumnBuilder passThroughIndexBuilder) {
+      long timeValue = input.getLong(0);
+      if (curTime == Long.MIN_VALUE) {
+        curTime = timeValue;
+      }
+      if (curTime + size <= timeValue) {
+        // jump to appropriate window
+        long move = (timeValue - curTime - size) / slide + 1;
+        curTime += move * slide;
+      }
+      long slideTime = curTime;
+      while (slideTime <= timeValue && slideTime + size > timeValue) {
+        properColumnBuilders.get(0).writeLong(slideTime);
+        properColumnBuilders.get(1).writeLong(slideTime + size);
+        passThroughIndexBuilder.writeLong(curIndex);
+        slideTime += slide;
+      }
+      curIndex++;
+    }
+  }
+}
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBFillTableIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBFillTableIT.java
index 3884b1f2be1..12e9129f49d 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBFillTableIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBFillTableIT.java
@@ -298,14 +298,14 @@ public class IoTDBFillTableIT {
           "1970-01-01T00:00:00.002Z,shanghai,d1,212,2111,",
         };
     tableResultSetEqualTest(
-        "select time,city,device_id,s1,s2 from table2 FILL METHOD PREVIOUS 
FILL_GROUP 2,3",
+        "select time,city,device_id,s1,s2 from table2 FILL METHOD PREVIOUS 
FILL_GROUP 2,3 order by 2,3",
         expectedHeader,
         retArray,
         DATABASE_NAME);
 
     // case11: all without time filter using previous fill with FILL_GROUP and 
TIME_COLUMN
     tableResultSetEqualTest(
-        "select time,city,device_id,s1,s2 from table2 FILL METHOD PREVIOUS 
TIME_COLUMN 1 FILL_GROUP 2,3",
+        "select time,city,device_id,s1,s2 from table2 FILL METHOD PREVIOUS 
TIME_COLUMN 1 FILL_GROUP 2,3 order by 2,3",
         expectedHeader,
         retArray,
         DATABASE_NAME);
@@ -326,7 +326,7 @@ public class IoTDBFillTableIT {
           "1970-01-01T00:00:00.002Z,shanghai,d1,212,2111,",
         };
     tableResultSetEqualTest(
-        "select time,city,device_id,s1,s2 from table2 FILL METHOD PREVIOUS 
TIME_BOUND 2ms FILL_GROUP 2,3",
+        "select time,city,device_id,s1,s2 from table2 FILL METHOD PREVIOUS 
TIME_BOUND 2ms FILL_GROUP 2,3 order by 2,3",
         expectedHeader,
         retArray,
         DATABASE_NAME);
@@ -334,7 +334,7 @@ public class IoTDBFillTableIT {
     // case13: all without time filter using previous fill with TIME_BOUND, 
FILL_GROUP and
     // TIME_COLUMN
     tableResultSetEqualTest(
-        "select time,city,device_id,s1,s2 from table2 FILL METHOD PREVIOUS 
TIME_BOUND 2ms TIME_COLUMN 1 FILL_GROUP 2,3",
+        "select time,city,device_id,s1,s2 from table2 FILL METHOD PREVIOUS 
TIME_BOUND 2ms TIME_COLUMN 1 FILL_GROUP 2,3 order by 2,3",
         expectedHeader,
         retArray,
         DATABASE_NAME);
@@ -503,14 +503,14 @@ public class IoTDBFillTableIT {
           "1970-01-01T00:00:00.002Z,shanghai,d1,212,null,",
         };
     tableResultSetEqualTest(
-        "select time,city,device_id,s1,s2 from table3 FILL METHOD LINEAR 
FILL_GROUP 2,3",
+        "select time,city,device_id,s1,s2 from table3 FILL METHOD LINEAR 
FILL_GROUP 2,3 order by 2,3",
         expectedHeader,
         retArray,
         DATABASE_NAME);
 
     // case11: all without time filter using linear fill with FILL_GROUP and 
TIME_COLUMN
     tableResultSetEqualTest(
-        "select time,city,device_id,s1,s2 from table3 FILL METHOD LINEAR 
TIME_COLUMN 1 FILL_GROUP 2,3",
+        "select time,city,device_id,s1,s2 from table3 FILL METHOD LINEAR 
TIME_COLUMN 1 FILL_GROUP 2,3 order by 2,3",
         expectedHeader,
         retArray,
         DATABASE_NAME);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
index a0144a59e38..0ad7ab155ec 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
@@ -67,6 +67,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.DeviceViewInt
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.IntoPathDescriptor;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTreeDeviceViewScanNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AuxSortNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.EnforceSingleRowNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ExchangeNode;
@@ -926,6 +927,16 @@ public class PlanGraphPrinter extends 
PlanVisitor<List<String>, PlanGraphPrinter
     return render(node, boxValue, context);
   }
 
+  @Override
+  public List<String> visitAuxSort(AuxSortNode node, GraphContext context) {
+    List<String> boxValue = new ArrayList<>();
+    boxValue.add(String.format("AuxSort-%s", node.getPlanNodeId().getId()));
+    boxValue.add(String.format("EnableParalleled: %s", 
node.isEnableParalleled()));
+    boxValue.add(String.format("PartitionKeyCount: %s", 
node.getPartitionKeyCount()));
+    boxValue.add(String.format("OrderingScheme: %s", 
node.getOrderingScheme()));
+    return render(node, boxValue, context);
+  }
+
   @Override
   public List<String> visitMergeSort(
       
org.apache.iotdb.db.queryengine.plan.relational.planner.node.MergeSortNode node,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
index 38145ce0194..04308a71323 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
@@ -117,6 +117,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalIn
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertRowsNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTreeDeviceViewScanNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AuxSortNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.EnforceSingleRowNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.GapFillNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.InformationSchemaTableScanNode;
@@ -296,6 +297,7 @@ public enum PlanNodeType {
   MARK_DISTINCT_NODE((short) 1026),
   TABLE_FUNCTION_NODE((short) 1027),
   TABLE_FUNCTION_PROCESSOR_NODE((short) 1028),
+  TABLE_AUX_SORT_NODE((short) 1029),
 
   RELATIONAL_INSERT_TABLET((short) 2000),
   RELATIONAL_INSERT_ROW((short) 2001),
@@ -669,6 +671,8 @@ public enum PlanNodeType {
         return TableFunctionNode.deserialize(buffer);
       case 1028:
         return TableFunctionProcessorNode.deserialize(buffer);
+      case 1029:
+        return AuxSortNode.deserialize(buffer);
       case 2000:
         return RelationalInsertTabletNode.deserialize(buffer);
       case 2001:
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
index 0100cbbaeef..da861abf15f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
@@ -748,6 +748,11 @@ public abstract class PlanVisitor<R, C> {
     return visitSingleChildProcess(node, context);
   }
 
+  public R visitAuxSort(
+      org.apache.iotdb.db.queryengine.plan.relational.planner.node.AuxSortNode 
node, C context) {
+    return visitSingleChildProcess(node, context);
+  }
+
   public R visitTopK(
       org.apache.iotdb.db.queryengine.plan.relational.planner.node.TopKNode 
node, C context) {
     return visitMultiChildProcess(node, context);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/OrderingScheme.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/OrderingScheme.java
index 508408571bc..97ad4a78d8e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/OrderingScheme.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/OrderingScheme.java
@@ -53,6 +53,18 @@ public class OrderingScheme {
     this.orderings = ImmutableMap.copyOf(orderings);
   }
 
+  public static OrderingScheme combine(OrderingScheme a, OrderingScheme b) {
+    List<Symbol> orderBy = new ArrayList<>(a.orderBy);
+    Map<Symbol, SortOrder> orderings = new HashMap<>(a.orderings);
+    for (Symbol symbol : b.orderBy) {
+      if (!orderings.containsKey(symbol)) {
+        orderBy.add(symbol);
+        orderings.put(symbol, b.orderings.get(symbol));
+      }
+    }
+    return new OrderingScheme(orderBy, orderings);
+  }
+
   public List<Symbol> getOrderBy() {
     return orderBy;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/QueryPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/QueryPlanner.java
index d0da64317b4..26e57030fe1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/QueryPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/QueryPlanner.java
@@ -31,6 +31,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.analyzer.NodeRef;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.ir.GapFillStartAndEndTimeExtractVisitor;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode.Aggregation;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AuxSortNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FilterNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.GapFillNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitNode;
@@ -821,8 +822,11 @@ public class QueryPlanner {
     OrderingScheme orderingScheme = new OrderingScheme(orderBySymbols.build(), 
orderings);
     analysis.setSortNode(true);
     return subPlan.withNewRoot(
-        new SortNode(
-            queryIdAllocator.genPlanNodeId(), subPlan.getRoot(), 
orderingScheme, false, false));
+        new AuxSortNode(
+            queryIdAllocator.genPlanNodeId(),
+            subPlan.getRoot(),
+            orderingScheme,
+            groupingKeys.size()));
   }
 
   private PlanBuilder distinct(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
index 91f357168ef..49b1bf790fb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
@@ -43,6 +43,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolAllocator;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTableScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTreeDeviceViewScanNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AuxSortNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.CollectNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.EnforceSingleRowNode;
@@ -100,7 +101,6 @@ import java.util.stream.IntStream;
 import static com.google.common.collect.ImmutableList.toImmutableList;
 import static org.apache.iotdb.commons.partition.DataPartition.NOT_ASSIGNED;
 import static 
org.apache.iotdb.commons.udf.builtin.relational.TableBuiltinScalarFunction.DATE_BIN;
-import static 
org.apache.iotdb.db.queryengine.plan.planner.TableOperatorGenerator.createTreeDeviceIdColumnValueExtractor;
 import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolAllocator.GROUP_KEY_SUFFIX;
 import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolAllocator.SEPARATOR;
 import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode.Step.SINGLE;
@@ -189,14 +189,20 @@ public class TableDistributedPlanGenerator
     if (!(node instanceof ValueFillNode)) {
       context.clearExpectedOrderingScheme();
     }
+    boolean parallel =
+        (node.getChild() instanceof AuxSortNode)
+            && ((AuxSortNode) node.getChild()).isEnableParalleled();
     List<PlanNode> childrenNodes = node.getChild().accept(this, context);
     OrderingScheme childOrdering = 
nodeOrderingMap.get(childrenNodes.get(0).getPlanNodeId());
     if (childOrdering != null) {
       nodeOrderingMap.put(node.getPlanNodeId(), childOrdering);
     }
-
-    node.setChild(mergeChildrenViaCollectOrMergeSort(childOrdering, 
childrenNodes));
-    return Collections.singletonList(node);
+    if (!parallel) {
+      node.setChild(mergeChildrenViaCollectOrMergeSort(childOrdering, 
childrenNodes));
+      return Collections.singletonList(node);
+    } else {
+      return splitForEachChild(node, childrenNodes);
+    }
   }
 
   @Override
@@ -272,9 +278,7 @@ public class TableDistributedPlanGenerator
       ProjectNode subProjectNode =
           new ProjectNode(queryId.genPlanNodeId(), child, 
node.getAssignments());
       resultNodeList.add(subProjectNode);
-      if (i == 0) {
-        nodeOrderingMap.put(subProjectNode.getPlanNodeId(), childOrdering);
-      }
+      nodeOrderingMap.put(subProjectNode.getPlanNodeId(), childOrdering);
     }
     return resultNodeList;
   }
@@ -310,6 +314,36 @@ public class TableDistributedPlanGenerator
     return Collections.singletonList(newTopKNode);
   }
 
+  @Override
+  public List<PlanNode> visitAuxSort(AuxSortNode node, PlanContext context) {
+    boolean pushDown = context.pushDownAuxSort;
+    try {
+      context.setPushDownAuxSort(node.isEnableParalleled());
+      if (node.isEnableParalleled()) {
+        List<PlanNode> result = new ArrayList<>();
+        context.setExpectedOrderingScheme(node.getOrderingScheme());
+        List<PlanNode> childrenNodes = node.getChild().accept(this, context);
+        for (PlanNode child : childrenNodes) {
+          if (canSortEliminated(
+              node.getOrderingScheme(), 
nodeOrderingMap.get(child.getPlanNodeId()))) {
+            result.add(child);
+          } else {
+            SortNode subSortNode =
+                new SortNode(
+                    queryId.genPlanNodeId(), child, node.getOrderingScheme(), 
false, false);
+            result.add(subSortNode);
+            nodeOrderingMap.put(subSortNode.getPlanNodeId(), 
subSortNode.getOrderingScheme());
+          }
+        }
+        return result;
+      } else {
+        return visitSort(node, context);
+      }
+    } finally {
+      context.pushDownAuxSort = pushDown;
+    }
+  }
+
   @Override
   public List<PlanNode> visitSort(SortNode node, PlanContext context) {
     context.setExpectedOrderingScheme(node.getOrderingScheme());
@@ -433,9 +467,7 @@ public class TableDistributedPlanGenerator
       FilterNode subFilterNode =
           new FilterNode(queryId.genPlanNodeId(), child, node.getPredicate());
       resultNodeList.add(subFilterNode);
-      if (i == 0) {
-        nodeOrderingMap.put(subFilterNode.getPlanNodeId(), childOrdering);
-      }
+      nodeOrderingMap.put(subFilterNode.getPlanNodeId(), childOrdering);
     }
     return resultNodeList;
   }
@@ -483,8 +515,79 @@ public class TableDistributedPlanGenerator
 
   public List<PlanNode> visitDeviceTableScan(
       final DeviceTableScanNode node, final PlanContext context) {
+    if (context.isPushDownAuxSort()) {
+      return constructDeviceTableScanByTags(node, context);
+    } else {
+      return constructDeviceTableScanByRegionReplicaSet(node, context);
+    }
+  }
+
+  private List<PlanNode> constructDeviceTableScanByTags(
+      final DeviceTableScanNode node, final PlanContext context) {
+    List<PlanNode> result = new ArrayList<>();
+    List<DeviceEntry> crossRegionDevices = new ArrayList<>();
+
     final Map<TRegionReplicaSet, DeviceTableScanNode> tableScanNodeMap = new 
HashMap<>();
+    final Map<TRegionReplicaSet, Integer> regionDeviceCount = new HashMap<>();
+    for (final DeviceEntry deviceEntry : node.getDeviceEntries()) {
+      final List<TRegionReplicaSet> regionReplicaSets =
+          analysis.getDataRegionReplicaSetWithTimeFilter(
+              node.getQualifiedObjectName().getDatabaseName(),
+              deviceEntry.getDeviceID(),
+              node.getTimeFilter());
+      regionReplicaSets.forEach(
+          regionReplicaSet ->
+              regionDeviceCount.put(
+                  regionReplicaSet, 
regionDeviceCount.getOrDefault(regionReplicaSet, 0) + 1));
+      if (regionReplicaSets.size() != 1) {
+        crossRegionDevices.add(deviceEntry);
+        continue;
+      }
+      final DeviceTableScanNode deviceTableScanNode =
+          tableScanNodeMap.computeIfAbsent(
+              regionReplicaSets.get(0),
+              k -> {
+                final DeviceTableScanNode scanNode =
+                    new DeviceTableScanNode(
+                        queryId.genPlanNodeId(),
+                        node.getQualifiedObjectName(),
+                        node.getOutputSymbols(),
+                        node.getAssignments(),
+                        new ArrayList<>(),
+                        node.getIdAndAttributeIndexMap(),
+                        node.getScanOrder(),
+                        node.getTimePredicate().orElse(null),
+                        node.getPushDownPredicate(),
+                        node.getPushDownLimit(),
+                        node.getPushDownOffset(),
+                        node.isPushLimitToEachDevice(),
+                        node.containsNonAlignedDevice());
+                scanNode.setRegionReplicaSet(regionReplicaSets.get(0));
+                return scanNode;
+              });
+      deviceTableScanNode.appendDeviceEntry(deviceEntry);
+    }
+    result.addAll(tableScanNodeMap.values());
+    if (context.hasSortProperty) {
+      processSortProperty(node, result, context);
+    }
+    context.mostUsedRegion =
+        regionDeviceCount.entrySet().stream()
+            .max(Comparator.comparingInt(Map.Entry::getValue))
+            .map(Map.Entry::getKey)
+            .orElse(null);
+    if(!crossRegionDevices.isEmpty()) {
+      node.setDeviceEntries(crossRegionDevices);
+      result.add(
+              new CollectNode(queryId.genPlanNodeId(), 
constructDeviceTableScanByRegionReplicaSet(node, context), 
node.getOutputSymbols()));
+    }
+    return result;
+  }
 
+  private List<PlanNode> constructDeviceTableScanByRegionReplicaSet(
+      final DeviceTableScanNode node, final PlanContext context) {
+
+    final Map<TRegionReplicaSet, DeviceTableScanNode> tableScanNodeMap = new 
HashMap<>();
     for (final DeviceEntry deviceEntry : node.getDeviceEntries()) {
       final List<TRegionReplicaSet> regionReplicaSets =
           analysis.getDataRegionReplicaSetWithTimeFilter(
@@ -497,7 +600,7 @@ public class TableDistributedPlanGenerator
             tableScanNodeMap.computeIfAbsent(
                 regionReplicaSet,
                 k -> {
-                  DeviceTableScanNode scanNode =
+                  final DeviceTableScanNode scanNode =
                       new DeviceTableScanNode(
                           queryId.genPlanNodeId(),
                           node.getQualifiedObjectName(),
@@ -518,7 +621,6 @@ public class TableDistributedPlanGenerator
         deviceTableScanNode.appendDeviceEntry(deviceEntry);
       }
     }
-
     if (tableScanNodeMap.isEmpty()) {
       node.setRegionReplicaSet(NOT_ASSIGNED);
       return Collections.singletonList(node);
@@ -685,7 +787,6 @@ public class TableDistributedPlanGenerator
     return resultTableScanNodeList;
   }
 
-  // TODO(UDF): 参考 visitAggregation 和 visitSort
   @Override
   public List<PlanNode> visitAggregation(AggregationNode node, PlanContext 
context) {
     if (node.isStreamable()) {
@@ -851,22 +952,40 @@ public class TableDistributedPlanGenerator
     return Collections.singletonList(node);
   }
 
+  private List<PlanNode> splitForEachChild(PlanNode node, List<PlanNode> 
childrenNodes) {
+    ImmutableList.Builder<PlanNode> result = ImmutableList.builder();
+    for (PlanNode child : childrenNodes) {
+      PlanNode subNode = node.clone();
+      subNode.addChild(child);
+      subNode.setPlanNodeId(queryId.genPlanNodeId());
+      result.add(subNode);
+    }
+    return result.build();
+  }
+
   @Override
   public List<PlanNode> visitTableFunctionProcessor(
       TableFunctionProcessorNode node, PlanContext context) {
+    context.clearExpectedOrderingScheme();
     if (node.getChildren().isEmpty()) {
       return Collections.singletonList(node);
     }
+    boolean parallel =
+        (node.getChild() instanceof AuxSortNode)
+            && ((AuxSortNode) node.getChild()).isEnableParalleled();
     List<PlanNode> childrenNodes = node.getChild().accept(this, context);
     if (childrenNodes.size() == 1) {
       node.setChild(childrenNodes.get(0));
-    } else {
+      return Collections.singletonList(node);
+    } else if (!parallel) {
       CollectNode collectNode =
           new CollectNode(queryId.genPlanNodeId(), 
node.getChildren().get(0).getOutputSymbols());
       childrenNodes.forEach(collectNode::addChild);
       node.setChild(collectNode);
+      return Collections.singletonList(node);
+    } else {
+      return splitForEachChild(node, childrenNodes);
     }
-    return Collections.singletonList(node);
   }
 
   private void buildRegionNodeMap(
@@ -1265,6 +1384,7 @@ public class TableDistributedPlanGenerator
     final Map<PlanNodeId, NodeDistribution> nodeDistributionMap;
     boolean hasExchangeNode = false;
     boolean hasSortProperty = false;
+    boolean pushDownAuxSort = false;
     OrderingScheme expectedOrderingScheme;
     TRegionReplicaSet mostUsedRegion;
 
@@ -1285,5 +1405,13 @@ public class TableDistributedPlanGenerator
       this.expectedOrderingScheme = expectedOrderingScheme;
       hasSortProperty = true;
     }
+
+    public void setPushDownAuxSort(boolean pushDownAuxSort) {
+      this.pushDownAuxSort = pushDownAuxSort;
+    }
+
+    public boolean isPushDownAuxSort() {
+      return pushDownAuxSort;
+    }
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelTypeProviderExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelTypeProviderExtractor.java
index 43c52a78282..d2596b45593 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelTypeProviderExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelTypeProviderExtractor.java
@@ -27,6 +27,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.IdentitySinkN
 import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTableScanNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AuxSortNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.CollectNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ExchangeNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FillNode;
@@ -203,6 +204,12 @@ public class TableModelTypeProviderExtractor {
       return null;
     }
 
+    @Override
+    public Void visitAuxSort(AuxSortNode node, Void context) {
+      node.getChild().accept(this, context);
+      return null;
+    }
+
     @Override
     public Void visitMergeSort(MergeSortNode node, Void context) {
       node.getChildren().forEach(c -> c.accept(this, context));
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/ImplementTableFunctionSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/ImplementTableFunctionSource.java
index 810a008e335..b189d2d5e97 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/ImplementTableFunctionSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/ImplementTableFunctionSource.java
@@ -24,16 +24,19 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingScheme;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.SortOrder;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Rule;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AuxSortNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns;
-import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableFunctionNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableFunctionProcessorNode;
 import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Captures;
 import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Pattern;
 
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -125,26 +128,30 @@ public class ImplementTableFunctionSource implements 
Rule<TableFunctionNode> {
           .getDataOrganizationSpecification()
           .ifPresent(
               dataOrganizationSpecification -> {
-                ImmutableList.Builder<Symbol> orderBy = 
ImmutableList.builder();
-                ImmutableMap.Builder<Symbol, SortOrder> orderings = 
ImmutableMap.builder();
+                List<Symbol> sortSymbols = new ArrayList<>();
+                Map<Symbol, SortOrder> sortOrderings = new HashMap<>();
                 for (Symbol symbol : 
dataOrganizationSpecification.getPartitionBy()) {
-                  orderBy.add(symbol);
-                  orderings.put(symbol, ASC_NULLS_LAST);
+                  sortSymbols.add(symbol);
+                  sortOrderings.put(symbol, ASC_NULLS_LAST);
                 }
+                int sortKeyOffset = sortSymbols.size();
                 dataOrganizationSpecification
                     .getOrderingScheme()
                     .ifPresent(
                         orderingScheme -> {
-                          orderBy.addAll(orderingScheme.getOrderBy());
-                          orderings.putAll(orderingScheme.getOrderings());
+                          for (Symbol symbol : orderingScheme.getOrderBy()) {
+                            if (!sortOrderings.containsKey(symbol)) {
+                              sortSymbols.add(symbol);
+                              sortOrderings.put(symbol, 
orderingScheme.getOrdering(symbol));
+                            }
+                          }
                         });
                 child.set(
-                    new SortNode(
+                    new AuxSortNode(
                         context.getIdAllocator().genPlanNodeId(),
                         child.get(),
-                        new OrderingScheme(orderBy.build(), orderings.build()),
-                        false,
-                        false));
+                        new OrderingScheme(sortSymbols, sortOrderings),
+                        sortKeyOffset));
               });
       return Result.ofPlanNode(
           new TableFunctionProcessorNode(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AuxSortNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AuxSortNode.java
new file mode 100644
index 00000000000..3c72a0ee8ba
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AuxSortNode.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.planner.node;
+
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingScheme;
+
+import com.google.common.collect.Iterables;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+/**
+ * Auxiliary node for sorting. Not generated by user queries, it is used by 
special process such as
+ * table function.
+ */
+public class AuxSortNode extends SortNode {
+
+  private boolean enableParalleled = false;
+
+  /**
+   * AuxSort#orderingScheme may include two parts: partition key and sort key. 
For example,
+   * PARTITION BY device_id, color ORDER BY time will construct an AuxSortNode 
with
+   * OrderingScheme[device_id, color, time] and partitionColumnCount = 2.
+   */
+  private int partitionKeyCount;
+
+  public AuxSortNode(PlanNodeId id, PlanNode child, OrderingScheme scheme, int 
partitionKeyCount) {
+    super(id, child, scheme, false, false);
+    this.partitionKeyCount = partitionKeyCount;
+  }
+
+  public AuxSortNode(
+      PlanNodeId id,
+      PlanNode child,
+      OrderingScheme scheme,
+      boolean partial,
+      boolean orderByAllIdsAndTime,
+      boolean enableParalleled,
+      int partitionKeyCount) {
+    super(id, child, scheme, partial, orderByAllIdsAndTime);
+    this.enableParalleled = enableParalleled;
+    this.partitionKeyCount = partitionKeyCount;
+  }
+
+  @Override
+  public PlanNode replaceChildren(List<PlanNode> newChildren) {
+    return new AuxSortNode(
+        id,
+        Iterables.getOnlyElement(newChildren),
+        orderingScheme,
+        partial,
+        orderByAllIdsAndTime,
+        enableParalleled,
+        partitionKeyCount);
+  }
+
+  public boolean isEnableParalleled() {
+    return enableParalleled;
+  }
+
+  public void setEnableParalleled(boolean enableParalleled) {
+    this.enableParalleled = enableParalleled;
+  }
+
+  public int getPartitionKeyCount() {
+    return partitionKeyCount;
+  }
+
+  @Override
+  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+    return visitor.visitAuxSort(this, context);
+  }
+
+  @Override
+  public PlanNode clone() {
+    return new AuxSortNode(
+        id,
+        null,
+        orderingScheme,
+        partial,
+        orderByAllIdsAndTime,
+        enableParalleled,
+        partitionKeyCount);
+  }
+
+  @Override
+  protected void serializeAttributes(ByteBuffer byteBuffer) {
+    PlanNodeType.TABLE_AUX_SORT_NODE.serialize(byteBuffer);
+    orderingScheme.serialize(byteBuffer);
+    ReadWriteIOUtils.write(enableParalleled, byteBuffer);
+    ReadWriteIOUtils.write(partitionKeyCount, byteBuffer);
+  }
+
+  @Override
+  protected void serializeAttributes(DataOutputStream stream) throws 
IOException {
+    PlanNodeType.TABLE_AUX_SORT_NODE.serialize(stream);
+    orderingScheme.serialize(stream);
+    ReadWriteIOUtils.write(enableParalleled, stream);
+    ReadWriteIOUtils.write(partitionKeyCount, stream);
+  }
+
+  public static AuxSortNode deserialize(ByteBuffer byteBuffer) {
+    OrderingScheme orderingScheme = OrderingScheme.deserialize(byteBuffer);
+    boolean enableParalleled = ReadWriteIOUtils.readBoolean(byteBuffer);
+    int partitionColumnCount = ReadWriteIOUtils.readInt(byteBuffer);
+    PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
+    return new AuxSortNode(
+        planNodeId, null, orderingScheme, false, false, enableParalleled, 
partitionColumnCount);
+  }
+
+  @Override
+  public String toString() {
+    return "AuxSortNode-" + this.getPlanNodeId();
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java
index 3b7b7e6267b..ae3dff9d838 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java
@@ -260,7 +260,8 @@ public class LogicalOptimizeFactory {
         new IterativeOptimizer(
             plannerContext,
             ruleStats,
-            ImmutableSet.of(new MergeLimitWithSort(), new 
MergeLimitOverProjectWithSort())));
+            ImmutableSet.of(new MergeLimitWithSort(), new 
MergeLimitOverProjectWithSort())),
+        new ParallelizeAuxSort());
 
     this.planOptimizers = optimizerBuilder.build();
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/ParallelizeAuxSort.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/ParallelizeAuxSort.java
new file mode 100644
index 00000000000..d175331533c
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/ParallelizeAuxSort.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations;
+
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanGraphPrinter;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.DataOrganizationSpecification;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingScheme;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTableScanNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AuxSortNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.CorrelatedJoinNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.JoinNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.SemiJoinNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.StreamSortNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableFunctionProcessorNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TopKNode;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static 
org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory.TAG;
+import static 
org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory.TIME;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.ParallelizeAuxSort.CanPushDown.ENABLE;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.ParallelizeAuxSort.CanPushDown.PENDING;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.ParallelizeAuxSort.CanPushDown.UNABLE;
+
+public class ParallelizeAuxSort implements PlanOptimizer {
+  @Override
+  public PlanNode optimize(PlanNode plan, PlanOptimizer.Context context) {
+    if (!(context.getAnalysis().isQuery())) {
+      return plan;
+    }
+    System.out.println("before optimize ParallelizeAuxSort 
==========================");
+    PlanGraphPrinter.print(plan);
+    PlanNode res = plan.accept(new Rewriter(context.getAnalysis()), new 
Context(null, 0));
+    System.out.println("after optimize ParallelizeAuxSort 
==========================");
+    PlanGraphPrinter.print(res);
+    return res;
+    //        return plan.accept(new Rewriter(context.getAnalysis()), new 
Context());
+  }
+
+  private void print(PlanNode node) {
+    PlanGraphPrinter.print(node);
+    for (PlanNode child : node.getChildren()) {
+      print(child);
+    }
+  }
+
+  private static class Rewriter extends PlanVisitor<PlanNode, Context> {
+    private final Analysis analysis;
+
+    public Rewriter(Analysis analysis) {
+      this.analysis = analysis;
+    }
+
+    @Override
+    public PlanNode visitPlan(PlanNode node, Context context) {
+      PlanNode newNode = node.clone();
+      for (PlanNode child : node.getChildren()) {
+        newNode.addChild(child.accept(this, context));
+      }
+      return newNode;
+    }
+
+    /**
+     * We need to make sure:
+     *
+     * <ul>
+     *   <li>(1) All keys in context#orderKey are used for partition.
+     *   <li>(2) childOrderSchema can match the prefix of context#orderKey, so 
that partition-based
+     *       operation can be pushed down.
+     * </ul>
+     */
+    private void checkPrefixMatch(Context context, List<Symbol> childOrder) {
+      if (context.canSkip()) {
+        return;
+      }
+      OrderingScheme prefix = context.orderKey;
+      if (prefix.getOrderBy().size() != context.partitionKeyCount) {
+        context.canPushDown = UNABLE;
+        return;
+      }
+      if (prefix.getOrderBy().size() > childOrder.size()) {
+        context.canPushDown = UNABLE;
+        return;
+      }
+      for (int i = 0; i < prefix.getOrderBy().size(); i++) {
+        Symbol lhs = prefix.getOrderBy().get(i);
+        Symbol rhs = childOrder.get(i);
+        if (!lhs.equals(rhs)) {
+          context.canPushDown = UNABLE;
+          return;
+        }
+      }
+      context.canPushDown = ENABLE;
+    }
+
+    @Override
+    public PlanNode visitAuxSort(AuxSortNode node, Context context) {
+      checkPrefixMatch(context, node.getOrderingScheme().getOrderBy());
+      Context newContext = new Context(node.getOrderingScheme(), 
node.getPartitionKeyCount());
+      AuxSortNode newNode = (AuxSortNode) node.clone();
+      newNode.addChild(node.getChild().accept(this, newContext));
+      if (newContext.canPushDown.equals(ENABLE)) {
+        newNode.setEnableParalleled(true);
+      }
+      return newNode;
+    }
+
+    @Override
+    public PlanNode visitSort(SortNode node, Context context) {
+      checkPrefixMatch(context, node.getOrderingScheme().getOrderBy());
+      return visitPlan(node, context);
+    }
+
+    @Override
+    public PlanNode visitStreamSort(StreamSortNode node, Context context) {
+      checkPrefixMatch(context, node.getOrderingScheme().getOrderBy());
+      return visitPlan(node, context);
+    }
+
+    @Override
+    public PlanNode visitTopK(TopKNode node, Context context) {
+      checkPrefixMatch(context, node.getOrderingScheme().getOrderBy());
+      return visitPlan(node, context);
+    }
+
+    @Override
+    public PlanNode visitJoin(JoinNode node, Context context) {
+      context.canPushDown = UNABLE;
+      return visitPlan(node, context);
+    }
+
+    @Override
+    public PlanNode visitCorrelatedJoin(CorrelatedJoinNode node, Context 
context) {
+      context.canPushDown = UNABLE;
+      return visitPlan(node, context);
+    }
+
+    @Override
+    public PlanNode visitSemiJoin(SemiJoinNode node, Context context) {
+      context.canPushDown = UNABLE;
+      return visitPlan(node, context);
+    }
+
+    @Override
+    public PlanNode visitTableFunctionProcessor(TableFunctionProcessorNode 
node, Context context) {
+      if (!context.canSkip()) {
+        if (node.getChildren().isEmpty()) {
+          // leaf node
+          context.canPushDown = UNABLE;
+          return node;
+        }
+        Optional<DataOrganizationSpecification> dataOrganizationSpecification =
+            node.getDataOrganizationSpecification();
+        if (!dataOrganizationSpecification.isPresent()) {
+          context.canPushDown = UNABLE;
+        } else {
+          checkPrefixMatch(context, 
dataOrganizationSpecification.get().getPartitionBy());
+        }
+      }
+      return visitPlan(node, context);
+    }
+
+    @Override
+    public PlanNode visitProject(ProjectNode node, Context context) {
+      if (!context.canSkip()) {
+        OrderingScheme orderKey = context.orderKey;
+        for (int i = 0; i < orderKey.getOrderBy().size(); i++) {
+          if (!node.getAssignments().contains(orderKey.getOrderBy().get(i))) {
+            context.canPushDown = UNABLE;
+            break;
+          }
+        }
+      }
+      return visitPlan(node, context);
+    }
+
+    @Override
+    public PlanNode visitDeviceTableScan(DeviceTableScanNode node, Context 
context) {
+      if (!context.canSkip()) {
+        OrderingScheme orderKey = context.orderKey;
+        Map<Symbol, ColumnSchema> tableColumnSchema =
+            analysis.getTableColumnSchema(node.getQualifiedObjectName());
+        // 1. It is possible for the last sort key to be a time column
+        if (orderKey.getOrderBy().size() > context.partitionKeyCount + 1) {
+          context.canPushDown = UNABLE;
+          return node;
+        } else if (orderKey.getOrderBy().size() == context.partitionKeyCount + 
1) {
+          Symbol lastSymbol = 
orderKey.getOrderBy().get(context.partitionKeyCount);
+          if (!tableColumnSchema.containsKey(lastSymbol)
+              || tableColumnSchema.get(lastSymbol).getColumnCategory() != 
TIME) {
+            context.canPushDown = UNABLE;
+            return node;
+          }
+        }
+        // 2. check there are no field in orderKey and all tags in orderKey
+        Set<Symbol> tagSymbols =
+            tableColumnSchema.entrySet().stream()
+                .filter(entry -> entry.getValue().getColumnCategory() == TAG)
+                .map(Map.Entry::getKey)
+                .collect(Collectors.toSet());
+        for (int i = 0; i < context.partitionKeyCount; i++) {
+          Symbol symbol = orderKey.getOrderBy().get(i);
+          if (!tableColumnSchema.containsKey(symbol)) {
+            context.canPushDown = UNABLE;
+            return node;
+          }
+          switch (tableColumnSchema.get(symbol).getColumnCategory()) {
+            case TAG:
+              tagSymbols.remove(symbol);
+              break;
+            case ATTRIBUTE:
+              // If all tags in partition key, attributes must be the same in 
one partition.
+              break;
+            default:
+              context.canPushDown = UNABLE;
+              return node;
+          }
+        }
+        if (!tagSymbols.isEmpty()) {
+          context.canPushDown = UNABLE;
+          return node;
+        }
+        context.canPushDown = ENABLE;
+      }
+      return node;
+    }
+
+    @Override
+    public PlanNode visitAggregation(AggregationNode node, Context context) {
+      return super.visitAggregation(node, context);
+    }
+
+    @Override
+    public PlanNode visitAggregationTableScan(AggregationTableScanNode node, 
Context context) {
+      return super.visitAggregationTableScan(node, context);
+    }
+  }
+
+  private static class Context {
+    private final OrderingScheme orderKey;
+    private final int partitionKeyCount;
+    private CanPushDown canPushDown = PENDING;
+
+    private Context(OrderingScheme orderKey, int sortKeyOffset) {
+      this.orderKey = orderKey;
+      this.partitionKeyCount = sortKeyOffset;
+    }
+
+    private boolean canSkip() {
+      return orderKey == null || canPushDown != PENDING;
+    }
+  }
+
+  protected enum CanPushDown {
+    ENABLE,
+    UNABLE,
+    PENDING
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushLimitOffsetIntoTableScan.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushLimitOffsetIntoTableScan.java
index 7ca40899d00..f3cb9f99776 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushLimitOffsetIntoTableScan.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushLimitOffsetIntoTableScan.java
@@ -27,6 +27,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingScheme;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AuxSortNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FilterNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.GapFillNode;
@@ -219,6 +220,11 @@ public class PushLimitOffsetIntoTableScan implements 
PlanOptimizer {
       return visitSort(node, context);
     }
 
+    @Override
+    public PlanNode visitAuxSort(AuxSortNode node, Context context) {
+      return visitSort(node, context);
+    }
+
     @Override
     public PlanNode visitAggregation(AggregationNode node, Context context) {
       context.enablePushDown = false;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformSortToStreamSort.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformSortToStreamSort.java
index c514a46746b..f66bd9f25f6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformSortToStreamSort.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformSortToStreamSort.java
@@ -29,6 +29,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingScheme;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTableScanNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AuxSortNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.InformationSchemaTableScanNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode;
@@ -120,6 +121,11 @@ public class TransformSortToStreamSort implements 
PlanOptimizer {
       return node;
     }
 
+    @Override
+    public PlanNode visitAuxSort(AuxSortNode node, Context context) {
+      return visitSingleChildProcess(node, context);
+    }
+
     @Override
     public PlanNode visitDeviceTableScan(DeviceTableScanNode node, Context 
context) {
       context.setTableScanNode(node);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java
index ce6e5e50457..341add0ab31 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java
@@ -32,6 +32,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolAllocator;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.ir.DeterminismEvaluator;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ApplyNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AuxSortNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.CorrelatedJoinNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.EnforceSingleRowNode;
@@ -423,6 +424,26 @@ public class UnaliasSymbolReferences implements 
PlanOptimizer {
           mapping);
     }
 
+    @Override
+    public PlanAndMappings visitAuxSort(AuxSortNode node, UnaliasContext 
context) {
+      PlanAndMappings rewrittenSource = node.getChild().accept(this, context);
+      Map<Symbol, Symbol> mapping = new 
HashMap<>(rewrittenSource.getMappings());
+      SymbolMapper mapper = symbolMapper(mapping);
+
+      OrderingScheme newOrderingScheme = mapper.map(node.getOrderingScheme());
+
+      return new PlanAndMappings(
+          new AuxSortNode(
+              node.getPlanNodeId(),
+              rewrittenSource.getRoot(),
+              newOrderingScheme,
+              node.isPartial(),
+              node.isOrderByAllIdsAndTime(),
+              node.isEnableParalleled(),
+              node.getPartitionKeyCount()),
+          mapping);
+    }
+
     @Override
     public PlanAndMappings visitFilter(FilterNode node, UnaliasContext 
context) {
       PlanAndMappings rewrittenSource = node.getChild().accept(this, context);

Reply via email to