This is an automated email from the ASF dual-hosted git repository.

chenyz pushed a commit to branch udtf-optimize
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit bc073a0d63217b2515df97d8de22693cfdce2161
Author: Chen YZ <[email protected]>
AuthorDate: Sun Feb 23 17:50:07 2025 +0800

    save
---
 a                                                  |  0
 .../org/apache/iotdb/udf/table/RepeatExample.java  |  5 ++
 .../iotdb/confignode/conf/ConfigNodeConfig.java    |  2 +-
 .../plan/planner/plan/node/PlanGraphPrinter.java   |  4 +-
 .../plan/planner/plan/node/PlanNodeType.java       |  6 +-
 .../plan/planner/plan/node/PlanVisitor.java        |  4 +-
 .../plan/relational/planner/QueryPlanner.java      |  4 +-
 .../distribute/TableDistributedPlanGenerator.java  | 17 +++--
 .../TableModelTypeProviderExtractor.java           |  4 +-
 .../rule/ImplementTableFunctionSource.java         |  6 +-
 .../{AuxSortNode.java => SortBasedGroupNode.java}  | 41 +++++-----
 .../planner/node/TableFunctionProcessorNode.java   | 16 ++++
 .../optimizations/LogicalOptimizeFactory.java      |  2 +-
 ...lelizeAuxSort.java => ParallelizeGrouping.java} | 88 ++++++++++++++--------
 .../PushLimitOffsetIntoTableScan.java              |  4 +-
 .../TransformAggregationToStreamable.java          | 19 +++++
 .../optimizations/TransformSortToStreamSort.java   |  4 +-
 .../optimizations/UnaliasSymbolReferences.java     |  8 +-
 .../relational/TableBuiltinTableFunction.java      |  4 +
 .../udf/builtin/relational/tvf}/RepeatExample.java |  7 +-
 20 files changed, 166 insertions(+), 79 deletions(-)

