This is an automated email from the ASF dual-hosted git repository.
chenyz pushed a commit to branch udtf-optimize
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/udtf-optimize by this push:
new d2f3471900e save
d2f3471900e is described below
commit d2f3471900e07073896fea27df900f891ea97d6a
Author: Chen YZ <[email protected]>
AuthorDate: Tue Feb 25 14:25:49 2025 +0800
save
---
.../execution/operator/process/CollectOperator.java | 2 +-
.../db/queryengine/plan/planner/plan/PlanFragment.java | 9 +++++----
.../planner/distribute/AddExchangeNodes.java | 18 ++++++++----------
.../planner/distribute/TableDistributedPlanner.java | 8 ++++++--
.../distribute/TableModelQueryFragmentPlanner.java | 10 +++++++---
.../planner/optimizations/LogicalOptimizeFactory.java | 3 +--
6 files changed, 28 insertions(+), 22 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/CollectOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/CollectOperator.java
index 83d29966f5d..5c287332053 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/CollectOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/CollectOperator.java
@@ -69,7 +69,7 @@ public class CollectOperator implements ProcessOperator {
@Override
public ListenableFuture<?> isBlocked() {
- if(!inited) {
+ if (!inited) {
inited = true;
for (Operator child : children) {
child.isBlocked();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/PlanFragment.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/PlanFragment.java
index 0f2c0b85bbe..44f8f23fda8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/PlanFragment.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/PlanFragment.java
@@ -113,10 +113,10 @@ public class PlanFragment {
// So we can use the DataRegion of one SourceNode as the PlanFragment's
DataRegion.
public TRegionReplicaSet getTargetRegionForTreeModel() {
return getNodeRegion(planNodeTree, Collections.emptyMap());
-
}
- public TRegionReplicaSet getTargetRegionForTableModel(final Map<PlanNodeId,
NodeDistribution> nodeDistributionMap) {
+ public TRegionReplicaSet getTargetRegionForTableModel(
+ final Map<PlanNodeId, NodeDistribution> nodeDistributionMap) {
return getNodeRegion(planNodeTree, nodeDistributionMap);
}
@@ -127,8 +127,9 @@ public class PlanFragment {
return getNodeLocation(planNodeTree);
}
- private TRegionReplicaSet getNodeRegion(PlanNode root, final Map<PlanNodeId,
NodeDistribution> nodeDistributionMap) {
- if(nodeDistributionMap.containsKey(root.getPlanNodeId())) {
+ private TRegionReplicaSet getNodeRegion(
+ PlanNode root, final Map<PlanNodeId, NodeDistribution>
nodeDistributionMap) {
+ if (nodeDistributionMap.containsKey(root.getPlanNodeId())) {
return nodeDistributionMap.get(root.getPlanNodeId()).getRegion();
} else if (root instanceof IPartitionRelatedNode) {
return ((IPartitionRelatedNode) root).getRegionReplicaSet();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/AddExchangeNodes.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/AddExchangeNodes.java
index 1eae13a60ee..225137b328e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/AddExchangeNodes.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/AddExchangeNodes.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.db.queryengine.plan.relational.planner.distribute;
-import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.commons.partition.DataPartition;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import
org.apache.iotdb.db.queryengine.plan.planner.distribution.NodeDistribution;
@@ -27,7 +26,6 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.read.TableDeviceSourceNode;
-import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.CollectNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ExchangeNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ExplainAnalyzeNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableFunctionProcessorNode;
@@ -77,13 +75,14 @@ public class AddExchangeNodes
for (PlanNode child : node.getChildren()) {
PlanNode rewriteNode = child.accept(this, context);
- ExchangeNode exchangeNode = new
ExchangeNode(queryContext.getQueryId().genPlanNodeId());
- exchangeNode.addChild(rewriteNode);
- exchangeNode.setOutputSymbols(rewriteNode.getOutputSymbols());
- newNode.addChild(exchangeNode);
- context.hasExchangeNode = true;
- context.nodeDistributionMap.put(
- exchangeNode.getPlanNodeId(), new
NodeDistribution(SAME_WITH_SOME_CHILD, context.mostUsedRegion));
+ ExchangeNode exchangeNode = new
ExchangeNode(queryContext.getQueryId().genPlanNodeId());
+ exchangeNode.addChild(rewriteNode);
+ exchangeNode.setOutputSymbols(rewriteNode.getOutputSymbols());
+ newNode.addChild(exchangeNode);
+ context.hasExchangeNode = true;
+ context.nodeDistributionMap.put(
+ exchangeNode.getPlanNodeId(),
+ new NodeDistribution(SAME_WITH_SOME_CHILD, context.mostUsedRegion));
}
context.nodeDistributionMap.put(
node.getPlanNodeId(), new NodeDistribution(SAME_WITH_SOME_CHILD,
context.mostUsedRegion));
@@ -91,7 +90,6 @@ public class AddExchangeNodes
return newNode;
}
-
@Override
public PlanNode visitTableScan(
TableScanNode node, TableDistributedPlanGenerator.PlanContext context) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanner.java
index 24508a8f766..f1b903b3047 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanner.java
@@ -154,7 +154,9 @@ public class TableDistributedPlanner {
return new
AddExchangeNodes(mppQueryContext).addExchangeNodes(distributedPlan,
planContext);
}
- private DistributedQueryPlan generateDistributedPlan(PlanNode
outputNodeWithExchange, final Map<PlanNodeId, NodeDistribution>
nodeDistributionMap) {
+ private DistributedQueryPlan generateDistributedPlan(
+ PlanNode outputNodeWithExchange,
+ final Map<PlanNodeId, NodeDistribution> nodeDistributionMap) {
// generate subPlan
SubPlan subPlan =
new SubPlanGenerator()
@@ -164,7 +166,9 @@ public class TableDistributedPlanner {
// generate fragment instances
List<FragmentInstance> fragmentInstances =
mppQueryContext.getQueryType() == QueryType.READ
- ? new TableModelQueryFragmentPlanner(subPlan, analysis,
mppQueryContext, nodeDistributionMap).plan()
+ ? new TableModelQueryFragmentPlanner(
+ subPlan, analysis, mppQueryContext, nodeDistributionMap)
+ .plan()
: new WriteFragmentParallelPlanner(
subPlan, analysis, mppQueryContext,
WritePlanNode::splitByPartition)
.parallelPlan();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelQueryFragmentPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelQueryFragmentPlanner.java
index e1362bfb5cb..85fe530577e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelQueryFragmentPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelQueryFragmentPlanner.java
@@ -77,7 +77,11 @@ public class TableModelQueryFragmentPlanner {
private final Map<PlanNodeId, NodeDistribution> nodeDistributionMap;
- TableModelQueryFragmentPlanner(SubPlan subPlan, Analysis analysis,
MPPQueryContext queryContext, final Map<PlanNodeId, NodeDistribution>
nodeDistributionMap) {
+ TableModelQueryFragmentPlanner(
+ SubPlan subPlan,
+ Analysis analysis,
+ MPPQueryContext queryContext,
+ final Map<PlanNodeId, NodeDistribution> nodeDistributionMap) {
this.subPlan = subPlan;
this.analysis = analysis;
this.queryContext = queryContext;
@@ -105,7 +109,8 @@ public class TableModelQueryFragmentPlanner {
root.getChildren().forEach(child -> recordPlanNodeRelation(child,
planFragmentId));
}
- private void produceFragmentInstance(PlanFragment fragment, final
Map<PlanNodeId, NodeDistribution> nodeDistributionMap) {
+ private void produceFragmentInstance(
+ PlanFragment fragment, final Map<PlanNodeId, NodeDistribution>
nodeDistributionMap) {
FragmentInstance fragmentInstance =
new FragmentInstance(
fragment,
@@ -120,7 +125,6 @@ public class TableModelQueryFragmentPlanner {
// of them.
TRegionReplicaSet regionReplicaSet =
fragment.getTargetRegionForTableModel(nodeDistributionMap);
-
// Set ExecutorType and target host for the instance,
// We need to store all the replica host in case of the scenario that the
instance need to be
// redirected
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java
index ab8df9acd2c..2a72c18966b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java
@@ -260,8 +260,7 @@ public class LogicalOptimizeFactory {
plannerContext,
ruleStats,
ImmutableSet.of(new MergeLimitWithSort(), new
MergeLimitOverProjectWithSort())),
- new ParallelizeGrouping()
- );
+ new ParallelizeGrouping());
this.planOptimizers = optimizerBuilder.build();
}