This is an automated email from the ASF dual-hosted git repository.

chenyz pushed a commit to branch udtf-optimize
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/udtf-optimize by this push:
     new d2f3471900e save
d2f3471900e is described below

commit d2f3471900e07073896fea27df900f891ea97d6a
Author: Chen YZ <[email protected]>
AuthorDate: Tue Feb 25 14:25:49 2025 +0800

    save
---
 .../execution/operator/process/CollectOperator.java    |  2 +-
 .../db/queryengine/plan/planner/plan/PlanFragment.java |  9 +++++----
 .../planner/distribute/AddExchangeNodes.java           | 18 ++++++++----------
 .../planner/distribute/TableDistributedPlanner.java    |  8 ++++++--
 .../distribute/TableModelQueryFragmentPlanner.java     | 10 +++++++---
 .../planner/optimizations/LogicalOptimizeFactory.java  |  3 +--
 6 files changed, 28 insertions(+), 22 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/CollectOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/CollectOperator.java
index 83d29966f5d..5c287332053 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/CollectOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/CollectOperator.java
@@ -69,7 +69,7 @@ public class CollectOperator implements ProcessOperator {
 
   @Override
   public ListenableFuture<?> isBlocked() {
-    if(!inited) {
+    if (!inited) {
       inited = true;
       for (Operator child : children) {
         child.isBlocked();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/PlanFragment.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/PlanFragment.java
index 0f2c0b85bbe..44f8f23fda8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/PlanFragment.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/PlanFragment.java
@@ -113,10 +113,10 @@ public class PlanFragment {
   // So we can use the DataRegion of one SourceNode as the PlanFragment's 
DataRegion.
   public TRegionReplicaSet getTargetRegionForTreeModel() {
     return getNodeRegion(planNodeTree, Collections.emptyMap());
-
   }
 
-  public TRegionReplicaSet getTargetRegionForTableModel(final Map<PlanNodeId, 
NodeDistribution> nodeDistributionMap) {
+  public TRegionReplicaSet getTargetRegionForTableModel(
+      final Map<PlanNodeId, NodeDistribution> nodeDistributionMap) {
     return getNodeRegion(planNodeTree, nodeDistributionMap);
   }
 
@@ -127,8 +127,9 @@ public class PlanFragment {
     return getNodeLocation(planNodeTree);
   }
 
-  private TRegionReplicaSet getNodeRegion(PlanNode root, final Map<PlanNodeId, 
NodeDistribution> nodeDistributionMap) {
-    if(nodeDistributionMap.containsKey(root.getPlanNodeId())) {
+  private TRegionReplicaSet getNodeRegion(
+      PlanNode root, final Map<PlanNodeId, NodeDistribution> 
nodeDistributionMap) {
+    if (nodeDistributionMap.containsKey(root.getPlanNodeId())) {
       return nodeDistributionMap.get(root.getPlanNodeId()).getRegion();
     } else if (root instanceof IPartitionRelatedNode) {
       return ((IPartitionRelatedNode) root).getRegionReplicaSet();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/AddExchangeNodes.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/AddExchangeNodes.java
index 1eae13a60ee..225137b328e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/AddExchangeNodes.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/AddExchangeNodes.java
@@ -19,7 +19,6 @@
 
 package org.apache.iotdb.db.queryengine.plan.relational.planner.distribute;
 
-import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.commons.partition.DataPartition;
 import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
 import 
org.apache.iotdb.db.queryengine.plan.planner.distribution.NodeDistribution;
@@ -27,7 +26,6 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.read.TableDeviceSourceNode;
-import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.CollectNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ExchangeNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ExplainAnalyzeNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableFunctionProcessorNode;
@@ -77,13 +75,14 @@ public class AddExchangeNodes
 
     for (PlanNode child : node.getChildren()) {
       PlanNode rewriteNode = child.accept(this, context);
-        ExchangeNode exchangeNode = new 
ExchangeNode(queryContext.getQueryId().genPlanNodeId());
-        exchangeNode.addChild(rewriteNode);
-        exchangeNode.setOutputSymbols(rewriteNode.getOutputSymbols());
-        newNode.addChild(exchangeNode);
-        context.hasExchangeNode = true;
-        context.nodeDistributionMap.put(
-                exchangeNode.getPlanNodeId(), new 
NodeDistribution(SAME_WITH_SOME_CHILD, context.mostUsedRegion));
+      ExchangeNode exchangeNode = new 
ExchangeNode(queryContext.getQueryId().genPlanNodeId());
+      exchangeNode.addChild(rewriteNode);
+      exchangeNode.setOutputSymbols(rewriteNode.getOutputSymbols());
+      newNode.addChild(exchangeNode);
+      context.hasExchangeNode = true;
+      context.nodeDistributionMap.put(
+          exchangeNode.getPlanNodeId(),
+          new NodeDistribution(SAME_WITH_SOME_CHILD, context.mostUsedRegion));
     }
     context.nodeDistributionMap.put(
         node.getPlanNodeId(), new NodeDistribution(SAME_WITH_SOME_CHILD, 
context.mostUsedRegion));
@@ -91,7 +90,6 @@ public class AddExchangeNodes
     return newNode;
   }
 
-
   @Override
   public PlanNode visitTableScan(
       TableScanNode node, TableDistributedPlanGenerator.PlanContext context) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanner.java
index 24508a8f766..f1b903b3047 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanner.java
@@ -154,7 +154,9 @@ public class TableDistributedPlanner {
     return new 
AddExchangeNodes(mppQueryContext).addExchangeNodes(distributedPlan, 
planContext);
   }
 
-  private DistributedQueryPlan generateDistributedPlan(PlanNode 
outputNodeWithExchange, final Map<PlanNodeId, NodeDistribution> 
nodeDistributionMap) {
+  private DistributedQueryPlan generateDistributedPlan(
+      PlanNode outputNodeWithExchange,
+      final Map<PlanNodeId, NodeDistribution> nodeDistributionMap) {
     // generate subPlan
     SubPlan subPlan =
         new SubPlanGenerator()
@@ -164,7 +166,9 @@ public class TableDistributedPlanner {
     // generate fragment instances
     List<FragmentInstance> fragmentInstances =
         mppQueryContext.getQueryType() == QueryType.READ
-            ? new TableModelQueryFragmentPlanner(subPlan, analysis, 
mppQueryContext, nodeDistributionMap).plan()
+            ? new TableModelQueryFragmentPlanner(
+                    subPlan, analysis, mppQueryContext, nodeDistributionMap)
+                .plan()
             : new WriteFragmentParallelPlanner(
                     subPlan, analysis, mppQueryContext, 
WritePlanNode::splitByPartition)
                 .parallelPlan();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelQueryFragmentPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelQueryFragmentPlanner.java
index e1362bfb5cb..85fe530577e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelQueryFragmentPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelQueryFragmentPlanner.java
@@ -77,7 +77,11 @@ public class TableModelQueryFragmentPlanner {
 
   private final Map<PlanNodeId, NodeDistribution> nodeDistributionMap;
 
-  TableModelQueryFragmentPlanner(SubPlan subPlan, Analysis analysis, 
MPPQueryContext queryContext, final Map<PlanNodeId, NodeDistribution> 
nodeDistributionMap) {
+  TableModelQueryFragmentPlanner(
+      SubPlan subPlan,
+      Analysis analysis,
+      MPPQueryContext queryContext,
+      final Map<PlanNodeId, NodeDistribution> nodeDistributionMap) {
     this.subPlan = subPlan;
     this.analysis = analysis;
     this.queryContext = queryContext;
@@ -105,7 +109,8 @@ public class TableModelQueryFragmentPlanner {
     root.getChildren().forEach(child -> recordPlanNodeRelation(child, 
planFragmentId));
   }
 
-  private void produceFragmentInstance(PlanFragment fragment, final 
Map<PlanNodeId, NodeDistribution> nodeDistributionMap) {
+  private void produceFragmentInstance(
+      PlanFragment fragment, final Map<PlanNodeId, NodeDistribution> 
nodeDistributionMap) {
     FragmentInstance fragmentInstance =
         new FragmentInstance(
             fragment,
@@ -120,7 +125,6 @@ public class TableModelQueryFragmentPlanner {
     // of them.
     TRegionReplicaSet regionReplicaSet = 
fragment.getTargetRegionForTableModel(nodeDistributionMap);
 
-
     // Set ExecutorType and target host for the instance,
     // We need to store all the replica host in case of the scenario that the 
instance need to be
     // redirected
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java
index ab8df9acd2c..2a72c18966b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java
@@ -260,8 +260,7 @@ public class LogicalOptimizeFactory {
             plannerContext,
             ruleStats,
             ImmutableSet.of(new MergeLimitWithSort(), new 
MergeLimitOverProjectWithSort())),
-        new ParallelizeGrouping()
-    );
+        new ParallelizeGrouping());
 
     this.planOptimizers = optimizerBuilder.build();
   }

Reply via email to