This is an automated email from the ASF dual-hosted git repository. chenyz pushed a commit to branch udtf-optimize in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit f1775e928dd428199ade056a4296036ba40d9891 Author: Chen YZ <[email protected]> AuthorDate: Mon Feb 24 17:34:32 2025 +0800 change collect node --- .../org/apache/iotdb/udf/table/RepeatExample.java | 5 ----- .../execution/operator/process/CollectOperator.java | 7 +++++++ .../distribution/SimpleFragmentParallelPlanner.java | 2 +- .../queryengine/plan/planner/plan/PlanFragment.java | 21 ++++++++++++++++----- .../planner/distribute/AddExchangeNodes.java | 13 ++++--------- .../planner/distribute/TableDistributedPlanner.java | 8 +++++--- .../distribute/TableModelQueryFragmentPlanner.java | 13 +++++++++---- .../optimizations/LogicalOptimizeFactory.java | 3 ++- .../udf/builtin/relational/tvf/RepeatExample.java | 5 ----- 9 files changed, 44 insertions(+), 33 deletions(-) diff --git a/example/udf/src/main/java/org/apache/iotdb/udf/table/RepeatExample.java b/example/udf/src/main/java/org/apache/iotdb/udf/table/RepeatExample.java index 7d6bab4ef4d..8b260c59acb 100644 --- a/example/udf/src/main/java/org/apache/iotdb/udf/table/RepeatExample.java +++ b/example/udf/src/main/java/org/apache/iotdb/udf/table/RepeatExample.java @@ -111,11 +111,6 @@ public class RepeatExample implements TableFunction { @Override public void finish( List<ColumnBuilder> columnBuilders, ColumnBuilder passThroughIndexBuilder) { - try { - Thread.sleep(100); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } for (int i = 1; i < n; i++) { for (int j = 0; j < recordIndex; j++) { columnBuilders.get(0).writeInt(i); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/CollectOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/CollectOperator.java index 4b0ecf27c37..83d29966f5d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/CollectOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/CollectOperator.java @@ -36,6 +36,7 @@ public class CollectOperator implements ProcessOperator { private final OperatorContext operatorContext; private final List<Operator> children; + private boolean inited = false; private int currentIndex; @@ -68,6 +69,12 @@ public class CollectOperator implements ProcessOperator { @Override public ListenableFuture<?> isBlocked() { + if(!inited) { + inited = true; + for (Operator child : children) { + child.isBlocked(); + } + } if (currentIndex >= children.size()) { return NOT_BLOCKED; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java index e955f5f5a22..a7fd5ef9671 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java @@ -151,7 +151,7 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner { // Get the target region for origin PlanFragment, then its instance will be distributed one // of them. - TRegionReplicaSet regionReplicaSet = fragment.getTargetRegion(); + TRegionReplicaSet regionReplicaSet = fragment.getTargetRegionForTreeModel(); // Set ExecutorType and target host for the instance // We need to store all the replica host in case of the scenario that the instance need to be diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/PlanFragment.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/PlanFragment.java index c181dc2636a..0f2c0b85bbe 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/PlanFragment.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/PlanFragment.java @@ -25,8 +25,10 @@ import org.apache.iotdb.commons.partition.DataPartition; import org.apache.iotdb.db.queryengine.common.PlanFragmentId; import org.apache.iotdb.db.queryengine.plan.analyze.TypeProvider; import org.apache.iotdb.db.queryengine.plan.planner.SubPlanTypeExtractor; +import org.apache.iotdb.db.queryengine.plan.planner.distribution.NodeDistribution; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.IPartitionRelatedNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesScanNode; @@ -39,6 +41,8 @@ import org.apache.tsfile.utils.ReadWriteIOUtils; import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.Map; import java.util.Objects; import static com.google.common.base.Preconditions.checkArgument; @@ -107,8 +111,13 @@ public class PlanFragment { // In current version, one PlanFragment should contain at least one SourceNode, // and the DataRegions of all SourceNodes should be same in one PlanFragment. // So we can use the DataRegion of one SourceNode as the PlanFragment's DataRegion. - public TRegionReplicaSet getTargetRegion() { - return getNodeRegion(planNodeTree); + public TRegionReplicaSet getTargetRegionForTreeModel() { + return getNodeRegion(planNodeTree, Collections.emptyMap()); + + } + + public TRegionReplicaSet getTargetRegionForTableModel(final Map<PlanNodeId, NodeDistribution> nodeDistributionMap) { + return getNodeRegion(planNodeTree, nodeDistributionMap); } // If a Fragment is not related with DataPartition, @@ -118,12 +127,14 @@ public class PlanFragment { return getNodeLocation(planNodeTree); } - private TRegionReplicaSet getNodeRegion(PlanNode root) { - if (root instanceof IPartitionRelatedNode) { + private TRegionReplicaSet getNodeRegion(PlanNode root, final Map<PlanNodeId, NodeDistribution> nodeDistributionMap) { + if(nodeDistributionMap.containsKey(root.getPlanNodeId())) { + return nodeDistributionMap.get(root.getPlanNodeId()).getRegion(); + } else if (root instanceof IPartitionRelatedNode) { return ((IPartitionRelatedNode) root).getRegionReplicaSet(); } for (PlanNode child : root.getChildren()) { - TRegionReplicaSet result = getNodeRegion(child); + TRegionReplicaSet result = getNodeRegion(child, nodeDistributionMap); if (result != null && result != DataPartition.NOT_ASSIGNED) { return result; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/AddExchangeNodes.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/AddExchangeNodes.java index 7cfb71281bf..1eae13a60ee 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/AddExchangeNodes.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/AddExchangeNodes.java @@ -27,6 +27,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.read.TableDeviceSourceNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.CollectNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ExchangeNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ExplainAnalyzeNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableFunctionProcessorNode; @@ -76,27 +77,21 @@ public class AddExchangeNodes for (PlanNode child : node.getChildren()) { PlanNode rewriteNode = child.accept(this, context); - - TRegionReplicaSet region = - context.nodeDistributionMap.get(rewriteNode.getPlanNodeId()).getRegion(); - if (region.equals(DataPartition.NOT_ASSIGNED) || region.equals(context.mostUsedRegion)) { - // if region equals NOT_ASSIGNED, it can be executed on any node - newNode.addChild(rewriteNode); - } else { ExchangeNode exchangeNode = new ExchangeNode(queryContext.getQueryId().genPlanNodeId()); exchangeNode.addChild(rewriteNode); exchangeNode.setOutputSymbols(rewriteNode.getOutputSymbols()); newNode.addChild(exchangeNode); context.hasExchangeNode = true; - } + context.nodeDistributionMap.put( + exchangeNode.getPlanNodeId(), new NodeDistribution(SAME_WITH_SOME_CHILD, context.mostUsedRegion)); } - context.nodeDistributionMap.put( node.getPlanNodeId(), new NodeDistribution(SAME_WITH_SOME_CHILD, context.mostUsedRegion)); return newNode; } + @Override public PlanNode visitTableScan( TableScanNode node, TableDistributedPlanGenerator.PlanContext context) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanner.java index b537aaefdef..24508a8f766 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanner.java @@ -24,12 +24,14 @@ import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.execution.exchange.sink.DownStreamChannelLocation; import org.apache.iotdb.db.queryengine.plan.analyze.QueryType; +import org.apache.iotdb.db.queryengine.plan.planner.distribution.NodeDistribution; import org.apache.iotdb.db.queryengine.plan.planner.distribution.WriteFragmentParallelPlanner; import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan; import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance; import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan; import org.apache.iotdb.db.queryengine.plan.planner.plan.SubPlan; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.IdentitySinkNode; import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis; @@ -109,7 +111,7 @@ public class TableDistributedPlanner { adjustUpStream(outputNodeWithExchange, planContext); - return generateDistributedPlan(outputNodeWithExchange); + return generateDistributedPlan(outputNodeWithExchange, planContext.nodeDistributionMap); } public PlanNode generateDistributedPlanWithOptimize( @@ -152,7 +154,7 @@ public class TableDistributedPlanner { return new AddExchangeNodes(mppQueryContext).addExchangeNodes(distributedPlan, planContext); } - private DistributedQueryPlan generateDistributedPlan(PlanNode outputNodeWithExchange) { + private DistributedQueryPlan generateDistributedPlan(PlanNode outputNodeWithExchange, final Map<PlanNodeId, NodeDistribution> nodeDistributionMap) { // generate subPlan SubPlan subPlan = new SubPlanGenerator() @@ -162,7 +164,7 @@ public class TableDistributedPlanner { // generate fragment instances List<FragmentInstance> fragmentInstances = mppQueryContext.getQueryType() == QueryType.READ - ? new TableModelQueryFragmentPlanner(subPlan, analysis, mppQueryContext).plan() + ? new TableModelQueryFragmentPlanner(subPlan, analysis, mppQueryContext, nodeDistributionMap).plan() : new WriteFragmentParallelPlanner( subPlan, analysis, mppQueryContext, WritePlanNode::splitByPartition) .parallelPlan(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelQueryFragmentPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelQueryFragmentPlanner.java index e6344f33a2a..e1362bfb5cb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelQueryFragmentPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelQueryFragmentPlanner.java @@ -30,6 +30,7 @@ import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.common.PlanFragmentId; import org.apache.iotdb.db.queryengine.execution.exchange.sink.DownStreamChannelLocation; import org.apache.iotdb.db.queryengine.plan.analyze.QueryType; +import org.apache.iotdb.db.queryengine.plan.planner.distribution.NodeDistribution; import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance; import org.apache.iotdb.db.queryengine.plan.planner.plan.PlanFragment; import org.apache.iotdb.db.queryengine.plan.planner.plan.SubPlan; @@ -74,10 +75,13 @@ public class TableModelQueryFragmentPlanner { // Record FragmentInstances dispatched to same DataNode private final Map<TDataNodeLocation, List<FragmentInstance>> dataNodeFIMap = new HashMap<>(); - TableModelQueryFragmentPlanner(SubPlan subPlan, Analysis analysis, MPPQueryContext queryContext) { + private final Map<PlanNodeId, NodeDistribution> nodeDistributionMap; + + TableModelQueryFragmentPlanner(SubPlan subPlan, Analysis analysis, MPPQueryContext queryContext, final Map<PlanNodeId, NodeDistribution> nodeDistributionMap) { this.subPlan = subPlan; this.analysis = analysis; this.queryContext = queryContext; + this.nodeDistributionMap = nodeDistributionMap; } public List<FragmentInstance> plan() { @@ -89,7 +93,7 @@ public class TableModelQueryFragmentPlanner { private void prepare() { for (PlanFragment fragment : subPlan.getPlanFragmentList()) { recordPlanNodeRelation(fragment.getPlanNodeTree(), fragment.getId()); - produceFragmentInstance(fragment); + produceFragmentInstance(fragment, nodeDistributionMap); } fragmentInstanceList.forEach( @@ -101,7 +105,7 @@ public class TableModelQueryFragmentPlanner { root.getChildren().forEach(child -> recordPlanNodeRelation(child, planFragmentId)); } - private void produceFragmentInstance(PlanFragment fragment) { + private void produceFragmentInstance(PlanFragment fragment, final Map<PlanNodeId, NodeDistribution> nodeDistributionMap) { FragmentInstance fragmentInstance = new FragmentInstance( fragment, @@ -114,7 +118,8 @@ public class TableModelQueryFragmentPlanner { // Get the target region for origin PlanFragment, then its instance will be distributed one // of them. - TRegionReplicaSet regionReplicaSet = fragment.getTargetRegion(); + TRegionReplicaSet regionReplicaSet = fragment.getTargetRegionForTableModel(nodeDistributionMap); + // Set ExecutorType and target host for the instance, // We need to store all the replica host in case of the scenario that the instance need to be diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java index 2a72c18966b..ab8df9acd2c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java @@ -260,7 +260,8 @@ public class LogicalOptimizeFactory { plannerContext, ruleStats, ImmutableSet.of(new MergeLimitWithSort(), new MergeLimitOverProjectWithSort())), - new ParallelizeGrouping()); + new ParallelizeGrouping() + ); this.planOptimizers = optimizerBuilder.build(); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/RepeatExample.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/RepeatExample.java index e140b1de42d..ff6466ba713 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/RepeatExample.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/RepeatExample.java @@ -111,11 +111,6 @@ public class RepeatExample implements TableFunction { @Override public void finish( List<ColumnBuilder> columnBuilders, ColumnBuilder passThroughIndexBuilder) { - try { - Thread.sleep(100); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } for (int i = 1; i < n; i++) { for (int j = 0; j < recordIndex; j++) { columnBuilders.get(0).writeInt(i);