diff --git a/a b/a
new file mode 100644
index 00000000000..e69de29bb2d
diff --git 
a/example/udf/src/main/java/org/apache/iotdb/udf/table/RepeatExample.java 
b/example/udf/src/main/java/org/apache/iotdb/udf/table/RepeatExample.java
index 8b260c59acb..7d6bab4ef4d 100644
--- a/example/udf/src/main/java/org/apache/iotdb/udf/table/RepeatExample.java
+++ b/example/udf/src/main/java/org/apache/iotdb/udf/table/RepeatExample.java
@@ -111,6 +111,11 @@ public class RepeatExample implements TableFunction {
           @Override
           public void finish(
               List<ColumnBuilder> columnBuilders, ColumnBuilder 
passThroughIndexBuilder) {
+            try {
+              Thread.sleep(100);
+            } catch (InterruptedException e) {
+              throw new RuntimeException(e);
+            }
             for (int i = 1; i < n; i++) {
               for (int j = 0; j < recordIndex; j++) {
                 columnBuilders.get(0).writeInt(i);
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
index 183f41294e2..61077dde18c 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
@@ -104,7 +104,7 @@ public class ConfigNodeConfig {
    * DataRegionGroups for each Database. When set 
data_region_group_extension_policy=AUTO, this
    * parameter is the default minimal number of DataRegionGroups for each 
Database.
    */
-  private int defaultDataRegionGroupNumPerDatabase = 2;
+  private int defaultDataRegionGroupNumPerDatabase = 3;
 
   /**
    * The maximum number of DataRegions expected to be managed by each 
DataNode. Set to 0 means that
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
index 0ad7ab155ec..03c1addab36 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
@@ -67,7 +67,6 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.DeviceViewInt
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.IntoPathDescriptor;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTreeDeviceViewScanNode;
-import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AuxSortNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.EnforceSingleRowNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ExchangeNode;
@@ -76,6 +75,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.LinearFillNo
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.MarkDistinctNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.PreviousFillNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.SemiJoinNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortBasedGroupNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableFunctionProcessorNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TreeDeviceViewScanNode;
@@ -928,7 +928,7 @@ public class PlanGraphPrinter extends 
PlanVisitor<List<String>, PlanGraphPrinter
   }
 
   @Override
-  public List<String> visitAuxSort(AuxSortNode node, GraphContext context) {
+  public List<String> visitSortBasedGroup(SortBasedGroupNode node, 
GraphContext context) {
     List<String> boxValue = new ArrayList<>();
     boxValue.add(String.format("AuxSort-%s", node.getPlanNodeId().getId()));
     boxValue.add(String.format("EnableParalleled: %s", 
node.isEnableParalleled()));
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
index 04308a71323..085bac9e2ff 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
@@ -117,7 +117,6 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalIn
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertRowsNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTreeDeviceViewScanNode;
-import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AuxSortNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.EnforceSingleRowNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.GapFillNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.InformationSchemaTableScanNode;
@@ -125,6 +124,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.LinearFillNo
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.MarkDistinctNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.PreviousFillNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.SemiJoinNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortBasedGroupNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableFunctionNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableFunctionProcessorNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TreeAlignedDeviceViewScanNode;
@@ -297,7 +297,7 @@ public enum PlanNodeType {
   MARK_DISTINCT_NODE((short) 1026),
   TABLE_FUNCTION_NODE((short) 1027),
   TABLE_FUNCTION_PROCESSOR_NODE((short) 1028),
-  TABLE_AUX_SORT_NODE((short) 1029),
+  TABLE_SORT_BASED_GROUP_NODE((short) 1029),
 
   RELATIONAL_INSERT_TABLET((short) 2000),
   RELATIONAL_INSERT_ROW((short) 2001),
@@ -672,7 +672,7 @@ public enum PlanNodeType {
       case 1028:
         return TableFunctionProcessorNode.deserialize(buffer);
       case 1029:
-        return AuxSortNode.deserialize(buffer);
+        return SortBasedGroupNode.deserialize(buffer);
       case 2000:
         return RelationalInsertTabletNode.deserialize(buffer);
       case 2001:
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
index da861abf15f..e3ccdc4d42a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
@@ -129,6 +129,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.LinearFillNo
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.MarkDistinctNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.PreviousFillNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.SemiJoinNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortBasedGroupNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableFunctionNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableFunctionProcessorNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode;
@@ -748,8 +749,7 @@ public abstract class PlanVisitor<R, C> {
     return visitSingleChildProcess(node, context);
   }
 
-  public R visitAuxSort(
-      org.apache.iotdb.db.queryengine.plan.relational.planner.node.AuxSortNode 
node, C context) {
+  public R visitSortBasedGroup(SortBasedGroupNode node, C context) {
     return visitSingleChildProcess(node, context);
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/QueryPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/QueryPlanner.java
index 26e57030fe1..de858008a45 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/QueryPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/QueryPlanner.java
@@ -31,7 +31,6 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.analyzer.NodeRef;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.ir.GapFillStartAndEndTimeExtractVisitor;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode.Aggregation;
-import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AuxSortNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FilterNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.GapFillNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitNode;
@@ -39,6 +38,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.LinearFillNo
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OffsetNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.PreviousFillNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortBasedGroupNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ValueFillNode;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Cast;
@@ -822,7 +822,7 @@ public class QueryPlanner {
     OrderingScheme orderingScheme = new OrderingScheme(orderBySymbols.build(), 
orderings);
     analysis.setSortNode(true);
     return subPlan.withNewRoot(
-        new AuxSortNode(
+        new SortBasedGroupNode(
             queryIdAllocator.genPlanNodeId(),
             subPlan.getRoot(),
             orderingScheme,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
index 501a5b9c1a1..278824a9466 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
@@ -43,7 +43,6 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolAllocator;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTableScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTreeDeviceViewScanNode;
-import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AuxSortNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.CollectNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.EnforceSingleRowNode;
@@ -60,6 +59,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.OffsetNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OutputNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.SemiJoinNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortBasedGroupNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.StreamSortNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableFunctionProcessorNode;
@@ -190,8 +190,8 @@ public class TableDistributedPlanGenerator
       context.clearExpectedOrderingScheme();
     }
     boolean parallel =
-        (node.getChild() instanceof AuxSortNode)
-            && ((AuxSortNode) node.getChild()).isEnableParalleled();
+        (node.getChild() instanceof SortBasedGroupNode)
+            && ((SortBasedGroupNode) node.getChild()).isEnableParalleled();
     List<PlanNode> childrenNodes = node.getChild().accept(this, context);
     OrderingScheme childOrdering = 
nodeOrderingMap.get(childrenNodes.get(0).getPlanNodeId());
     if (childOrdering != null) {
@@ -315,7 +315,7 @@ public class TableDistributedPlanGenerator
   }
 
   @Override
-  public List<PlanNode> visitAuxSort(AuxSortNode node, PlanContext context) {
+  public List<PlanNode> visitSortBasedGroup(SortBasedGroupNode node, 
PlanContext context) {
     boolean pushDown = context.pushDownAuxSort;
     try {
       context.setPushDownAuxSort(node.isEnableParalleled());
@@ -973,14 +973,15 @@ public class TableDistributedPlanGenerator
     if (node.getChildren().isEmpty()) {
       return Collections.singletonList(node);
     }
-    boolean parallel =
-        (node.getChild() instanceof AuxSortNode)
-            && ((AuxSortNode) node.getChild()).isEnableParalleled();
+    boolean canSplitPushDown =
+        node.isRowSemantic()
+            || (node.getChild() instanceof SortBasedGroupNode)
+                && ((SortBasedGroupNode) node.getChild()).isEnableParalleled();
     List<PlanNode> childrenNodes = node.getChild().accept(this, context);
     if (childrenNodes.size() == 1) {
       node.setChild(childrenNodes.get(0));
       return Collections.singletonList(node);
-    } else if (!parallel) {
+    } else if (!canSplitPushDown) {
       CollectNode collectNode =
           new CollectNode(queryId.genPlanNodeId(), 
node.getChildren().get(0).getOutputSymbols());
       childrenNodes.forEach(collectNode::addChild);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelTypeProviderExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelTypeProviderExtractor.java
index d2596b45593..c12c9341986 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelTypeProviderExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelTypeProviderExtractor.java
@@ -27,7 +27,6 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.IdentitySinkN
 import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTableScanNode;
-import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AuxSortNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.CollectNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ExchangeNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FillNode;
@@ -39,6 +38,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.MergeSortNod
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OffsetNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OutputNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortBasedGroupNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.StreamSortNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode;
@@ -205,7 +205,7 @@ public class TableModelTypeProviderExtractor {
     }
 
     @Override
-    public Void visitAuxSort(AuxSortNode node, Void context) {
+    public Void visitSortBasedGroup(SortBasedGroupNode node, Void context) {
       node.getChild().accept(this, context);
       return null;
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/ImplementTableFunctionSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/ImplementTableFunctionSource.java
index b189d2d5e97..ff6d7514c68 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/ImplementTableFunctionSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/ImplementTableFunctionSource.java
@@ -24,8 +24,8 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingScheme;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.SortOrder;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Rule;
-import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AuxSortNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortBasedGroupNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableFunctionNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableFunctionProcessorNode;
 import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Captures;
@@ -112,6 +112,7 @@ public class ImplementTableFunctionSource implements 
Rule<TableFunctionNode> {
               Optional.empty(),
               ImmutableList.of(),
               Optional.empty(),
+              false,
               node.getArguments()));
     } else if (node.getChildren().size() == 1) {
       // Single source does not require pre-processing.
@@ -147,7 +148,7 @@ public class ImplementTableFunctionSource implements 
Rule<TableFunctionNode> {
                           }
                         });
                 child.set(
-                    new AuxSortNode(
+                    new SortBasedGroupNode(
                         context.getIdAllocator().genPlanNodeId(),
                         child.get(),
                         new OrderingScheme(sortSymbols, sortOrderings),
@@ -163,6 +164,7 @@ public class ImplementTableFunctionSource implements 
Rule<TableFunctionNode> {
               
Optional.ofNullable(sourceProperties.getPassThroughSpecification()),
               sourceProperties.getRequiredColumns(),
               sourceProperties.getDataOrganizationSpecification(),
+              sourceProperties.isRowSemantics(),
               node.getArguments()));
     } else {
       // TODO(UDF): we dont support multiple source now.
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AuxSortNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/SortBasedGroupNode.java
similarity index 74%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AuxSortNode.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/SortBasedGroupNode.java
index 3c72a0ee8ba..77bc98e454c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AuxSortNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/SortBasedGroupNode.java
@@ -34,26 +34,33 @@ import java.nio.ByteBuffer;
 import java.util.List;
 
 /**
- * Auxiliary node for sorting. Not generated by user queries, it is used by 
special process such as
- * table function.
+ * SortBasedGroupNode is a auxiliary node that is used to group data. 
Currently, it is implemented
+ * based on SortNode. It will only be generated some special node that 
required grouping source,
+ * such as FillNode and TableFunctionNode.
+ *
+ * <p>SortBasedGroupNode's ordering schema consists of two parts: PartitionKey 
and SortKey. It
+ * guarantees to return data grouped by PartitionKey and sorted by SortKey. 
For example, PARTITION
+ * BY device_id ORDER BY time will return data grouped by device_id, and in 
each group, data will be
+ * sorted by time.
  */
-public class AuxSortNode extends SortNode {
-
-  private boolean enableParalleled = false;
+public class SortBasedGroupNode extends SortNode {
 
   /**
-   * AuxSort#orderingScheme may include two parts: partition key and sort key. 
For example,
-   * PARTITION BY device_id, color ORDER BY time will construct an AuxSortNode 
with
-   * OrderingScheme[device_id, color, time] and partitionColumnCount = 2.
+   * orderingScheme may include two parts: PartitionKey and SortKey. It marks 
the number of
+   * PartitionKey.
    */
   private int partitionKeyCount;
 
-  public AuxSortNode(PlanNodeId id, PlanNode child, OrderingScheme scheme, int 
partitionKeyCount) {
+  /** SortBasedGroupNode can be pushed down for paralleled execution. */
+  private boolean enableParalleled = false;
+
+  public SortBasedGroupNode(
+      PlanNodeId id, PlanNode child, OrderingScheme scheme, int 
partitionKeyCount) {
     super(id, child, scheme, false, false);
     this.partitionKeyCount = partitionKeyCount;
   }
 
-  public AuxSortNode(
+  public SortBasedGroupNode(
       PlanNodeId id,
       PlanNode child,
       OrderingScheme scheme,
@@ -68,7 +75,7 @@ public class AuxSortNode extends SortNode {
 
   @Override
   public PlanNode replaceChildren(List<PlanNode> newChildren) {
-    return new AuxSortNode(
+    return new SortBasedGroupNode(
         id,
         Iterables.getOnlyElement(newChildren),
         orderingScheme,
@@ -92,12 +99,12 @@ public class AuxSortNode extends SortNode {
 
   @Override
   public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
-    return visitor.visitAuxSort(this, context);
+    return visitor.visitSortBasedGroup(this, context);
   }
 
   @Override
   public PlanNode clone() {
-    return new AuxSortNode(
+    return new SortBasedGroupNode(
         id,
         null,
         orderingScheme,
@@ -109,7 +116,7 @@ public class AuxSortNode extends SortNode {
 
   @Override
   protected void serializeAttributes(ByteBuffer byteBuffer) {
-    PlanNodeType.TABLE_AUX_SORT_NODE.serialize(byteBuffer);
+    PlanNodeType.TABLE_SORT_BASED_GROUP_NODE.serialize(byteBuffer);
     orderingScheme.serialize(byteBuffer);
     ReadWriteIOUtils.write(enableParalleled, byteBuffer);
     ReadWriteIOUtils.write(partitionKeyCount, byteBuffer);
@@ -117,18 +124,18 @@ public class AuxSortNode extends SortNode {
 
   @Override
   protected void serializeAttributes(DataOutputStream stream) throws 
IOException {
-    PlanNodeType.TABLE_AUX_SORT_NODE.serialize(stream);
+    PlanNodeType.TABLE_SORT_BASED_GROUP_NODE.serialize(stream);
     orderingScheme.serialize(stream);
     ReadWriteIOUtils.write(enableParalleled, stream);
     ReadWriteIOUtils.write(partitionKeyCount, stream);
   }
 
-  public static AuxSortNode deserialize(ByteBuffer byteBuffer) {
+  public static SortBasedGroupNode deserialize(ByteBuffer byteBuffer) {
     OrderingScheme orderingScheme = OrderingScheme.deserialize(byteBuffer);
     boolean enableParalleled = ReadWriteIOUtils.readBoolean(byteBuffer);
     int partitionColumnCount = ReadWriteIOUtils.readInt(byteBuffer);
     PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
-    return new AuxSortNode(
+    return new SortBasedGroupNode(
         planNodeId, null, orderingScheme, false, false, enableParalleled, 
partitionColumnCount);
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableFunctionProcessorNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableFunctionProcessorNode.java
index 6479678f260..b3b8ac1d11c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableFunctionProcessorNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableFunctionProcessorNode.java
@@ -65,6 +65,8 @@ public class TableFunctionProcessorNode extends 
SingleChildProcessNode {
   // partitioning and ordering combined from sources
   private final Optional<DataOrganizationSpecification> 
dataOrganizationSpecification;
 
+  private final boolean rowSemantic;
+
   private final Map<String, Argument> arguments;
 
   public TableFunctionProcessorNode(
@@ -76,6 +78,7 @@ public class TableFunctionProcessorNode extends 
SingleChildProcessNode {
       Optional<TableFunctionNode.PassThroughSpecification> 
passThroughSpecification,
       List<Symbol> requiredSymbols,
       Optional<DataOrganizationSpecification> dataOrganizationSpecification,
+      boolean rowSemantic,
       Map<String, Argument> arguments) {
     super(id, source.orElse(null));
     this.name = requireNonNull(name, "name is null");
@@ -85,6 +88,7 @@ public class TableFunctionProcessorNode extends 
SingleChildProcessNode {
     this.requiredSymbols = ImmutableList.copyOf(requiredSymbols);
     this.dataOrganizationSpecification =
         requireNonNull(dataOrganizationSpecification, "specification is null");
+    this.rowSemantic = rowSemantic;
     this.arguments = ImmutableMap.copyOf(arguments);
   }
 
@@ -96,6 +100,7 @@ public class TableFunctionProcessorNode extends 
SingleChildProcessNode {
       Optional<TableFunctionNode.PassThroughSpecification> 
passThroughSpecification,
       List<Symbol> requiredSymbols,
       Optional<DataOrganizationSpecification> dataOrganizationSpecification,
+      boolean rowSemantic,
       Map<String, Argument> arguments) {
     super(id);
     this.name = requireNonNull(name, "name is null");
@@ -105,6 +110,7 @@ public class TableFunctionProcessorNode extends 
SingleChildProcessNode {
     this.requiredSymbols = ImmutableList.copyOf(requiredSymbols);
     this.dataOrganizationSpecification =
         requireNonNull(dataOrganizationSpecification, "specification is null");
+    this.rowSemantic = rowSemantic;
     this.arguments = ImmutableMap.copyOf(arguments);
   }
 
@@ -116,6 +122,10 @@ public class TableFunctionProcessorNode extends 
SingleChildProcessNode {
     return properOutputs;
   }
 
+  public boolean isRowSemantic() {
+    return rowSemantic;
+  }
+
   public boolean isPruneWhenEmpty() {
     return pruneWhenEmpty;
   }
@@ -146,6 +156,7 @@ public class TableFunctionProcessorNode extends 
SingleChildProcessNode {
         passThroughSpecification,
         requiredSymbols,
         dataOrganizationSpecification,
+        rowSemantic,
         arguments);
   }
 
@@ -195,6 +206,7 @@ public class TableFunctionProcessorNode extends 
SingleChildProcessNode {
     if (dataOrganizationSpecification.isPresent()) {
       dataOrganizationSpecification.get().serialize(byteBuffer);
     }
+    ReadWriteIOUtils.write(rowSemantic, byteBuffer);
     ReadWriteIOUtils.write(arguments.size(), byteBuffer);
     arguments.forEach(
         (key, value) -> {
@@ -224,6 +236,7 @@ public class TableFunctionProcessorNode extends 
SingleChildProcessNode {
     if (dataOrganizationSpecification.isPresent()) {
       dataOrganizationSpecification.get().serialize(stream);
     }
+    ReadWriteIOUtils.write(rowSemantic, stream);
     ReadWriteIOUtils.write(arguments.size(), stream);
     for (Map.Entry<String, Argument> entry : arguments.entrySet()) {
       ReadWriteIOUtils.write(entry.getKey(), stream);
@@ -254,6 +267,7 @@ public class TableFunctionProcessorNode extends 
SingleChildProcessNode {
         hasDataOrganizationSpecification
             ? 
Optional.of(DataOrganizationSpecification.deserialize(byteBuffer))
             : Optional.empty();
+    boolean rowSemantic = ReadWriteIOUtils.readBoolean(byteBuffer);
     size = ReadWriteIOUtils.readInt(byteBuffer);
     Map<String, Argument> arguments = new HashMap<>(size);
     while (size-- > 0) {
@@ -271,6 +285,7 @@ public class TableFunctionProcessorNode extends 
SingleChildProcessNode {
         passThroughSpecification,
         requiredSymbols,
         dataOrganizationSpecification,
+        rowSemantic,
         arguments);
   }
 
@@ -287,6 +302,7 @@ public class TableFunctionProcessorNode extends 
SingleChildProcessNode {
         passThroughSpecification,
         requiredSymbols,
         dataOrganizationSpecification,
+        rowSemantic,
         arguments);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java
index ae3dff9d838..adff9b3d119 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java
@@ -261,7 +261,7 @@ public class LogicalOptimizeFactory {
             plannerContext,
             ruleStats,
             ImmutableSet.of(new MergeLimitWithSort(), new 
MergeLimitOverProjectWithSort())),
-        new ParallelizeAuxSort());
+        new ParallelizeGrouping());
 
     this.planOptimizers = optimizerBuilder.build();
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/ParallelizeAuxSort.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/ParallelizeGrouping.java
similarity index 78%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/ParallelizeAuxSort.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/ParallelizeGrouping.java
index 0acbfc504bd..bb5f04350da 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/ParallelizeAuxSort.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/ParallelizeGrouping.java
@@ -29,12 +29,12 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingScheme;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTableScanNode;
-import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AuxSortNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.CorrelatedJoinNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.JoinNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.SemiJoinNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortBasedGroupNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.StreamSortNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableFunctionProcessorNode;
@@ -48,20 +48,44 @@ import java.util.stream.Collectors;
 
 import static 
org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory.TAG;
 import static 
org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory.TIME;
-import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.ParallelizeAuxSort.CanPushDown.ENABLE;
-import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.ParallelizeAuxSort.CanPushDown.PENDING;
-import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.ParallelizeAuxSort.CanPushDown.UNABLE;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.ParallelizeGrouping.CanParalleled.ENABLE;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.ParallelizeGrouping.CanParalleled.PENDING;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.ParallelizeGrouping.CanParalleled.UNABLE;
 
-public class ParallelizeAuxSort implements PlanOptimizer {
+/**
+ * This rule is used to determine whether the SortBasedGroupNode can be 
parallelized during Logical
+ *
+ * <p>Optimization phase: Logical plan planning.
+ *
+ * <p>The SortBasedGroupNode can be parallelized if the following conditions 
are met:
+ *
+ * <ul>
+ *   SortingKey is empty and the result child node has been pre-grouped. In 
the other world, the
+ *   PartitionKey matches the lasted offspring that guarantees the data is 
grouped by PartitionKey.
+ *   For example:
+ *   <li>SortBasedGroupNode[tag1,tag2] -> SortNode[sort=tag1]
+ *   <li>SortBasedGroupNode[tag1,tag2] -> TopKNode[sort=tag1,tag2]
+ *   <li>SortBasedGroupNode[tag1,tag2] -> AggregationNode[group=tag1]
+ *   <li>SortBasedGroupNode[tag1,tag2] -> TableFunctionNode[partition=tag1]
+ * </ul>
+ *
+ * <ul>
+ *   SortingKey is time column and the lasted offspring that guarantees the 
data is grouped by
+ *   PartitionKey is TableDeviceScanNode. For example:
+ *   <li>SortBasedGroupNode[device_id,time] -> ... -> TableDeviceScanNode
+ * </ul>
+ */
+public class ParallelizeGrouping implements PlanOptimizer {
   @Override
   public PlanNode optimize(PlanNode plan, PlanOptimizer.Context context) {
     if (!(context.getAnalysis().isQuery())) {
       return plan;
     }
-    System.out.println("before optimize ParallelizeAuxSort 
==========================");
+    // TODO: remove println
+    System.out.println("before optimize ParallelizeGrouping 
==========================");
     PlanGraphPrinter.print(plan);
     PlanNode res = plan.accept(new Rewriter(context.getAnalysis()), new 
Context(null, 0));
-    System.out.println("after optimize ParallelizeAuxSort 
==========================");
+    System.out.println("after optimize ParallelizeGrouping 
==========================");
     PlanGraphPrinter.print(res);
     return res;
     //        return plan.accept(new Rewriter(context.getAnalysis()), new 
Context());
@@ -98,31 +122,31 @@ public class ParallelizeAuxSort implements PlanOptimizer {
       }
       OrderingScheme prefix = context.orderKey;
       if (prefix.getOrderBy().size() != context.partitionKeyCount) {
-        context.canPushDown = UNABLE;
+        context.canParalleled = UNABLE;
         return;
       }
       if (prefix.getOrderBy().size() > childOrder.size()) {
-        context.canPushDown = UNABLE;
+        context.canParalleled = UNABLE;
         return;
       }
       for (int i = 0; i < prefix.getOrderBy().size(); i++) {
         Symbol lhs = prefix.getOrderBy().get(i);
         Symbol rhs = childOrder.get(i);
         if (!lhs.equals(rhs)) {
-          context.canPushDown = UNABLE;
+          context.canParalleled = UNABLE;
           return;
         }
       }
-      context.canPushDown = ENABLE;
+      context.canParalleled = ENABLE;
     }
 
     @Override
-    public PlanNode visitAuxSort(AuxSortNode node, Context context) {
+    public PlanNode visitSortBasedGroup(SortBasedGroupNode node, Context 
context) {
       checkPrefixMatch(context, node.getOrderingScheme().getOrderBy());
       Context newContext = new Context(node.getOrderingScheme(), 
node.getPartitionKeyCount());
-      AuxSortNode newNode = (AuxSortNode) node.clone();
+      SortBasedGroupNode newNode = (SortBasedGroupNode) node.clone();
       newNode.addChild(node.getChild().accept(this, newContext));
-      if (newContext.canPushDown.equals(ENABLE)) {
+      if (newContext.canParalleled.equals(ENABLE)) {
         newNode.setEnableParalleled(true);
       }
       return newNode;
@@ -148,19 +172,19 @@ public class ParallelizeAuxSort implements PlanOptimizer {
 
     @Override
     public PlanNode visitJoin(JoinNode node, Context context) {
-      context.canPushDown = UNABLE;
+      context.canParalleled = UNABLE;
       return visitPlan(node, context);
     }
 
     @Override
     public PlanNode visitCorrelatedJoin(CorrelatedJoinNode node, Context 
context) {
-      context.canPushDown = UNABLE;
+      context.canParalleled = UNABLE;
       return visitPlan(node, context);
     }
 
     @Override
     public PlanNode visitSemiJoin(SemiJoinNode node, Context context) {
-      context.canPushDown = UNABLE;
+      context.canParalleled = UNABLE;
       return visitPlan(node, context);
     }
 
@@ -169,13 +193,13 @@ public class ParallelizeAuxSort implements PlanOptimizer {
       if (!context.canSkip()) {
         if (node.getChildren().isEmpty()) {
           // leaf node
-          context.canPushDown = UNABLE;
+          context.canParalleled = UNABLE;
           return node;
         }
         Optional<DataOrganizationSpecification> dataOrganizationSpecification =
             node.getDataOrganizationSpecification();
         if (!dataOrganizationSpecification.isPresent()) {
-          context.canPushDown = UNABLE;
+          context.canParalleled = UNABLE;
         } else {
           checkPrefixMatch(context, 
dataOrganizationSpecification.get().getPartitionBy());
         }
@@ -189,7 +213,7 @@ public class ParallelizeAuxSort implements PlanOptimizer {
         OrderingScheme orderKey = context.orderKey;
         for (int i = 0; i < orderKey.getOrderBy().size(); i++) {
           if (!node.getAssignments().contains(orderKey.getOrderBy().get(i))) {
-            context.canPushDown = UNABLE;
+            context.canParalleled = UNABLE;
             break;
           }
         }
@@ -205,13 +229,13 @@ public class ParallelizeAuxSort implements PlanOptimizer {
             analysis.getTableColumnSchema(node.getQualifiedObjectName());
         // 1. It is possible for the last sort key to be a time column
         if (orderKey.getOrderBy().size() > context.partitionKeyCount + 1) {
-          context.canPushDown = UNABLE;
+          context.canParalleled = UNABLE;
           return node;
         } else if (orderKey.getOrderBy().size() == context.partitionKeyCount + 
1) {
           Symbol lastSymbol = 
orderKey.getOrderBy().get(context.partitionKeyCount);
           if (!tableColumnSchema.containsKey(lastSymbol)
               || tableColumnSchema.get(lastSymbol).getColumnCategory() != 
TIME) {
-            context.canPushDown = UNABLE;
+            context.canParalleled = UNABLE;
             return node;
           }
         }
@@ -224,7 +248,7 @@ public class ParallelizeAuxSort implements PlanOptimizer {
         for (int i = 0; i < context.partitionKeyCount; i++) {
           Symbol symbol = orderKey.getOrderBy().get(i);
           if (!tableColumnSchema.containsKey(symbol)) {
-            context.canPushDown = UNABLE;
+            context.canParalleled = UNABLE;
             return node;
           }
           switch (tableColumnSchema.get(symbol).getColumnCategory()) {
@@ -235,34 +259,36 @@ public class ParallelizeAuxSort implements PlanOptimizer {
               // If all tags in partition key, attributes must be the same in 
one partition.
               break;
             default:
-              context.canPushDown = UNABLE;
+              context.canParalleled = UNABLE;
               return node;
           }
         }
         if (!tagSymbols.isEmpty()) {
-          context.canPushDown = UNABLE;
+          context.canParalleled = UNABLE;
           return node;
         }
-        context.canPushDown = ENABLE;
+        context.canParalleled = ENABLE;
       }
       return node;
     }
 
     @Override
     public PlanNode visitAggregation(AggregationNode node, Context context) {
-      return super.visitAggregation(node, context);
+      checkPrefixMatch(context, node.getGroupingKeys());
+      return visitPlan(node, context);
     }
 
     @Override
     public PlanNode visitAggregationTableScan(AggregationTableScanNode node, 
Context context) {
-      return super.visitAggregationTableScan(node, context);
+      checkPrefixMatch(context, node.getGroupingKeys());
+      return node;
     }
   }
 
   private static class Context {
     private final OrderingScheme orderKey;
     private final int partitionKeyCount;
-    private CanPushDown canPushDown = PENDING;
+    private CanParalleled canParalleled = PENDING;
 
     private Context(OrderingScheme orderKey, int sortKeyOffset) {
       this.orderKey = orderKey;
@@ -270,11 +296,11 @@ public class ParallelizeAuxSort implements PlanOptimizer {
     }
 
     private boolean canSkip() {
-      return orderKey == null || canPushDown != PENDING;
+      return orderKey == null || canParalleled != PENDING;
     }
   }
 
-  protected enum CanPushDown {
+  protected enum CanParalleled {
     ENABLE,
     UNABLE,
     PENDING
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushLimitOffsetIntoTableScan.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushLimitOffsetIntoTableScan.java
index f3cb9f99776..fa20100c9db 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushLimitOffsetIntoTableScan.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushLimitOffsetIntoTableScan.java
@@ -27,7 +27,6 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingScheme;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode;
-import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AuxSortNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FilterNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.GapFillNode;
@@ -37,6 +36,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.LinearFillNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.PreviousFillNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortBasedGroupNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.StreamSortNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableFunctionProcessorNode;
@@ -221,7 +221,7 @@ public class PushLimitOffsetIntoTableScan implements 
PlanOptimizer {
     }
 
     @Override
-    public PlanNode visitAuxSort(AuxSortNode node, Context context) {
+    public PlanNode visitSortBasedGroup(SortBasedGroupNode node, Context 
context) {
       return visitSort(node, context);
     }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformAggregationToStreamable.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformAggregationToStreamable.java
index 340d9ebf592..7e9d27253ba 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformAggregationToStreamable.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformAggregationToStreamable.java
@@ -22,18 +22,21 @@ package 
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.DataOrganizationSpecification;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTableScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.MergeSortNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableFunctionProcessorNode;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -121,6 +124,22 @@ public class TransformAggregationToStreamable implements 
PlanOptimizer {
       return ImmutableList.of();
     }
 
+    @Override
+    public List<Symbol> visitTableFunctionProcessor(
+        TableFunctionProcessorNode node, GroupContext context) {
+      if (node.getChildren().isEmpty()) {
+        // leaf node
+        return ImmutableList.of();
+      }
+      Optional<DataOrganizationSpecification> dataOrganizationSpecification =
+          node.getDataOrganizationSpecification();
+      return dataOrganizationSpecification
+          .<List<Symbol>>map(
+              organizationSpecification ->
+                  
ImmutableList.copyOf(organizationSpecification.getPartitionBy()))
+          .orElseGet(ImmutableList::of);
+    }
+
     @Override
     public List<Symbol> visitDeviceTableScan(DeviceTableScanNode node, 
GroupContext context) {
       Set<Symbol> expectedGroupingKeys = context.groupingKeys;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformSortToStreamSort.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformSortToStreamSort.java
index f66bd9f25f6..01fd477925b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformSortToStreamSort.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformSortToStreamSort.java
@@ -29,9 +29,9 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingScheme;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTableScanNode;
-import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AuxSortNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.InformationSchemaTableScanNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortBasedGroupNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.StreamSortNode;
 
@@ -122,7 +122,7 @@ public class TransformSortToStreamSort implements 
PlanOptimizer {
     }
 
     @Override
-    public PlanNode visitAuxSort(AuxSortNode node, Context context) {
+    public PlanNode visitSortBasedGroup(SortBasedGroupNode node, Context 
context) {
       return visitSingleChildProcess(node, context);
     }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java
index 341add0ab31..e796bf77123 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java
@@ -32,7 +32,6 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolAllocator;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.ir.DeterminismEvaluator;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ApplyNode;
-import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AuxSortNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.CorrelatedJoinNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.EnforceSingleRowNode;
@@ -49,6 +48,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.OutputNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.PreviousFillNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.SemiJoinNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortBasedGroupNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableFunctionNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableFunctionProcessorNode;
@@ -425,7 +425,7 @@ public class UnaliasSymbolReferences implements 
PlanOptimizer {
     }
 
     @Override
-    public PlanAndMappings visitAuxSort(AuxSortNode node, UnaliasContext 
context) {
+    public PlanAndMappings visitSortBasedGroup(SortBasedGroupNode node, 
UnaliasContext context) {
       PlanAndMappings rewrittenSource = node.getChild().accept(this, context);
       Map<Symbol, Symbol> mapping = new 
HashMap<>(rewrittenSource.getMappings());
       SymbolMapper mapper = symbolMapper(mapping);
@@ -433,7 +433,7 @@ public class UnaliasSymbolReferences implements 
PlanOptimizer {
       OrderingScheme newOrderingScheme = mapper.map(node.getOrderingScheme());
 
       return new PlanAndMappings(
-          new AuxSortNode(
+          new SortBasedGroupNode(
               node.getPlanNodeId(),
               rewrittenSource.getRoot(),
               newOrderingScheme,
@@ -872,6 +872,7 @@ public class UnaliasSymbolReferences implements 
PlanOptimizer {
                 Optional.empty(),
                 ImmutableList.of(),
                 Optional.empty(),
+                false,
                 node.getArguments()),
             mapping);
       }
@@ -908,6 +909,7 @@ public class UnaliasSymbolReferences implements 
PlanOptimizer {
               newPassThroughSpecification,
               newRequiredSymbols,
               newSpecification,
+              node.isRowSemantic(),
               node.getArguments());
 
       return new PlanAndMappings(rewrittenTableFunctionProcessor, mapping);
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/TableBuiltinTableFunction.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/TableBuiltinTableFunction.java
index 2981ecae0dd..830ffa8733c 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/TableBuiltinTableFunction.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/TableBuiltinTableFunction.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.commons.udf.builtin.relational;
 
 import org.apache.iotdb.commons.udf.builtin.relational.tvf.HOPTableFunction;
+import org.apache.iotdb.commons.udf.builtin.relational.tvf.RepeatExample;
 import org.apache.iotdb.udf.api.relational.TableFunction;
 
 import java.util.Arrays;
@@ -29,6 +30,7 @@ import java.util.stream.Collectors;
 
 public enum TableBuiltinTableFunction {
   HOP("hop"),
+  REPEAT("repeat"),
   ;
 
   private final String functionName;
@@ -59,6 +61,8 @@ public enum TableBuiltinTableFunction {
     switch (functionName.toLowerCase()) {
       case "hop":
         return new HOPTableFunction();
+      case "repeat":
+        return new RepeatExample();
       default:
         throw new UnsupportedOperationException("Unsupported table function: " 
+ functionName);
     }
diff --git 
a/example/udf/src/main/java/org/apache/iotdb/udf/table/RepeatExample.java 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/RepeatExample.java
similarity index 95%
copy from 
example/udf/src/main/java/org/apache/iotdb/udf/table/RepeatExample.java
copy to 
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/RepeatExample.java
index 8b260c59acb..e140b1de42d 100644
--- a/example/udf/src/main/java/org/apache/iotdb/udf/table/RepeatExample.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/RepeatExample.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.udf.table;
+package org.apache.iotdb.commons.udf.builtin.relational.tvf;
 
 import org.apache.iotdb.udf.api.exception.UDFArgumentNotValidException;
 import org.apache.iotdb.udf.api.exception.UDFException;
@@ -111,6 +111,11 @@ public class RepeatExample implements TableFunction {
           @Override
           public void finish(
               List<ColumnBuilder> columnBuilders, ColumnBuilder 
passThroughIndexBuilder) {
+            try {
+              Thread.sleep(100);
+            } catch (InterruptedException e) {
+              throw new RuntimeException(e);
+            }
             for (int i = 1; i < n; i++) {
               for (int j = 0; j < recordIndex; j++) {
                 columnBuilders.get(0).writeInt(i);

Reply via email to