This is an automated email from the ASF dual-hosted git repository.

chenyz pushed a commit to branch udtf-optimize
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit eeb91ecd48f8cbce2a22a372496ec39db22cd752
Merge: d734b704d36 88f48151bd8
Author: Chen YZ <[email protected]>
AuthorDate: Mon Feb 24 01:15:54 2025 +0800

    save

 .../db/it/udf/IoTDBUserDefinedTableFunctionIT.java |  5 +-
 .../specification/ParameterSpecification.java      |  1 -
 .../specification/TableParameterSpecification.java | 18 +------
 .../process/function/TableFunctionOperator.java    |  3 ++
 .../config/executor/ClusterConfigTaskExecutor.java | 18 +++++++
 .../relational/analyzer/StatementAnalyzer.java     | 17 -------
 .../tablefunction/TableArgumentAnalysis.java       | 23 +--------
 .../plan/relational/planner/OrderingScheme.java    | 13 +----
 .../plan/relational/planner/QueryPlanner.java      |  1 -
 .../plan/relational/planner/RelationPlanner.java   |  1 -
 .../distribute/TableDistributedPlanGenerator.java  | 56 ++++++++++++----------
 .../rule/ImplementTableFunctionSource.java         | 43 +----------------
 .../rule/PruneTableFunctionProcessorColumns.java   |  1 -
 .../PruneTableFunctionProcessorSourceColumns.java  |  1 -
 .../relational/planner/node/TableFunctionNode.java | 11 -----
 .../planner/node/TableFunctionProcessorNode.java   | 18 -------
 .../optimizations/LogicalOptimizeFactory.java      |  1 -
 .../planner/optimizations/ParallelizeGrouping.java | 15 ------
 .../TransformAggregationToStreamable.java          |  5 +-
 .../optimizations/UnaliasSymbolReferences.java     |  3 --
 .../sql/ast/TableFunctionTableArgument.java        | 15 ++----
 .../plan/relational/sql/parser/AstBuilder.java     | 17 +------
 .../plan/relational/sql/util/SqlFormatter.java     |  6 ---
 .../process/tvf/TableFunctionOperatorTest.java     |  5 --
 .../builtin/relational/tvf/HOPTableFunction.java   |  1 -
 .../db/relational/grammar/sql/RelationalSql.g4     |  1 -
 26 files changed, 68 insertions(+), 231 deletions(-)

diff --cc 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/TableFunctionOperator.java
index 1e953bf8189,1e953bf8189..47e5b9041b0
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/TableFunctionOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/TableFunctionOperator.java
@@@ -186,6 -186,6 +186,9 @@@ public class TableFunctionOperator impl
        // if there is no proper column, use pass through column's position 
count
        positionCount = passThroughIndexBuilder.getPositionCount();
      }
