This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new eaee1d8dbf [IOTDB-4005] Add degree of parallelism to pipeline engine
eaee1d8dbf is described below

commit eaee1d8dbf832d96098f699d3143bf4a3d917dd7
Author: Xiangwei Wei <[email protected]>
AuthorDate: Tue Feb 14 20:43:07 2023 +0800

    [IOTDB-4005] Add degree of parallelism to pipeline engine
---
 .../resources/conf/iotdb-common.properties         |   4 +
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  10 +
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |   9 +
 .../iotdb/db/mpp/execution/driver/Driver.java      |   8 +
 .../db/mpp/execution/driver/DriverContext.java     |   9 +
 .../iotdb/db/mpp/execution/driver/IDriver.java     |   2 +
 .../mpp/execution/driver/SchemaDriverContext.java  |   5 +-
 .../fragment/FragmentInstanceManager.java          |  14 +-
 .../process/join/RowBasedTimeJoinOperator.java     |   6 +
 .../db/mpp/execution/schedule/DriverScheduler.java | 126 ++--
 .../db/mpp/execution/schedule/task/DriverTask.java |  21 +
 .../plan/planner/LocalExecutionPlanContext.java    |  20 +-
 .../db/mpp/plan/planner/LocalExecutionPlanner.java |  17 +-
 .../db/mpp/plan/planner/OperatorTreeGenerator.java | 207 +++++-
 .../db/mpp/plan/planner/PipelineDriverFactory.java |  23 +-
 .../plan/planner/distribution/SourceRewriter.java  |  16 +-
 .../db/mpp/plan/planner/plan/node/PlanNode.java    |  13 +
 .../planner/plan/node/process/AggregationNode.java |  24 +-
 .../planner/plan/node/process/DeviceMergeNode.java |   6 +
 .../plan/node/process/GroupByLevelNode.java        |   7 +
 .../planner/plan/node/process/GroupByTagNode.java  |   7 +
 .../plan/node/process/HorizontallyConcatNode.java  |  11 +
 .../planner/plan/node/process/MergeSortNode.java   |  19 +
 .../planner/plan/node/process/TimeJoinNode.java    |   8 +
 .../schema/CountGroupByLevelMergeOperatorTest.java |   4 +-
 .../operator/schema/SchemaCountOperatorTest.java   |   4 +-
 .../schema/SchemaQueryScanOperatorTest.java        |   4 +-
 .../execution/schedule/DriverSchedulerTest.java    |   4 +
 .../db/mpp/plan/plan/PipelineBuilderTest.java      | 761 +++++++++++++++++++++
 29 files changed, 1257 insertions(+), 112 deletions(-)

diff --git a/node-commons/src/assembly/resources/conf/iotdb-common.properties 
b/node-commons/src/assembly/resources/conf/iotdb-common.properties
index 1ca8968dd3..9f9054aef7 100644
--- a/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++ b/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -419,6 +419,10 @@ cluster_name=defaultCluster
 # Datatype: int
 # query_thread_count=0
 
+# How many pipeline drivers will be created for one fragment instance. When <= 
0, use CPU core number / 2.
+# Datatype: int
+# degree_of_query_parallelism=0
+
 # The amount of data iterate each time in server (the number of data strips, 
