This is an automated email from the ASF dual-hosted git repository. lancelly pushed a commit to branch dopMemory in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 8a3c652a2b0aa74fc94c015d6559e8fb81f8083b Author: lancelly <1435078...@qq.com> AuthorDate: Tue Aug 22 09:00:27 2023 +0800 fix --- .../plan/planner/LocalExecutionPlanner.java | 57 ++++++-- .../plan/planner/OperatorTreeGenerator.java | 8 +- .../plan/planner/PipelineDriverFactory.java | 4 + .../iotdb/db/queryengine/plan/plan/FEPlanUtil.java | 115 ++++++++++++++++ .../plan/plan/LocalExecutionPlannerTest.java | 147 +++++++++++++++++++++ .../queryengine/plan/plan/PipelineBuilderTest.java | 72 +--------- 6 files changed, 319 insertions(+), 84 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java index b875ee70905..f7c3f220a39 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java @@ -38,14 +38,19 @@ import java.util.ArrayList; import java.util.List; /** - * Used to plan a fragment instance. Currently, we simply change it from PlanNode to executable - * Operator tree, but in the future, we may split one fragment instance into multiple pipeline to - * run a fragment instance parallel and take full advantage of multi-cores + * Used to plan a fragment instance. One fragment instance could be split into multiple pipelines so + * that a fragment instance could be run in parallel, and thus we can take full advantages of + * multi-cores. */ public class LocalExecutionPlanner { private static final Logger LOGGER = LoggerFactory.getLogger(LocalExecutionPlanner.class); + private static final long QUERY_THREAD_COUNT = + IoTDBDescriptor.getInstance().getConfig().getQueryThreadCount(); + + private static final long ESTIMATED_FI_NUM = 8; + /** allocated memory for operator execution */ private long freeMemoryForOperators = IoTDBDescriptor.getInstance().getConfig().getAllocateMemoryForOperators(); @@ -71,10 +76,11 @@ public class LocalExecutionPlanner { // TODO Replace operator with operatorFactory to build multiple driver for one pipeline Operator root = plan.accept(new OperatorTreeGenerator(), context); - // check whether current free memory is enough to execute current query - long estimatedMemorySize = checkMemory(root, instanceContext.getStateMachine()); + context.addPipelineDriverFactory( + root, context.getDriverContext(), root.calculateMaxPeekMemory()); - context.addPipelineDriverFactory(root, context.getDriverContext(), estimatedMemorySize); + // check whether current free memory is enough to execute current query + checkMemory(context.getPipelineDriverFactories(), instanceContext.getStateMachine()); instanceContext.setSourcePaths(collectSourcePaths(context)); @@ -92,10 +98,11 @@ public class LocalExecutionPlanner { Operator root = plan.accept(new OperatorTreeGenerator(), context); - // check whether current free memory is enough to execute current query - checkMemory(root, instanceContext.getStateMachine()); + context.addPipelineDriverFactory( + root, context.getDriverContext(), root.calculateMaxPeekMemory()); - context.addPipelineDriverFactory(root, context.getDriverContext(), 0); + // check whether current free memory is enough to execute current query + checkMemory(context.getPipelineDriverFactories(), instanceContext.getStateMachine()); // set maxBytes one SourceHandle can reserve after visiting the whole tree context.setMaxBytesOneHandleCanReserve(); @@ -103,16 +110,18 @@ public class LocalExecutionPlanner { return context.getPipelineDriverFactories(); } - private long checkMemory(Operator root, FragmentInstanceStateMachine stateMachine) + private void checkMemory( + List<PipelineDriverFactory> pipelineDriverFactories, + FragmentInstanceStateMachine stateMachine) throws MemoryNotEnoughException { // if it is disabled, just return if (!IoTDBDescriptor.getInstance().getConfig().isEnableQueryMemoryEstimation() && !IoTDBDescriptor.getInstance().getConfig().isQuotaEnable()) { - return 0; + return; } - long estimatedMemorySize = root.calculateMaxPeekMemory(); + long estimatedMemorySize = calculateEstimatedMemorySize(pipelineDriverFactories); synchronized (this) { if (estimatedMemorySize > freeMemoryForOperators) { @@ -148,7 +157,29 @@ public class LocalExecutionPlanner { } } }); - return estimatedMemorySize; + } + + /** + * Calculate the estimated memory size this FI would use. Given that there are + * QUERY_THREAD_COUNT(X) threads, each thread could deal with one DriverTask. Now we have + * pipelineDriverFactories.size()(Y) DriverTasks(We use pipelineDriverFactories.size() instead of + * degreeOfParallelism to estimate). Suppose that there are M FragmentInstances at the same time. + * Then one FragmentInstance could have N Drivers running. + * + * <p>N = (X / ( M * Y)) * Y = X / M + * + * <p>The estimated memory size this FI would use is: + * + * <p>N * avgMemoryUsedPerDriver = N * totalSizeOfDriver / driverNum + */ + private long calculateEstimatedMemorySize( + final List<PipelineDriverFactory> pipelineDriverFactories) { + long totalSizeOfDrivers = + pipelineDriverFactories.stream() + .map(PipelineDriverFactory::getEstimatedMemorySize) + .reduce(0L, Long::sum); + return Math.max((QUERY_THREAD_COUNT / ESTIMATED_FI_NUM), 1) + * (totalSizeOfDrivers / pipelineDriverFactories.size()); } private List<PartialPath> collectSourcePaths(LocalExecutionPlanContext context) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java index 48d2417f8a5..28cca77786d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java @@ -2596,7 +2596,8 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP // Attention, there is no parent node, use first child node instead subContext.getDriverContext(), childNode.getPlanNodeId().getId()); subContext.setISink(localSinkChannel); - subContext.addPipelineDriverFactory(childOperation, subContext.getDriverContext(), 0); + subContext.addPipelineDriverFactory( + childOperation, subContext.getDriverContext(), childOperation.calculateMaxPeekMemory()); ExchangeOperator sourceOperator = new ExchangeOperator( @@ -2655,7 +2656,10 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP // Attention, there is no parent node, use first child node instead context.getDriverContext(), childNode.getPlanNodeId().getId()); subContext.setISink(localSinkChannel); - subContext.addPipelineDriverFactory(childOperation, subContext.getDriverContext(), 0); + subContext.addPipelineDriverFactory( + childOperation, + subContext.getDriverContext(), + childOperation.calculateMaxPeekMemory()); // OneByOneChild may be divided into more than dop pipelines, but the number of running // actually is dop diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/PipelineDriverFactory.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/PipelineDriverFactory.java index 1f5ef929fd2..38d19ef1e58 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/PipelineDriverFactory.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/PipelineDriverFactory.java @@ -45,6 +45,10 @@ public class PipelineDriverFactory { this.estimatedMemorySize = estimatedMemorySize; } + public long getEstimatedMemorySize() { + return estimatedMemorySize; + } + public DriverContext getDriverContext() { return driverContext; } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/FEPlanUtil.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/FEPlanUtil.java new file mode 100644 index 00000000000..45da5db83e7 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/FEPlanUtil.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.plan; + +import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; +import org.apache.iotdb.commons.exception.IllegalPathException; +import org.apache.iotdb.commons.path.AlignedPath; +import org.apache.iotdb.commons.path.MeasurementPath; +import org.apache.iotdb.db.queryengine.common.FragmentInstanceId; +import org.apache.iotdb.db.queryengine.common.PlanFragmentId; +import org.apache.iotdb.db.queryengine.common.QueryId; +import org.apache.iotdb.db.queryengine.execution.fragment.DataNodeQueryContext; +import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext; +import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceStateMachine; +import org.apache.iotdb.db.queryengine.plan.analyze.TypeProvider; +import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanContext; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceViewNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TimeJoinNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesScanNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanNode; +import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; +import org.apache.iotdb.db.storageengine.dataregion.DataRegion; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; + +import org.mockito.Mockito; + +import java.util.Collections; +import java.util.concurrent.ExecutorService; + +import static org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext; + +public class FEPlanUtil { + + private FEPlanUtil() { + // Util class does not need a Construct Method + } + + protected static LocalExecutionPlanContext createLocalExecutionPlanContext( + final TypeProvider typeProvider) { + ExecutorService instanceNotificationExecutor = + IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification"); + + QueryId queryId = new QueryId("stub_query"); + FragmentInstanceId instanceId = + new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"); + FragmentInstanceStateMachine stateMachine = + new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor); + DataRegion dataRegion = Mockito.mock(DataRegion.class); + FragmentInstanceContext fragmentInstanceContext = + createFragmentInstanceContext(instanceId, stateMachine); + fragmentInstanceContext.setDataRegion(dataRegion); + + return new LocalExecutionPlanContext( + typeProvider, fragmentInstanceContext, new DataNodeQueryContext(1)); + } + + /** + * This method will init a timeJoinNode with @childNum seriesScanNode as children. + * + * @param childNum the number of children + * @return a timeJoinNode with @childNum seriesScanNode as children + */ + protected static TimeJoinNode initTimeJoinNode( + final TypeProvider typeProvider, final int childNum) throws IllegalPathException { + TimeJoinNode timeJoinNode = new TimeJoinNode(new PlanNodeId("TimeJoinNode"), Ordering.ASC); + for (int i = 0; i < childNum; i++) { + SeriesScanNode seriesScanNode = + new SeriesScanNode( + new PlanNodeId(String.format("SeriesScanNode%d", i)), + new MeasurementPath(String.format("root.sg.d%d.s1", i), TSDataType.INT32)); + typeProvider.setType(seriesScanNode.getSeriesPath().toString(), TSDataType.INT32); + timeJoinNode.addChild(seriesScanNode); + } + return timeJoinNode; + } + + /** + * This method will init a DeviceViewNode with @childNum alignedSeriesScanNode as children. + * + * @param childNum the number of children + * @return a DeviceViewNode with @childNum alignedSeriesScanNode as children + */ + protected static DeviceViewNode initDeviceViewNode( + final TypeProvider typeProvider, final int childNum) throws IllegalPathException { + DeviceViewNode deviceViewNode = + new DeviceViewNode( + new PlanNodeId("DeviceViewNode"), null, Collections.EMPTY_LIST, Collections.EMPTY_MAP); + for (int i = 0; i < childNum; i++) { + AlignedSeriesScanNode alignedSeriesScanNode = + new AlignedSeriesScanNode( + new PlanNodeId(String.format("AlignedSeriesScanNode%d", i)), + new AlignedPath(String.format("root.sg.d%d", i), "s1")); + deviceViewNode.addChild(alignedSeriesScanNode); + } + return deviceViewNode; + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/LocalExecutionPlannerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/LocalExecutionPlannerTest.java new file mode 100644 index 00000000000..348c9b6babc --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/LocalExecutionPlannerTest.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.plan; + +import org.apache.iotdb.commons.exception.IllegalPathException; +import org.apache.iotdb.db.queryengine.execution.operator.Operator; +import org.apache.iotdb.db.queryengine.plan.analyze.TypeProvider; +import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanContext; +import org.apache.iotdb.db.queryengine.plan.planner.OperatorTreeGenerator; +import org.apache.iotdb.db.queryengine.plan.planner.PipelineDriverFactory; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceViewNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TimeJoinNode; +import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; + +import static org.apache.iotdb.db.queryengine.plan.plan.FEPlanUtil.createLocalExecutionPlanContext; +import static org.apache.iotdb.db.queryengine.plan.plan.FEPlanUtil.initDeviceViewNode; +import static org.apache.iotdb.db.queryengine.plan.plan.FEPlanUtil.initTimeJoinNode; + +public class LocalExecutionPlannerTest { + + private static final long ESTIMATED_FI_NUM = 8; + + private static final long QUERY_THREAD_COUNT = 8; + + private static final long ALIGNED_MAX_SIZE = + TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes(); + + /** + * This test will test dop = 2. Expected result is five pipelines with dependency: + * + * <p>The first is: DeviceView - [ExchangeOperator, ExchangeOperator, ExchangeOperator, + * ExchangeOperator]; + * + * <p>The second is: ExchangeOperator - SeriesScan0. + * + * <p>The third is: ExchangeOperator - SeriesScan1, which has dependency second pipeline. + * + * <p>The forth is: ExchangeOperator - SeriesScan2, which has dependency third pipeline. + * + * <p>The fifth is: ExchangeOperator - SeriesScan3, which has dependency forth pipeline. + */ + @Test + public void testCheckMemoryWithDeviceView() { + try { + TypeProvider typeProvider = new TypeProvider(); + DeviceViewNode deviceViewNode = initDeviceViewNode(typeProvider, 4); + LocalExecutionPlanContext context = createLocalExecutionPlanContext(typeProvider); + context.setDegreeOfParallelism(2); + Operator root = deviceViewNode.accept(new OperatorTreeGenerator(), context); + + // all the four children pipelines are ScanOperator + context + .getPipelineDriverFactories() + .forEach( + pipelineDriverFactory -> + Assert.assertEquals( + ALIGNED_MAX_SIZE, pipelineDriverFactory.getEstimatedMemorySize())); + + context.addPipelineDriverFactory( + root, context.getDriverContext(), root.calculateMaxPeekMemory()); + Assert.assertEquals( + ALIGNED_MAX_SIZE, calculateEstimatedMemorySize(context.getPipelineDriverFactories())); + Assert.assertEquals( + calculateEstimatedMemorySizeFromOperation(context.getPipelineDriverFactories()), + calculateEstimatedMemorySize(context.getPipelineDriverFactories())); + } catch (Exception e) { + Assert.fail(); + } + } + + /** + * This test will test dop = 2. Expected result is two pipelines: + * + * <p>The first is: TimeJoin1 - [SeriesScan1, SeriesScan0, ExchangeOperator]; + * + * <p>The second is: ExchangeOperator - TimeJoin1-1[SeriesScan2, SeriesScan3]. + */ + @Test + public void testCheckMemoryWithTimeJoin() throws IllegalPathException { + TypeProvider typeProvider = new TypeProvider(); + TimeJoinNode timeJoinNode = initTimeJoinNode(typeProvider, 4); + LocalExecutionPlanContext context = createLocalExecutionPlanContext(typeProvider); + context.setDegreeOfParallelism(2); + + Operator root = timeJoinNode.accept(new OperatorTreeGenerator(), context); + + // The second pipeline is a RowBasedTimeJoinOperator + Assert.assertEquals( + 2 * ALIGNED_MAX_SIZE, context.getPipelineDriverFactories().get(0).getEstimatedMemorySize()); + + context.addPipelineDriverFactory( + root, context.getDriverContext(), root.calculateMaxPeekMemory()); + + // ALIGNED_MAX_SIZE * 5 / 2 is calculated by directly applying the algorithm on the Operator + // Tree. + Assert.assertEquals( + ALIGNED_MAX_SIZE * 5 / 2, + calculateEstimatedMemorySize(context.getPipelineDriverFactories())); + Assert.assertEquals( + calculateEstimatedMemorySizeFromOperation(context.getPipelineDriverFactories()), + calculateEstimatedMemorySize(context.getPipelineDriverFactories())); + } + + private long calculateEstimatedMemorySizeFromOperation( + final List<PipelineDriverFactory> pipelineDriverFactories) { + long totalSizeOfDrivers = + pipelineDriverFactories.stream() + .map( + pipelineDriverFactory -> + pipelineDriverFactory.getOperation().calculateMaxPeekMemory()) + .reduce(0L, Long::sum); + return Math.max((QUERY_THREAD_COUNT / ESTIMATED_FI_NUM), 1) + * (totalSizeOfDrivers / pipelineDriverFactories.size()); + } + + private long calculateEstimatedMemorySize( + final List<PipelineDriverFactory> pipelineDriverFactories) { + long totalSizeOfDrivers = + pipelineDriverFactories.stream() + .map(PipelineDriverFactory::getEstimatedMemorySize) + .reduce(0L, Long::sum); + return Math.max((QUERY_THREAD_COUNT / ESTIMATED_FI_NUM), 1) + * (totalSizeOfDrivers / pipelineDriverFactories.size()); + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/PipelineBuilderTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/PipelineBuilderTest.java index 6ac9ade2501..2b7069333bd 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/PipelineBuilderTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/PipelineBuilderTest.java @@ -21,17 +21,11 @@ package org.apache.iotdb.db.queryengine.plan.plan; import org.apache.iotdb.common.rpc.thrift.TAggregationType; import org.apache.iotdb.common.rpc.thrift.TEndPoint; -import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; import org.apache.iotdb.commons.exception.IllegalPathException; -import org.apache.iotdb.commons.path.AlignedPath; import org.apache.iotdb.commons.path.MeasurementPath; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.queryengine.common.FragmentInstanceId; import org.apache.iotdb.db.queryengine.common.PlanFragmentId; -import org.apache.iotdb.db.queryengine.common.QueryId; -import org.apache.iotdb.db.queryengine.execution.fragment.DataNodeQueryContext; -import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext; -import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceStateMachine; import org.apache.iotdb.db.queryengine.execution.operator.Operator; import org.apache.iotdb.db.queryengine.execution.operator.source.AlignedSeriesScanOperator; import org.apache.iotdb.db.queryengine.execution.operator.source.ExchangeOperator; @@ -46,24 +40,22 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.Aggregatio import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceViewNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TimeJoinNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesAggregationScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationDescriptor; import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationStep; import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; -import org.apache.iotdb.db.storageengine.dataregion.DataRegion; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.junit.Test; -import org.mockito.Mockito; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.concurrent.ExecutorService; -import static org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext; +import static org.apache.iotdb.db.queryengine.plan.plan.FEPlanUtil.createLocalExecutionPlanContext; +import static org.apache.iotdb.db.queryengine.plan.plan.FEPlanUtil.initDeviceViewNode; +import static org.apache.iotdb.db.queryengine.plan.plan.FEPlanUtil.initTimeJoinNode; import static org.junit.Assert.assertEquals; public class PipelineBuilderTest { @@ -797,62 +789,4 @@ public class PipelineBuilderTest { assertEquals(2, childNumInEachPipeline[1]); assertEquals(5, childNumInEachPipeline[2]); } - - private LocalExecutionPlanContext createLocalExecutionPlanContext(TypeProvider typeProvider) { - ExecutorService instanceNotificationExecutor = - IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification"); - - QueryId queryId = new QueryId("stub_query"); - FragmentInstanceId instanceId = - new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"); - FragmentInstanceStateMachine stateMachine = - new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor); - DataRegion dataRegion = Mockito.mock(DataRegion.class); - FragmentInstanceContext fragmentInstanceContext = - createFragmentInstanceContext(instanceId, stateMachine); - fragmentInstanceContext.setDataRegion(dataRegion); - - return new LocalExecutionPlanContext( - typeProvider, fragmentInstanceContext, new DataNodeQueryContext(1)); - } - - /** - * This method will init a timeJoinNode with @childNum seriesScanNode as children. - * - * @param childNum the number of children - * @return a timeJoinNode with @childNum seriesScanNode as children - */ - private TimeJoinNode initTimeJoinNode(TypeProvider typeProvider, int childNum) - throws IllegalPathException { - TimeJoinNode timeJoinNode = new TimeJoinNode(new PlanNodeId("TimeJoinNode"), Ordering.ASC); - for (int i = 0; i < childNum; i++) { - SeriesScanNode seriesScanNode = - new SeriesScanNode( - new PlanNodeId(String.format("SeriesScanNode%d", i)), - new MeasurementPath(String.format("root.sg.d%d.s1", i), TSDataType.INT32)); - typeProvider.setType(seriesScanNode.getSeriesPath().toString(), TSDataType.INT32); - timeJoinNode.addChild(seriesScanNode); - } - return timeJoinNode; - } - - /** - * This method will init a DeviceViewNode with @childNum alignedSeriesScanNode as children. - * - * @param childNum the number of children - * @return a DeviceViewNode with @childNum alignedSeriesScanNode as children - */ - private DeviceViewNode initDeviceViewNode(TypeProvider typeProvider, int childNum) - throws IllegalPathException { - DeviceViewNode deviceViewNode = - new DeviceViewNode(new PlanNodeId("DeviceViewNode"), null, null, null); - for (int i = 0; i < childNum; i++) { - AlignedSeriesScanNode alignedSeriesScanNode = - new AlignedSeriesScanNode( - new PlanNodeId(String.format("AlignedSeriesScanNode%d", i)), - new AlignedPath(String.format("root.sg.d%d", i), "s1")); - deviceViewNode.addChild(alignedSeriesScanNode); - } - return deviceViewNode; - } }