++    if (positionCount == 0) {
++      return null;
++    }
      blockBuilder.declarePositions(positionCount);
      if (needPassThrough) {
        // handle pass through column only if needed
diff --cc 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/OrderingScheme.java
index 97ad4a78d8e,508408571bc..c86c8bee957
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/OrderingScheme.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/OrderingScheme.java
@@@ -39,6 -39,6 +39,7 @@@ import static com.google.common.collect
  import static java.util.Objects.requireNonNull;
  
  public class OrderingScheme {
++
    private final List<Symbol> orderBy;
    private final Map<Symbol, SortOrder> orderings;
  
diff --cc 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
index 278824a9466,3ac405e9255..8ddc35912bd
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
@@@ -314,36 -309,6 +314,36 @@@ public class TableDistributedPlanGenera
      return Collections.singletonList(newTopKNode);
    }
  
 +  @Override
 +  public List<PlanNode> visitSortBasedGroup(SortBasedGroupNode node, 
PlanContext context) {
-     boolean pushDown = context.pushDownAuxSort;
++    boolean pushDown = context.isPushDownGrouping();
 +    try {
-       context.setPushDownAuxSort(node.isEnableParalleled());
++      context.setPushDownGrouping(node.isEnableParalleled());
 +      if (node.isEnableParalleled()) {
 +        List<PlanNode> result = new ArrayList<>();
 +        context.setExpectedOrderingScheme(node.getOrderingScheme());
 +        List<PlanNode> childrenNodes = node.getChild().accept(this, context);
 +        for (PlanNode child : childrenNodes) {
 +          if (canSortEliminated(
 +              node.getOrderingScheme(), 
nodeOrderingMap.get(child.getPlanNodeId()))) {
 +            result.add(child);
 +          } else {
 +            SortNode subSortNode =
 +                new SortNode(
 +                    queryId.genPlanNodeId(), child, node.getOrderingScheme(), 
false, false);
 +            result.add(subSortNode);
 +            nodeOrderingMap.put(subSortNode.getPlanNodeId(), 
subSortNode.getOrderingScheme());
 +          }
 +        }
 +        return result;
 +      } else {
 +        return visitSort(node, context);
 +      }
 +    } finally {
-       context.pushDownAuxSort = pushDown;
++      context.setPushDownGrouping(pushDown);
 +    }
 +  }
 +
    @Override
    public List<PlanNode> visitSort(SortNode node, PlanContext context) {
      context.setExpectedOrderingScheme(node.getOrderingScheme());
@@@ -515,82 -482,8 +515,82 @@@
  
    public List<PlanNode> visitDeviceTableScan(
        final DeviceTableScanNode node, final PlanContext context) {
-     if (context.isPushDownAuxSort()) {
++    if (context.isPushDownGrouping()) {
 +      return constructDeviceTableScanByTags(node, context);
 +    } else {
 +      return constructDeviceTableScanByRegionReplicaSet(node, context);
 +    }
 +  }
 +
 +  private List<PlanNode> constructDeviceTableScanByTags(
 +      final DeviceTableScanNode node, final PlanContext context) {
 +    List<PlanNode> result = new ArrayList<>();
 +    List<DeviceEntry> crossRegionDevices = new ArrayList<>();
 +
      final Map<TRegionReplicaSet, DeviceTableScanNode> tableScanNodeMap = new 
HashMap<>();
 +    final Map<TRegionReplicaSet, Integer> regionDeviceCount = new HashMap<>();
 +    for (final DeviceEntry deviceEntry : node.getDeviceEntries()) {
 +      final List<TRegionReplicaSet> regionReplicaSets =
 +          analysis.getDataRegionReplicaSetWithTimeFilter(
 +              node.getQualifiedObjectName().getDatabaseName(),
 +              deviceEntry.getDeviceID(),
 +              node.getTimeFilter());
 +      regionReplicaSets.forEach(
 +          regionReplicaSet ->
 +              regionDeviceCount.put(
 +                  regionReplicaSet, 
regionDeviceCount.getOrDefault(regionReplicaSet, 0) + 1));
 +      if (regionReplicaSets.size() != 1) {
 +        crossRegionDevices.add(deviceEntry);
 +        continue;
 +      }
 +      final DeviceTableScanNode deviceTableScanNode =
 +          tableScanNodeMap.computeIfAbsent(
 +              regionReplicaSets.get(0),
 +              k -> {
 +                final DeviceTableScanNode scanNode =
 +                    new DeviceTableScanNode(
 +                        queryId.genPlanNodeId(),
 +                        node.getQualifiedObjectName(),
 +                        node.getOutputSymbols(),
 +                        node.getAssignments(),
 +                        new ArrayList<>(),
 +                        node.getIdAndAttributeIndexMap(),
 +                        node.getScanOrder(),
 +                        node.getTimePredicate().orElse(null),
 +                        node.getPushDownPredicate(),
 +                        node.getPushDownLimit(),
 +                        node.getPushDownOffset(),
 +                        node.isPushLimitToEachDevice(),
 +                        node.containsNonAlignedDevice());
 +                scanNode.setRegionReplicaSet(regionReplicaSets.get(0));
 +                return scanNode;
 +              });
 +      deviceTableScanNode.appendDeviceEntry(deviceEntry);
 +    }
 +    result.addAll(tableScanNodeMap.values());
 +    if (context.hasSortProperty) {
 +      processSortProperty(node, result, context);
 +    }
 +    context.mostUsedRegion =
 +        regionDeviceCount.entrySet().stream()
 +            .max(Comparator.comparingInt(Map.Entry::getValue))
 +            .map(Map.Entry::getKey)
 +            .orElse(null);
 +    if (!crossRegionDevices.isEmpty()) {
 +      node.setDeviceEntries(crossRegionDevices);
 +      result.add(
 +          new CollectNode(
 +              queryId.genPlanNodeId(),
 +              constructDeviceTableScanByRegionReplicaSet(node, context),
 +              node.getOutputSymbols()));
 +    }
 +    return result;
 +  }
 +
 +  private List<PlanNode> constructDeviceTableScanByRegionReplicaSet(
 +      final DeviceTableScanNode node, final PlanContext context) {
  
 +    final Map<TRegionReplicaSet, DeviceTableScanNode> tableScanNodeMap = new 
HashMap<>();
      for (final DeviceEntry deviceEntry : node.getDeviceEntries()) {
        final List<TRegionReplicaSet> regionReplicaSets =
            analysis.getDataRegionReplicaSetWithTimeFilter(
@@@ -973,23 -855,16 +973,29 @@@
      if (node.getChildren().isEmpty()) {
        return Collections.singletonList(node);
      }
-     boolean canSplitPushDown =
-         node.isRowSemantic()
-             || (node.getChild() instanceof SortBasedGroupNode)
-                 && ((SortBasedGroupNode) 
node.getChild()).isEnableParalleled();
--    List<PlanNode> childrenNodes = node.getChild().accept(this, context);
--    if (childrenNodes.size() == 1) {
--      node.setChild(childrenNodes.get(0));
-       return Collections.singletonList(node);
-     } else if (!canSplitPushDown) {
 -    } else {
--      CollectNode collectNode =
--          new CollectNode(queryId.genPlanNodeId(), 
node.getChildren().get(0).getOutputSymbols());
--      childrenNodes.forEach(collectNode::addChild);
--      node.setChild(collectNode);
-       return Collections.singletonList(node);
-     } else {
-       return splitForEachChild(node, childrenNodes);
++    boolean pushDown = context.isPushDownGrouping();
++    try {
++      context.setPushDownGrouping(node.isRowSemantic());
++      boolean canSplitPushDown =
++          node.isRowSemantic()
++              || (node.getChild() instanceof SortBasedGroupNode)
++                  && ((SortBasedGroupNode) 
node.getChild()).isEnableParalleled();
++      List<PlanNode> childrenNodes = node.getChild().accept(this, context);
++      if (childrenNodes.size() == 1) {
++        node.setChild(childrenNodes.get(0));
++        return Collections.singletonList(node);
++      } else if (!canSplitPushDown) {
++        CollectNode collectNode =
++            new CollectNode(queryId.genPlanNodeId(), 
node.getChildren().get(0).getOutputSymbols());
++        childrenNodes.forEach(collectNode::addChild);
++        node.setChild(collectNode);
++        return Collections.singletonList(node);
++      } else {
++        return splitForEachChild(node, childrenNodes);
++      }
++    } finally {
++      context.setPushDownGrouping(pushDown);
      }
 -    return Collections.singletonList(node);
    }
  
    private void buildRegionNodeMap(
@@@ -1388,7 -1263,6 +1394,7 @@@
      final Map<PlanNodeId, NodeDistribution> nodeDistributionMap;
      boolean hasExchangeNode = false;
      boolean hasSortProperty = false;
-     boolean pushDownAuxSort = false;
++    boolean pushDownGrouping = false;
      OrderingScheme expectedOrderingScheme;
      TRegionReplicaSet mostUsedRegion;
  
@@@ -1409,13 -1283,5 +1415,13 @@@
        this.expectedOrderingScheme = expectedOrderingScheme;
        hasSortProperty = true;
      }
 +
-     public void setPushDownAuxSort(boolean pushDownAuxSort) {
-       this.pushDownAuxSort = pushDownAuxSort;
++    public void setPushDownGrouping(boolean pushDownGrouping) {
++      this.pushDownGrouping = pushDownGrouping;
 +    }
 +
-     public boolean isPushDownAuxSort() {
-       return pushDownAuxSort;
++    public boolean isPushDownGrouping() {
++      return pushDownGrouping;
 +    }
    }
  }
diff --cc 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/ImplementTableFunctionSource.java
index ff6d7514c68,d2d4be0dfba..2ca4b286267
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/ImplementTableFunctionSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/ImplementTableFunctionSource.java
@@@ -48,45 -45,45 +48,6 @@@ import static org.apache.iotdb.db.query
   *
   * <p>It rewrites TableFunctionNode with potentially many sources into a 
TableFunctionProcessorNode.
   * The new node has one source being a combination of the original sources.
-- *
-- * <p>The original sources are combined with joins. The join conditions 
depend on the prune when
-- * empty property, and on the co-partitioning of sources.
-- *
-- * <p>The resulting source should be partitioned and ordered according to 
combined schemas from the
-- * component sources.
-- *
-- * <p>Example transformation for two sources, both with set semantics and 
KEEP WHEN EMPTY property:
-- *
-- * <pre>{@code
-- * - TableFunction foo
-- *      - source T1(a1, b1) PARTITION BY a1 ORDER BY b1
-- *      - source T2(a2, b2) PARTITION BY a2
-- * }</pre>
-- *
-- * Is transformed into:
-- *
-- * <pre>{@code
-- * - TableFunctionProcessor foo
-- *      PARTITION BY (a1, a2), ORDER BY combined_row_number
-- *      - Project
-- *          marker_1 <= IF(table1_row_number = combined_row_number, 
table1_row_number, CAST(null AS bigint))
-- *          marker_2 <= IF(table2_row_number = combined_row_number, 
table2_row_number, CAST(null AS bigint))
-- *          - Project
-- *              combined_row_number <= IF(COALESCE(table1_row_number, BIGINT 
'-1') > COALESCE(table2_row_number, BIGINT '-1'), table1_row_number, 
table2_row_number)
-- *              combined_partition_size <= IF(COALESCE(table1_partition_size, 
BIGINT '-1') > COALESCE(table2_partition_size, BIGINT '-1'), 
table1_partition_size, table2_partition_size)
-- *              - FULL Join
-- *                  [table1_row_number = table2_row_number OR
-- *                   table1_row_number > table2_partition_size AND 
table2_row_number = BIGINT '1' OR
-- *                   table2_row_number > table1_partition_size AND 
table1_row_number = BIGINT '1']
-- *                  - Window [PARTITION BY a1 ORDER BY b1]
-- *                      table1_row_number <= row_number()
-- *                      table1_partition_size <= count()
-- *                          - source T1(a1, b1)
-- *                  - Window [PARTITION BY a2]
-- *                      table2_row_number <= row_number()
-- *                      table2_partition_size <= count()
-- *                          - source T2(a2, b2)
-- * }</pre>
   */
  public class ImplementTableFunctionSource implements Rule<TableFunctionNode> {
  
@@@ -163,11 -154,9 +123,10 @@@
                
Optional.ofNullable(sourceProperties.getPassThroughSpecification()),
                sourceProperties.getRequiredColumns(),
                sourceProperties.getDataOrganizationSpecification(),
 +              sourceProperties.isRowSemantics(),
                node.getArguments()));
      } else {
-       // TODO(UDF): we dont support multiple source now.
+       // we don't support multiple source now.
        throw new IllegalArgumentException("table function does not support 
multiple source now.");
      }
    }
diff --cc 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableFunctionProcessorNode.java
index b3b8ac1d11c,782a3d64e0e..757ec9236b3
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableFunctionProcessorNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableFunctionProcessorNode.java
@@@ -122,14 -108,6 +114,10 @@@ public class TableFunctionProcessorNod
      return properOutputs;
    }
  
 +  public boolean isRowSemantic() {
 +    return rowSemantic;
 +  }
 +
-   public boolean isPruneWhenEmpty() {
-     return pruneWhenEmpty;
-   }
- 
    public Optional<TableFunctionNode.PassThroughSpecification> 
getPassThroughSpecification() {
      return passThroughSpecification;
    }
diff --cc 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/ParallelizeGrouping.java
index 48207824795,00000000000..5777e32fe93
mode 100644,000000..100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/ParallelizeGrouping.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/ParallelizeGrouping.java
@@@ -1,308 -1,0 +1,293 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +
 +package org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations;
 +
 +import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanGraphPrinter;
 +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
 +import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis;
 +import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
 +import 
org.apache.iotdb.db.queryengine.plan.relational.planner.DataOrganizationSpecification;
 +import org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingScheme;
 +import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
 +import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode;
 +import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTableScanNode;
 +import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.CorrelatedJoinNode;
 +import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode;
 +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.JoinNode;
- import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
 +import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.SemiJoinNode;
 +import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortBasedGroupNode;
 +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode;
 +import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.StreamSortNode;
 +import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableFunctionProcessorNode;
 +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TopKNode;
 +
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Optional;
 +import java.util.Set;
 +import java.util.stream.Collectors;
 +
 +import static 
org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory.TAG;
 +import static 
org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory.TIME;
 +import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.ParallelizeGrouping.CanParalleled.ENABLE;
 +import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.ParallelizeGrouping.CanParalleled.PENDING;
 +import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.ParallelizeGrouping.CanParalleled.UNABLE;
 +
 +/**
 + * This rule is used to determine whether the SortBasedGroupNode can be 
parallelized during Logical
 + *
 + * <p>Optimization phase: Logical plan planning.
 + *
 + * <p>The SortBasedGroupNode can be parallelized if the following conditions 
are met:
 + *
 + * <ul>
 + *   SortingKey is empty and the result child node has been pre-grouped. In 
the other world, the
 + *   PartitionKey matches the lasted offspring that guarantees the data is 
grouped by PartitionKey.
 + *   For example:
 + *   <li>SortBasedGroupNode[tag1,tag2] -> SortNode[sort=tag1]
 + *   <li>SortBasedGroupNode[tag1,tag2] -> TopKNode[sort=tag1,tag2]
 + *   <li>SortBasedGroupNode[tag1,tag2] -> AggregationNode[group=tag1]
 + *   <li>SortBasedGroupNode[tag1,tag2] -> TableFunctionNode[partition=tag1]
 + * </ul>
 + *
 + * <ul>
 + *   SortingKey is time column and the lasted offspring that guarantees the 
data is grouped by
 + *   PartitionKey is TableDeviceScanNode. For example:
 + *   <li>SortBasedGroupNode[device_id,time] -> ... -> TableDeviceScanNode
 + * </ul>
 + */
 +public class ParallelizeGrouping implements PlanOptimizer {
 +  @Override
 +  public PlanNode optimize(PlanNode plan, PlanOptimizer.Context context) {
 +    if (!(context.getAnalysis().isQuery())) {
 +      return plan;
 +    }
 +    // TODO: remove println
 +    System.out.println("before optimize ParallelizeGrouping 
==========================");
 +    PlanGraphPrinter.print(plan);
 +    PlanNode res = plan.accept(new Rewriter(context.getAnalysis()), new 
Context(null, 0));
 +    System.out.println("after optimize ParallelizeGrouping 
==========================");
 +    PlanGraphPrinter.print(res);
 +    return res;
 +    //        return plan.accept(new Rewriter(context.getAnalysis()), new 
Context());
 +  }
 +
 +  private static class Rewriter extends PlanVisitor<PlanNode, Context> {
 +    private final Analysis analysis;
 +
 +    public Rewriter(Analysis analysis) {
 +      this.analysis = analysis;
 +    }
 +
 +    @Override
 +    public PlanNode visitPlan(PlanNode node, Context context) {
 +      PlanNode newNode = node.clone();
 +      for (PlanNode child : node.getChildren()) {
 +        newNode.addChild(child.accept(this, context));
 +      }
 +      return newNode;
 +    }
 +
 +    /**
 +     * We need to make sure:
 +     *
 +     * <ul>
 +     *   <li>(1) All keys in context#orderKey are used for partition.
 +     *   <li>(2) childOrderSchema can match the prefix of context#orderKey, 
so that partition-based
 +     *       operation can be pushed down.
 +     * </ul>
 +     */
 +    private void checkPrefixMatch(Context context, List<Symbol> childOrder) {
 +      if (context.canSkip()) {
 +        return;
 +      }
 +      OrderingScheme prefix = context.sortKey;
 +      if (prefix.getOrderBy().size() != context.partitionKeyCount) {
 +        context.canParalleled = UNABLE;
 +        return;
 +      }
 +      if (prefix.getOrderBy().size() > childOrder.size()) {
 +        context.canParalleled = UNABLE;
 +        return;
 +      }
 +      for (int i = 0; i < prefix.getOrderBy().size(); i++) {
 +        Symbol lhs = prefix.getOrderBy().get(i);
 +        Symbol rhs = childOrder.get(i);
 +        if (!lhs.equals(rhs)) {
 +          context.canParalleled = UNABLE;
 +          return;
 +        }
 +      }
 +      context.canParalleled = ENABLE;
 +    }
 +
 +    @Override
 +    public PlanNode visitSortBasedGroup(SortBasedGroupNode node, Context 
context) {
 +      checkPrefixMatch(context, node.getOrderingScheme().getOrderBy());
 +      Context newContext = new Context(node.getOrderingScheme(), 
node.getPartitionKeyCount());
 +      SortBasedGroupNode newNode = (SortBasedGroupNode) node.clone();
 +      newNode.addChild(node.getChild().accept(this, newContext));
 +      if (newContext.canParalleled.equals(ENABLE)) {
 +        newNode.setEnableParalleled(true);
 +      }
 +      return newNode;
 +    }
 +
 +    @Override
 +    public PlanNode visitSort(SortNode node, Context context) {
 +      checkPrefixMatch(context, node.getOrderingScheme().getOrderBy());
 +      return visitPlan(node, context);
 +    }
 +
 +    @Override
 +    public PlanNode visitStreamSort(StreamSortNode node, Context context) {
 +      checkPrefixMatch(context, node.getOrderingScheme().getOrderBy());
 +      return visitPlan(node, context);
 +    }
 +
 +    @Override
 +    public PlanNode visitTopK(TopKNode node, Context context) {
 +      checkPrefixMatch(context, node.getOrderingScheme().getOrderBy());
 +      return visitPlan(node, context);
 +    }
 +
 +    @Override
 +    public PlanNode visitJoin(JoinNode node, Context context) {
 +      context.canParalleled = UNABLE;
 +      return visitPlan(node, context);
 +    }
 +
 +    @Override
 +    public PlanNode visitCorrelatedJoin(CorrelatedJoinNode node, Context 
context) {
 +      context.canParalleled = UNABLE;
 +      return visitPlan(node, context);
 +    }
 +
 +    @Override
 +    public PlanNode visitSemiJoin(SemiJoinNode node, Context context) {
 +      context.canParalleled = UNABLE;
 +      return visitPlan(node, context);
 +    }
 +
 +    @Override
 +    public PlanNode visitTableFunctionProcessor(TableFunctionProcessorNode 
node, Context context) {
 +      if (!context.canSkip()) {
 +        if (node.getChildren().isEmpty()) {
 +          // leaf node
 +          context.canParalleled = UNABLE;
 +          return node;
 +        }
 +        Optional<DataOrganizationSpecification> dataOrganizationSpecification 
=
 +            node.getDataOrganizationSpecification();
 +        if (!dataOrganizationSpecification.isPresent()) {
 +          context.canParalleled = UNABLE;
 +        } else {
 +          checkPrefixMatch(context, 
dataOrganizationSpecification.get().getPartitionBy());
 +        }
 +      }
 +      return visitPlan(node, context);
 +    }
 +
-     @Override
-     public PlanNode visitProject(ProjectNode node, Context context) {
-       if (!context.canSkip()) {
-         OrderingScheme sortKey = context.sortKey;
-         for (int i = 0; i < sortKey.getOrderBy().size(); i++) {
-           if (!node.getAssignments().contains(sortKey.getOrderBy().get(i))) {
-             context.canParalleled = UNABLE;
-             break;
-           }
-         }
-       }
-       return visitPlan(node, context);
-     }
- 
 +    @Override
 +    public PlanNode visitDeviceTableScan(DeviceTableScanNode node, Context 
context) {
 +      if (!context.canSkip()) {
 +        OrderingScheme sortKey = context.sortKey;
 +        Map<Symbol, ColumnSchema> tableColumnSchema =
 +            analysis.getTableColumnSchema(node.getQualifiedObjectName());
 +        // 1. It is possible for the last sort key to be a time column
 +        if (sortKey.getOrderBy().size() > context.partitionKeyCount + 1) {
 +          context.canParalleled = UNABLE;
 +          return node;
 +        } else if (sortKey.getOrderBy().size() == context.partitionKeyCount + 
1) {
 +          Symbol lastSymbol = 
sortKey.getOrderBy().get(context.partitionKeyCount);
 +          if (!tableColumnSchema.containsKey(lastSymbol)
 +              || tableColumnSchema.get(lastSymbol).getColumnCategory() != 
TIME) {
 +            context.canParalleled = UNABLE;
 +            return node;
 +          }
 +        }
 +        // 2. check there are no field in sortKey and all tags in sortKey
 +        Set<Symbol> tagSymbols =
 +            tableColumnSchema.entrySet().stream()
 +                .filter(entry -> entry.getValue().getColumnCategory() == TAG)
 +                .map(Map.Entry::getKey)
 +                .collect(Collectors.toSet());
 +        for (int i = 0; i < context.partitionKeyCount; i++) {
 +          Symbol symbol = sortKey.getOrderBy().get(i);
 +          if (!tableColumnSchema.containsKey(symbol)) {
 +            context.canParalleled = UNABLE;
 +            return node;
 +          }
 +          switch (tableColumnSchema.get(symbol).getColumnCategory()) {
 +            case TAG:
 +              tagSymbols.remove(symbol);
 +              break;
 +            case ATTRIBUTE:
 +              // If all tags in partition key, attributes must be the same in 
one partition.
 +              break;
 +            default:
 +              context.canParalleled = UNABLE;
 +              return node;
 +          }
 +        }
 +        if (!tagSymbols.isEmpty()) {
 +          context.canParalleled = UNABLE;
 +          return node;
 +        }
 +        context.canParalleled = ENABLE;
 +      }
 +      return node;
 +    }
 +
 +    @Override
 +    public PlanNode visitAggregation(AggregationNode node, Context context) {
 +      checkPrefixMatch(context, node.getGroupingKeys());
 +      return visitPlan(node, context);
 +    }
 +
 +    @Override
 +    public PlanNode visitAggregationTableScan(AggregationTableScanNode node, 
Context context) {
 +      checkPrefixMatch(context, node.getGroupingKeys());
 +      return node;
 +    }
 +  }
 +
 +  private static class Context {
 +    private final OrderingScheme sortKey;
 +    private final int partitionKeyCount;
 +    private CanParalleled canParalleled = PENDING;
 +
 +    private Context(OrderingScheme sortKey, int partitionKeyCount) {
 +      this.sortKey = sortKey;
 +      this.partitionKeyCount = partitionKeyCount;
 +    }
 +
 +    private boolean canSkip() {
 +      return sortKey == null || canParalleled != PENDING;
 +    }
 +  }
 +
 +  protected enum CanParalleled {
 +    ENABLE,
 +    UNABLE,
 +    PENDING
 +  }
 +}
diff --cc 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformAggregationToStreamable.java
index 7e9d27253ba,340d9ebf592..0c2a1cd4a77
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformAggregationToStreamable.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformAggregationToStreamable.java
@@@ -124,22 -121,6 +124,25 @@@ public class TransformAggregationToStre
        return ImmutableList.of();
      }
  
 +    @Override
 +    public List<Symbol> visitTableFunctionProcessor(
 +        TableFunctionProcessorNode node, GroupContext context) {
 +      if (node.getChildren().isEmpty()) {
-         // leaf node
 +        return ImmutableList.of();
++      } else if (node.isRowSemantic()) {
++        return visitPlan(node, context);
 +      }
++
++      //      return ImmutableList.of();
 +      Optional<DataOrganizationSpecification> dataOrganizationSpecification =
 +          node.getDataOrganizationSpecification();
 +      return dataOrganizationSpecification
 +          .<List<Symbol>>map(
 +              organizationSpecification ->
 +                  
ImmutableList.copyOf(organizationSpecification.getPartitionBy()))
 +          .orElseGet(ImmutableList::of);
 +    }
 +
      @Override
      public List<Symbol> visitDeviceTableScan(DeviceTableScanNode node, 
GroupContext context) {
        Set<Symbol> expectedGroupingKeys = context.groupingKeys;

Reply via email to