that is, the number of different timestamps.)
 # Datatype: int
 # batch_size=100000
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 32f68e5002..746d26bb03 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -318,6 +318,8 @@ public class IoTDBConfig {
   /** How many threads can concurrently execute query statement. When <= 0, 
use CPU core number. */
   private int queryThreadCount = Runtime.getRuntime().availableProcessors();
 
+  private int degreeOfParallelism = Runtime.getRuntime().availableProcessors() 
/ 2;
+
   /** How many queries can be concurrently executed. When <= 0, use 1000. */
   private int maxAllowedConcurrentQueries = 1000;
 
@@ -1463,6 +1465,14 @@ public class IoTDBConfig {
     this.queryThreadCount = queryThreadCount;
   }
 
+  public void setDegreeOfParallelism(int degreeOfParallelism) {
+    this.degreeOfParallelism = degreeOfParallelism;
+  }
+
+  public int getDegreeOfParallelism() {
+    return degreeOfParallelism;
+  }
+
   public int getMaxAllowedConcurrentQueries() {
     return maxAllowedConcurrentQueries;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java 
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 1348f6861a..8c5b060644 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -541,6 +541,15 @@ public class IoTDBDescriptor {
       conf.setQueryThreadCount(Runtime.getRuntime().availableProcessors());
     }
 
+    conf.setDegreeOfParallelism(
+        Integer.parseInt(
+            properties.getProperty(
+                "degree_of_query_parallelism", 
Integer.toString(conf.getDegreeOfParallelism()))));
+
+    if (conf.getDegreeOfParallelism() <= 0) {
+      conf.setDegreeOfParallelism(Runtime.getRuntime().availableProcessors() / 
2);
+    }
+
     conf.setMaxAllowedConcurrentQueries(
         Integer.parseInt(
             properties.getProperty(
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
index 14554bb8fb..a1ab1c18db 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
@@ -91,6 +91,10 @@ public abstract class Driver implements IDriver {
     return result.orElseGet(() -> state.get() != State.ALIVE || 
driverContext.isDone());
   }
 
+  public DriverContext getDriverContext() {
+    return driverContext;
+  }
+
   /**
    * do initialization
    *
@@ -101,6 +105,10 @@ public abstract class Driver implements IDriver {
   /** release resource this driver used */
   protected abstract void releaseResource();
 
+  public int getDependencyDriverIndex() {
+    return driverContext.getDependencyDriverIndex();
+  }
+
   @Override
   public ListenableFuture<?> processFor(Duration duration) {
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java
index 95a5861978..ea369d8fad 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java
@@ -39,6 +39,7 @@ public class DriverContext {
   private final List<OperatorContext> operatorContexts = new ArrayList<>();
   private ISinkHandle sinkHandle;
   private final RuleBasedTimeSliceAllocator timeSliceAllocator;
+  private int dependencyDriverIndex = -1;
 
   private final AtomicBoolean finished = new AtomicBoolean();
 
@@ -69,6 +70,14 @@ public class DriverContext {
     throw new UnsupportedOperationException();
   }
 
+  public void setDependencyDriverIndex(int dependencyDriverIndex) {
+    this.dependencyDriverIndex = dependencyDriverIndex;
+  }
+
+  public int getDependencyDriverIndex() {
+    return dependencyDriverIndex;
+  }
+
   public void setSinkHandle(ISinkHandle sinkHandle) {
     this.sinkHandle = sinkHandle;
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/IDriver.java 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/IDriver.java
index 8a55d098de..ff55d5456c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/IDriver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/IDriver.java
@@ -71,4 +71,6 @@ public interface IDriver {
 
   /** @return get SinkHandle of current IDriver */
   ISinkHandle getSinkHandle();
+
+  int getDependencyDriverIndex();
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/SchemaDriverContext.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/SchemaDriverContext.java
index 495a2779ee..b93e410127 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/SchemaDriverContext.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/SchemaDriverContext.java
@@ -27,9 +27,8 @@ public class SchemaDriverContext extends DriverContext {
   private final ISchemaRegion schemaRegion;
 
   public SchemaDriverContext(
-      FragmentInstanceContext fragmentInstanceContext, ISchemaRegion 
schemaRegion) {
-    // TODO whether schema driver need to be split to pipeline, default 0 now
-    super(fragmentInstanceContext, 0);
+      FragmentInstanceContext fragmentInstanceContext, ISchemaRegion 
schemaRegion, int pipelineId) {
+    super(fragmentInstanceContext, pipelineId);
     this.schemaRegion = schemaRegion;
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
index efa8db0bf1..794174b939 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
@@ -25,7 +25,6 @@ import 
org.apache.iotdb.db.engine.storagegroup.IDataRegionForQuery;
 import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.execution.driver.IDriver;
-import org.apache.iotdb.db.mpp.execution.driver.SchemaDriver;
 import org.apache.iotdb.db.mpp.execution.exchange.ISinkHandle;
 import org.apache.iotdb.db.mpp.execution.schedule.DriverScheduler;
 import org.apache.iotdb.db.mpp.execution.schedule.IDriverScheduler;
@@ -41,7 +40,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -195,14 +193,20 @@ public class FragmentInstanceManager {
                               fragmentInstanceId, stateMachine, 
instance.getSessionInfo()));
 
               try {
-                SchemaDriver driver =
+                List<PipelineDriverFactory> driverFactories =
                     planner.plan(instance.getFragment().getPlanNodeTree(), 
context, schemaRegion);
+
+                List<IDriver> drivers = new ArrayList<>();
+                driverFactories.forEach(factory -> 
drivers.add(factory.createDriver()));
+                // get the sinkHandle of last driver
+                ISinkHandle sinkHandle = drivers.get(drivers.size() - 
1).getSinkHandle();
+
                 return createFragmentInstanceExecution(
                     scheduler,
                     instanceId,
                     context,
-                    Collections.singletonList(driver),
-                    driver.getSinkHandle(),
+                    drivers,
+                    sinkHandle,
                     stateMachine,
                     failedInstances,
                     instance.getTimeOut());
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java
index 50cccb9573..79f9905ba8 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.execution.operator.process.join;
 
+import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import 
org.apache.iotdb.db.mpp.execution.operator.process.AbstractProcessOperator;
@@ -290,6 +291,11 @@ public class RowBasedTimeJoinOperator extends 
AbstractProcessOperator {
     timeSelector.add(inputTsBlocks[index].getTimeByIndex(inputIndex[index]));
   }
 
+  @TestOnly
+  public List<Operator> getChildren() {
+    return children;
+  }
+
   /**
    * If the tsBlock of columnIndex is null or has no more data in the tsBlock, 
return true; else
    * return false;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
index 7188717d20..5179a57b3a 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.commons.exception.StartupException;
 import org.apache.iotdb.commons.service.IService;
 import org.apache.iotdb.commons.service.ServiceType;
 import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.common.QueryId;
@@ -39,6 +40,8 @@ import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
 import org.apache.iotdb.db.utils.SetThreadName;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
 
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,7 +55,6 @@ import java.util.OptionalInt;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
 
 import static 
org.apache.iotdb.db.mpp.metric.DriverSchedulerMetricSet.BLOCK_QUEUED_TIME;
 import static 
org.apache.iotdb.db.mpp.metric.DriverSchedulerMetricSet.READY_QUEUED_TIME;
@@ -62,6 +64,7 @@ public class DriverScheduler implements IDriverScheduler, 
IService {
 
   private static final Logger logger = 
LoggerFactory.getLogger(DriverScheduler.class);
   private static final QueryMetricsManager QUERY_METRICS = 
QueryMetricsManager.getInstance();
+  private static final IoTDBConfig config = 
IoTDBDescriptor.getInstance().getConfig();
 
   private static final double LEVEL_TIME_MULTIPLIER = 2;
 
@@ -78,13 +81,10 @@ public class DriverScheduler implements IDriverScheduler, 
IService {
   private final AtomicInteger nextDriverTaskHandleId = new AtomicInteger(0);
   private IMPPDataExchangeManager blockManager;
 
-  private static final int QUERY_MAX_CAPACITY =
-      
IoTDBDescriptor.getInstance().getConfig().getMaxAllowedConcurrentQueries();
-  private static final int WORKER_THREAD_NUM =
-      IoTDBDescriptor.getInstance().getConfig().getQueryThreadCount();
-  private static final int TASK_MAX_CAPACITY = QUERY_MAX_CAPACITY * 
WORKER_THREAD_NUM * 2;
-  private static final long QUERY_TIMEOUT_MS =
-      IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold();
+  private static final int QUERY_MAX_CAPACITY = 
config.getMaxAllowedConcurrentQueries();
+  private static final int WORKER_THREAD_NUM = config.getQueryThreadCount();
+  private static final int TASK_MAX_CAPACITY = QUERY_MAX_CAPACITY * 
config.getDegreeOfParallelism();
+  private static final long QUERY_TIMEOUT_MS = 
config.getQueryTimeoutThreshold();
   private final ThreadGroup workerGroups;
   private final List<AbstractDriverThread> threads;
 
@@ -175,43 +175,78 @@ public class DriverScheduler implements IDriverScheduler, 
IService {
             getNextDriverTaskHandleId(),
             (MultilevelPriorityQueue) readyQueue,
             OptionalInt.of(Integer.MAX_VALUE));
-    List<DriverTask> tasks =
-        drivers.stream()
-            .map(
-                v ->
-                    new DriverTask(
-                        v,
-                        timeOut > 0 ? timeOut : QUERY_TIMEOUT_MS,
-                        DriverTaskStatus.READY,
-                        driverTaskHandle))
-            .collect(Collectors.toList());
+    List<DriverTask> tasks = new ArrayList<>();
+    drivers.forEach(
+        driver ->
+            tasks.add(
+                new DriverTask(
+                    driver,
+                    timeOut > 0 ? timeOut : QUERY_TIMEOUT_MS,
+                    DriverTaskStatus.READY,
+                    driverTaskHandle)));
+
+    List<DriverTask> submittedTasks = new ArrayList<>();
+    for (DriverTask task : tasks) {
+      IDriver driver = task.getDriver();
+      if (driver.getDependencyDriverIndex() != -1) {
+        SettableFuture<?> blockedDependencyFuture =
+            
tasks.get(driver.getDependencyDriverIndex()).getBlockedDependencyDriver();
+        blockedDependencyFuture.addListener(
+            () -> {
+              // Only if query is alive, we can submit this task
+              queryMap.computeIfPresent(
+                  queryId,
+                  (k1, queryRelatedTasks) -> {
+                    queryRelatedTasks.computeIfPresent(
+                        task.getDriverTaskId().getFragmentInstanceId(),
+                        (k2, instanceRelatedTasks) -> {
+                          instanceRelatedTasks.add(task);
+                          submitTaskToReadyQueue(task);
+                          return instanceRelatedTasks;
+                        });
+                    return queryRelatedTasks;
+                  });
+            },
+            MoreExecutors.directExecutor());
+      } else {
+        submittedTasks.add(task);
+      }
+    }
+
+    for (DriverTask task : submittedTasks) {
+      registerTaskToQueryMap(queryId, task);
+    }
+    for (DriverTask task : submittedTasks) {
+      submitTaskToReadyQueue(task);
+    }
+  }
+
+  public void registerTaskToQueryMap(QueryId queryId, DriverTask driverTask) {
     // If query has not been registered by other fragment instances,
     // add the first task as timeout checking task to timeoutQueue.
-    for (DriverTask driverTask : tasks) {
-      queryMap
-          .computeIfAbsent(
-              queryId,
-              v -> {
-                timeoutQueue.push(tasks.get(0));
-                return new ConcurrentHashMap<>();
-              })
-          .computeIfAbsent(
-              driverTask.getDriverTaskId().getFragmentInstanceId(),
-              v -> Collections.synchronizedSet(new HashSet<>()))
-          .add(driverTask);
-    }
+    queryMap
+        .computeIfAbsent(
+            queryId,
+            v -> {
+              timeoutQueue.push(driverTask);
+              return new ConcurrentHashMap<>();
+            })
+        .computeIfAbsent(
+            driverTask.getDriverTaskId().getFragmentInstanceId(),
+            v -> Collections.synchronizedSet(new HashSet<>()))
+        .add(driverTask);
+  }
 
-    for (DriverTask task : tasks) {
-      task.lock();
-      try {
-        if (task.getStatus() != DriverTaskStatus.READY) {
-          continue;
-        }
-        readyQueue.push(task);
-        task.setLastEnterReadyQueueTime(System.nanoTime());
-      } finally {
-        task.unlock();
+  public void submitTaskToReadyQueue(DriverTask task) {
+    task.lock();
+    try {
+      if (task.getStatus() != DriverTaskStatus.READY) {
+        return;
       }
+      readyQueue.push(task);
+      task.setLastEnterReadyQueueTime(System.nanoTime());
+    } finally {
+      task.unlock();
     }
   }
 
@@ -447,6 +482,13 @@ public class DriverScheduler implements IDriverScheduler, 
IService {
         }
         task.updateSchedulePriority(context);
         task.setStatus(DriverTaskStatus.FINISHED);
+      } finally {
+        task.unlock();
+      }
+      // Dependency driver must be submitted before this task is cleared
+      task.submitDependencyDriver();
+      task.lock();
+      try {
         clearDriverTask(task);
       } finally {
         task.unlock();
@@ -472,7 +514,7 @@ public class DriverScheduler implements IDriverScheduler, 
IService {
           task.unlock();
         }
         QueryId queryId = task.getDriverTaskId().getQueryId();
-        Map<FragmentInstanceId, Set<DriverTask>> queryRelatedTasks = 
queryMap.get(queryId);
+        Map<FragmentInstanceId, Set<DriverTask>> queryRelatedTasks = 
queryMap.remove(queryId);
         if (queryRelatedTasks != null) {
           for (Set<DriverTask> fragmentRelatedTasks : 
queryRelatedTasks.values()) {
             if (fragmentRelatedTasks != null) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java
index e008b84210..c436b180f8 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java
@@ -31,6 +31,7 @@ import 
org.apache.iotdb.db.mpp.execution.schedule.queue.multilevelqueue.DriverTa
 import 
org.apache.iotdb.db.mpp.execution.schedule.queue.multilevelqueue.Priority;
 
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
 import io.airlift.units.Duration;
 
 import java.util.Comparator;
@@ -58,6 +59,8 @@ public class DriverTask implements IDIndexedAccessible {
   private long lastEnterReadyQueueTime;
   private long lastEnterBlockQueueTime;
 
+  private SettableFuture<Void> blockedDependencyDriver = null;
+
   /** Initialize a dummy instance for queryHolder */
   public DriverTask() {
     this(new StubFragmentInstance(), 0L, null, null);
@@ -137,6 +140,19 @@ public class DriverTask implements IDIndexedAccessible {
     this.abortCause = abortCause;
   }
 
+  public void submitDependencyDriver() {
+    if (blockedDependencyDriver != null) {
+      this.blockedDependencyDriver.set(null);
+    }
+  }
+
+  public SettableFuture<Void> getBlockedDependencyDriver() {
+    if (blockedDependencyDriver == null) {
+      blockedDependencyDriver = SettableFuture.create();
+    }
+    return blockedDependencyDriver;
+  }
+
   public Priority getPriority() {
     return priority.get();
   }
@@ -251,5 +267,10 @@ public class DriverTask implements IDIndexedAccessible {
     public ISinkHandle getSinkHandle() {
       return null;
     }
+
+    @Override
+    public int getDependencyDriverIndex() {
+      return -1;
+    }
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
index 2d8e6d7051..0404be866a 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
@@ -59,6 +59,8 @@ public class LocalExecutionPlanContext {
   private final AtomicInteger nextOperatorId;
   private final TypeProvider typeProvider;
   private final Map<String, Set<String>> allSensorsMap;
+  private int degreeOfParallelism =
+      IoTDBDescriptor.getInstance().getConfig().getDegreeOfParallelism();
   // this is shared with all subContexts
   private AtomicInteger nextPipelineId;
   private List<PipelineDriverFactory> pipelineDriverFactories;
@@ -97,6 +99,7 @@ public class LocalExecutionPlanContext {
     this.dataRegionTTL = parentContext.dataRegionTTL;
     this.nextPipelineId = parentContext.nextPipelineId;
     this.pipelineDriverFactories = parentContext.pipelineDriverFactories;
+    this.degreeOfParallelism = parentContext.degreeOfParallelism;
     this.exchangeSumNum = parentContext.exchangeSumNum;
     this.exchangeOperatorList = parentContext.exchangeOperatorList;
     this.cachedDataTypes = parentContext.cachedDataTypes;
@@ -110,10 +113,13 @@ public class LocalExecutionPlanContext {
     this.allSensorsMap = new ConcurrentHashMap<>();
     this.typeProvider = null;
     this.nextOperatorId = new AtomicInteger(0);
+    this.nextPipelineId = new AtomicInteger(0);
 
     // there is no ttl in schema region, so we don't care this field
     this.dataRegionTTL = Long.MAX_VALUE;
-    this.driverContext = new SchemaDriverContext(instanceContext, 
schemaRegion);
+    this.driverContext =
+        new SchemaDriverContext(instanceContext, schemaRegion, 
getNextPipelineId());
+    this.pipelineDriverFactories = new ArrayList<>();
   }
 
   public void addPipelineDriverFactory(Operator operation, DriverContext 
driverContext) {
@@ -138,10 +144,22 @@ public class LocalExecutionPlanContext {
     return pipelineDriverFactories;
   }
 
+  public int getPipelineNumber() {
+    return pipelineDriverFactories.size();
+  }
+
   public DriverContext getDriverContext() {
     return driverContext;
   }
 
+  public int getDegreeOfParallelism() {
+    return degreeOfParallelism;
+  }
+
+  public void setDegreeOfParallelism(int degreeOfParallelism) {
+    this.degreeOfParallelism = degreeOfParallelism;
+  }
+
   private int getNextPipelineId() {
     return nextPipelineId.getAndIncrement();
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
index 231ca1d680..91b3c5187f 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
@@ -24,12 +24,9 @@ import 
org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
 import org.apache.iotdb.db.mpp.exception.MemoryNotEnoughException;
 import org.apache.iotdb.db.mpp.execution.driver.DataDriverContext;
-import org.apache.iotdb.db.mpp.execution.driver.SchemaDriver;
-import org.apache.iotdb.db.mpp.execution.driver.SchemaDriverContext;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
-import org.apache.iotdb.db.mpp.execution.timer.ITimeSliceAllocator;
 import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.utils.SetThreadName;
@@ -81,7 +78,7 @@ public class LocalExecutionPlanner {
     return context.getPipelineDriverFactories();
   }
 
-  public SchemaDriver plan(
+  public List<PipelineDriverFactory> plan(
       PlanNode plan, FragmentInstanceContext instanceContext, ISchemaRegion 
schemaRegion)
       throws MemoryNotEnoughException {
     LocalExecutionPlanContext context =
@@ -92,18 +89,12 @@ public class LocalExecutionPlanner {
     // check whether current free memory is enough to execute current query
     checkMemory(root, instanceContext.getStateMachine());
 
+    context.addPipelineDriverFactory(root, context.getDriverContext());
+
     // set maxBytes one SourceHandle can reserve after visiting the whole tree
     context.setMaxBytesOneHandleCanReserve();
 
-    ITimeSliceAllocator timeSliceAllocator = context.getTimeSliceAllocator();
-    context
-        .getDriverContext()
-        .getOperatorContexts()
-        .forEach(
-            operatorContext ->
-                
operatorContext.setMaxRunTime(timeSliceAllocator.getMaxRunTime(operatorContext)));
-
-    return new SchemaDriver(root, (SchemaDriverContext) 
context.getDriverContext());
+    return context.getPipelineDriverFactories();
   }
 
   private void checkMemory(Operator root, FragmentInstanceStateMachine 
stateMachine)
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index 5cf08f8d98..5d99a29ad1 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -554,7 +554,7 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
                 node.getPlanNodeId(),
                 CountMergeOperator.class.getSimpleName());
     context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
-    if (children.get(0) instanceof CountGroupByLevelScanOperator) {
+    if (node.getChildren().get(0) instanceof LevelTimeSeriesCountNode) {
       return new CountGroupByLevelMergeOperator(node.getPlanNodeId(), 
operatorContext, children);
     } else {
       return new CountMergeOperator(node.getPlanNodeId(), operatorContext, 
children);
@@ -2228,25 +2228,72 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
             node.getPathPatternList(), node.getTemplateId()));
   }
 
-  private List<Operator> dealWithConsumeAllChildrenPipelineBreaker(
+  public List<Operator> dealWithConsumeAllChildrenPipelineBreaker(
       PlanNode node, LocalExecutionPlanContext context) {
     // children after pipelining
-    List<Operator> children = new ArrayList<>();
+    List<Operator> parentPipelineChildren = new ArrayList<>();
     int finalExchangeNum = context.getExchangeSumNum();
-    for (PlanNode childSource : node.getChildren()) {
-      // Create pipelines for children
-      LocalExecutionPlanContext subContext = context.createSubContext();
-      Operator childOperation = childSource.accept(this, subContext);
-      // If the child belongs to another fragment instance, we don't create 
pipeline for it
-      if (childOperation instanceof ExchangeOperator) {
-        children.add(childOperation);
-        finalExchangeNum += 1;
-      } else {
+    if (context.getDegreeOfParallelism() == 1) {
+      // If dop = 1, we don't create extra pipeline
+      for (PlanNode localChild : node.getChildren()) {
+        Operator childOperation = localChild.accept(this, context);
+        parentPipelineChildren.add(childOperation);
+      }
+    } else {
+      // Keep it since we may change the structure of origin children nodes
+      List<PlanNode> afterwardsNodes = new ArrayList<>();
+      // 1. Calculate localChildren size
+      int localChildrenSize = 0;
+      for (PlanNode child : node.getChildren()) {
+        if (!(child instanceof ExchangeNode)) {
+          localChildrenSize++;
+        }
+      }
+      // 2. divide every childNumInEachPipeline localChildren to different 
pipeline
+      int[] childNumInEachPipeline =
+          getChildNumInEachPipeline(
+              node.getChildren(), localChildrenSize, 
context.getDegreeOfParallelism());
+      // If dop > size(children) + 1, we can allocate extra dop to child node
+      // Extra dop = dop - size(children), since dop = 1 means serial but not 0
+      int childGroupNum = Math.min(context.getDegreeOfParallelism(), 
localChildrenSize);
+      int dopForChild = Math.max(1, context.getDegreeOfParallelism() - 
localChildrenSize);
+      int startIndex, endIndex = 0;
+      for (int i = 0; i < childGroupNum; i++) {
+        startIndex = endIndex;
+        endIndex += childNumInEachPipeline[i];
+        // Only if dop >= size(children) + 1, split all children to new 
pipeline
+        // Otherwise, the first group will belong to the parent pipeline
+        if (i == 0 && context.getDegreeOfParallelism() < localChildrenSize + 
1) {
+          for (int j = startIndex; j < endIndex; j++) {
+            Operator childOperation = node.getChildren().get(j).accept(this, 
context);
+            parentPipelineChildren.add(childOperation);
+            afterwardsNodes.add(node.getChildren().get(j));
+          }
+          continue;
+        }
+        LocalExecutionPlanContext subContext = context.createSubContext();
+        subContext.setDegreeOfParallelism(dopForChild);
+        // Create partial parent operator for children
+        PlanNode partialParentNode = null;
+        Operator partialParentOperator = null;
+
+        int originPipeNum = context.getPipelineNumber();
+        if (endIndex - startIndex == 1) {
+          partialParentNode = node.getChildren().get(i);
+          partialParentOperator = node.getChildren().get(i).accept(this, 
subContext);
+        } else {
+          // PartialParentNode is equals to parentNode except children
+          partialParentNode = node.createSubNode(i, startIndex, endIndex);
+          partialParentOperator = partialParentNode.accept(this, subContext);
+        }
+        // update dop for child
+        dopForChild = Math.max(1, dopForChild - 
(subContext.getPipelineNumber() - originPipeNum));
         ISinkHandle localSinkHandle =
             MPP_DATA_EXCHANGE_MANAGER.createLocalSinkHandleForPipeline(
-                subContext.getDriverContext(), 
childSource.getPlanNodeId().getId());
+                // Attention, there is no parent node, use first child node 
instead
+                subContext.getDriverContext(), 
node.getChildren().get(i).getPlanNodeId().getId());
         subContext.setSinkHandle(localSinkHandle);
-        subContext.addPipelineDriverFactory(childOperation, 
subContext.getDriverContext());
+        subContext.addPipelineDriverFactory(partialParentOperator, 
subContext.getDriverContext());
 
         ExchangeOperator sourceOperator =
             new ExchangeOperator(
@@ -2257,31 +2304,141 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
                 MPP_DATA_EXCHANGE_MANAGER.createLocalSourceHandleForPipeline(
                     ((LocalSinkHandle) 
localSinkHandle).getSharedTsBlockQueue(),
                     context.getDriverContext()),
-                childSource.getPlanNodeId());
+                partialParentNode.getPlanNodeId());
         context
             .getTimeSliceAllocator()
             .recordExecutionWeight(sourceOperator.getOperatorContext(), 1);
-        children.add(sourceOperator);
+        parentPipelineChildren.add(sourceOperator);
+        afterwardsNodes.add(partialParentNode);
         context.addExchangeOperator(sourceOperator);
         finalExchangeNum += subContext.getExchangeSumNum() - 
context.getExchangeSumNum() + 1;
       }
+      ((MultiChildProcessNode) node).setChildren(afterwardsNodes);
     }
     context.setExchangeSumNum(finalExchangeNum);
-    return children;
+    return parentPipelineChildren;
+  }
+
+  /**
+   * Now, we allocate children to each pipeline as average as possible. For 
example, 5 children with
+   * 3 dop, the children group will be [1, 2, 2]. After we can estimate the 
workload of each
+   * operator, maybe we can allocate based on workload rather than child 
number.
+   *
+   * <p>If child is ExchangeNode, it won't affect the children number of 
current group.
+   */
+  public int[] getChildNumInEachPipeline(
+      List<PlanNode> allChildren, int localChildrenSize, int dop) {
+    int maxPipelineNum = Math.min(localChildrenSize, dop);
+    int[] childNumInEachPipeline = new int[maxPipelineNum];
+    int avgChildNum = Math.max(1, localChildrenSize / dop);
+    // allocate remaining child to group from splitIndex
+    int splitIndex =
+        localChildrenSize <= dop ? maxPipelineNum : maxPipelineNum - 
localChildrenSize % dop;
+    int pipelineIndex = 0, childIndex = 0;
+    while (pipelineIndex < maxPipelineNum) {
+      int childNum = pipelineIndex < splitIndex ? avgChildNum : avgChildNum + 
1;
+      int originChildIndex = childIndex;
+      while (childNum >= 0 && childIndex < allChildren.size()) {
+        if (!(allChildren.get(childIndex) instanceof ExchangeNode)) {
+          childNum--;
+          // Try to keep the first of a pipeline is not a ExchangeNode
+          if (childNum == -1) {
+            childIndex--;
+          }
+        }
+        childIndex++;
+      }
+      childNumInEachPipeline[pipelineIndex++] = childIndex - originChildIndex;
+    }
+    return childNumInEachPipeline;
   }
 
-  private List<Operator> dealWithConsumeChildrenOneByOneNode(
+  public List<Operator> dealWithConsumeChildrenOneByOneNode(
       PlanNode node, LocalExecutionPlanContext context) {
+    List<Operator> parentPipelineChildren = new ArrayList<>();
     int originExchangeNum = context.getExchangeSumNum();
     int finalExchangeNum = context.getExchangeSumNum();
-    List<Operator> children = new ArrayList<>();
-    for (PlanNode childSource : node.getChildren()) {
-      Operator childOperation = childSource.accept(this, context);
-      finalExchangeNum = Math.max(finalExchangeNum, 
context.getExchangeSumNum());
-      context.setExchangeSumNum(originExchangeNum);
-      children.add(childOperation);
+
+    // 1. divide every child to pipeline using the max dop
+    if (context.getDegreeOfParallelism() == 1) {
+      // If dop = 1, we don't create extra pipeline
+      for (PlanNode childSource : node.getChildren()) {
+        Operator childOperation = childSource.accept(this, context);
+        finalExchangeNum = Math.max(finalExchangeNum, 
context.getExchangeSumNum());
+        context.setExchangeSumNum(originExchangeNum);
+        parentPipelineChildren.add(childOperation);
+      }
+    } else {
+      List<Integer> childPipelineNums = new ArrayList<>();
+      List<Integer> childExchangeNums = new ArrayList<>();
+      int sumOfChildPipelines = 0, sumOfChildExchangeNums = 0;
+      int dependencyChildNode = 0, dependencyPipeId = 0;
+      for (PlanNode childNode : node.getChildren()) {
+        if (childNode instanceof ExchangeNode) {
+          Operator childOperation = childNode.accept(this, context);
+          finalExchangeNum = Math.max(finalExchangeNum, 
context.getExchangeSumNum());
+          context.setExchangeSumNum(originExchangeNum);
+          parentPipelineChildren.add(childOperation);
+        } else {
+          LocalExecutionPlanContext subContext = context.createSubContext();
+          // Only context.getDegreeOfParallelism() - 1 can be allocated to 
child
+          int dopForChild = context.getDegreeOfParallelism() - 1;
+          subContext.setDegreeOfParallelism(dopForChild);
+          int originPipeNum = context.getPipelineNumber();
+          Operator childOperation = childNode.accept(this, subContext);
+          ISinkHandle localSinkHandle =
+              MPP_DATA_EXCHANGE_MANAGER.createLocalSinkHandleForPipeline(
+                  // Attention, there is no parent node, use first child node 
instead
+                  context.getDriverContext(), 
childNode.getPlanNodeId().getId());
+          subContext.setSinkHandle(localSinkHandle);
+          subContext.addPipelineDriverFactory(childOperation, 
subContext.getDriverContext());
+
+          int curChildPipelineNum = subContext.getPipelineNumber() - 
originPipeNum;
+          childPipelineNums.add(curChildPipelineNum);
+          sumOfChildPipelines += curChildPipelineNum;
+          // If sumOfChildPipelines > dopForChild, we have to wait until some 
pipelines finish
+          if (sumOfChildPipelines > dopForChild) {
+            // Update dependencyPipeId, after which finishes we can submit 
curChildPipeline
+            while (sumOfChildPipelines > dopForChild) {
+              dependencyPipeId = context.getPipelineNumber() - 
sumOfChildPipelines;
+              sumOfChildPipelines -= 
childPipelineNums.get(dependencyChildNode);
+              sumOfChildExchangeNums -= 
childExchangeNums.get(dependencyChildNode);
+              dependencyChildNode++;
+            }
+          }
+          // Add dependency for all pipelines under current node
+          if (dependencyChildNode != 0) {
+            for (int i = originPipeNum; i < subContext.getPipelineNumber(); 
i++) {
+              
context.getPipelineDriverFactories().get(i).setDependencyPipeline(dependencyPipeId);
+            }
+          }
+
+          ExchangeOperator sourceOperator =
+              new ExchangeOperator(
+                  context
+                      .getDriverContext()
+                      .addOperatorContext(
+                          context.getNextOperatorId(),
+                          null,
+                          ExchangeOperator.class.getSimpleName()),
+                  MPP_DATA_EXCHANGE_MANAGER.createLocalSourceHandleForPipeline(
+                      ((LocalSinkHandle) 
localSinkHandle).getSharedTsBlockQueue(),
+                      context.getDriverContext()),
+                  childNode.getPlanNodeId());
+          context
+              .getTimeSliceAllocator()
+              .recordExecutionWeight(sourceOperator.getOperatorContext(), 1);
+          parentPipelineChildren.add(sourceOperator);
+          context.addExchangeOperator(sourceOperator);
+          int childExchangeNum = subContext.getExchangeSumNum() - 
context.getExchangeSumNum() + 1;
+          sumOfChildExchangeNums += childExchangeNum;
+          childExchangeNums.add(childExchangeNum);
+          finalExchangeNum =
+              Math.max(finalExchangeNum, context.getExchangeSumNum() + 
sumOfChildExchangeNums);
+        }
+      }
     }
     context.setExchangeSumNum(finalExchangeNum);
-    return children;
+    return parentPipelineChildren;
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/PipelineDriverFactory.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/PipelineDriverFactory.java
index ec5eb7a3e2..8cc9ec0e51 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/PipelineDriverFactory.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/PipelineDriverFactory.java
@@ -20,8 +20,11 @@
 package org.apache.iotdb.db.mpp.plan.planner;
 
 import org.apache.iotdb.db.mpp.execution.driver.DataDriver;
+import org.apache.iotdb.db.mpp.execution.driver.DataDriverContext;
 import org.apache.iotdb.db.mpp.execution.driver.Driver;
 import org.apache.iotdb.db.mpp.execution.driver.DriverContext;
+import org.apache.iotdb.db.mpp.execution.driver.SchemaDriver;
+import org.apache.iotdb.db.mpp.execution.driver.SchemaDriverContext;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 
 import static java.util.Objects.requireNonNull;
@@ -31,6 +34,7 @@ public class PipelineDriverFactory {
   private final DriverContext driverContext;
   // TODO Use OperatorFactory to replace operator to generate multiple drivers 
for on pipeline
   private final Operator operation;
+  private int dependencyPipelineIndex = -1;
 
   public PipelineDriverFactory(Operator operation, DriverContext 
driverContext) {
     this.operation = requireNonNull(operation, "rootOperator is null");
@@ -44,7 +48,16 @@ public class PipelineDriverFactory {
   public Driver createDriver() {
     requireNonNull(driverContext, "driverContext is null");
     try {
-      return new DataDriver(operation, driverContext);
+      Driver driver = null;
+      if (driverContext instanceof DataDriverContext) {
+        driver = new DataDriver(operation, driverContext);
+      } else {
+        driver = new SchemaDriver(operation, (SchemaDriverContext) 
driverContext);
+      }
+      if (dependencyPipelineIndex != -1) {
+        
driver.getDriverContext().setDependencyDriverIndex(dependencyPipelineIndex);
+      }
+      return driver;
     } catch (Throwable failure) {
       try {
         operation.close();
@@ -56,4 +69,12 @@ public class PipelineDriverFactory {
       throw failure;
     }
   }
+
+  public void setDependencyPipeline(int dependencyPipelineIndex) {
+    this.dependencyPipelineIndex = dependencyPipelineIndex;
+  }
+
+  public int getDependencyPipelineIndex() {
+    return dependencyPipelineIndex;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
index fd8d1a9293..22efafdb10 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
@@ -1085,7 +1085,7 @@ public class SourceRewriter extends 
SimplePlanNodeRewriter<DistributionPlanConte
   private Map<TRegionReplicaSet, List<SeriesAggregationSourceNode>>
       splitAggregationSourceByPartition(PlanNode root, DistributionPlanContext 
context) {
     // Step 0: get all SeriesAggregationSourceNode in PlanNodeTree
-    List<SeriesAggregationSourceNode> rawSources = 
findAggregationSourceNode(root);
+    List<SeriesAggregationSourceNode> rawSources = 
AggregationNode.findAggregationSourceNode(root);
 
     // Step 1: construct SeriesAggregationSourceNode for each data region of 
one Path
     List<SeriesAggregationSourceNode> sources = new ArrayList<>();
@@ -1117,7 +1117,7 @@ public class SourceRewriter extends 
SimplePlanNodeRewriter<DistributionPlanConte
           boolean[] eachSeriesOneRegion,
           Map<PartialPath, Integer> regionCountPerSeries) {
     // Step 0: get all SeriesAggregationSourceNode in PlanNodeTree
-    List<SeriesAggregationSourceNode> rawSources = 
findAggregationSourceNode(root);
+    List<SeriesAggregationSourceNode> rawSources = 
AggregationNode.findAggregationSourceNode(root);
 
     // Step 1: construct SeriesAggregationSourceNode for each data region of 
one Path
     for (SeriesAggregationSourceNode child : rawSources) {
@@ -1169,18 +1169,6 @@ public class SourceRewriter extends 
SimplePlanNodeRewriter<DistributionPlanConte
     return dataDistribution.size();
   }
 
-  private List<SeriesAggregationSourceNode> findAggregationSourceNode(PlanNode 
node) {
-    if (node == null) {
-      return new ArrayList<>();
-    }
-    if (node instanceof SeriesAggregationSourceNode) {
-      return Collections.singletonList((SeriesAggregationSourceNode) node);
-    }
-    List<SeriesAggregationSourceNode> ret = new ArrayList<>();
-    node.getChildren().forEach(child -> 
ret.addAll(findAggregationSourceNode(child)));
-    return ret;
-  }
-
   public List<PlanNode> visit(PlanNode node, DistributionPlanContext context) {
     return node.accept(this, context);
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNode.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNode.java
index d9b640e0cd..0e85bd9461 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNode.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNode.java
@@ -75,6 +75,19 @@ public abstract class PlanNode implements IConsensusRequest {
   @Override
   public abstract PlanNode clone();
 
+  /**
+   * Create sub node which has exactly the same function of origin node, only 
its children is a part
+   * of it, which is composed by the [startIndex, endIndex) of origin children 
list.
+   *
+   * @param subNodeId the sub node id
+   * @param startIndex the start Index of origin children
+   * @param endIndex the endIndex Index of origin children
+   */
+  public PlanNode createSubNode(int subNodeId, int startIndex, int endIndex) {
+    throw new UnsupportedOperationException(
+        String.format("Can't create subNode for %s", 
this.getClass().toString()));
+  }
+
   public PlanNode cloneWithChildren(List<PlanNode> children) {
     if (!(children == null
         || allowedChildCount() == CHILD_COUNT_NO_LIMIT
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/AggregationNode.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/AggregationNode.java
index 6c0c7703e8..1f70daa76b 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/AggregationNode.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/AggregationNode.java
@@ -24,6 +24,7 @@ import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
+import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationSourceNode;
 import 
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByParameter;
 import 
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
@@ -36,6 +37,7 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -50,8 +52,7 @@ import java.util.stream.Collectors;
 public class AggregationNode extends MultiChildProcessNode {
 
   // The list of aggregate functions, each AggregateDescriptor will be output 
as one or two column
-  // of
-  // result TsBlock
+  // of result TsBlock
   protected List<AggregationDescriptor> aggregationDescriptorList;
 
   // The parameter of `group by time`.
@@ -167,6 +168,13 @@ public class AggregationNode extends MultiChildProcessNode 
{
         getScanOrder());
   }
 
+  @Override
+  public PlanNode createSubNode(int subNodeId, int startIndex, int endIndex) {
+    return new HorizontallyConcatNode(
+        new PlanNodeId(String.format("%s-%s", getPlanNodeId(), subNodeId)),
+        new ArrayList<>(children.subList(startIndex, endIndex)));
+  }
+
   @Override
   public List<String> getOutputColumnNames() {
     List<String> outputColumnNames = new ArrayList<>();
@@ -182,6 +190,18 @@ public class AggregationNode extends MultiChildProcessNode 
{
     return outputColumnNames;
   }
 
+  public static List<SeriesAggregationSourceNode> 
findAggregationSourceNode(PlanNode node) {
+    if (node == null) {
+      return new ArrayList<>();
+    }
+    if (node instanceof SeriesAggregationSourceNode) {
+      return Collections.singletonList((SeriesAggregationSourceNode) node);
+    }
+    List<SeriesAggregationSourceNode> ret = new ArrayList<>();
+    node.getChildren().forEach(child -> 
ret.addAll(findAggregationSourceNode(child)));
+    return ret;
+  }
+
   @Override
   public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
     return visitor.visitAggregation(this, context);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceMergeNode.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceMergeNode.java
index d3ec660e7f..faa21947d1 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceMergeNode.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceMergeNode.java
@@ -63,6 +63,12 @@ public class DeviceMergeNode extends MultiChildProcessNode {
     return new DeviceMergeNode(getPlanNodeId(), getMergeOrderParameter(), 
getDevices());
   }
 
+  @Override
+  public PlanNode createSubNode(int subNodeId, int startIndex, int endIndex) {
+    throw new UnsupportedOperationException(
+        "DeviceMergeNode should have only one local child in single data 
region.");
+  }
+
   @Override
   public List<String> getOutputColumnNames() {
     return children.stream()
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByLevelNode.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByLevelNode.java
index da7b8dc9c0..f1833a7cee 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByLevelNode.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByLevelNode.java
@@ -93,6 +93,13 @@ public class GroupByLevelNode extends MultiChildProcessNode {
         getPlanNodeId(), getGroupByLevelDescriptors(), 
this.groupByTimeParameter, this.scanOrder);
   }
 
+  @Override
+  public PlanNode createSubNode(int subNodeId, int startIndex, int endIndex) {
+    return new HorizontallyConcatNode(
+        new PlanNodeId(String.format("%s-%s", getPlanNodeId(), subNodeId)),
+        new ArrayList<>(children.subList(startIndex, endIndex)));
+  }
+
   public List<CrossSeriesAggregationDescriptor> getGroupByLevelDescriptors() {
     return groupByLevelDescriptors;
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByTagNode.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByTagNode.java
index c2e558bdc7..2089696218 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByTagNode.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByTagNode.java
@@ -99,6 +99,13 @@ public class GroupByTagNode extends MultiChildProcessNode {
         this.outputColumnNames);
   }
 
+  @Override
+  public PlanNode createSubNode(int subNodeId, int startIndex, int endIndex) {
+    return new HorizontallyConcatNode(
+        new PlanNodeId(String.format("%s-%s", getPlanNodeId(), subNodeId)),
+        new ArrayList<>(children.subList(startIndex, endIndex)));
+  }
+
   @Override
   public List<String> getOutputColumnNames() {
     List<String> ret = new ArrayList<>(tagKeys);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/HorizontallyConcatNode.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/HorizontallyConcatNode.java
index a541ef1bee..f6c785f8fc 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/HorizontallyConcatNode.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/HorizontallyConcatNode.java
@@ -42,11 +42,22 @@ public class HorizontallyConcatNode extends 
MultiChildProcessNode {
     super(id, new ArrayList<>());
   }
 
+  public HorizontallyConcatNode(PlanNodeId id, List<PlanNode> children) {
+    super(id, children);
+  }
+
   @Override
   public PlanNode clone() {
     return new HorizontallyConcatNode(getPlanNodeId());
   }
 
+  @Override
+  public PlanNode createSubNode(int subNodeId, int startIndex, int endIndex) {
+    return new HorizontallyConcatNode(
+        new PlanNodeId(String.format("%s-%s", getPlanNodeId(), subNodeId)),
+        new ArrayList<>(children.subList(startIndex, endIndex)));
+  }
+
   @Override
   public List<String> getOutputColumnNames() {
     return children.stream()
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/MergeSortNode.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/MergeSortNode.java
index 564a5dbfa4..c5d6c394ee 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/MergeSortNode.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/MergeSortNode.java
@@ -45,6 +45,16 @@ public class MergeSortNode extends MultiChildProcessNode {
     this.outputColumns = outputColumns;
   }
 
+  public MergeSortNode(
+      PlanNodeId id,
+      List<PlanNode> children,
+      OrderByParameter mergeOrderParameter,
+      List<String> outputColumns) {
+    super(id, children);
+    this.mergeOrderParameter = mergeOrderParameter;
+    this.outputColumns = outputColumns;
+  }
+
   public OrderByParameter getMergeOrderParameter() {
     return mergeOrderParameter;
   }
@@ -54,6 +64,15 @@ public class MergeSortNode extends MultiChildProcessNode {
     return new MergeSortNode(getPlanNodeId(), getMergeOrderParameter(), 
outputColumns);
   }
 
+  @Override
+  public PlanNode createSubNode(int subNodeId, int startIndex, int endIndex) {
+    return new MergeSortNode(
+        new PlanNodeId(String.format("%s-%s", getPlanNodeId(), subNodeId)),
+        new ArrayList<>(children.subList(startIndex, endIndex)),
+        getMergeOrderParameter(),
+        outputColumns);
+  }
+
   @Override
   public List<String> getOutputColumnNames() {
     return outputColumns;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/TimeJoinNode.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/TimeJoinNode.java
index 2598cd4e28..ed1b8e4fb0 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/TimeJoinNode.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/TimeJoinNode.java
@@ -62,6 +62,14 @@ public class TimeJoinNode extends MultiChildProcessNode {
     return new TimeJoinNode(getPlanNodeId(), getMergeOrder());
   }
 
+  @Override
+  public PlanNode createSubNode(int subNodeId, int startIndex, int endIndex) {
+    return new TimeJoinNode(
+        new PlanNodeId(String.format("%s-%s", getPlanNodeId(), subNodeId)),
+        getMergeOrder(),
+        new ArrayList<>(children.subList(startIndex, endIndex)));
+  }
+
   @Override
   public List<String> getOutputColumnNames() {
     return children.stream()
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountGroupByLevelMergeOperatorTest.java
 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountGroupByLevelMergeOperatorTest.java
index e503c460de..9cc4311cbe 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountGroupByLevelMergeOperatorTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountGroupByLevelMergeOperatorTest.java
@@ -77,7 +77,7 @@ public class CountGroupByLevelMergeOperatorTest {
               1, planNodeId, 
CountGroupByLevelScanOperator.class.getSimpleName());
       ISchemaRegion schemaRegion = Mockito.mock(ISchemaRegion.class);
       operatorContext.setDriverContext(
-          new SchemaDriverContext(fragmentInstanceContext, schemaRegion));
+          new SchemaDriverContext(fragmentInstanceContext, schemaRegion, 0));
       CountGroupByLevelScanOperator<ITimeSeriesSchemaInfo> 
timeSeriesCountOperator1 =
           new CountGroupByLevelScanOperator<>(
               planNodeId,
@@ -152,7 +152,7 @@ public class CountGroupByLevelMergeOperatorTest {
               1, planNodeId, 
CountGroupByLevelScanOperator.class.getSimpleName());
       ISchemaRegion schemaRegion = Mockito.mock(ISchemaRegion.class);
       operatorContext.setDriverContext(
-          new SchemaDriverContext(fragmentInstanceContext, schemaRegion));
+          new SchemaDriverContext(fragmentInstanceContext, schemaRegion, 0));
       CountGroupByLevelScanOperator<ITimeSeriesSchemaInfo> 
timeSeriesCountOperator =
           new CountGroupByLevelScanOperator<>(
               planNodeId,
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaCountOperatorTest.java
 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaCountOperatorTest.java
index 2e9fa0e2db..ec10473b9b 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaCountOperatorTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaCountOperatorTest.java
@@ -73,7 +73,7 @@ public class SchemaCountOperatorTest {
           driverContext.addOperatorContext(
               1, planNodeId, SchemaCountOperator.class.getSimpleName());
       operatorContext.setDriverContext(
-          new SchemaDriverContext(fragmentInstanceContext, schemaRegion));
+          new SchemaDriverContext(fragmentInstanceContext, schemaRegion, 0));
       ISchemaSource<ISchemaInfo> schemaSource = 
Mockito.mock(ISchemaSource.class);
 
       List<ISchemaInfo> schemaInfoList = new ArrayList<>(10);
@@ -133,7 +133,7 @@ public class SchemaCountOperatorTest {
       ISchemaRegion schemaRegion = Mockito.mock(ISchemaRegion.class);
 
       operatorContext.setDriverContext(
-          new SchemaDriverContext(fragmentInstanceContext, schemaRegion));
+          new SchemaDriverContext(fragmentInstanceContext, schemaRegion, 0));
       CountGroupByLevelScanOperator<ITimeSeriesSchemaInfo> 
timeSeriesCountOperator =
           new CountGroupByLevelScanOperator<>(
               planNodeId,
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperatorTest.java
 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperatorTest.java
index a9b501324d..434d509c75 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperatorTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperatorTest.java
@@ -88,7 +88,7 @@ public class SchemaQueryScanOperatorTest {
           .thenReturn(META_SCAN_OPERATOR_TEST_SG + ".device0");
       Mockito.when(deviceSchemaInfo.isAligned()).thenReturn(false);
       operatorContext.setDriverContext(
-          new SchemaDriverContext(fragmentInstanceContext, schemaRegion));
+          new SchemaDriverContext(fragmentInstanceContext, schemaRegion, 0));
       ISchemaSource<IDeviceSchemaInfo> deviceSchemaSource =
           SchemaSourceFactory.getDeviceSchemaSource(partialPath, false, 10, 0, 
true);
       SchemaOperatorTestUtil.mockGetSchemaReader(
@@ -193,7 +193,7 @@ public class SchemaQueryScanOperatorTest {
       
Mockito.when(schemaRegion.getStorageGroupFullPath()).thenReturn(META_SCAN_OPERATOR_TEST_SG);
 
       operatorContext.setDriverContext(
-          new SchemaDriverContext(fragmentInstanceContext, schemaRegion));
+          new SchemaDriverContext(fragmentInstanceContext, schemaRegion, 0));
       ISchemaSource<ITimeSeriesSchemaInfo> timeSeriesSchemaSource =
           SchemaSourceFactory.getTimeSeriesSchemaSource(
               partialPath, false, 10, 0, null, null, false, 
Collections.emptyMap());
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverSchedulerTest.java
 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverSchedulerTest.java
index 7e3612e499..ff8d0fdebf 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverSchedulerTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverSchedulerTest.java
@@ -63,10 +63,12 @@ public class DriverSchedulerTest {
     DriverTaskId driverTaskId1 = new DriverTaskId(instanceId1, 0);
     IDriver mockDriver1 = Mockito.mock(IDriver.class);
     Mockito.when(mockDriver1.getDriverTaskId()).thenReturn(driverTaskId1);
+    Mockito.when(mockDriver1.getDependencyDriverIndex()).thenReturn(-1);
     FragmentInstanceId instanceId2 = new FragmentInstanceId(fragmentId, 
"inst-1");
     DriverTaskId driverTaskId2 = new DriverTaskId(instanceId2, 0);
     IDriver mockDriver2 = Mockito.mock(IDriver.class);
     Mockito.when(mockDriver2.getDriverTaskId()).thenReturn(driverTaskId2);
+    Mockito.when(mockDriver2.getDependencyDriverIndex()).thenReturn(-1);
     List<IDriver> instances = Arrays.asList(mockDriver1, mockDriver2);
     manager.submitDrivers(queryId, instances, QUERY_TIMEOUT_MS);
     Assert.assertTrue(manager.getBlockedTasks().isEmpty());
@@ -91,6 +93,7 @@ public class DriverSchedulerTest {
     FragmentInstanceId instanceId3 = new FragmentInstanceId(fragmentId, 
"inst-2");
     DriverTaskId driverTaskId3 = new DriverTaskId(instanceId3, 0);
     Mockito.when(mockDriver3.getDriverTaskId()).thenReturn(driverTaskId3);
+    Mockito.when(mockDriver3.getDependencyDriverIndex()).thenReturn(-1);
     manager.submitDrivers(queryId, Collections.singletonList(mockDriver3), 
QUERY_TIMEOUT_MS);
     Assert.assertTrue(manager.getBlockedTasks().isEmpty());
     Assert.assertEquals(1, manager.getQueryMap().size());
@@ -111,6 +114,7 @@ public class DriverSchedulerTest {
     DriverTaskId driverTaskId4 = new DriverTaskId(instanceId4, 0);
     IDriver mockDriver4 = Mockito.mock(IDriver.class);
     Mockito.when(mockDriver4.getDriverTaskId()).thenReturn(driverTaskId4);
+    Mockito.when(mockDriver4.getDependencyDriverIndex()).thenReturn(-1);
     manager.submitDrivers(queryId2, Collections.singletonList(mockDriver4), 
QUERY_TIMEOUT_MS);
     Assert.assertTrue(manager.getBlockedTasks().isEmpty());
     Assert.assertEquals(2, manager.getQueryMap().size());
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/PipelineBuilderTest.java
 
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/PipelineBuilderTest.java
new file mode 100644
index 0000000000..c0ee46142b
--- /dev/null
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/PipelineBuilderTest.java
@@ -0,0 +1,761 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.plan;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.AlignedPath;
+import org.apache.iotdb.commons.path.MeasurementPath;
+import org.apache.iotdb.db.engine.storagegroup.DataRegion;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
+import org.apache.iotdb.db.mpp.execution.operator.Operator;
+import 
org.apache.iotdb.db.mpp.execution.operator.source.AlignedSeriesScanOperator;
+import org.apache.iotdb.db.mpp.execution.operator.source.ExchangeOperator;
+import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
+import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
+import org.apache.iotdb.db.mpp.plan.planner.LocalExecutionPlanContext;
+import org.apache.iotdb.db.mpp.plan.planner.OperatorTreeGenerator;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceViewNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
+import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
+import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
+import static 
org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static org.junit.Assert.assertEquals;
+
+public class PipelineBuilderTest {
+
+  OperatorTreeGenerator operatorTreeGenerator = new OperatorTreeGenerator();
+
+  /**
+   * The operator structure is [TimeJoin1 - 
[SeriesScan0,SeriesScan1,SeriesScan2,SeriesScan3]].
+   *
+   * <p>The next six tests, I will test this TimeJoinOperator with different 
dop.
+   *
+   * <p>The first test will test dop = 1. Expected result is that no child 
pipelines will be
+   * divided.
+   */
+  @Test
+  public void testConsumeAllChildrenPipelineBuilder1() throws 
IllegalPathException {
+    TypeProvider typeProvider = new TypeProvider();
+    TimeJoinNode timeJoinNode = initTimeJoinNode(typeProvider, 4);
+    LocalExecutionPlanContext context = 
createLocalExecutionPlanContext(typeProvider);
+    context.setDegreeOfParallelism(1);
+
+    List<Operator> childrenOperator =
+        
operatorTreeGenerator.dealWithConsumeAllChildrenPipelineBreaker(timeJoinNode, 
context);
+    assertEquals(0, context.getPipelineNumber());
+    assertEquals(4, childrenOperator.size());
+    assertEquals(4, timeJoinNode.getChildren().size());
+    for (int i = 0; i < 4; i++) {
+      assertEquals(SeriesScanOperator.class, 
childrenOperator.get(i).getClass());
+      assertEquals(SeriesScanNode.class, 
timeJoinNode.getChildren().get(i).getClass());
+      assertEquals(
+          String.format("root.sg.d%d.s1", i),
+          timeJoinNode.getChildren().get(i).getOutputColumnNames().get(0));
+    }
+
+    // Validate the number exchange operator
+    assertEquals(0, context.getExchangeSumNum());
+  }
+
+  /**
+   * This test will test dop = 2. Expected result is two pipelines:
+   *
+   * <p>The first is: TimeJoin1 - [SeriesScan1, SeriesScan0, ExchangeOperator];
+   *
+   * <p>The second is: ExchangeOperator - TimeJoin1-1[SeriesScan2, 
SeriesScan3].
+   */
+  @Test
+  public void testConsumeAllChildrenPipelineBuilder2() throws 
IllegalPathException {
+    TypeProvider typeProvider = new TypeProvider();
+    TimeJoinNode timeJoinNode = initTimeJoinNode(typeProvider, 4);
+    LocalExecutionPlanContext context = 
createLocalExecutionPlanContext(typeProvider);
+    context.setDegreeOfParallelism(2);
+
+    List<Operator> childrenOperator =
+        
operatorTreeGenerator.dealWithConsumeAllChildrenPipelineBreaker(timeJoinNode, 
context);
+    // The number of pipeline is 1, since parent pipeline hasn't joined
+    assertEquals(1, context.getPipelineNumber());
+
+    // Validate the first pipeline
+    assertEquals(3, childrenOperator.size());
+    assertEquals(3, timeJoinNode.getChildren().size());
+    for (int i = 0; i < 2; i++) {
+      assertEquals(SeriesScanOperator.class, 
childrenOperator.get(i).getClass());
+      assertEquals(SeriesScanNode.class, 
timeJoinNode.getChildren().get(i).getClass());
+    }
+    assertEquals(ExchangeOperator.class, childrenOperator.get(2).getClass());
+
+    // Validate the changes of node structure
+    assertEquals("root.sg.d0.s1", 
timeJoinNode.getChildren().get(0).getOutputColumnNames().get(0));
+    assertEquals("root.sg.d1.s1", 
timeJoinNode.getChildren().get(1).getOutputColumnNames().get(0));
+    assertEquals(TimeJoinNode.class, 
timeJoinNode.getChildren().get(2).getClass());
+
+    // Validate the second pipeline
+    TimeJoinNode subTimeJoinNode = (TimeJoinNode) 
timeJoinNode.getChildren().get(2);
+    assertEquals(2, subTimeJoinNode.getChildren().size());
+    assertEquals(
+        "root.sg.d2.s1", 
subTimeJoinNode.getChildren().get(0).getOutputColumnNames().get(0));
+    assertEquals(
+        "root.sg.d3.s1", 
subTimeJoinNode.getChildren().get(1).getOutputColumnNames().get(0));
+
+    // Validate the number exchange operator
+    assertEquals(1, context.getExchangeSumNum());
+  }
+
+  /**
+   * This test will test dop = 3. Expected result is three pipelines:
+   *
+   * <p>The first is: TimeJoin1 - [SeriesScan0, ExchangeOperator, 
ExchangeOperator];
+   *
+   * <p>The second is: ExchangeOperator - SeriesScan1.
+   *
+   * <p>The third is: ExchangeOperator - TimeJoin1-1[SeriesScan2, SeriesScan3].
+   */
+  @Test
+  public void testConsumeAllChildrenPipelineBuilder3() throws 
IllegalPathException {
+    TypeProvider typeProvider = new TypeProvider();
+    TimeJoinNode timeJoinNode = initTimeJoinNode(typeProvider, 4);
+    LocalExecutionPlanContext context = 
createLocalExecutionPlanContext(typeProvider);
+    context.setDegreeOfParallelism(3);
+
+    List<Operator> childrenOperator =
+        
operatorTreeGenerator.dealWithConsumeAllChildrenPipelineBreaker(timeJoinNode, 
context);
+    // The number of pipeline is 2, since parent pipeline hasn't joined
+    assertEquals(2, context.getPipelineNumber());
+
+    // Validate the first pipeline
+    assertEquals(3, childrenOperator.size());
+    assertEquals(SeriesScanOperator.class, childrenOperator.get(0).getClass());
+    assertEquals(ExchangeOperator.class, childrenOperator.get(1).getClass());
+    assertEquals(ExchangeOperator.class, childrenOperator.get(2).getClass());
+
+    // Validate the changes of node structure
+    assertEquals(3, timeJoinNode.getChildren().size());
+    assertEquals("root.sg.d0.s1", 
timeJoinNode.getChildren().get(0).getOutputColumnNames().get(0));
+    assertEquals("root.sg.d1.s1", 
timeJoinNode.getChildren().get(1).getOutputColumnNames().get(0));
+    assertEquals(TimeJoinNode.class, 
timeJoinNode.getChildren().get(2).getClass());
+
+    // Validate the second pipeline
+    ExchangeOperator exchangeOperator1 = (ExchangeOperator) 
childrenOperator.get(1);
+    assertEquals("SeriesScanNode1", exchangeOperator1.getSourceId().getId());
+
+    // Validate the third pipeline
+    TimeJoinNode subTimeJoinNode = (TimeJoinNode) 
timeJoinNode.getChildren().get(2);
+    assertEquals(2, subTimeJoinNode.getChildren().size());
+    assertEquals(
+        "root.sg.d2.s1", 
subTimeJoinNode.getChildren().get(0).getOutputColumnNames().get(0));
+    assertEquals(
+        "root.sg.d3.s1", 
subTimeJoinNode.getChildren().get(1).getOutputColumnNames().get(0));
+    ExchangeOperator exchangeOperator2 = (ExchangeOperator) 
childrenOperator.get(2);
+    assertEquals(exchangeOperator2.getSourceId(), 
subTimeJoinNode.getPlanNodeId());
+
+    // Validate the number exchange operator
+    assertEquals(2, context.getExchangeSumNum());
+  }
+
+  /**
+   * This test will test dop = 4. Expected result is four pipelines:
+   *
+   * <p>The first is: TimeJoin1 - [SeriesScan0, ExchangeOperator, 
ExchangeOperator,
+   * ExchangeOperator];
+   *
+   * <p>The second is: ExchangeOperator - SeriesScan1.
+   *
+   * <p>The third is: ExchangeOperator - SeriesScan2.
+   *
+   * <p>The forth is: ExchangeOperator - SeriesScan3.
+   */
+  @Test
+  public void testConsumeAllChildrenPipelineBuilder4() throws 
IllegalPathException {
+    TypeProvider typeProvider = new TypeProvider();
+    TimeJoinNode timeJoinNode = initTimeJoinNode(typeProvider, 4);
+    LocalExecutionPlanContext context = 
createLocalExecutionPlanContext(typeProvider);
+    context.setDegreeOfParallelism(4);
+
+    List<Operator> childrenOperator =
+        
operatorTreeGenerator.dealWithConsumeAllChildrenPipelineBreaker(timeJoinNode, 
context);
+    // The number of pipeline is 3, since parent pipeline hasn't joined
+    assertEquals(3, context.getPipelineNumber());
+
+    // Validate the first pipeline
+    assertEquals(4, childrenOperator.size());
+    assertEquals(SeriesScanOperator.class, childrenOperator.get(0).getClass());
+    assertEquals(ExchangeOperator.class, childrenOperator.get(1).getClass());
+    assertEquals(ExchangeOperator.class, childrenOperator.get(2).getClass());
+    assertEquals(ExchangeOperator.class, childrenOperator.get(3).getClass());
+
+    // Validate the changes of node structure
+    assertEquals(4, timeJoinNode.getChildren().size());
+    for (int i = 0; i < 4; i++) {
+      assertEquals(SeriesScanNode.class, 
timeJoinNode.getChildren().get(i).getClass());
+      assertEquals(
+          String.format("root.sg.d%d.s1", i),
+          timeJoinNode.getChildren().get(i).getOutputColumnNames().get(0));
+    }
+
+    // Validate the second pipeline
+    ExchangeOperator exchangeOperator1 = (ExchangeOperator) 
childrenOperator.get(1);
+    assertEquals("SeriesScanNode1", exchangeOperator1.getSourceId().getId());
+
+    // Validate the third pipeline
+    ExchangeOperator exchangeOperator2 = (ExchangeOperator) 
childrenOperator.get(2);
+    assertEquals("SeriesScanNode2", exchangeOperator2.getSourceId().getId());
+
+    // Validate the forth pipeline
+    ExchangeOperator exchangeOperator3 = (ExchangeOperator) 
childrenOperator.get(3);
+    assertEquals("SeriesScanNode3", exchangeOperator3.getSourceId().getId());
+
+    // Validate the number exchange operator
+    assertEquals(3, context.getExchangeSumNum());
+  }
+
+  /**
+   * This test will test dop = 5. Expected result is five pipelines:
+   *
+   * <p>The first is: TimeJoin1 - [ExchangeOperator, ExchangeOperator, 
ExchangeOperator,
+   * ExchangeOperator];
+   *
+   * <p>The second is: ExchangeOperator - SeriesScan0.
+   *
+   * <p>The third is: ExchangeOperator - SeriesScan1.
+   *
+   * <p>The forth is: ExchangeOperator - SeriesScan2.
+   *
+   * <p>The fifth is: ExchangeOperator - SeriesScan3.
+   */
+  @Test
+  public void testConsumeAllChildrenPipelineBuilder5() throws 
IllegalPathException {
+    TypeProvider typeProvider = new TypeProvider();
+    TimeJoinNode timeJoinNode = initTimeJoinNode(typeProvider, 4);
+    LocalExecutionPlanContext context = 
createLocalExecutionPlanContext(typeProvider);
+    context.setDegreeOfParallelism(5);
+
+    List<Operator> childrenOperator =
+        
operatorTreeGenerator.dealWithConsumeAllChildrenPipelineBreaker(timeJoinNode, 
context);
+    // The number of pipeline is 4, since parent pipeline hasn't joined
+    assertEquals(4, context.getPipelineNumber());
+
+    // Validate the first pipeline
+    assertEquals(4, childrenOperator.size());
+    for (int i = 0; i < 4; i++) {
+      assertEquals(ExchangeOperator.class, childrenOperator.get(i).getClass());
+    }
+
+    // Validate the changes of node structure
+    assertEquals(4, timeJoinNode.getChildren().size());
+    for (int i = 0; i < 4; i++) {
+      assertEquals(SeriesScanNode.class, 
timeJoinNode.getChildren().get(i).getClass());
+      assertEquals(
+          String.format("root.sg.d%d.s1", i),
+          timeJoinNode.getChildren().get(i).getOutputColumnNames().get(0));
+    }
+
+    // Validate the second pipeline
+    ExchangeOperator exchangeOperator1 = (ExchangeOperator) 
childrenOperator.get(0);
+    assertEquals("SeriesScanNode0", exchangeOperator1.getSourceId().getId());
+
+    // Validate the third pipeline
+    ExchangeOperator exchangeOperator2 = (ExchangeOperator) 
childrenOperator.get(1);
+    assertEquals("SeriesScanNode1", exchangeOperator2.getSourceId().getId());
+
+    // Validate the forth pipeline
+    ExchangeOperator exchangeOperator3 = (ExchangeOperator) 
childrenOperator.get(2);
+    assertEquals("SeriesScanNode2", exchangeOperator3.getSourceId().getId());
+
+    // Validate the fifth pipeline
+    ExchangeOperator exchangeOperator4 = (ExchangeOperator) 
childrenOperator.get(3);
+    assertEquals("SeriesScanNode3", exchangeOperator4.getSourceId().getId());
+
+    // Validate the number exchange operator
+    assertEquals(4, context.getExchangeSumNum());
+  }
+
+  /**
+   * This test will test dop = 6. Expected result is still five pipelines:
+   *
+   * <p>The first is: TimeJoin1 - [ExchangeOperator, ExchangeOperator, 
ExchangeOperator,
+   * ExchangeOperator];
+   *
+   * <p>The second is: ExchangeOperator - SeriesScan0.
+   *
+   * <p>The third is: ExchangeOperator - SeriesScan1.
+   *
+   * <p>The forth is: ExchangeOperator - SeriesScan2.
+   *
+   * <p>The fifth is: ExchangeOperator - SeriesScan3.
+   */
+  @Test
+  public void testConsumeAllChildrenPipelineBuilder6() throws 
IllegalPathException {
+    TypeProvider typeProvider = new TypeProvider();
+    TimeJoinNode timeJoinNode = initTimeJoinNode(typeProvider, 4);
+    LocalExecutionPlanContext context = 
createLocalExecutionPlanContext(typeProvider);
+    context.setDegreeOfParallelism(6);
+
+    List<Operator> childrenOperator =
+        
operatorTreeGenerator.dealWithConsumeAllChildrenPipelineBreaker(timeJoinNode, 
context);
+    // The number of pipeline is 4, since parent pipeline hasn't joined
+    assertEquals(4, context.getPipelineNumber());
+
+    // Validate the first pipeline
+    assertEquals(4, childrenOperator.size());
+    for (int i = 0; i < 4; i++) {
+      assertEquals(ExchangeOperator.class, childrenOperator.get(i).getClass());
+    }
+
+    // Validate the changes of node structure
+    assertEquals(4, timeJoinNode.getChildren().size());
+    for (int i = 0; i < 4; i++) {
+      assertEquals(SeriesScanNode.class, 
timeJoinNode.getChildren().get(i).getClass());
+      assertEquals(
+          String.format("root.sg.d%d.s1", i),
+          timeJoinNode.getChildren().get(i).getOutputColumnNames().get(0));
+    }
+
+    // Validate the second pipeline
+    ExchangeOperator exchangeOperator1 = (ExchangeOperator) 
childrenOperator.get(0);
+    assertEquals("SeriesScanNode0", exchangeOperator1.getSourceId().getId());
+
+    // Validate the third pipeline
+    ExchangeOperator exchangeOperator2 = (ExchangeOperator) 
childrenOperator.get(1);
+    assertEquals("SeriesScanNode1", exchangeOperator2.getSourceId().getId());
+
+    // Validate the forth pipeline
+    ExchangeOperator exchangeOperator3 = (ExchangeOperator) 
childrenOperator.get(2);
+    assertEquals("SeriesScanNode2", exchangeOperator3.getSourceId().getId());
+
+    // Validate the fifth pipeline
+    ExchangeOperator exchangeOperator4 = (ExchangeOperator) 
childrenOperator.get(3);
+    assertEquals("SeriesScanNode3", exchangeOperator4.getSourceId().getId());
+
+    // Validate the number exchange operator
+    assertEquals(4, context.getExchangeSumNum());
+  }
+
+  /**
+   * The operator structure is [DeviceView - 
[SeriesScan0,SeriesScan1,SeriesScan2,SeriesScan3]].
+   *
+   * <p>The next six tests, I will test this DeviceViewOperator with different 
dop.
+   *
+   * <p>The first test will test dop = 1. Expected result is that no child 
pipelines will be
+   * divided.
+   */
+  @Test
+  public void testConsumeOneByOneChildrenPipelineBuilder1() throws 
IllegalPathException {
+    TypeProvider typeProvider = new TypeProvider();
+    DeviceViewNode deviceViewNode = initDeviceViewNode(typeProvider, 4);
+    LocalExecutionPlanContext context = 
createLocalExecutionPlanContext(typeProvider);
+    context.setDegreeOfParallelism(1);
+
+    List<Operator> childrenOperator =
+        
operatorTreeGenerator.dealWithConsumeChildrenOneByOneNode(deviceViewNode, 
context);
+    assertEquals(0, context.getPipelineNumber());
+    assertEquals(4, childrenOperator.size());
+    for (int i = 0; i < 4; i++) {
+      assertEquals(AlignedSeriesScanOperator.class, 
childrenOperator.get(i).getClass());
+      assertEquals(
+          String.format("root.sg.d%d.s1", i),
+          deviceViewNode.getChildren().get(i).getOutputColumnNames().get(0));
+    }
+
+    // Validate the number exchange operator
+    assertEquals(0, context.getExchangeSumNum());
+  }
+
+  /**
+   * This test will test dop = 2. Expected result is five pipelines with 
dependency:
+   *
+   * <p>The first is: DeviceView - [ExchangeOperator, ExchangeOperator, 
ExchangeOperator,
+   * ExchangeOperator];
+   *
+   * <p>The second is: ExchangeOperator - SeriesScan0.
+   *
+   * <p>The third is: ExchangeOperator - SeriesScan1, which has dependency 
second pipeline.
+   *
+   * <p>The forth is: ExchangeOperator - SeriesScan2, which has dependency 
third pipeline.
+   *
+   * <p>The fifth is: ExchangeOperator - SeriesScan3, which has dependency 
forth pipeline.
+   */
+  @Test
+  public void testConsumeOneByOneChildrenPipelineBuilder2() throws 
IllegalPathException {
+    TypeProvider typeProvider = new TypeProvider();
+    DeviceViewNode deviceViewNode = initDeviceViewNode(typeProvider, 4);
+    LocalExecutionPlanContext context = 
createLocalExecutionPlanContext(typeProvider);
+    context.setDegreeOfParallelism(2);
+
+    List<Operator> childrenOperator =
+        
operatorTreeGenerator.dealWithConsumeChildrenOneByOneNode(deviceViewNode, 
context);
+    // The number of pipeline is 4, since parent pipeline hasn't joined
+    assertEquals(4, context.getPipelineNumber());
+
+    // Validate the first pipeline
+    assertEquals(4, childrenOperator.size());
+    for (int i = 0; i < 4; i++) {
+      assertEquals(ExchangeOperator.class, childrenOperator.get(i).getClass());
+    }
+
+    // Validate the second pipeline
+    ExchangeOperator exchangeOperator1 = (ExchangeOperator) 
childrenOperator.get(0);
+    assertEquals("AlignedSeriesScanNode0", 
exchangeOperator1.getSourceId().getId());
+    assertEquals(-1, 
context.getPipelineDriverFactories().get(0).getDependencyPipelineIndex());
+
+    // Validate the third pipeline
+    ExchangeOperator exchangeOperator2 = (ExchangeOperator) 
childrenOperator.get(1);
+    assertEquals("AlignedSeriesScanNode1", 
exchangeOperator2.getSourceId().getId());
+    assertEquals(0, 
context.getPipelineDriverFactories().get(1).getDependencyPipelineIndex());
+
+    // Validate the forth pipeline
+    ExchangeOperator exchangeOperator3 = (ExchangeOperator) 
childrenOperator.get(2);
+    assertEquals("AlignedSeriesScanNode2", 
exchangeOperator3.getSourceId().getId());
+    assertEquals(1, 
context.getPipelineDriverFactories().get(2).getDependencyPipelineIndex());
+
+    // Validate the fifth pipeline
+    ExchangeOperator exchangeOperator4 = (ExchangeOperator) 
childrenOperator.get(3);
+    assertEquals("AlignedSeriesScanNode3", 
exchangeOperator4.getSourceId().getId());
+    assertEquals(2, 
context.getPipelineDriverFactories().get(3).getDependencyPipelineIndex());
+
+    // Validate the number exchange operator
+    assertEquals(1, context.getExchangeSumNum());
+  }
+
+  /**
+   * This test will test dop = 3. Expected result is five pipelines with 
dependency:
+   *
+   * <p>The first is: DeviceView - [ExchangeOperator, ExchangeOperator, 
ExchangeOperator,
+   * ExchangeOperator];
+   *
+   * <p>The second is: ExchangeOperator - SeriesScan0.
+   *
+   * <p>The third is: ExchangeOperator - SeriesScan1.
+   *
+   * <p>The forth is: ExchangeOperator - SeriesScan2, which has dependency 
second pipeline.
+   *
+   * <p>The fifth is: ExchangeOperator - SeriesScan3, which has dependency 
third pipeline.
+   */
+  @Test
+  public void testConsumeOneByOneChildrenPipelineBuilder3() throws 
IllegalPathException {
+    TypeProvider typeProvider = new TypeProvider();
+    DeviceViewNode deviceViewNode = initDeviceViewNode(typeProvider, 4);
+    LocalExecutionPlanContext context = 
createLocalExecutionPlanContext(typeProvider);
+    context.setDegreeOfParallelism(3);
+
+    List<Operator> childrenOperator =
+        
operatorTreeGenerator.dealWithConsumeChildrenOneByOneNode(deviceViewNode, 
context);
+    // The number of pipeline is 4, since parent pipeline hasn't joined
+    assertEquals(4, context.getPipelineNumber());
+
+    // Validate the first pipeline
+    assertEquals(4, childrenOperator.size());
+    for (int i = 0; i < 4; i++) {
+      assertEquals(ExchangeOperator.class, childrenOperator.get(i).getClass());
+    }
+
+    // Validate the second pipeline
+    ExchangeOperator exchangeOperator1 = (ExchangeOperator) 
childrenOperator.get(0);
+    assertEquals("AlignedSeriesScanNode0", 
exchangeOperator1.getSourceId().getId());
+    assertEquals(-1, 
context.getPipelineDriverFactories().get(0).getDependencyPipelineIndex());
+
+    // Validate the third pipeline
+    ExchangeOperator exchangeOperator2 = (ExchangeOperator) 
childrenOperator.get(1);
+    assertEquals("AlignedSeriesScanNode1", 
exchangeOperator2.getSourceId().getId());
+    assertEquals(-1, 
context.getPipelineDriverFactories().get(1).getDependencyPipelineIndex());
+
+    // Validate the forth pipeline
+    ExchangeOperator exchangeOperator3 = (ExchangeOperator) 
childrenOperator.get(2);
+    assertEquals("AlignedSeriesScanNode2", 
exchangeOperator3.getSourceId().getId());
+    assertEquals(0, 
context.getPipelineDriverFactories().get(2).getDependencyPipelineIndex());
+
+    // Validate the fifth pipeline
+    ExchangeOperator exchangeOperator4 = (ExchangeOperator) 
childrenOperator.get(3);
+    assertEquals("AlignedSeriesScanNode3", 
exchangeOperator4.getSourceId().getId());
+    assertEquals(1, 
context.getPipelineDriverFactories().get(3).getDependencyPipelineIndex());
+
+    // Validate the number exchange operator
+    assertEquals(2, context.getExchangeSumNum());
+  }
+
+  /**
+   * This test will test dop = 4. Expected result is five pipelines with 
dependency:
+   *
+   * <p>The first is: DeviceView - [ExchangeOperator, ExchangeOperator, 
ExchangeOperator,
+   * ExchangeOperator];
+   *
+   * <p>The second is: ExchangeOperator - SeriesScan0.
+   *
+   * <p>The third is: ExchangeOperator - SeriesScan1.
+   *
+   * <p>The forth is: ExchangeOperator - SeriesScan2.
+   *
+   * <p>The fifth is: ExchangeOperator - SeriesScan3, which has dependency 
second pipeline.
+   */
+  @Test
+  public void testConsumeOneByOneChildrenPipelineBuilder4() throws 
IllegalPathException {
+    TypeProvider typeProvider = new TypeProvider();
+    DeviceViewNode deviceViewNode = initDeviceViewNode(typeProvider, 4);
+    LocalExecutionPlanContext context = 
createLocalExecutionPlanContext(typeProvider);
+    context.setDegreeOfParallelism(4);
+
+    List<Operator> childrenOperator =
+        
operatorTreeGenerator.dealWithConsumeChildrenOneByOneNode(deviceViewNode, 
context);
+    // The number of pipeline is 4, since parent pipeline hasn't joined
+    assertEquals(4, context.getPipelineNumber());
+
+    // Validate the first pipeline
+    assertEquals(4, childrenOperator.size());
+    for (int i = 0; i < 4; i++) {
+      assertEquals(ExchangeOperator.class, childrenOperator.get(i).getClass());
+    }
+
+    // Validate the second pipeline
+    ExchangeOperator exchangeOperator1 = (ExchangeOperator) 
childrenOperator.get(0);
+    assertEquals("AlignedSeriesScanNode0", 
exchangeOperator1.getSourceId().getId());
+    assertEquals(-1, 
context.getPipelineDriverFactories().get(0).getDependencyPipelineIndex());
+
+    // Validate the third pipeline
+    ExchangeOperator exchangeOperator2 = (ExchangeOperator) 
childrenOperator.get(1);
+    assertEquals("AlignedSeriesScanNode1", 
exchangeOperator2.getSourceId().getId());
+    assertEquals(-1, 
context.getPipelineDriverFactories().get(1).getDependencyPipelineIndex());
+
+    // Validate the forth pipeline
+    ExchangeOperator exchangeOperator3 = (ExchangeOperator) 
childrenOperator.get(2);
+    assertEquals("AlignedSeriesScanNode2", 
exchangeOperator3.getSourceId().getId());
+    assertEquals(-1, 
context.getPipelineDriverFactories().get(2).getDependencyPipelineIndex());
+
+    // Validate the fifth pipeline
+    ExchangeOperator exchangeOperator4 = (ExchangeOperator) 
childrenOperator.get(3);
+    assertEquals("AlignedSeriesScanNode3", 
exchangeOperator4.getSourceId().getId());
+    assertEquals(0, 
context.getPipelineDriverFactories().get(3).getDependencyPipelineIndex());
+
+    // Validate the number exchange operator
+    assertEquals(3, context.getExchangeSumNum());
+  }
+
+  /**
+   * This test will test dop = 5. Expected result is five pipelines without 
dependency:
+   *
+   * <p>The first is: DeviceView - [ExchangeOperator, ExchangeOperator, 
ExchangeOperator,
+   * ExchangeOperator];
+   *
+   * <p>The second is: ExchangeOperator - SeriesScan0.
+   *
+   * <p>The third is: ExchangeOperator - SeriesScan1.
+   *
+   * <p>The forth is: ExchangeOperator - SeriesScan2.
+   *
+   * <p>The fifth is: ExchangeOperator - SeriesScan3.
+   */
+  @Test
+  public void testConsumeOneByOneChildrenPipelineBuilder5() throws 
IllegalPathException {
+    TypeProvider typeProvider = new TypeProvider();
+    DeviceViewNode deviceViewNode = initDeviceViewNode(typeProvider, 4);
+    LocalExecutionPlanContext context = 
createLocalExecutionPlanContext(typeProvider);
+    context.setDegreeOfParallelism(5);
+
+    List<Operator> childrenOperator =
+        
operatorTreeGenerator.dealWithConsumeChildrenOneByOneNode(deviceViewNode, 
context);
+    // The number of pipeline is 4, since parent pipeline hasn't joined
+    assertEquals(4, context.getPipelineNumber());
+
+    // Validate the first pipeline
+    assertEquals(4, childrenOperator.size());
+    for (int i = 0; i < 4; i++) {
+      assertEquals(ExchangeOperator.class, childrenOperator.get(i).getClass());
+    }
+
+    // Validate the second pipeline
+    ExchangeOperator exchangeOperator1 = (ExchangeOperator) 
childrenOperator.get(0);
+    assertEquals("AlignedSeriesScanNode0", 
exchangeOperator1.getSourceId().getId());
+    assertEquals(-1, 
context.getPipelineDriverFactories().get(0).getDependencyPipelineIndex());
+
+    // Validate the third pipeline
+    ExchangeOperator exchangeOperator2 = (ExchangeOperator) 
childrenOperator.get(1);
+    assertEquals("AlignedSeriesScanNode1", 
exchangeOperator2.getSourceId().getId());
+    assertEquals(-1, 
context.getPipelineDriverFactories().get(1).getDependencyPipelineIndex());
+
+    // Validate the forth pipeline
+    ExchangeOperator exchangeOperator3 = (ExchangeOperator) 
childrenOperator.get(2);
+    assertEquals("AlignedSeriesScanNode2", 
exchangeOperator3.getSourceId().getId());
+    assertEquals(-1, 
context.getPipelineDriverFactories().get(2).getDependencyPipelineIndex());
+
+    // Validate the fifth pipeline
+    ExchangeOperator exchangeOperator4 = (ExchangeOperator) 
childrenOperator.get(3);
+    assertEquals("AlignedSeriesScanNode3", 
exchangeOperator4.getSourceId().getId());
+    assertEquals(-1, 
context.getPipelineDriverFactories().get(3).getDependencyPipelineIndex());
+
+    // Validate the number exchange operator
+    assertEquals(4, context.getExchangeSumNum());
+  }
+
+  /**
+   * This test will test dop = 5. Expected result is five pipelines without 
dependency:
+   *
+   * <p>The first is: DeviceView - [ExchangeOperator, ExchangeOperator, 
ExchangeOperator,
+   * ExchangeOperator];
+   *
+   * <p>The second is: ExchangeOperator - SeriesScan0.
+   *
+   * <p>The third is: ExchangeOperator - SeriesScan1.
+   *
+   * <p>The forth is: ExchangeOperator - SeriesScan2.
+   *
+   * <p>The fifth is: ExchangeOperator - SeriesScan3.
+   */
+  @Test
+  public void testConsumeOneByOneChildrenPipelineBuilder6() throws 
IllegalPathException {
+    TypeProvider typeProvider = new TypeProvider();
+    DeviceViewNode deviceViewNode = initDeviceViewNode(typeProvider, 4);
+    LocalExecutionPlanContext context = 
createLocalExecutionPlanContext(typeProvider);
+    context.setDegreeOfParallelism(5);
+
+    List<Operator> childrenOperator =
+        
operatorTreeGenerator.dealWithConsumeChildrenOneByOneNode(deviceViewNode, 
context);
+    // The number of pipeline is 4, since parent pipeline hasn't joined
+    assertEquals(4, context.getPipelineNumber());
+
+    // Validate the first pipeline
+    assertEquals(4, childrenOperator.size());
+    for (int i = 0; i < 4; i++) {
+      assertEquals(ExchangeOperator.class, childrenOperator.get(i).getClass());
+    }
+
+    // Validate the second pipeline
+    ExchangeOperator exchangeOperator1 = (ExchangeOperator) 
childrenOperator.get(0);
+    assertEquals("AlignedSeriesScanNode0", 
exchangeOperator1.getSourceId().getId());
+    assertEquals(-1, 
context.getPipelineDriverFactories().get(0).getDependencyPipelineIndex());
+
+    // Validate the third pipeline
+    ExchangeOperator exchangeOperator2 = (ExchangeOperator) 
childrenOperator.get(1);
+    assertEquals("AlignedSeriesScanNode1", 
exchangeOperator2.getSourceId().getId());
+    assertEquals(-1, 
context.getPipelineDriverFactories().get(1).getDependencyPipelineIndex());
+
+    // Validate the forth pipeline
+    ExchangeOperator exchangeOperator3 = (ExchangeOperator) 
childrenOperator.get(2);
+    assertEquals("AlignedSeriesScanNode2", 
exchangeOperator3.getSourceId().getId());
+    assertEquals(-1, 
context.getPipelineDriverFactories().get(2).getDependencyPipelineIndex());
+
+    // Validate the fifth pipeline
+    ExchangeOperator exchangeOperator4 = (ExchangeOperator) 
childrenOperator.get(3);
+    assertEquals("AlignedSeriesScanNode3", 
exchangeOperator4.getSourceId().getId());
+    assertEquals(-1, 
context.getPipelineDriverFactories().get(3).getDependencyPipelineIndex());
+
+    // Validate the number exchange operator
+    assertEquals(4, context.getExchangeSumNum());
+  }
+
+  @Test
+  public void testGetChildNumInEachPipeline() {
+    List<PlanNode> allChildren = new ArrayList<>();
+    allChildren.add(new SeriesScanNode(new PlanNodeId("localNode1"), null));
+    allChildren.add(new ExchangeNode(new PlanNodeId("remoteNode1")));
+    allChildren.add(new SeriesScanNode(new PlanNodeId("localNode2"), null));
+
+    int[] childNumInEachPipeline =
+        operatorTreeGenerator.getChildNumInEachPipeline(allChildren, 2, 3);
+    assertEquals(2, childNumInEachPipeline.length);
+    assertEquals(2, childNumInEachPipeline[0]);
+    assertEquals(1, childNumInEachPipeline[1]);
+
+    allChildren.add(new SeriesScanNode(new PlanNodeId("localNode3"), null));
+    allChildren.add(new SeriesScanNode(new PlanNodeId("localNode4"), null));
+    allChildren.add(new ExchangeNode(new PlanNodeId("remoteNode2")));
+    allChildren.add(new ExchangeNode(new PlanNodeId("remoteNode3")));
+    allChildren.add(new SeriesScanNode(new PlanNodeId("localNode5"), null));
+    allChildren.add(new ExchangeNode(new PlanNodeId("remoteNode4")));
+    childNumInEachPipeline = 
operatorTreeGenerator.getChildNumInEachPipeline(allChildren, 5, 3);
+    assertEquals(3, childNumInEachPipeline.length);
+    assertEquals(2, childNumInEachPipeline[0]);
+    assertEquals(2, childNumInEachPipeline[1]);
+    assertEquals(5, childNumInEachPipeline[2]);
+  }
+
+  private LocalExecutionPlanContext 
createLocalExecutionPlanContext(TypeProvider typeProvider) {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, 
"test-instance-notification");
+
+    QueryId queryId = new QueryId("stub_query");
+    FragmentInstanceId instanceId =
+        new FragmentInstanceId(new PlanFragmentId(queryId, 0), 
"stub-instance");
+    FragmentInstanceStateMachine stateMachine =
+        new FragmentInstanceStateMachine(instanceId, 
instanceNotificationExecutor);
+    DataRegion dataRegion = Mockito.mock(DataRegion.class);
+    FragmentInstanceContext fragmentInstanceContext =
+        createFragmentInstanceContext(instanceId, stateMachine);
+    fragmentInstanceContext.setDataRegion(dataRegion);
+
+    return new LocalExecutionPlanContext(typeProvider, 
fragmentInstanceContext);
+  }
+
+  /**
+   * This method will init a timeJoinNode with @childNum seriesScanNode as 
children.
+   *
+   * @param childNum the number of children
+   * @return a timeJoinNode with @childNum seriesScanNode as children
+   */
+  private TimeJoinNode initTimeJoinNode(TypeProvider typeProvider, int 
childNum)
+      throws IllegalPathException {
+    TimeJoinNode timeJoinNode = new TimeJoinNode(new 
PlanNodeId("TimeJoinNode"), Ordering.ASC);
+    for (int i = 0; i < childNum; i++) {
+      SeriesScanNode seriesScanNode =
+          new SeriesScanNode(
+              new PlanNodeId(String.format("SeriesScanNode%d", i)),
+              new MeasurementPath(String.format("root.sg.d%d.s1", i), 
TSDataType.INT32));
+      typeProvider.setType(seriesScanNode.getSeriesPath().toString(), 
TSDataType.INT32);
+      timeJoinNode.addChild(seriesScanNode);
+    }
+    return timeJoinNode;
+  }
+
+  /**
+   * This method will init a DeviceViewNode with @childNum 
alignedSeriesScanNode as children.
+   *
+   * @param childNum the number of children
+   * @return a DeviceViewNode with @childNum alignedSeriesScanNode as children
+   */
+  private DeviceViewNode initDeviceViewNode(TypeProvider typeProvider, int 
childNum)
+      throws IllegalPathException {
+    DeviceViewNode deviceViewNode =
+        new DeviceViewNode(new PlanNodeId("DeviceViewNode"), null, null, null);
+    for (int i = 0; i < childNum; i++) {
+      AlignedSeriesScanNode alignedSeriesScanNode =
+          new AlignedSeriesScanNode(
+              new PlanNodeId(String.format("AlignedSeriesScanNode%d", i)),
+              new AlignedPath(String.format("root.sg.d%d", i), "s1"));
+      deviceViewNode.addChild(alignedSeriesScanNode);
+    }
+    return deviceViewNode;
+  }
+}

Reply via email to