This is an automated email from the ASF dual-hosted git repository.

lancelly pushed a commit to branch dopMemory
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 8a3c652a2b0aa74fc94c015d6559e8fb81f8083b
Author: lancelly <1435078...@qq.com>
AuthorDate: Tue Aug 22 09:00:27 2023 +0800

    fix
---
 .../plan/planner/LocalExecutionPlanner.java        |  57 ++++++--
 .../plan/planner/OperatorTreeGenerator.java        |   8 +-
 .../plan/planner/PipelineDriverFactory.java        |   4 +
 .../iotdb/db/queryengine/plan/plan/FEPlanUtil.java | 115 ++++++++++++++++
 .../plan/plan/LocalExecutionPlannerTest.java       | 147 +++++++++++++++++++++
 .../queryengine/plan/plan/PipelineBuilderTest.java |  72 +---------
 6 files changed, 319 insertions(+), 84 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
index b875ee70905..f7c3f220a39 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
@@ -38,14 +38,19 @@ import java.util.ArrayList;
 import java.util.List;
 
 /**
- * Used to plan a fragment instance. Currently, we simply change it from 
PlanNode to executable
- * Operator tree, but in the future, we may split one fragment instance into 
multiple pipeline to
- * run a fragment instance parallel and take full advantage of multi-cores
+ * Used to plan a fragment instance. One fragment instance could be split into 
multiple pipelines so
+ * that a fragment instance could be run in parallel, and thus we can take 
full advantages of
+ * multi-cores.
  */
 public class LocalExecutionPlanner {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(LocalExecutionPlanner.class);
 
+  private static final long QUERY_THREAD_COUNT =
+      IoTDBDescriptor.getInstance().getConfig().getQueryThreadCount();
+
+  private static final long ESTIMATED_FI_NUM = 8;
+
   /** allocated memory for operator execution */
   private long freeMemoryForOperators =
       
IoTDBDescriptor.getInstance().getConfig().getAllocateMemoryForOperators();
@@ -71,10 +76,11 @@ public class LocalExecutionPlanner {
     // TODO Replace operator with operatorFactory to build multiple driver for 
one pipeline
     Operator root = plan.accept(new OperatorTreeGenerator(), context);
 
-    // check whether current free memory is enough to execute current query
-    long estimatedMemorySize = checkMemory(root, 
instanceContext.getStateMachine());
+    context.addPipelineDriverFactory(
+        root, context.getDriverContext(), root.calculateMaxPeekMemory());
 
-    context.addPipelineDriverFactory(root, context.getDriverContext(), 
estimatedMemorySize);
+    // check whether current free memory is enough to execute current query
+    checkMemory(context.getPipelineDriverFactories(), 
instanceContext.getStateMachine());
 
     instanceContext.setSourcePaths(collectSourcePaths(context));
 
@@ -92,10 +98,11 @@ public class LocalExecutionPlanner {
 
     Operator root = plan.accept(new OperatorTreeGenerator(), context);
 
-    // check whether current free memory is enough to execute current query
-    checkMemory(root, instanceContext.getStateMachine());
+    context.addPipelineDriverFactory(
+        root, context.getDriverContext(), root.calculateMaxPeekMemory());
 
-    context.addPipelineDriverFactory(root, context.getDriverContext(), 0);
+    // check whether current free memory is enough to execute current query
+    checkMemory(context.getPipelineDriverFactories(), 
instanceContext.getStateMachine());
 
     // set maxBytes one SourceHandle can reserve after visiting the whole tree
     context.setMaxBytesOneHandleCanReserve();
@@ -103,16 +110,18 @@ public class LocalExecutionPlanner {
     return context.getPipelineDriverFactories();
   }
 
-  private long checkMemory(Operator root, FragmentInstanceStateMachine 
stateMachine)
+  private void checkMemory(
+      List<PipelineDriverFactory> pipelineDriverFactories,
+      FragmentInstanceStateMachine stateMachine)
       throws MemoryNotEnoughException {
 
     // if it is disabled, just return
     if 
(!IoTDBDescriptor.getInstance().getConfig().isEnableQueryMemoryEstimation()
         && !IoTDBDescriptor.getInstance().getConfig().isQuotaEnable()) {
-      return 0;
+      return;
     }
 
-    long estimatedMemorySize = root.calculateMaxPeekMemory();
+    long estimatedMemorySize = 
calculateEstimatedMemorySize(pipelineDriverFactories);
 
     synchronized (this) {
       if (estimatedMemorySize > freeMemoryForOperators) {
@@ -148,7 +157,29 @@ public class LocalExecutionPlanner {
             }
           }
         });
-    return estimatedMemorySize;
+  }
+
+  /**
+   * Calculate the estimated memory size this FI would use. Given that there 
are
+   * QUERY_THREAD_COUNT(X) threads, each thread could deal with one 
DriverTask. Now we have
+   * pipelineDriverFactories.size()(Y) DriverTasks(We use 
pipelineDriverFactories.size() instead of
+   * degreeOfParallelism to estimate). Suppose that there are M 
FragmentInstances at the same time.
+   * Then one FragmentInstance could have N Drivers running.
+   *
+   * <p>N = (X / ( M * Y)) * Y = X / M
+   *
+   * <p>The estimated memory size this FI would use is:
+   *
+   * <p>N * avgMemoryUsedPerDriver = N * totalSizeOfDriver / driverNum
+   */
+  private long calculateEstimatedMemorySize(
+      final List<PipelineDriverFactory> pipelineDriverFactories) {
+    long totalSizeOfDrivers =
+        pipelineDriverFactories.stream()
+            .map(PipelineDriverFactory::getEstimatedMemorySize)
+            .reduce(0L, Long::sum);
+    return Math.max((QUERY_THREAD_COUNT / ESTIMATED_FI_NUM), 1)
+        * (totalSizeOfDrivers / pipelineDriverFactories.size());
   }
 
   private List<PartialPath> collectSourcePaths(LocalExecutionPlanContext 
context) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
index 48d2417f8a5..28cca77786d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
@@ -2596,7 +2596,8 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
             // Attention, there is no parent node, use first child node instead
             subContext.getDriverContext(), childNode.getPlanNodeId().getId());
     subContext.setISink(localSinkChannel);
-    subContext.addPipelineDriverFactory(childOperation, 
subContext.getDriverContext(), 0);
+    subContext.addPipelineDriverFactory(
+        childOperation, subContext.getDriverContext(), 
childOperation.calculateMaxPeekMemory());
 
     ExchangeOperator sourceOperator =
         new ExchangeOperator(
@@ -2655,7 +2656,10 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
                   // Attention, there is no parent node, use first child node 
instead
                   context.getDriverContext(), 
childNode.getPlanNodeId().getId());
           subContext.setISink(localSinkChannel);
-          subContext.addPipelineDriverFactory(childOperation, 
subContext.getDriverContext(), 0);
+          subContext.addPipelineDriverFactory(
+              childOperation,
+              subContext.getDriverContext(),
+              childOperation.calculateMaxPeekMemory());
 
           // OneByOneChild may be divided into more than dop pipelines, but 
the number of running
           // actually is dop
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/PipelineDriverFactory.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/PipelineDriverFactory.java
index 1f5ef929fd2..38d19ef1e58 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/PipelineDriverFactory.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/PipelineDriverFactory.java
@@ -45,6 +45,10 @@ public class PipelineDriverFactory {
     this.estimatedMemorySize = estimatedMemorySize;
   }
 
+  public long getEstimatedMemorySize() {
+    return estimatedMemorySize;
+  }
+
   public DriverContext getDriverContext() {
     return driverContext;
   }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/FEPlanUtil.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/FEPlanUtil.java
new file mode 100644
index 00000000000..45da5db83e7
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/FEPlanUtil.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.plan;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.AlignedPath;
+import org.apache.iotdb.commons.path.MeasurementPath;
+import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
+import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
+import org.apache.iotdb.db.queryengine.common.QueryId;
+import org.apache.iotdb.db.queryengine.execution.fragment.DataNodeQueryContext;
+import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
+import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceStateMachine;
+import org.apache.iotdb.db.queryengine.plan.analyze.TypeProvider;
+import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanContext;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceViewNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TimeJoinNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesScanNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanNode;
+import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
+import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import org.mockito.Mockito;
+
+import java.util.Collections;
+import java.util.concurrent.ExecutorService;
+
+import static 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+
+public class FEPlanUtil {
+
+  private FEPlanUtil() {
+    // Util class does not need a Construct Method
+  }
+
+  protected static LocalExecutionPlanContext createLocalExecutionPlanContext(
+      final TypeProvider typeProvider) {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, 
"test-instance-notification");
+
+    QueryId queryId = new QueryId("stub_query");
+    FragmentInstanceId instanceId =
+        new FragmentInstanceId(new PlanFragmentId(queryId, 0), 
"stub-instance");
+    FragmentInstanceStateMachine stateMachine =
+        new FragmentInstanceStateMachine(instanceId, 
instanceNotificationExecutor);
+    DataRegion dataRegion = Mockito.mock(DataRegion.class);
+    FragmentInstanceContext fragmentInstanceContext =
+        createFragmentInstanceContext(instanceId, stateMachine);
+    fragmentInstanceContext.setDataRegion(dataRegion);
+
+    return new LocalExecutionPlanContext(
+        typeProvider, fragmentInstanceContext, new DataNodeQueryContext(1));
+  }
+
+  /**
+   * This method will init a timeJoinNode with @childNum seriesScanNode as 
children.
+   *
+   * @param childNum the number of children
+   * @return a timeJoinNode with @childNum seriesScanNode as children
+   */
+  protected static TimeJoinNode initTimeJoinNode(
+      final TypeProvider typeProvider, final int childNum) throws 
IllegalPathException {
+    TimeJoinNode timeJoinNode = new TimeJoinNode(new 
PlanNodeId("TimeJoinNode"), Ordering.ASC);
+    for (int i = 0; i < childNum; i++) {
+      SeriesScanNode seriesScanNode =
+          new SeriesScanNode(
+              new PlanNodeId(String.format("SeriesScanNode%d", i)),
+              new MeasurementPath(String.format("root.sg.d%d.s1", i), 
TSDataType.INT32));
+      typeProvider.setType(seriesScanNode.getSeriesPath().toString(), 
TSDataType.INT32);
+      timeJoinNode.addChild(seriesScanNode);
+    }
+    return timeJoinNode;
+  }
+
+  /**
+   * This method will init a DeviceViewNode with @childNum 
alignedSeriesScanNode as children.
+   *
+   * @param childNum the number of children
+   * @return a DeviceViewNode with @childNum alignedSeriesScanNode as children
+   */
+  protected static DeviceViewNode initDeviceViewNode(
+      final TypeProvider typeProvider, final int childNum) throws 
IllegalPathException {
+    DeviceViewNode deviceViewNode =
+        new DeviceViewNode(
+            new PlanNodeId("DeviceViewNode"), null, Collections.EMPTY_LIST, 
Collections.EMPTY_MAP);
+    for (int i = 0; i < childNum; i++) {
+      AlignedSeriesScanNode alignedSeriesScanNode =
+          new AlignedSeriesScanNode(
+              new PlanNodeId(String.format("AlignedSeriesScanNode%d", i)),
+              new AlignedPath(String.format("root.sg.d%d", i), "s1"));
+      deviceViewNode.addChild(alignedSeriesScanNode);
+    }
+    return deviceViewNode;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/LocalExecutionPlannerTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/LocalExecutionPlannerTest.java
new file mode 100644
index 00000000000..348c9b6babc
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/LocalExecutionPlannerTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.plan;
+
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.db.queryengine.execution.operator.Operator;
+import org.apache.iotdb.db.queryengine.plan.analyze.TypeProvider;
+import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanContext;
+import org.apache.iotdb.db.queryengine.plan.planner.OperatorTreeGenerator;
+import org.apache.iotdb.db.queryengine.plan.planner.PipelineDriverFactory;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceViewNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TimeJoinNode;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+
+import static 
org.apache.iotdb.db.queryengine.plan.plan.FEPlanUtil.createLocalExecutionPlanContext;
+import static 
org.apache.iotdb.db.queryengine.plan.plan.FEPlanUtil.initDeviceViewNode;
+import static 
org.apache.iotdb.db.queryengine.plan.plan.FEPlanUtil.initTimeJoinNode;
+
+public class LocalExecutionPlannerTest {
+
+  private static final long ESTIMATED_FI_NUM = 8;
+
+  private static final long QUERY_THREAD_COUNT = 8;
+
+  private static final long ALIGNED_MAX_SIZE =
+      TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes();
+
+  /**
+   * This test will test dop = 2. Expected result is five pipelines with 
dependency:
+   *
+   * <p>The first is: DeviceView - [ExchangeOperator, ExchangeOperator, 
ExchangeOperator,
+   * ExchangeOperator];
+   *
+   * <p>The second is: ExchangeOperator - SeriesScan0.
+   *
+   * <p>The third is: ExchangeOperator - SeriesScan1, which has dependency 
second pipeline.
+   *
+   * <p>The forth is: ExchangeOperator - SeriesScan2, which has dependency 
third pipeline.
+   *
+   * <p>The fifth is: ExchangeOperator - SeriesScan3, which has dependency 
forth pipeline.
+   */
+  @Test
+  public void testCheckMemoryWithDeviceView() {
+    try {
+      TypeProvider typeProvider = new TypeProvider();
+      DeviceViewNode deviceViewNode = initDeviceViewNode(typeProvider, 4);
+      LocalExecutionPlanContext context = 
createLocalExecutionPlanContext(typeProvider);
+      context.setDegreeOfParallelism(2);
+      Operator root = deviceViewNode.accept(new OperatorTreeGenerator(), 
context);
+
+      // all the four children pipelines are ScanOperator
+      context
+          .getPipelineDriverFactories()
+          .forEach(
+              pipelineDriverFactory ->
+                  Assert.assertEquals(
+                      ALIGNED_MAX_SIZE, 
pipelineDriverFactory.getEstimatedMemorySize()));
+
+      context.addPipelineDriverFactory(
+          root, context.getDriverContext(), root.calculateMaxPeekMemory());
+      Assert.assertEquals(
+          ALIGNED_MAX_SIZE, 
calculateEstimatedMemorySize(context.getPipelineDriverFactories()));
+      Assert.assertEquals(
+          
calculateEstimatedMemorySizeFromOperation(context.getPipelineDriverFactories()),
+          calculateEstimatedMemorySize(context.getPipelineDriverFactories()));
+    } catch (Exception e) {
+      Assert.fail();
+    }
+  }
+
+  /**
+   * This test will test dop = 2. Expected result is two pipelines:
+   *
+   * <p>The first is: TimeJoin1 - [SeriesScan1, SeriesScan0, ExchangeOperator];
+   *
+   * <p>The second is: ExchangeOperator - TimeJoin1-1[SeriesScan2, 
SeriesScan3].
+   */
+  @Test
+  public void testCheckMemoryWithTimeJoin() throws IllegalPathException {
+    TypeProvider typeProvider = new TypeProvider();
+    TimeJoinNode timeJoinNode = initTimeJoinNode(typeProvider, 4);
+    LocalExecutionPlanContext context = 
createLocalExecutionPlanContext(typeProvider);
+    context.setDegreeOfParallelism(2);
+
+    Operator root = timeJoinNode.accept(new OperatorTreeGenerator(), context);
+
+    // The second pipeline is a RowBasedTimeJoinOperator
+    Assert.assertEquals(
+        2 * ALIGNED_MAX_SIZE, 
context.getPipelineDriverFactories().get(0).getEstimatedMemorySize());
+
+    context.addPipelineDriverFactory(
+        root, context.getDriverContext(), root.calculateMaxPeekMemory());
+
+    // ALIGNED_MAX_SIZE * 5 / 2 is calculated by directly applying the 
algorithm on the Operator
+    // Tree.
+    Assert.assertEquals(
+        ALIGNED_MAX_SIZE * 5 / 2,
+        calculateEstimatedMemorySize(context.getPipelineDriverFactories()));
+    Assert.assertEquals(
+        
calculateEstimatedMemorySizeFromOperation(context.getPipelineDriverFactories()),
+        calculateEstimatedMemorySize(context.getPipelineDriverFactories()));
+  }
+
+  private long calculateEstimatedMemorySizeFromOperation(
+      final List<PipelineDriverFactory> pipelineDriverFactories) {
+    long totalSizeOfDrivers =
+        pipelineDriverFactories.stream()
+            .map(
+                pipelineDriverFactory ->
+                    
pipelineDriverFactory.getOperation().calculateMaxPeekMemory())
+            .reduce(0L, Long::sum);
+    return Math.max((QUERY_THREAD_COUNT / ESTIMATED_FI_NUM), 1)
+        * (totalSizeOfDrivers / pipelineDriverFactories.size());
+  }
+
+  private long calculateEstimatedMemorySize(
+      final List<PipelineDriverFactory> pipelineDriverFactories) {
+    long totalSizeOfDrivers =
+        pipelineDriverFactories.stream()
+            .map(PipelineDriverFactory::getEstimatedMemorySize)
+            .reduce(0L, Long::sum);
+    return Math.max((QUERY_THREAD_COUNT / ESTIMATED_FI_NUM), 1)
+        * (totalSizeOfDrivers / pipelineDriverFactories.size());
+  }
+}
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/PipelineBuilderTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/PipelineBuilderTest.java
index 6ac9ade2501..2b7069333bd 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/PipelineBuilderTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/PipelineBuilderTest.java
@@ -21,17 +21,11 @@ package org.apache.iotdb.db.queryengine.plan.plan;
 
 import org.apache.iotdb.common.rpc.thrift.TAggregationType;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.exception.IllegalPathException;
-import org.apache.iotdb.commons.path.AlignedPath;
 import org.apache.iotdb.commons.path.MeasurementPath;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
 import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
-import org.apache.iotdb.db.queryengine.common.QueryId;
-import org.apache.iotdb.db.queryengine.execution.fragment.DataNodeQueryContext;
-import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
-import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceStateMachine;
 import org.apache.iotdb.db.queryengine.execution.operator.Operator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.AlignedSeriesScanOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.ExchangeOperator;
@@ -46,24 +40,22 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.Aggregatio
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceViewNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TimeJoinNode;
-import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesAggregationScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationDescriptor;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationStep;
 import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
-import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 import org.junit.Test;
-import org.mockito.Mockito;
 
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.ExecutorService;
 
-import static 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static 
org.apache.iotdb.db.queryengine.plan.plan.FEPlanUtil.createLocalExecutionPlanContext;
+import static 
org.apache.iotdb.db.queryengine.plan.plan.FEPlanUtil.initDeviceViewNode;
+import static 
org.apache.iotdb.db.queryengine.plan.plan.FEPlanUtil.initTimeJoinNode;
 import static org.junit.Assert.assertEquals;
 
 public class PipelineBuilderTest {
@@ -797,62 +789,4 @@ public class PipelineBuilderTest {
     assertEquals(2, childNumInEachPipeline[1]);
     assertEquals(5, childNumInEachPipeline[2]);
   }
-
-  private LocalExecutionPlanContext 
createLocalExecutionPlanContext(TypeProvider typeProvider) {
-    ExecutorService instanceNotificationExecutor =
-        IoTDBThreadPoolFactory.newFixedThreadPool(1, 
"test-instance-notification");
-
-    QueryId queryId = new QueryId("stub_query");
-    FragmentInstanceId instanceId =
-        new FragmentInstanceId(new PlanFragmentId(queryId, 0), 
"stub-instance");
-    FragmentInstanceStateMachine stateMachine =
-        new FragmentInstanceStateMachine(instanceId, 
instanceNotificationExecutor);
-    DataRegion dataRegion = Mockito.mock(DataRegion.class);
-    FragmentInstanceContext fragmentInstanceContext =
-        createFragmentInstanceContext(instanceId, stateMachine);
-    fragmentInstanceContext.setDataRegion(dataRegion);
-
-    return new LocalExecutionPlanContext(
-        typeProvider, fragmentInstanceContext, new DataNodeQueryContext(1));
-  }
-
-  /**
-   * This method will init a timeJoinNode with @childNum seriesScanNode as 
children.
-   *
-   * @param childNum the number of children
-   * @return a timeJoinNode with @childNum seriesScanNode as children
-   */
-  private TimeJoinNode initTimeJoinNode(TypeProvider typeProvider, int 
childNum)
-      throws IllegalPathException {
-    TimeJoinNode timeJoinNode = new TimeJoinNode(new 
PlanNodeId("TimeJoinNode"), Ordering.ASC);
-    for (int i = 0; i < childNum; i++) {
-      SeriesScanNode seriesScanNode =
-          new SeriesScanNode(
-              new PlanNodeId(String.format("SeriesScanNode%d", i)),
-              new MeasurementPath(String.format("root.sg.d%d.s1", i), 
TSDataType.INT32));
-      typeProvider.setType(seriesScanNode.getSeriesPath().toString(), 
TSDataType.INT32);
-      timeJoinNode.addChild(seriesScanNode);
-    }
-    return timeJoinNode;
-  }
-
-  /**
-   * This method will init a DeviceViewNode with @childNum 
alignedSeriesScanNode as children.
-   *
-   * @param childNum the number of children
-   * @return a DeviceViewNode with @childNum alignedSeriesScanNode as children
-   */
-  private DeviceViewNode initDeviceViewNode(TypeProvider typeProvider, int 
childNum)
-      throws IllegalPathException {
-    DeviceViewNode deviceViewNode =
-        new DeviceViewNode(new PlanNodeId("DeviceViewNode"), null, null, null);
-    for (int i = 0; i < childNum; i++) {
-      AlignedSeriesScanNode alignedSeriesScanNode =
-          new AlignedSeriesScanNode(
-              new PlanNodeId(String.format("AlignedSeriesScanNode%d", i)),
-              new AlignedPath(String.format("root.sg.d%d", i), "s1"));
-      deviceViewNode.addChild(alignedSeriesScanNode);
-    }
-    return deviceViewNode;
-  }
 }

Reply via email to