This is an automated email from the ASF dual-hosted git repository.

chenyz pushed a commit to branch udtf-optimize
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit f1775e928dd428199ade056a4296036ba40d9891
Author: Chen YZ <[email protected]>
AuthorDate: Mon Feb 24 17:34:32 2025 +0800

    change collect node
---
 .../org/apache/iotdb/udf/table/RepeatExample.java   |  5 -----
 .../execution/operator/process/CollectOperator.java |  7 +++++++
 .../distribution/SimpleFragmentParallelPlanner.java |  2 +-
 .../queryengine/plan/planner/plan/PlanFragment.java | 21 ++++++++++++++++-----
 .../planner/distribute/AddExchangeNodes.java        | 13 ++++---------
 .../planner/distribute/TableDistributedPlanner.java |  8 +++++---
 .../distribute/TableModelQueryFragmentPlanner.java  | 13 +++++++++----
 .../optimizations/LogicalOptimizeFactory.java       |  3 ++-
 .../udf/builtin/relational/tvf/RepeatExample.java   |  5 -----
 9 files changed, 44 insertions(+), 33 deletions(-)

diff --git 
a/example/udf/src/main/java/org/apache/iotdb/udf/table/RepeatExample.java 
b/example/udf/src/main/java/org/apache/iotdb/udf/table/RepeatExample.java
index 7d6bab4ef4d..8b260c59acb 100644
--- a/example/udf/src/main/java/org/apache/iotdb/udf/table/RepeatExample.java
+++ b/example/udf/src/main/java/org/apache/iotdb/udf/table/RepeatExample.java
@@ -111,11 +111,6 @@ public class RepeatExample implements TableFunction {
           @Override
           public void finish(
               List<ColumnBuilder> columnBuilders, ColumnBuilder 
passThroughIndexBuilder) {
-            try {
-              Thread.sleep(100);
-            } catch (InterruptedException e) {
-              throw new RuntimeException(e);
-            }
             for (int i = 1; i < n; i++) {
               for (int j = 0; j < recordIndex; j++) {
                 columnBuilders.get(0).writeInt(i);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/CollectOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/CollectOperator.java
index 4b0ecf27c37..83d29966f5d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/CollectOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/CollectOperator.java
@@ -36,6 +36,7 @@ public class CollectOperator implements ProcessOperator {
 
   private final OperatorContext operatorContext;
   private final List<Operator> children;
+  private boolean inited = false;
 
   private int currentIndex;
 
@@ -68,6 +69,12 @@ public class CollectOperator implements ProcessOperator {
 
   @Override
   public ListenableFuture<?> isBlocked() {
+    if(!inited) {
+      inited = true;
+      for (Operator child : children) {
+        child.isBlocked();
+      }
+    }
     if (currentIndex >= children.size()) {
       return NOT_BLOCKED;
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java
index e955f5f5a22..a7fd5ef9671 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java
@@ -151,7 +151,7 @@ public class SimpleFragmentParallelPlanner implements 
IFragmentParallelPlaner {
 
     // Get the target region for origin PlanFragment, then its instance will 
be distributed one
     // of them.
-    TRegionReplicaSet regionReplicaSet = fragment.getTargetRegion();
+    TRegionReplicaSet regionReplicaSet = 
fragment.getTargetRegionForTreeModel();
 
     // Set ExecutorType and target host for the instance
     // We need to store all the replica host in case of the scenario that the 
instance need to be
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/PlanFragment.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/PlanFragment.java
index c181dc2636a..0f2c0b85bbe 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/PlanFragment.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/PlanFragment.java
@@ -25,8 +25,10 @@ import org.apache.iotdb.commons.partition.DataPartition;
 import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
 import org.apache.iotdb.db.queryengine.plan.analyze.TypeProvider;
 import org.apache.iotdb.db.queryengine.plan.planner.SubPlanTypeExtractor;
+import 
org.apache.iotdb.db.queryengine.plan.planner.distribution.NodeDistribution;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.IPartitionRelatedNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesScanNode;
@@ -39,6 +41,8 @@ import org.apache.tsfile.utils.ReadWriteIOUtils;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Map;
 import java.util.Objects;
 
 import static com.google.common.base.Preconditions.checkArgument;
@@ -107,8 +111,13 @@ public class PlanFragment {
   // In current version, one PlanFragment should contain at least one 
SourceNode,
   // and the DataRegions of all SourceNodes should be same in one PlanFragment.
   // So we can use the DataRegion of one SourceNode as the PlanFragment's 
DataRegion.
-  public TRegionReplicaSet getTargetRegion() {
-    return getNodeRegion(planNodeTree);
+  public TRegionReplicaSet getTargetRegionForTreeModel() {
+    return getNodeRegion(planNodeTree, Collections.emptyMap());
+
+  }
+
+  public TRegionReplicaSet getTargetRegionForTableModel(final Map<PlanNodeId, 
NodeDistribution> nodeDistributionMap) {
+    return getNodeRegion(planNodeTree, nodeDistributionMap);
   }
 
   // If a Fragment is not related with DataPartition,
@@ -118,12 +127,14 @@ public class PlanFragment {
     return getNodeLocation(planNodeTree);
   }
 
-  private TRegionReplicaSet getNodeRegion(PlanNode root) {
-    if (root instanceof IPartitionRelatedNode) {
+  private TRegionReplicaSet getNodeRegion(PlanNode root, final Map<PlanNodeId, 
NodeDistribution> nodeDistributionMap) {
+    if(nodeDistributionMap.containsKey(root.getPlanNodeId())) {
+      return nodeDistributionMap.get(root.getPlanNodeId()).getRegion();
+    } else if (root instanceof IPartitionRelatedNode) {
       return ((IPartitionRelatedNode) root).getRegionReplicaSet();
     }
     for (PlanNode child : root.getChildren()) {
-      TRegionReplicaSet result = getNodeRegion(child);
+      TRegionReplicaSet result = getNodeRegion(child, nodeDistributionMap);
       if (result != null && result != DataPartition.NOT_ASSIGNED) {
         return result;
       }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/AddExchangeNodes.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/AddExchangeNodes.java
index 7cfb71281bf..1eae13a60ee 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/AddExchangeNodes.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/AddExchangeNodes.java
@@ -27,6 +27,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.read.TableDeviceSourceNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.CollectNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ExchangeNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ExplainAnalyzeNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableFunctionProcessorNode;
@@ -76,27 +77,21 @@ public class AddExchangeNodes
 
     for (PlanNode child : node.getChildren()) {
       PlanNode rewriteNode = child.accept(this, context);
-
-      TRegionReplicaSet region =
-          
context.nodeDistributionMap.get(rewriteNode.getPlanNodeId()).getRegion();
-      if (region.equals(DataPartition.NOT_ASSIGNED) || 
region.equals(context.mostUsedRegion)) {
-        // if region equals NOT_ASSIGNED, it can be executed on any node
-        newNode.addChild(rewriteNode);
-      } else {
         ExchangeNode exchangeNode = new 
ExchangeNode(queryContext.getQueryId().genPlanNodeId());
         exchangeNode.addChild(rewriteNode);
         exchangeNode.setOutputSymbols(rewriteNode.getOutputSymbols());
         newNode.addChild(exchangeNode);
         context.hasExchangeNode = true;
-      }
+        context.nodeDistributionMap.put(
+                exchangeNode.getPlanNodeId(), new 
NodeDistribution(SAME_WITH_SOME_CHILD, context.mostUsedRegion));
     }
-
     context.nodeDistributionMap.put(
         node.getPlanNodeId(), new NodeDistribution(SAME_WITH_SOME_CHILD, 
context.mostUsedRegion));
 
     return newNode;
   }
 
+
   @Override
   public PlanNode visitTableScan(
       TableScanNode node, TableDistributedPlanGenerator.PlanContext context) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanner.java
index b537aaefdef..24508a8f766 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanner.java
@@ -24,12 +24,14 @@ import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
 import 
org.apache.iotdb.db.queryengine.execution.exchange.sink.DownStreamChannelLocation;
 import org.apache.iotdb.db.queryengine.plan.analyze.QueryType;
+import 
org.apache.iotdb.db.queryengine.plan.planner.distribution.NodeDistribution;
 import 
org.apache.iotdb.db.queryengine.plan.planner.distribution.WriteFragmentParallelPlanner;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.SubPlan;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.IdentitySinkNode;
 import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis;
@@ -109,7 +111,7 @@ public class TableDistributedPlanner {
 
     adjustUpStream(outputNodeWithExchange, planContext);
 
-    return generateDistributedPlan(outputNodeWithExchange);
+    return generateDistributedPlan(outputNodeWithExchange, 
planContext.nodeDistributionMap);
   }
 
   public PlanNode generateDistributedPlanWithOptimize(
@@ -152,7 +154,7 @@ public class TableDistributedPlanner {
     return new 
AddExchangeNodes(mppQueryContext).addExchangeNodes(distributedPlan, 
planContext);
   }
 
-  private DistributedQueryPlan generateDistributedPlan(PlanNode 
outputNodeWithExchange) {
+  private DistributedQueryPlan generateDistributedPlan(PlanNode 
outputNodeWithExchange, final Map<PlanNodeId, NodeDistribution> 
nodeDistributionMap) {
     // generate subPlan
     SubPlan subPlan =
         new SubPlanGenerator()
@@ -162,7 +164,7 @@ public class TableDistributedPlanner {
     // generate fragment instances
     List<FragmentInstance> fragmentInstances =
         mppQueryContext.getQueryType() == QueryType.READ
-            ? new TableModelQueryFragmentPlanner(subPlan, analysis, 
mppQueryContext).plan()
+            ? new TableModelQueryFragmentPlanner(subPlan, analysis, 
mppQueryContext, nodeDistributionMap).plan()
             : new WriteFragmentParallelPlanner(
                     subPlan, analysis, mppQueryContext, 
WritePlanNode::splitByPartition)
                 .parallelPlan();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelQueryFragmentPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelQueryFragmentPlanner.java
index e6344f33a2a..e1362bfb5cb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelQueryFragmentPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelQueryFragmentPlanner.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
 import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
 import 
org.apache.iotdb.db.queryengine.execution.exchange.sink.DownStreamChannelLocation;
 import org.apache.iotdb.db.queryengine.plan.analyze.QueryType;
+import 
org.apache.iotdb.db.queryengine.plan.planner.distribution.NodeDistribution;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.PlanFragment;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.SubPlan;
@@ -74,10 +75,13 @@ public class TableModelQueryFragmentPlanner {
   // Record FragmentInstances dispatched to same DataNode
   private final Map<TDataNodeLocation, List<FragmentInstance>> dataNodeFIMap = 
new HashMap<>();
 
-  TableModelQueryFragmentPlanner(SubPlan subPlan, Analysis analysis, 
MPPQueryContext queryContext) {
+  private final Map<PlanNodeId, NodeDistribution> nodeDistributionMap;
+
+  TableModelQueryFragmentPlanner(SubPlan subPlan, Analysis analysis, 
MPPQueryContext queryContext, final Map<PlanNodeId, NodeDistribution> 
nodeDistributionMap) {
     this.subPlan = subPlan;
     this.analysis = analysis;
     this.queryContext = queryContext;
+    this.nodeDistributionMap = nodeDistributionMap;
   }
 
   public List<FragmentInstance> plan() {
@@ -89,7 +93,7 @@ public class TableModelQueryFragmentPlanner {
   private void prepare() {
     for (PlanFragment fragment : subPlan.getPlanFragmentList()) {
       recordPlanNodeRelation(fragment.getPlanNodeTree(), fragment.getId());
-      produceFragmentInstance(fragment);
+      produceFragmentInstance(fragment, nodeDistributionMap);
     }
 
     fragmentInstanceList.forEach(
@@ -101,7 +105,7 @@ public class TableModelQueryFragmentPlanner {
     root.getChildren().forEach(child -> recordPlanNodeRelation(child, 
planFragmentId));
   }
 
-  private void produceFragmentInstance(PlanFragment fragment) {
+  private void produceFragmentInstance(PlanFragment fragment, final 
Map<PlanNodeId, NodeDistribution> nodeDistributionMap) {
     FragmentInstance fragmentInstance =
         new FragmentInstance(
             fragment,
@@ -114,7 +118,8 @@ public class TableModelQueryFragmentPlanner {
 
     // Get the target region for origin PlanFragment, then its instance will 
be distributed one
     // of them.
-    TRegionReplicaSet regionReplicaSet = fragment.getTargetRegion();
+    TRegionReplicaSet regionReplicaSet = 
fragment.getTargetRegionForTableModel(nodeDistributionMap);
+
 
     // Set ExecutorType and target host for the instance,
     // We need to store all the replica host in case of the scenario that the 
instance need to be
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java
index 2a72c18966b..ab8df9acd2c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java
@@ -260,7 +260,8 @@ public class LogicalOptimizeFactory {
             plannerContext,
             ruleStats,
             ImmutableSet.of(new MergeLimitWithSort(), new 
MergeLimitOverProjectWithSort())),
-        new ParallelizeGrouping());
+        new ParallelizeGrouping()
+    );
 
     this.planOptimizers = optimizerBuilder.build();
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/RepeatExample.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/RepeatExample.java
index e140b1de42d..ff6466ba713 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/RepeatExample.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/RepeatExample.java
@@ -111,11 +111,6 @@ public class RepeatExample implements TableFunction {
           @Override
           public void finish(
               List<ColumnBuilder> columnBuilders, ColumnBuilder 
passThroughIndexBuilder) {
-            try {
-              Thread.sleep(100);
-            } catch (InterruptedException e) {
-              throw new RuntimeException(e);
-            }
             for (int i = 1; i < n; i++) {
               for (int j = 0; j < recordIndex; j++) {
                 columnBuilders.get(0).writeInt(i);

Reply via email to