This is an automated email from the ASF dual-hosted git repository. xingtanzjr pushed a commit to branch xingtanzjr/last_query_distribution in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 15ee42cc3f9793affcdd75ef3d54e622c2abf6d2 Author: Jinrui.Zhang <[email protected]> AuthorDate: Thu May 26 15:38:16 2022 +0800 complete last query distribution plan --- .../planner/distribution/ExchangeNodeAdder.java | 19 ++++ .../plan/planner/distribution/SourceRewriter.java | 93 ++++++++++------ .../plan/node/process/LastQueryMergeNode.java | 2 +- .../plan/node/source/AlignedLastQueryScanNode.java | 14 ++- .../plan/node/source/LastQueryScanNode.java | 14 ++- ...Test.java => DistributionPlannerBasicTest.java} | 2 +- .../mpp/plan/plan/distribution/LastQueryTest.java | 118 +++++++++++++++++++++ 7 files changed, 228 insertions(+), 34 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java index daef502c7b..6c502c464f 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java @@ -33,10 +33,13 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.DeleteTimeS import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.AggregationNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode; +import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LastQueryMergeNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode; +import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedLastQueryScanNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode; +import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SourceNode; @@ -140,6 +143,17 @@ public class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> { return processNoChildSourceNode(node, context); } + @Override + public PlanNode visitLastQueryScan(LastQueryScanNode node, NodeGroupContext context) { + return processNoChildSourceNode(node, context); + } + + @Override + public PlanNode visitAlignedLastQueryScan( + AlignedLastQueryScanNode node, NodeGroupContext context) { + return processNoChildSourceNode(node, context); + } + public PlanNode visitSeriesAggregationScan( SeriesAggregationScanNode node, NodeGroupContext context) { return processNoChildSourceNode(node, context); @@ -183,6 +197,11 @@ public class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> { return node; } + @Override + public PlanNode visitLastQueryMerge(LastQueryMergeNode node, NodeGroupContext context) { + return processMultiChildNode(node, context); + } + @Override public PlanNode visitTimeJoin(TimeJoinNode node, NodeGroupContext context) { return processMultiChildNode(node, context); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java index d72a1d1b60..65d4d6552f 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java @@ -36,10 +36,13 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaQueryS import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.DeleteTimeSeriesNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.AggregationNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode; +import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LastQueryMergeNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode; +import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedLastQueryScanNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode; +import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationSourceNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode; @@ -214,21 +217,64 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte // TODO: (xingtanzjr) a temporary way to resolve the distribution of single SeriesScanNode issue @Override public PlanNode visitSeriesScan(SeriesScanNode node, DistributionPlanContext context) { + TimeJoinNode timeJoinNode = + new TimeJoinNode(context.queryContext.getQueryId().genPlanNodeId(), node.getScanOrder()); + return processRawSeriesScan(node, context, timeJoinNode); + } + + @Override + public PlanNode visitAlignedSeriesScan( + AlignedSeriesScanNode node, DistributionPlanContext context) { + TimeJoinNode timeJoinNode = + new TimeJoinNode(context.queryContext.getQueryId().genPlanNodeId(), node.getScanOrder()); + return processRawSeriesScan(node, context, timeJoinNode); + } + + @Override + public PlanNode visitLastQueryScan(LastQueryScanNode node, DistributionPlanContext context) { + LastQueryMergeNode mergeNode = + new LastQueryMergeNode( + context.queryContext.getQueryId().genPlanNodeId(), node.getPartitionTimeFilter()); + return processRawSeriesScan(node, context, mergeNode); + } + + @Override + public PlanNode visitAlignedLastQueryScan( + AlignedLastQueryScanNode node, DistributionPlanContext context) { + LastQueryMergeNode mergeNode = + new LastQueryMergeNode( + context.queryContext.getQueryId().genPlanNodeId(), node.getPartitionTimeFilter()); + return processRawSeriesScan(node, context, mergeNode); + } + + private PlanNode processRawSeriesScan( + SeriesSourceNode node, DistributionPlanContext context, MultiChildNode parent) { + List<SeriesSourceNode> sourceNodes = splitSeriesSourceNodeByPartition(node, context); + if (sourceNodes.size() == 1) { + return sourceNodes.get(0); + } + sourceNodes.forEach(parent::addChild); + return parent; + } + + private List<SeriesSourceNode> splitSeriesSourceNodeByPartition( + SeriesSourceNode node, DistributionPlanContext context) { + List<SeriesSourceNode> ret = new ArrayList<>(); List<TRegionReplicaSet> dataDistribution = - analysis.getPartitionInfo(node.getSeriesPath(), node.getTimeFilter()); + analysis.getPartitionInfo(node.getPartitionPath(), node.getPartitionTimeFilter()); if (dataDistribution.size() == 1) { node.setRegionReplicaSet(dataDistribution.get(0)); - return node; + ret.add(node); + return ret; } - TimeJoinNode timeJoinNode = - new TimeJoinNode(context.queryContext.getQueryId().genPlanNodeId(), node.getScanOrder()); + for (TRegionReplicaSet dataRegion : dataDistribution) { - SeriesScanNode split = (SeriesScanNode) node.clone(); + SeriesSourceNode split = (SeriesSourceNode) node.clone(); split.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId()); split.setRegionReplicaSet(dataRegion); - timeJoinNode.addChild(split); + ret.add(split); } - return timeJoinNode; + return ret; } @Override @@ -286,26 +332,6 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte return aggregationNode; } - @Override - public PlanNode visitAlignedSeriesScan( - AlignedSeriesScanNode node, DistributionPlanContext context) { - List<TRegionReplicaSet> dataDistribution = - analysis.getPartitionInfo(node.getAlignedPath(), node.getTimeFilter()); - if (dataDistribution.size() == 1) { - node.setRegionReplicaSet(dataDistribution.get(0)); - return node; - } - TimeJoinNode timeJoinNode = - new TimeJoinNode(context.queryContext.getQueryId().genPlanNodeId(), node.getScanOrder()); - for (TRegionReplicaSet dataRegion : dataDistribution) { - AlignedSeriesScanNode split = (AlignedSeriesScanNode) node.clone(); - split.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId()); - split.setRegionReplicaSet(dataRegion); - timeJoinNode.addChild(split); - } - return timeJoinNode; - } - @Override public PlanNode visitSchemaFetchMerge( SchemaFetchMergeNode node, DistributionPlanContext context) { @@ -335,6 +361,11 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte return root; } + @Override + public PlanNode visitLastQueryMerge(LastQueryMergeNode node, DistributionPlanContext context) { + return processRawMultiChildNode(node, context); + } + @Override public PlanNode visitTimeJoin(TimeJoinNode node, DistributionPlanContext context) { // Although some logic is similar between Aggregation and RawDataQuery, @@ -343,8 +374,11 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte if (isAggregationQuery(node)) { return planAggregationWithTimeJoin(node, context); } + return processRawMultiChildNode(node, context); + } - TimeJoinNode root = (TimeJoinNode) node.clone(); + private PlanNode processRawMultiChildNode(MultiChildNode node, DistributionPlanContext context) { + MultiChildNode root = (MultiChildNode) node.clone(); // Step 1: Get all source nodes. For the node which is not source, add it as the child of // current TimeJoinNode List<SourceNode> sources = new ArrayList<>(); @@ -385,7 +419,7 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte } else { // We clone a TimeJoinNode from root to make the params to be consistent. // But we need to assign a new ID to it - TimeJoinNode parentOfGroup = (TimeJoinNode) root.clone(); + MultiChildNode parentOfGroup = (MultiChildNode) root.clone(); parentOfGroup.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId()); seriesScanNodes.forEach(parentOfGroup::addChild); root.addChild(parentOfGroup); @@ -402,7 +436,6 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte root.addChild(visit(child, context)); } } - return root; } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/LastQueryMergeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/LastQueryMergeNode.java index 144ea7dc76..d697835fdc 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/LastQueryMergeNode.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/LastQueryMergeNode.java @@ -35,7 +35,7 @@ import java.util.Objects; import static org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode.LAST_QUERY_HEADER_COLUMNS; -public class LastQueryMergeNode extends ProcessNode { +public class LastQueryMergeNode extends MultiChildNode { // make sure child in list has been ordered by their sensor name private List<PlanNode> children; diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedLastQueryScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedLastQueryScanNode.java index 66c8a54f34..514d2485ed 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedLastQueryScanNode.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedLastQueryScanNode.java @@ -19,12 +19,14 @@ package org.apache.iotdb.db.mpp.plan.planner.plan.node.source; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; +import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.metadata.path.AlignedPath; import org.apache.iotdb.db.metadata.path.PathDeserializeUtil; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor; +import org.apache.iotdb.tsfile.read.filter.basic.Filter; import com.google.common.collect.ImmutableList; @@ -34,7 +36,7 @@ import java.util.Objects; import static org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode.LAST_QUERY_HEADER_COLUMNS; -public class AlignedLastQueryScanNode extends SourceNode { +public class AlignedLastQueryScanNode extends SeriesSourceNode { // The path of the target series which will be scanned. private final AlignedPath seriesPath; @@ -136,4 +138,14 @@ public class AlignedLastQueryScanNode extends SourceNode { public AlignedPath getSeriesPath() { return seriesPath; } + + @Override + public PartialPath getPartitionPath() { + return seriesPath; + } + + @Override + public Filter getPartitionTimeFilter() { + return null; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/LastQueryScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/LastQueryScanNode.java index be97c5187d..e1db6e1d6a 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/LastQueryScanNode.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/LastQueryScanNode.java @@ -19,12 +19,14 @@ package org.apache.iotdb.db.mpp.plan.planner.plan.node.source; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; +import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.metadata.path.MeasurementPath; import org.apache.iotdb.db.metadata.path.PathDeserializeUtil; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor; +import org.apache.iotdb.tsfile.read.filter.basic.Filter; import com.google.common.collect.ImmutableList; @@ -32,7 +34,7 @@ import java.nio.ByteBuffer; import java.util.List; import java.util.Objects; -public class LastQueryScanNode extends SourceNode { +public class LastQueryScanNode extends SeriesSourceNode { public static final List<String> LAST_QUERY_HEADER_COLUMNS = ImmutableList.of("timeseries", "value", "dataType"); @@ -138,4 +140,14 @@ public class LastQueryScanNode extends SourceNode { PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer); return new LastQueryScanNode(planNodeId, partialPath); } + + @Override + public PartialPath getPartitionPath() { + return seriesPath; + } + + @Override + public Filter getPartitionTimeFilter() { + return null; + } } diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/BasicTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/DistributionPlannerBasicTest.java similarity index 99% rename from server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/BasicTest.java rename to server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/DistributionPlannerBasicTest.java index f6133ac41c..ea762fb2c4 100644 --- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/BasicTest.java +++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/DistributionPlannerBasicTest.java @@ -52,7 +52,7 @@ import java.util.Arrays; import static org.junit.Assert.assertEquals; -public class BasicTest { +public class DistributionPlannerBasicTest { @Test public void testSingleSeriesScan() throws IllegalPathException { diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/LastQueryTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/LastQueryTest.java new file mode 100644 index 0000000000..2a3765dded --- /dev/null +++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/LastQueryTest.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.mpp.plan.plan.distribution; + +import org.apache.iotdb.common.rpc.thrift.TEndPoint; +import org.apache.iotdb.commons.exception.IllegalPathException; +import org.apache.iotdb.db.metadata.path.MeasurementPath; +import org.apache.iotdb.db.mpp.common.MPPQueryContext; +import org.apache.iotdb.db.mpp.common.QueryId; +import org.apache.iotdb.db.mpp.plan.expression.Expression; +import org.apache.iotdb.db.mpp.plan.expression.leaf.TimeSeriesOperand; +import org.apache.iotdb.db.mpp.plan.planner.LogicalPlanBuilder; +import org.apache.iotdb.db.mpp.plan.planner.distribution.DistributionPlanner; +import org.apache.iotdb.db.mpp.plan.planner.plan.DistributedQueryPlan; +import org.apache.iotdb.db.mpp.plan.planner.plan.LogicalQueryPlan; +import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +public class LastQueryTest { + + @Test + public void testLastQuery1Series1Region() throws IllegalPathException { + String d2s1Path = "root.sg.d22.s1"; + MPPQueryContext context = + new MPPQueryContext( + "", new QueryId("test_last_1_series_1_region"), null, new TEndPoint(), new TEndPoint()); + DistributionPlanner planner = + new DistributionPlanner( + Util.constructAnalysis(), + constructLastQuery(Collections.singletonList(d2s1Path), context)); + + DistributedQueryPlan distributedQueryPlan = planner.planFragments(); + Assert.assertEquals(1, distributedQueryPlan.getInstances().size()); + } + + @Test + public void testLastQuery1Series2Region() throws IllegalPathException { + String d1s1Path = "root.sg.d1.s1"; + MPPQueryContext context = + new MPPQueryContext( + "", new QueryId("test_last_1_series_2_region"), null, new TEndPoint(), new TEndPoint()); + DistributionPlanner planner = + new DistributionPlanner( + Util.constructAnalysis(), + constructLastQuery(Collections.singletonList(d1s1Path), context)); + + DistributedQueryPlan distributedQueryPlan = planner.planFragments(); + Assert.assertEquals(2, distributedQueryPlan.getInstances().size()); + } + + @Test + public void testLastQuery2Series3Region() throws IllegalPathException { + String d1s1Path = "root.sg.d1.s1"; + String d2s1Path = "root.sg.d22.s1"; + MPPQueryContext context = + new MPPQueryContext( + "", new QueryId("test_last_1_series_2_region"), null, new TEndPoint(), new TEndPoint()); + DistributionPlanner planner = + new DistributionPlanner( + Util.constructAnalysis(), + constructLastQuery(Arrays.asList(d1s1Path, d2s1Path), context)); + + DistributedQueryPlan distributedQueryPlan = planner.planFragments(); + Assert.assertEquals(3, distributedQueryPlan.getInstances().size()); + } + + @Test + public void testLastQuery2Series2Region() throws IllegalPathException { + String d3s1Path = "root.sg.d333.s1"; + String d4s1Path = "root.sg.d4444.s1"; + MPPQueryContext context = + new MPPQueryContext( + "", new QueryId("test_last_1_series_2_region"), null, new TEndPoint(), new TEndPoint()); + DistributionPlanner planner = + new DistributionPlanner( + Util.constructAnalysis(), + constructLastQuery(Arrays.asList(d3s1Path, d4s1Path), context)); + + DistributedQueryPlan distributedQueryPlan = planner.planFragments(); + Assert.assertEquals(2, distributedQueryPlan.getInstances().size()); + } + + private LogicalQueryPlan constructLastQuery(List<String> paths, MPPQueryContext context) + throws IllegalPathException { + LogicalPlanBuilder builder = new LogicalPlanBuilder(context); + Set<Expression> expressions = new HashSet<>(); + for (String path : paths) { + expressions.add(new TimeSeriesOperand(new MeasurementPath(path))); + } + PlanNode root = builder.planLast(expressions, null).getRoot(); + return new LogicalQueryPlan(context, root); + } +}
