This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new eaee1d8dbf [IOTDB-4005] Add degree of parallelism to pipeline engine
eaee1d8dbf is described below
commit eaee1d8dbf832d96098f699d3143bf4a3d917dd7
Author: Xiangwei Wei <[email protected]>
AuthorDate: Tue Feb 14 20:43:07 2023 +0800
[IOTDB-4005] Add degree of parallelism to pipeline engine
---
.../resources/conf/iotdb-common.properties | 4 +
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 10 +
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 9 +
.../iotdb/db/mpp/execution/driver/Driver.java | 8 +
.../db/mpp/execution/driver/DriverContext.java | 9 +
.../iotdb/db/mpp/execution/driver/IDriver.java | 2 +
.../mpp/execution/driver/SchemaDriverContext.java | 5 +-
.../fragment/FragmentInstanceManager.java | 14 +-
.../process/join/RowBasedTimeJoinOperator.java | 6 +
.../db/mpp/execution/schedule/DriverScheduler.java | 126 ++--
.../db/mpp/execution/schedule/task/DriverTask.java | 21 +
.../plan/planner/LocalExecutionPlanContext.java | 20 +-
.../db/mpp/plan/planner/LocalExecutionPlanner.java | 17 +-
.../db/mpp/plan/planner/OperatorTreeGenerator.java | 207 +++++-
.../db/mpp/plan/planner/PipelineDriverFactory.java | 23 +-
.../plan/planner/distribution/SourceRewriter.java | 16 +-
.../db/mpp/plan/planner/plan/node/PlanNode.java | 13 +
.../planner/plan/node/process/AggregationNode.java | 24 +-
.../planner/plan/node/process/DeviceMergeNode.java | 6 +
.../plan/node/process/GroupByLevelNode.java | 7 +
.../planner/plan/node/process/GroupByTagNode.java | 7 +
.../plan/node/process/HorizontallyConcatNode.java | 11 +
.../planner/plan/node/process/MergeSortNode.java | 19 +
.../planner/plan/node/process/TimeJoinNode.java | 8 +
.../schema/CountGroupByLevelMergeOperatorTest.java | 4 +-
.../operator/schema/SchemaCountOperatorTest.java | 4 +-
.../schema/SchemaQueryScanOperatorTest.java | 4 +-
.../execution/schedule/DriverSchedulerTest.java | 4 +
.../db/mpp/plan/plan/PipelineBuilderTest.java | 761 +++++++++++++++++++++
29 files changed, 1257 insertions(+), 112 deletions(-)
diff --git a/node-commons/src/assembly/resources/conf/iotdb-common.properties
b/node-commons/src/assembly/resources/conf/iotdb-common.properties
index 1ca8968dd3..9f9054aef7 100644
--- a/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++ b/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -419,6 +419,10 @@ cluster_name=defaultCluster
# Datatype: int
# query_thread_count=0
+# How many pipeline drivers will be created for one fragment instance. When <=
0, use CPU core number / 2.
+# Datatype: int
+# degree_of_query_parallelism=0
+
# The amount of data iterate each time in server (the number of data strips,
that is, the number of different timestamps.)
# Datatype: int
# batch_size=100000
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 32f68e5002..746d26bb03 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -318,6 +318,8 @@ public class IoTDBConfig {
/** How many threads can concurrently execute query statement. When <= 0,
use CPU core number. */
private int queryThreadCount = Runtime.getRuntime().availableProcessors();
+ private int degreeOfParallelism = Runtime.getRuntime().availableProcessors()
/ 2;
+
/** How many queries can be concurrently executed. When <= 0, use 1000. */
private int maxAllowedConcurrentQueries = 1000;
@@ -1463,6 +1465,14 @@ public class IoTDBConfig {
this.queryThreadCount = queryThreadCount;
}
+ public void setDegreeOfParallelism(int degreeOfParallelism) {
+ this.degreeOfParallelism = degreeOfParallelism;
+ }
+
+ public int getDegreeOfParallelism() {
+ return degreeOfParallelism;
+ }
+
public int getMaxAllowedConcurrentQueries() {
return maxAllowedConcurrentQueries;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 1348f6861a..8c5b060644 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -541,6 +541,15 @@ public class IoTDBDescriptor {
conf.setQueryThreadCount(Runtime.getRuntime().availableProcessors());
}
+ conf.setDegreeOfParallelism(
+ Integer.parseInt(
+ properties.getProperty(
+ "degree_of_query_parallelism",
Integer.toString(conf.getDegreeOfParallelism()))));
+
+ if (conf.getDegreeOfParallelism() <= 0) {
+ conf.setDegreeOfParallelism(Runtime.getRuntime().availableProcessors() /
2);
+ }
+
conf.setMaxAllowedConcurrentQueries(
Integer.parseInt(
properties.getProperty(
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
index 14554bb8fb..a1ab1c18db 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
@@ -91,6 +91,10 @@ public abstract class Driver implements IDriver {
return result.orElseGet(() -> state.get() != State.ALIVE ||
driverContext.isDone());
}
+ public DriverContext getDriverContext() {
+ return driverContext;
+ }
+
/**
* do initialization
*
@@ -101,6 +105,10 @@ public abstract class Driver implements IDriver {
/** release resource this driver used */
protected abstract void releaseResource();
+ public int getDependencyDriverIndex() {
+ return driverContext.getDependencyDriverIndex();
+ }
+
@Override
public ListenableFuture<?> processFor(Duration duration) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java
index 95a5861978..ea369d8fad 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java
@@ -39,6 +39,7 @@ public class DriverContext {
private final List<OperatorContext> operatorContexts = new ArrayList<>();
private ISinkHandle sinkHandle;
private final RuleBasedTimeSliceAllocator timeSliceAllocator;
+ private int dependencyDriverIndex = -1;
private final AtomicBoolean finished = new AtomicBoolean();
@@ -69,6 +70,14 @@ public class DriverContext {
throw new UnsupportedOperationException();
}
+ public void setDependencyDriverIndex(int dependencyDriverIndex) {
+ this.dependencyDriverIndex = dependencyDriverIndex;
+ }
+
+ public int getDependencyDriverIndex() {
+ return dependencyDriverIndex;
+ }
+
public void setSinkHandle(ISinkHandle sinkHandle) {
this.sinkHandle = sinkHandle;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/IDriver.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/IDriver.java
index 8a55d098de..ff55d5456c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/IDriver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/IDriver.java
@@ -71,4 +71,6 @@ public interface IDriver {
/** @return get SinkHandle of current IDriver */
ISinkHandle getSinkHandle();
+
+ int getDependencyDriverIndex();
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/SchemaDriverContext.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/SchemaDriverContext.java
index 495a2779ee..b93e410127 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/SchemaDriverContext.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/SchemaDriverContext.java
@@ -27,9 +27,8 @@ public class SchemaDriverContext extends DriverContext {
private final ISchemaRegion schemaRegion;
public SchemaDriverContext(
- FragmentInstanceContext fragmentInstanceContext, ISchemaRegion
schemaRegion) {
- // TODO whether schema driver need to be split to pipeline, default 0 now
- super(fragmentInstanceContext, 0);
+ FragmentInstanceContext fragmentInstanceContext, ISchemaRegion
schemaRegion, int pipelineId) {
+ super(fragmentInstanceContext, pipelineId);
this.schemaRegion = schemaRegion;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
index efa8db0bf1..794174b939 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
@@ -25,7 +25,6 @@ import
org.apache.iotdb.db.engine.storagegroup.IDataRegionForQuery;
import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.execution.driver.IDriver;
-import org.apache.iotdb.db.mpp.execution.driver.SchemaDriver;
import org.apache.iotdb.db.mpp.execution.exchange.ISinkHandle;
import org.apache.iotdb.db.mpp.execution.schedule.DriverScheduler;
import org.apache.iotdb.db.mpp.execution.schedule.IDriverScheduler;
@@ -41,7 +40,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -195,14 +193,20 @@ public class FragmentInstanceManager {
fragmentInstanceId, stateMachine,
instance.getSessionInfo()));
try {
- SchemaDriver driver =
+ List<PipelineDriverFactory> driverFactories =
planner.plan(instance.getFragment().getPlanNodeTree(),
context, schemaRegion);
+
+ List<IDriver> drivers = new ArrayList<>();
+ driverFactories.forEach(factory ->
drivers.add(factory.createDriver()));
+ // get the sinkHandle of last driver
+ ISinkHandle sinkHandle = drivers.get(drivers.size() -
1).getSinkHandle();
+
return createFragmentInstanceExecution(
scheduler,
instanceId,
context,
- Collections.singletonList(driver),
- driver.getSinkHandle(),
+ drivers,
+ sinkHandle,
stateMachine,
failedInstances,
instance.getTimeOut());
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java
index 50cccb9573..79f9905ba8 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.mpp.execution.operator.process.join;
+import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
import
org.apache.iotdb.db.mpp.execution.operator.process.AbstractProcessOperator;
@@ -290,6 +291,11 @@ public class RowBasedTimeJoinOperator extends
AbstractProcessOperator {
timeSelector.add(inputTsBlocks[index].getTimeByIndex(inputIndex[index]));
}
+ @TestOnly
+ public List<Operator> getChildren() {
+ return children;
+ }
+
/**
* If the tsBlock of columnIndex is null or has no more data in the tsBlock,
return true; else
* return false;
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
index 7188717d20..5179a57b3a 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.commons.exception.StartupException;
import org.apache.iotdb.commons.service.IService;
import org.apache.iotdb.commons.service.ServiceType;
import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.common.QueryId;
@@ -39,6 +40,8 @@ import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
import org.apache.iotdb.db.utils.SetThreadName;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,7 +55,6 @@ import java.util.OptionalInt;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
import static
org.apache.iotdb.db.mpp.metric.DriverSchedulerMetricSet.BLOCK_QUEUED_TIME;
import static
org.apache.iotdb.db.mpp.metric.DriverSchedulerMetricSet.READY_QUEUED_TIME;
@@ -62,6 +64,7 @@ public class DriverScheduler implements IDriverScheduler,
IService {
private static final Logger logger =
LoggerFactory.getLogger(DriverScheduler.class);
private static final QueryMetricsManager QUERY_METRICS =
QueryMetricsManager.getInstance();
+ private static final IoTDBConfig config =
IoTDBDescriptor.getInstance().getConfig();
private static final double LEVEL_TIME_MULTIPLIER = 2;
@@ -78,13 +81,10 @@ public class DriverScheduler implements IDriverScheduler,
IService {
private final AtomicInteger nextDriverTaskHandleId = new AtomicInteger(0);
private IMPPDataExchangeManager blockManager;
- private static final int QUERY_MAX_CAPACITY =
-
IoTDBDescriptor.getInstance().getConfig().getMaxAllowedConcurrentQueries();
- private static final int WORKER_THREAD_NUM =
- IoTDBDescriptor.getInstance().getConfig().getQueryThreadCount();
- private static final int TASK_MAX_CAPACITY = QUERY_MAX_CAPACITY *
WORKER_THREAD_NUM * 2;
- private static final long QUERY_TIMEOUT_MS =
- IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold();
+ private static final int QUERY_MAX_CAPACITY =
config.getMaxAllowedConcurrentQueries();
+ private static final int WORKER_THREAD_NUM = config.getQueryThreadCount();
+ private static final int TASK_MAX_CAPACITY = QUERY_MAX_CAPACITY *
config.getDegreeOfParallelism();
+ private static final long QUERY_TIMEOUT_MS =
config.getQueryTimeoutThreshold();
private final ThreadGroup workerGroups;
private final List<AbstractDriverThread> threads;
@@ -175,43 +175,78 @@ public class DriverScheduler implements IDriverScheduler,
IService {
getNextDriverTaskHandleId(),
(MultilevelPriorityQueue) readyQueue,
OptionalInt.of(Integer.MAX_VALUE));
- List<DriverTask> tasks =
- drivers.stream()
- .map(
- v ->
- new DriverTask(
- v,
- timeOut > 0 ? timeOut : QUERY_TIMEOUT_MS,
- DriverTaskStatus.READY,
- driverTaskHandle))
- .collect(Collectors.toList());
+ List<DriverTask> tasks = new ArrayList<>();
+ drivers.forEach(
+ driver ->
+ tasks.add(
+ new DriverTask(
+ driver,
+ timeOut > 0 ? timeOut : QUERY_TIMEOUT_MS,
+ DriverTaskStatus.READY,
+ driverTaskHandle)));
+
+ List<DriverTask> submittedTasks = new ArrayList<>();
+ for (DriverTask task : tasks) {
+ IDriver driver = task.getDriver();
+ if (driver.getDependencyDriverIndex() != -1) {
+ SettableFuture<?> blockedDependencyFuture =
+
tasks.get(driver.getDependencyDriverIndex()).getBlockedDependencyDriver();
+ blockedDependencyFuture.addListener(
+ () -> {
+ // Only if query is alive, we can submit this task
+ queryMap.computeIfPresent(
+ queryId,
+ (k1, queryRelatedTasks) -> {
+ queryRelatedTasks.computeIfPresent(
+ task.getDriverTaskId().getFragmentInstanceId(),
+ (k2, instanceRelatedTasks) -> {
+ instanceRelatedTasks.add(task);
+ submitTaskToReadyQueue(task);
+ return instanceRelatedTasks;
+ });
+ return queryRelatedTasks;
+ });
+ },
+ MoreExecutors.directExecutor());
+ } else {
+ submittedTasks.add(task);
+ }
+ }
+
+ for (DriverTask task : submittedTasks) {
+ registerTaskToQueryMap(queryId, task);
+ }
+ for (DriverTask task : submittedTasks) {
+ submitTaskToReadyQueue(task);
+ }
+ }
+
+ public void registerTaskToQueryMap(QueryId queryId, DriverTask driverTask) {
// If query has not been registered by other fragment instances,
// add the first task as timeout checking task to timeoutQueue.
- for (DriverTask driverTask : tasks) {
- queryMap
- .computeIfAbsent(
- queryId,
- v -> {
- timeoutQueue.push(tasks.get(0));
- return new ConcurrentHashMap<>();
- })
- .computeIfAbsent(
- driverTask.getDriverTaskId().getFragmentInstanceId(),
- v -> Collections.synchronizedSet(new HashSet<>()))
- .add(driverTask);
- }
+ queryMap
+ .computeIfAbsent(
+ queryId,
+ v -> {
+ timeoutQueue.push(driverTask);
+ return new ConcurrentHashMap<>();
+ })
+ .computeIfAbsent(
+ driverTask.getDriverTaskId().getFragmentInstanceId(),
+ v -> Collections.synchronizedSet(new HashSet<>()))
+ .add(driverTask);
+ }
- for (DriverTask task : tasks) {
- task.lock();
- try {
- if (task.getStatus() != DriverTaskStatus.READY) {
- continue;
- }
- readyQueue.push(task);
- task.setLastEnterReadyQueueTime(System.nanoTime());
- } finally {
- task.unlock();
+ public void submitTaskToReadyQueue(DriverTask task) {
+ task.lock();
+ try {
+ if (task.getStatus() != DriverTaskStatus.READY) {
+ return;
}
+ readyQueue.push(task);
+ task.setLastEnterReadyQueueTime(System.nanoTime());
+ } finally {
+ task.unlock();
}
}
@@ -447,6 +482,13 @@ public class DriverScheduler implements IDriverScheduler,
IService {
}
task.updateSchedulePriority(context);
task.setStatus(DriverTaskStatus.FINISHED);
+ } finally {
+ task.unlock();
+ }
+ // Dependency driver must be submitted before this task is cleared
+ task.submitDependencyDriver();
+ task.lock();
+ try {
clearDriverTask(task);
} finally {
task.unlock();
@@ -472,7 +514,7 @@ public class DriverScheduler implements IDriverScheduler,
IService {
task.unlock();
}
QueryId queryId = task.getDriverTaskId().getQueryId();
- Map<FragmentInstanceId, Set<DriverTask>> queryRelatedTasks =
queryMap.get(queryId);
+ Map<FragmentInstanceId, Set<DriverTask>> queryRelatedTasks =
queryMap.remove(queryId);
if (queryRelatedTasks != null) {
for (Set<DriverTask> fragmentRelatedTasks :
queryRelatedTasks.values()) {
if (fragmentRelatedTasks != null) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java
index e008b84210..c436b180f8 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java
@@ -31,6 +31,7 @@ import
org.apache.iotdb.db.mpp.execution.schedule.queue.multilevelqueue.DriverTa
import
org.apache.iotdb.db.mpp.execution.schedule.queue.multilevelqueue.Priority;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import io.airlift.units.Duration;
import java.util.Comparator;
@@ -58,6 +59,8 @@ public class DriverTask implements IDIndexedAccessible {
private long lastEnterReadyQueueTime;
private long lastEnterBlockQueueTime;
+ private SettableFuture<Void> blockedDependencyDriver = null;
+
/** Initialize a dummy instance for queryHolder */
public DriverTask() {
this(new StubFragmentInstance(), 0L, null, null);
@@ -137,6 +140,19 @@ public class DriverTask implements IDIndexedAccessible {
this.abortCause = abortCause;
}
+ public void submitDependencyDriver() {
+ if (blockedDependencyDriver != null) {
+ this.blockedDependencyDriver.set(null);
+ }
+ }
+
+ public SettableFuture<Void> getBlockedDependencyDriver() {
+ if (blockedDependencyDriver == null) {
+ blockedDependencyDriver = SettableFuture.create();
+ }
+ return blockedDependencyDriver;
+ }
+
public Priority getPriority() {
return priority.get();
}
@@ -251,5 +267,10 @@ public class DriverTask implements IDIndexedAccessible {
public ISinkHandle getSinkHandle() {
return null;
}
+
+ @Override
+ public int getDependencyDriverIndex() {
+ return -1;
+ }
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
index 2d8e6d7051..0404be866a 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
@@ -59,6 +59,8 @@ public class LocalExecutionPlanContext {
private final AtomicInteger nextOperatorId;
private final TypeProvider typeProvider;
private final Map<String, Set<String>> allSensorsMap;
+ private int degreeOfParallelism =
+ IoTDBDescriptor.getInstance().getConfig().getDegreeOfParallelism();
// this is shared with all subContexts
private AtomicInteger nextPipelineId;
private List<PipelineDriverFactory> pipelineDriverFactories;
@@ -97,6 +99,7 @@ public class LocalExecutionPlanContext {
this.dataRegionTTL = parentContext.dataRegionTTL;
this.nextPipelineId = parentContext.nextPipelineId;
this.pipelineDriverFactories = parentContext.pipelineDriverFactories;
+ this.degreeOfParallelism = parentContext.degreeOfParallelism;
this.exchangeSumNum = parentContext.exchangeSumNum;
this.exchangeOperatorList = parentContext.exchangeOperatorList;
this.cachedDataTypes = parentContext.cachedDataTypes;
@@ -110,10 +113,13 @@ public class LocalExecutionPlanContext {
this.allSensorsMap = new ConcurrentHashMap<>();
this.typeProvider = null;
this.nextOperatorId = new AtomicInteger(0);
+ this.nextPipelineId = new AtomicInteger(0);
// there is no ttl in schema region, so we don't care this field
this.dataRegionTTL = Long.MAX_VALUE;
- this.driverContext = new SchemaDriverContext(instanceContext,
schemaRegion);
+ this.driverContext =
+ new SchemaDriverContext(instanceContext, schemaRegion,
getNextPipelineId());
+ this.pipelineDriverFactories = new ArrayList<>();
}
public void addPipelineDriverFactory(Operator operation, DriverContext
driverContext) {
@@ -138,10 +144,22 @@ public class LocalExecutionPlanContext {
return pipelineDriverFactories;
}
+ public int getPipelineNumber() {
+ return pipelineDriverFactories.size();
+ }
+
public DriverContext getDriverContext() {
return driverContext;
}
+ public int getDegreeOfParallelism() {
+ return degreeOfParallelism;
+ }
+
+ public void setDegreeOfParallelism(int degreeOfParallelism) {
+ this.degreeOfParallelism = degreeOfParallelism;
+ }
+
private int getNextPipelineId() {
return nextPipelineId.getAndIncrement();
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
index 231ca1d680..91b3c5187f 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
@@ -24,12 +24,9 @@ import
org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
import org.apache.iotdb.db.mpp.exception.MemoryNotEnoughException;
import org.apache.iotdb.db.mpp.execution.driver.DataDriverContext;
-import org.apache.iotdb.db.mpp.execution.driver.SchemaDriver;
-import org.apache.iotdb.db.mpp.execution.driver.SchemaDriverContext;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
-import org.apache.iotdb.db.mpp.execution.timer.ITimeSliceAllocator;
import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.utils.SetThreadName;
@@ -81,7 +78,7 @@ public class LocalExecutionPlanner {
return context.getPipelineDriverFactories();
}
- public SchemaDriver plan(
+ public List<PipelineDriverFactory> plan(
PlanNode plan, FragmentInstanceContext instanceContext, ISchemaRegion
schemaRegion)
throws MemoryNotEnoughException {
LocalExecutionPlanContext context =
@@ -92,18 +89,12 @@ public class LocalExecutionPlanner {
// check whether current free memory is enough to execute current query
checkMemory(root, instanceContext.getStateMachine());
+ context.addPipelineDriverFactory(root, context.getDriverContext());
+
// set maxBytes one SourceHandle can reserve after visiting the whole tree
context.setMaxBytesOneHandleCanReserve();
- ITimeSliceAllocator timeSliceAllocator = context.getTimeSliceAllocator();
- context
- .getDriverContext()
- .getOperatorContexts()
- .forEach(
- operatorContext ->
-
operatorContext.setMaxRunTime(timeSliceAllocator.getMaxRunTime(operatorContext)));
-
- return new SchemaDriver(root, (SchemaDriverContext)
context.getDriverContext());
+ return context.getPipelineDriverFactories();
}
private void checkMemory(Operator root, FragmentInstanceStateMachine
stateMachine)
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index 5cf08f8d98..5d99a29ad1 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -554,7 +554,7 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
node.getPlanNodeId(),
CountMergeOperator.class.getSimpleName());
context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
- if (children.get(0) instanceof CountGroupByLevelScanOperator) {
+ if (node.getChildren().get(0) instanceof LevelTimeSeriesCountNode) {
return new CountGroupByLevelMergeOperator(node.getPlanNodeId(),
operatorContext, children);
} else {
return new CountMergeOperator(node.getPlanNodeId(), operatorContext,
children);
@@ -2228,25 +2228,72 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
node.getPathPatternList(), node.getTemplateId()));
}
- private List<Operator> dealWithConsumeAllChildrenPipelineBreaker(
+ public List<Operator> dealWithConsumeAllChildrenPipelineBreaker(
PlanNode node, LocalExecutionPlanContext context) {
// children after pipelining
- List<Operator> children = new ArrayList<>();
+ List<Operator> parentPipelineChildren = new ArrayList<>();
int finalExchangeNum = context.getExchangeSumNum();
- for (PlanNode childSource : node.getChildren()) {
- // Create pipelines for children
- LocalExecutionPlanContext subContext = context.createSubContext();
- Operator childOperation = childSource.accept(this, subContext);
- // If the child belongs to another fragment instance, we don't create
pipeline for it
- if (childOperation instanceof ExchangeOperator) {
- children.add(childOperation);
- finalExchangeNum += 1;
- } else {
+ if (context.getDegreeOfParallelism() == 1) {
+ // If dop = 1, we don't create extra pipeline
+ for (PlanNode localChild : node.getChildren()) {
+ Operator childOperation = localChild.accept(this, context);
+ parentPipelineChildren.add(childOperation);
+ }
+ } else {
+ // Keep it since we may change the structure of origin children nodes
+ List<PlanNode> afterwardsNodes = new ArrayList<>();
+ // 1. Calculate localChildren size
+ int localChildrenSize = 0;
+ for (PlanNode child : node.getChildren()) {
+ if (!(child instanceof ExchangeNode)) {
+ localChildrenSize++;
+ }
+ }
+ // 2. divide every childNumInEachPipeline localChildren to different
pipeline
+ int[] childNumInEachPipeline =
+ getChildNumInEachPipeline(
+ node.getChildren(), localChildrenSize,
context.getDegreeOfParallelism());
+ // If dop > size(children) + 1, we can allocate extra dop to child node
+ // Extra dop = dop - size(children), since dop = 1 means serial but not 0
+ int childGroupNum = Math.min(context.getDegreeOfParallelism(),
localChildrenSize);
+ int dopForChild = Math.max(1, context.getDegreeOfParallelism() -
localChildrenSize);
+ int startIndex, endIndex = 0;
+ for (int i = 0; i < childGroupNum; i++) {
+ startIndex = endIndex;
+ endIndex += childNumInEachPipeline[i];
+ // Only if dop >= size(children) + 1, split all children to new
pipeline
+ // Otherwise, the first group will belong to the parent pipeline
+ if (i == 0 && context.getDegreeOfParallelism() < localChildrenSize +
1) {
+ for (int j = startIndex; j < endIndex; j++) {
+ Operator childOperation = node.getChildren().get(j).accept(this,
context);
+ parentPipelineChildren.add(childOperation);
+ afterwardsNodes.add(node.getChildren().get(j));
+ }
+ continue;
+ }
+ LocalExecutionPlanContext subContext = context.createSubContext();
+ subContext.setDegreeOfParallelism(dopForChild);
+ // Create partial parent operator for children
+ PlanNode partialParentNode = null;
+ Operator partialParentOperator = null;
+
+ int originPipeNum = context.getPipelineNumber();
+ if (endIndex - startIndex == 1) {
+ partialParentNode = node.getChildren().get(i);
+ partialParentOperator = node.getChildren().get(i).accept(this,
subContext);
+ } else {
+ // PartialParentNode is equals to parentNode except children
+ partialParentNode = node.createSubNode(i, startIndex, endIndex);
+ partialParentOperator = partialParentNode.accept(this, subContext);
+ }
+ // update dop for child
+ dopForChild = Math.max(1, dopForChild -
(subContext.getPipelineNumber() - originPipeNum));
ISinkHandle localSinkHandle =
MPP_DATA_EXCHANGE_MANAGER.createLocalSinkHandleForPipeline(
- subContext.getDriverContext(),
childSource.getPlanNodeId().getId());
+ // Attention, there is no parent node, use first child node
instead
+ subContext.getDriverContext(),
node.getChildren().get(i).getPlanNodeId().getId());
subContext.setSinkHandle(localSinkHandle);
- subContext.addPipelineDriverFactory(childOperation,
subContext.getDriverContext());
+ subContext.addPipelineDriverFactory(partialParentOperator,
subContext.getDriverContext());
ExchangeOperator sourceOperator =
new ExchangeOperator(
@@ -2257,31 +2304,141 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
MPP_DATA_EXCHANGE_MANAGER.createLocalSourceHandleForPipeline(
((LocalSinkHandle)
localSinkHandle).getSharedTsBlockQueue(),
context.getDriverContext()),
- childSource.getPlanNodeId());
+ partialParentNode.getPlanNodeId());
context
.getTimeSliceAllocator()
.recordExecutionWeight(sourceOperator.getOperatorContext(), 1);
- children.add(sourceOperator);
+ parentPipelineChildren.add(sourceOperator);
+ afterwardsNodes.add(partialParentNode);
context.addExchangeOperator(sourceOperator);
finalExchangeNum += subContext.getExchangeSumNum() -
context.getExchangeSumNum() + 1;
}
+ ((MultiChildProcessNode) node).setChildren(afterwardsNodes);
}
context.setExchangeSumNum(finalExchangeNum);
- return children;
+ return parentPipelineChildren;
+ }
+
+ /**
+ * Now, we allocate children to each pipeline as average as possible. For
example, 5 children with
+ * 3 dop, the children group will be [1, 2, 2]. After we can estimate the
workload of each
+ * operator, maybe we can allocate based on workload rather than child
number.
+ *
+ * <p>If child is ExchangeNode, it won't affect the children number of
current group.
+ */
+ public int[] getChildNumInEachPipeline(
+ List<PlanNode> allChildren, int localChildrenSize, int dop) {
+ int maxPipelineNum = Math.min(localChildrenSize, dop);
+ int[] childNumInEachPipeline = new int[maxPipelineNum];
+ int avgChildNum = Math.max(1, localChildrenSize / dop);
+ // allocate remaining child to group from splitIndex
+ int splitIndex =
+ localChildrenSize <= dop ? maxPipelineNum : maxPipelineNum -
localChildrenSize % dop;
+ int pipelineIndex = 0, childIndex = 0;
+ while (pipelineIndex < maxPipelineNum) {
+ int childNum = pipelineIndex < splitIndex ? avgChildNum : avgChildNum +
1;
+ int originChildIndex = childIndex;
+ while (childNum >= 0 && childIndex < allChildren.size()) {
+ if (!(allChildren.get(childIndex) instanceof ExchangeNode)) {
+ childNum--;
+ // Try to keep the first of a pipeline is not a ExchangeNode
+ if (childNum == -1) {
+ childIndex--;
+ }
+ }
+ childIndex++;
+ }
+ childNumInEachPipeline[pipelineIndex++] = childIndex - originChildIndex;
+ }
+ return childNumInEachPipeline;
}
- private List<Operator> dealWithConsumeChildrenOneByOneNode(
+ public List<Operator> dealWithConsumeChildrenOneByOneNode(
PlanNode node, LocalExecutionPlanContext context) {
+ List<Operator> parentPipelineChildren = new ArrayList<>();
int originExchangeNum = context.getExchangeSumNum();
int finalExchangeNum = context.getExchangeSumNum();
- List<Operator> children = new ArrayList<>();
- for (PlanNode childSource : node.getChildren()) {
- Operator childOperation = childSource.accept(this, context);
- finalExchangeNum = Math.max(finalExchangeNum,
context.getExchangeSumNum());
- context.setExchangeSumNum(originExchangeNum);
- children.add(childOperation);
+
+ // 1. divide every child to pipeline using the max dop
+ if (context.getDegreeOfParallelism() == 1) {
+ // If dop = 1, we don't create extra pipeline
+ for (PlanNode childSource : node.getChildren()) {
+ Operator childOperation = childSource.accept(this, context);
+ finalExchangeNum = Math.max(finalExchangeNum,
context.getExchangeSumNum());
+ context.setExchangeSumNum(originExchangeNum);
+ parentPipelineChildren.add(childOperation);
+ }
+ } else {
+ List<Integer> childPipelineNums = new ArrayList<>();
+ List<Integer> childExchangeNums = new ArrayList<>();
+ int sumOfChildPipelines = 0, sumOfChildExchangeNums = 0;
+ int dependencyChildNode = 0, dependencyPipeId = 0;
+ for (PlanNode childNode : node.getChildren()) {
+ if (childNode instanceof ExchangeNode) {
+ Operator childOperation = childNode.accept(this, context);
+ finalExchangeNum = Math.max(finalExchangeNum,
context.getExchangeSumNum());
+ context.setExchangeSumNum(originExchangeNum);
+ parentPipelineChildren.add(childOperation);
+ } else {
+ LocalExecutionPlanContext subContext = context.createSubContext();
+ // Only context.getDegreeOfParallelism() - 1 can be allocated to
child
+ int dopForChild = context.getDegreeOfParallelism() - 1;
+ subContext.setDegreeOfParallelism(dopForChild);
+ int originPipeNum = context.getPipelineNumber();
+ Operator childOperation = childNode.accept(this, subContext);
+ ISinkHandle localSinkHandle =
+ MPP_DATA_EXCHANGE_MANAGER.createLocalSinkHandleForPipeline(
+ // Attention, there is no parent node, use first child node
instead
+ context.getDriverContext(),
childNode.getPlanNodeId().getId());
+ subContext.setSinkHandle(localSinkHandle);
+ subContext.addPipelineDriverFactory(childOperation,
subContext.getDriverContext());
+
+ int curChildPipelineNum = subContext.getPipelineNumber() -
originPipeNum;
+ childPipelineNums.add(curChildPipelineNum);
+ sumOfChildPipelines += curChildPipelineNum;
+ // If sumOfChildPipelines > dopForChild, we have to wait until some
pipelines finish
+ if (sumOfChildPipelines > dopForChild) {
+ // Update dependencyPipeId, after which finishes we can submit
curChildPipeline
+ while (sumOfChildPipelines > dopForChild) {
+ dependencyPipeId = context.getPipelineNumber() -
sumOfChildPipelines;
+ sumOfChildPipelines -=
childPipelineNums.get(dependencyChildNode);
+ sumOfChildExchangeNums -=
childExchangeNums.get(dependencyChildNode);
+ dependencyChildNode++;
+ }
+ }
+ // Add dependency for all pipelines under current node
+ if (dependencyChildNode != 0) {
+ for (int i = originPipeNum; i < subContext.getPipelineNumber();
i++) {
+
context.getPipelineDriverFactories().get(i).setDependencyPipeline(dependencyPipeId);
+ }
+ }
+
+ ExchangeOperator sourceOperator =
+ new ExchangeOperator(
+ context
+ .getDriverContext()
+ .addOperatorContext(
+ context.getNextOperatorId(),
+ null,
+ ExchangeOperator.class.getSimpleName()),
+ MPP_DATA_EXCHANGE_MANAGER.createLocalSourceHandleForPipeline(
+ ((LocalSinkHandle)
localSinkHandle).getSharedTsBlockQueue(),
+ context.getDriverContext()),
+ childNode.getPlanNodeId());
+ context
+ .getTimeSliceAllocator()
+ .recordExecutionWeight(sourceOperator.getOperatorContext(), 1);
+ parentPipelineChildren.add(sourceOperator);
+ context.addExchangeOperator(sourceOperator);
+ int childExchangeNum = subContext.getExchangeSumNum() -
context.getExchangeSumNum() + 1;
+ sumOfChildExchangeNums += childExchangeNum;
+ childExchangeNums.add(childExchangeNum);
+ finalExchangeNum =
+ Math.max(finalExchangeNum, context.getExchangeSumNum() +
sumOfChildExchangeNums);
+ }
+ }
}
context.setExchangeSumNum(finalExchangeNum);
- return children;
+ return parentPipelineChildren;
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/PipelineDriverFactory.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/PipelineDriverFactory.java
index ec5eb7a3e2..8cc9ec0e51 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/PipelineDriverFactory.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/PipelineDriverFactory.java
@@ -20,8 +20,11 @@
package org.apache.iotdb.db.mpp.plan.planner;
import org.apache.iotdb.db.mpp.execution.driver.DataDriver;
+import org.apache.iotdb.db.mpp.execution.driver.DataDriverContext;
import org.apache.iotdb.db.mpp.execution.driver.Driver;
import org.apache.iotdb.db.mpp.execution.driver.DriverContext;
+import org.apache.iotdb.db.mpp.execution.driver.SchemaDriver;
+import org.apache.iotdb.db.mpp.execution.driver.SchemaDriverContext;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
import static java.util.Objects.requireNonNull;
@@ -31,6 +34,7 @@ public class PipelineDriverFactory {
private final DriverContext driverContext;
// TODO Use OperatorFactory to replace operator to generate multiple drivers
for on pipeline
private final Operator operation;
+ private int dependencyPipelineIndex = -1;
public PipelineDriverFactory(Operator operation, DriverContext
driverContext) {
this.operation = requireNonNull(operation, "rootOperator is null");
@@ -44,7 +48,16 @@ public class PipelineDriverFactory {
public Driver createDriver() {
requireNonNull(driverContext, "driverContext is null");
try {
- return new DataDriver(operation, driverContext);
+ Driver driver = null;
+ if (driverContext instanceof DataDriverContext) {
+ driver = new DataDriver(operation, driverContext);
+ } else {
+ driver = new SchemaDriver(operation, (SchemaDriverContext)
driverContext);
+ }
+ if (dependencyPipelineIndex != -1) {
+
driver.getDriverContext().setDependencyDriverIndex(dependencyPipelineIndex);
+ }
+ return driver;
} catch (Throwable failure) {
try {
operation.close();
@@ -56,4 +69,12 @@ public class PipelineDriverFactory {
throw failure;
}
}
+
+ public void setDependencyPipeline(int dependencyPipelineIndex) {
+ this.dependencyPipelineIndex = dependencyPipelineIndex;
+ }
+
+ public int getDependencyPipelineIndex() {
+ return dependencyPipelineIndex;
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
index fd8d1a9293..22efafdb10 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
@@ -1085,7 +1085,7 @@ public class SourceRewriter extends
SimplePlanNodeRewriter<DistributionPlanConte
private Map<TRegionReplicaSet, List<SeriesAggregationSourceNode>>
splitAggregationSourceByPartition(PlanNode root, DistributionPlanContext
context) {
// Step 0: get all SeriesAggregationSourceNode in PlanNodeTree
- List<SeriesAggregationSourceNode> rawSources =
findAggregationSourceNode(root);
+ List<SeriesAggregationSourceNode> rawSources =
AggregationNode.findAggregationSourceNode(root);
// Step 1: construct SeriesAggregationSourceNode for each data region of
one Path
List<SeriesAggregationSourceNode> sources = new ArrayList<>();
@@ -1117,7 +1117,7 @@ public class SourceRewriter extends
SimplePlanNodeRewriter<DistributionPlanConte
boolean[] eachSeriesOneRegion,
Map<PartialPath, Integer> regionCountPerSeries) {
// Step 0: get all SeriesAggregationSourceNode in PlanNodeTree
- List<SeriesAggregationSourceNode> rawSources =
findAggregationSourceNode(root);
+ List<SeriesAggregationSourceNode> rawSources =
AggregationNode.findAggregationSourceNode(root);
// Step 1: construct SeriesAggregationSourceNode for each data region of
one Path
for (SeriesAggregationSourceNode child : rawSources) {
@@ -1169,18 +1169,6 @@ public class SourceRewriter extends
SimplePlanNodeRewriter<DistributionPlanConte
return dataDistribution.size();
}
- private List<SeriesAggregationSourceNode> findAggregationSourceNode(PlanNode
node) {
- if (node == null) {
- return new ArrayList<>();
- }
- if (node instanceof SeriesAggregationSourceNode) {
- return Collections.singletonList((SeriesAggregationSourceNode) node);
- }
- List<SeriesAggregationSourceNode> ret = new ArrayList<>();
- node.getChildren().forEach(child ->
ret.addAll(findAggregationSourceNode(child)));
- return ret;
- }
-
public List<PlanNode> visit(PlanNode node, DistributionPlanContext context) {
return node.accept(this, context);
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNode.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNode.java
index d9b640e0cd..0e85bd9461 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNode.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNode.java
@@ -75,6 +75,19 @@ public abstract class PlanNode implements IConsensusRequest {
@Override
public abstract PlanNode clone();
+ /**
+ * Create sub node which has exactly the same function of origin node, only
its children is a part
+ * of it, which is composed by the [startIndex, endIndex) of origin children
list.
+ *
+ * @param subNodeId the sub node id
+ * @param startIndex the start Index of origin children
+ * @param endIndex the endIndex Index of origin children
+ */
+ public PlanNode createSubNode(int subNodeId, int startIndex, int endIndex) {
+ throw new UnsupportedOperationException(
+ String.format("Can't create subNode for %s",
this.getClass().toString()));
+ }
+
public PlanNode cloneWithChildren(List<PlanNode> children) {
if (!(children == null
|| allowedChildCount() == CHILD_COUNT_NO_LIMIT
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/AggregationNode.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/AggregationNode.java
index 6c0c7703e8..1f70daa76b 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/AggregationNode.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/AggregationNode.java
@@ -24,6 +24,7 @@ import
org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
+import
org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationSourceNode;
import
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByParameter;
import
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
@@ -36,6 +37,7 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -50,8 +52,7 @@ import java.util.stream.Collectors;
public class AggregationNode extends MultiChildProcessNode {
// The list of aggregate functions, each AggregateDescriptor will be output
as one or two column
- // of
- // result TsBlock
+ // of result TsBlock
protected List<AggregationDescriptor> aggregationDescriptorList;
// The parameter of `group by time`.
@@ -167,6 +168,13 @@ public class AggregationNode extends MultiChildProcessNode
{
getScanOrder());
}
+ @Override
+ public PlanNode createSubNode(int subNodeId, int startIndex, int endIndex) {
+ return new HorizontallyConcatNode(
+ new PlanNodeId(String.format("%s-%s", getPlanNodeId(), subNodeId)),
+ new ArrayList<>(children.subList(startIndex, endIndex)));
+ }
+
@Override
public List<String> getOutputColumnNames() {
List<String> outputColumnNames = new ArrayList<>();
@@ -182,6 +190,18 @@ public class AggregationNode extends MultiChildProcessNode
{
return outputColumnNames;
}
+ public static List<SeriesAggregationSourceNode>
findAggregationSourceNode(PlanNode node) {
+ if (node == null) {
+ return new ArrayList<>();
+ }
+ if (node instanceof SeriesAggregationSourceNode) {
+ return Collections.singletonList((SeriesAggregationSourceNode) node);
+ }
+ List<SeriesAggregationSourceNode> ret = new ArrayList<>();
+ node.getChildren().forEach(child ->
ret.addAll(findAggregationSourceNode(child)));
+ return ret;
+ }
+
@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visitAggregation(this, context);
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceMergeNode.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceMergeNode.java
index d3ec660e7f..faa21947d1 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceMergeNode.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceMergeNode.java
@@ -63,6 +63,12 @@ public class DeviceMergeNode extends MultiChildProcessNode {
return new DeviceMergeNode(getPlanNodeId(), getMergeOrderParameter(),
getDevices());
}
+ @Override
+ public PlanNode createSubNode(int subNodeId, int startIndex, int endIndex) {
+ throw new UnsupportedOperationException(
+ "DeviceMergeNode should have only one local child in single data
region.");
+ }
+
@Override
public List<String> getOutputColumnNames() {
return children.stream()
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByLevelNode.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByLevelNode.java
index da7b8dc9c0..f1833a7cee 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByLevelNode.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByLevelNode.java
@@ -93,6 +93,13 @@ public class GroupByLevelNode extends MultiChildProcessNode {
getPlanNodeId(), getGroupByLevelDescriptors(),
this.groupByTimeParameter, this.scanOrder);
}
+ @Override
+ public PlanNode createSubNode(int subNodeId, int startIndex, int endIndex) {
+ return new HorizontallyConcatNode(
+ new PlanNodeId(String.format("%s-%s", getPlanNodeId(), subNodeId)),
+ new ArrayList<>(children.subList(startIndex, endIndex)));
+ }
+
public List<CrossSeriesAggregationDescriptor> getGroupByLevelDescriptors() {
return groupByLevelDescriptors;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByTagNode.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByTagNode.java
index c2e558bdc7..2089696218 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByTagNode.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByTagNode.java
@@ -99,6 +99,13 @@ public class GroupByTagNode extends MultiChildProcessNode {
this.outputColumnNames);
}
+ @Override
+ public PlanNode createSubNode(int subNodeId, int startIndex, int endIndex) {
+ return new HorizontallyConcatNode(
+ new PlanNodeId(String.format("%s-%s", getPlanNodeId(), subNodeId)),
+ new ArrayList<>(children.subList(startIndex, endIndex)));
+ }
+
@Override
public List<String> getOutputColumnNames() {
List<String> ret = new ArrayList<>(tagKeys);
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/HorizontallyConcatNode.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/HorizontallyConcatNode.java
index a541ef1bee..f6c785f8fc 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/HorizontallyConcatNode.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/HorizontallyConcatNode.java
@@ -42,11 +42,22 @@ public class HorizontallyConcatNode extends
MultiChildProcessNode {
super(id, new ArrayList<>());
}
+ public HorizontallyConcatNode(PlanNodeId id, List<PlanNode> children) {
+ super(id, children);
+ }
+
@Override
public PlanNode clone() {
return new HorizontallyConcatNode(getPlanNodeId());
}
+ @Override
+ public PlanNode createSubNode(int subNodeId, int startIndex, int endIndex) {
+ return new HorizontallyConcatNode(
+ new PlanNodeId(String.format("%s-%s", getPlanNodeId(), subNodeId)),
+ new ArrayList<>(children.subList(startIndex, endIndex)));
+ }
+
@Override
public List<String> getOutputColumnNames() {
return children.stream()
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/MergeSortNode.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/MergeSortNode.java
index 564a5dbfa4..c5d6c394ee 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/MergeSortNode.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/MergeSortNode.java
@@ -45,6 +45,16 @@ public class MergeSortNode extends MultiChildProcessNode {
this.outputColumns = outputColumns;
}
+ public MergeSortNode(
+ PlanNodeId id,
+ List<PlanNode> children,
+ OrderByParameter mergeOrderParameter,
+ List<String> outputColumns) {
+ super(id, children);
+ this.mergeOrderParameter = mergeOrderParameter;
+ this.outputColumns = outputColumns;
+ }
+
public OrderByParameter getMergeOrderParameter() {
return mergeOrderParameter;
}
@@ -54,6 +64,15 @@ public class MergeSortNode extends MultiChildProcessNode {
return new MergeSortNode(getPlanNodeId(), getMergeOrderParameter(),
outputColumns);
}
+ @Override
+ public PlanNode createSubNode(int subNodeId, int startIndex, int endIndex) {
+ return new MergeSortNode(
+ new PlanNodeId(String.format("%s-%s", getPlanNodeId(), subNodeId)),
+ new ArrayList<>(children.subList(startIndex, endIndex)),
+ getMergeOrderParameter(),
+ outputColumns);
+ }
+
@Override
public List<String> getOutputColumnNames() {
return outputColumns;
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/TimeJoinNode.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/TimeJoinNode.java
index 2598cd4e28..ed1b8e4fb0 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/TimeJoinNode.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/TimeJoinNode.java
@@ -62,6 +62,14 @@ public class TimeJoinNode extends MultiChildProcessNode {
return new TimeJoinNode(getPlanNodeId(), getMergeOrder());
}
+ @Override
+ public PlanNode createSubNode(int subNodeId, int startIndex, int endIndex) {
+ return new TimeJoinNode(
+ new PlanNodeId(String.format("%s-%s", getPlanNodeId(), subNodeId)),
+ getMergeOrder(),
+ new ArrayList<>(children.subList(startIndex, endIndex)));
+ }
+
@Override
public List<String> getOutputColumnNames() {
return children.stream()
diff --git
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountGroupByLevelMergeOperatorTest.java
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountGroupByLevelMergeOperatorTest.java
index e503c460de..9cc4311cbe 100644
---
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountGroupByLevelMergeOperatorTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountGroupByLevelMergeOperatorTest.java
@@ -77,7 +77,7 @@ public class CountGroupByLevelMergeOperatorTest {
1, planNodeId,
CountGroupByLevelScanOperator.class.getSimpleName());
ISchemaRegion schemaRegion = Mockito.mock(ISchemaRegion.class);
operatorContext.setDriverContext(
- new SchemaDriverContext(fragmentInstanceContext, schemaRegion));
+ new SchemaDriverContext(fragmentInstanceContext, schemaRegion, 0));
CountGroupByLevelScanOperator<ITimeSeriesSchemaInfo>
timeSeriesCountOperator1 =
new CountGroupByLevelScanOperator<>(
planNodeId,
@@ -152,7 +152,7 @@ public class CountGroupByLevelMergeOperatorTest {
1, planNodeId,
CountGroupByLevelScanOperator.class.getSimpleName());
ISchemaRegion schemaRegion = Mockito.mock(ISchemaRegion.class);
operatorContext.setDriverContext(
- new SchemaDriverContext(fragmentInstanceContext, schemaRegion));
+ new SchemaDriverContext(fragmentInstanceContext, schemaRegion, 0));
CountGroupByLevelScanOperator<ITimeSeriesSchemaInfo>
timeSeriesCountOperator =
new CountGroupByLevelScanOperator<>(
planNodeId,
diff --git
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaCountOperatorTest.java
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaCountOperatorTest.java
index 2e9fa0e2db..ec10473b9b 100644
---
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaCountOperatorTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaCountOperatorTest.java
@@ -73,7 +73,7 @@ public class SchemaCountOperatorTest {
driverContext.addOperatorContext(
1, planNodeId, SchemaCountOperator.class.getSimpleName());
operatorContext.setDriverContext(
- new SchemaDriverContext(fragmentInstanceContext, schemaRegion));
+ new SchemaDriverContext(fragmentInstanceContext, schemaRegion, 0));
ISchemaSource<ISchemaInfo> schemaSource =
Mockito.mock(ISchemaSource.class);
List<ISchemaInfo> schemaInfoList = new ArrayList<>(10);
@@ -133,7 +133,7 @@ public class SchemaCountOperatorTest {
ISchemaRegion schemaRegion = Mockito.mock(ISchemaRegion.class);
operatorContext.setDriverContext(
- new SchemaDriverContext(fragmentInstanceContext, schemaRegion));
+ new SchemaDriverContext(fragmentInstanceContext, schemaRegion, 0));
CountGroupByLevelScanOperator<ITimeSeriesSchemaInfo>
timeSeriesCountOperator =
new CountGroupByLevelScanOperator<>(
planNodeId,
diff --git
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperatorTest.java
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperatorTest.java
index a9b501324d..434d509c75 100644
---
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperatorTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperatorTest.java
@@ -88,7 +88,7 @@ public class SchemaQueryScanOperatorTest {
.thenReturn(META_SCAN_OPERATOR_TEST_SG + ".device0");
Mockito.when(deviceSchemaInfo.isAligned()).thenReturn(false);
operatorContext.setDriverContext(
- new SchemaDriverContext(fragmentInstanceContext, schemaRegion));
+ new SchemaDriverContext(fragmentInstanceContext, schemaRegion, 0));
ISchemaSource<IDeviceSchemaInfo> deviceSchemaSource =
SchemaSourceFactory.getDeviceSchemaSource(partialPath, false, 10, 0,
true);
SchemaOperatorTestUtil.mockGetSchemaReader(
@@ -193,7 +193,7 @@ public class SchemaQueryScanOperatorTest {
Mockito.when(schemaRegion.getStorageGroupFullPath()).thenReturn(META_SCAN_OPERATOR_TEST_SG);
operatorContext.setDriverContext(
- new SchemaDriverContext(fragmentInstanceContext, schemaRegion));
+ new SchemaDriverContext(fragmentInstanceContext, schemaRegion, 0));
ISchemaSource<ITimeSeriesSchemaInfo> timeSeriesSchemaSource =
SchemaSourceFactory.getTimeSeriesSchemaSource(
partialPath, false, 10, 0, null, null, false,
Collections.emptyMap());
diff --git
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverSchedulerTest.java
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverSchedulerTest.java
index 7e3612e499..ff8d0fdebf 100644
---
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverSchedulerTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverSchedulerTest.java
@@ -63,10 +63,12 @@ public class DriverSchedulerTest {
DriverTaskId driverTaskId1 = new DriverTaskId(instanceId1, 0);
IDriver mockDriver1 = Mockito.mock(IDriver.class);
Mockito.when(mockDriver1.getDriverTaskId()).thenReturn(driverTaskId1);
+ Mockito.when(mockDriver1.getDependencyDriverIndex()).thenReturn(-1);
FragmentInstanceId instanceId2 = new FragmentInstanceId(fragmentId,
"inst-1");
DriverTaskId driverTaskId2 = new DriverTaskId(instanceId2, 0);
IDriver mockDriver2 = Mockito.mock(IDriver.class);
Mockito.when(mockDriver2.getDriverTaskId()).thenReturn(driverTaskId2);
+ Mockito.when(mockDriver2.getDependencyDriverIndex()).thenReturn(-1);
List<IDriver> instances = Arrays.asList(mockDriver1, mockDriver2);
manager.submitDrivers(queryId, instances, QUERY_TIMEOUT_MS);
Assert.assertTrue(manager.getBlockedTasks().isEmpty());
@@ -91,6 +93,7 @@ public class DriverSchedulerTest {
FragmentInstanceId instanceId3 = new FragmentInstanceId(fragmentId,
"inst-2");
DriverTaskId driverTaskId3 = new DriverTaskId(instanceId3, 0);
Mockito.when(mockDriver3.getDriverTaskId()).thenReturn(driverTaskId3);
+ Mockito.when(mockDriver3.getDependencyDriverIndex()).thenReturn(-1);
manager.submitDrivers(queryId, Collections.singletonList(mockDriver3),
QUERY_TIMEOUT_MS);
Assert.assertTrue(manager.getBlockedTasks().isEmpty());
Assert.assertEquals(1, manager.getQueryMap().size());
@@ -111,6 +114,7 @@ public class DriverSchedulerTest {
DriverTaskId driverTaskId4 = new DriverTaskId(instanceId4, 0);
IDriver mockDriver4 = Mockito.mock(IDriver.class);
Mockito.when(mockDriver4.getDriverTaskId()).thenReturn(driverTaskId4);
+ Mockito.when(mockDriver4.getDependencyDriverIndex()).thenReturn(-1);
manager.submitDrivers(queryId2, Collections.singletonList(mockDriver4),
QUERY_TIMEOUT_MS);
Assert.assertTrue(manager.getBlockedTasks().isEmpty());
Assert.assertEquals(2, manager.getQueryMap().size());
diff --git
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/PipelineBuilderTest.java
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/PipelineBuilderTest.java
new file mode 100644
index 0000000000..c0ee46142b
--- /dev/null
+++
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/PipelineBuilderTest.java
@@ -0,0 +1,761 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.plan;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.AlignedPath;
+import org.apache.iotdb.commons.path.MeasurementPath;
+import org.apache.iotdb.db.engine.storagegroup.DataRegion;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
+import org.apache.iotdb.db.mpp.execution.operator.Operator;
+import
org.apache.iotdb.db.mpp.execution.operator.source.AlignedSeriesScanOperator;
+import org.apache.iotdb.db.mpp.execution.operator.source.ExchangeOperator;
+import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
+import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
+import org.apache.iotdb.db.mpp.plan.planner.LocalExecutionPlanContext;
+import org.apache.iotdb.db.mpp.plan.planner.OperatorTreeGenerator;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceViewNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
+import
org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
+import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
+import static
org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static org.junit.Assert.assertEquals;
+
+public class PipelineBuilderTest {
+
+ OperatorTreeGenerator operatorTreeGenerator = new OperatorTreeGenerator();
+
+ /**
+ * The operator structure is [TimeJoin1 -
[SeriesScan0,SeriesScan1,SeriesScan2,SeriesScan3]].
+ *
+ * <p>The next six tests, I will test this TimeJoinOperator with different
dop.
+ *
+ * <p>The first test will test dop = 1. Expected result is that no child
pipelines will be
+ * divided.
+ */
+ @Test
+ public void testConsumeAllChildrenPipelineBuilder1() throws
IllegalPathException {
+ TypeProvider typeProvider = new TypeProvider();
+ TimeJoinNode timeJoinNode = initTimeJoinNode(typeProvider, 4);
+ LocalExecutionPlanContext context =
createLocalExecutionPlanContext(typeProvider);
+ context.setDegreeOfParallelism(1);
+
+ List<Operator> childrenOperator =
+
operatorTreeGenerator.dealWithConsumeAllChildrenPipelineBreaker(timeJoinNode,
context);
+ assertEquals(0, context.getPipelineNumber());
+ assertEquals(4, childrenOperator.size());
+ assertEquals(4, timeJoinNode.getChildren().size());
+ for (int i = 0; i < 4; i++) {
+ assertEquals(SeriesScanOperator.class,
childrenOperator.get(i).getClass());
+ assertEquals(SeriesScanNode.class,
timeJoinNode.getChildren().get(i).getClass());
+ assertEquals(
+ String.format("root.sg.d%d.s1", i),
+ timeJoinNode.getChildren().get(i).getOutputColumnNames().get(0));
+ }
+
+ // Validate the number exchange operator
+ assertEquals(0, context.getExchangeSumNum());
+ }
+
+ /**
+ * This test will test dop = 2. Expected result is two pipelines:
+ *
+ * <p>The first is: TimeJoin1 - [SeriesScan1, SeriesScan0, ExchangeOperator];
+ *
+ * <p>The second is: ExchangeOperator - TimeJoin1-1[SeriesScan2,
SeriesScan3].
+ */
+ @Test
+ public void testConsumeAllChildrenPipelineBuilder2() throws
IllegalPathException {
+ TypeProvider typeProvider = new TypeProvider();
+ TimeJoinNode timeJoinNode = initTimeJoinNode(typeProvider, 4);
+ LocalExecutionPlanContext context =
createLocalExecutionPlanContext(typeProvider);
+ context.setDegreeOfParallelism(2);
+
+ List<Operator> childrenOperator =
+
operatorTreeGenerator.dealWithConsumeAllChildrenPipelineBreaker(timeJoinNode,
context);
+ // The number of pipeline is 1, since parent pipeline hasn't joined
+ assertEquals(1, context.getPipelineNumber());
+
+ // Validate the first pipeline
+ assertEquals(3, childrenOperator.size());
+ assertEquals(3, timeJoinNode.getChildren().size());
+ for (int i = 0; i < 2; i++) {
+ assertEquals(SeriesScanOperator.class,
childrenOperator.get(i).getClass());
+ assertEquals(SeriesScanNode.class,
timeJoinNode.getChildren().get(i).getClass());
+ }
+ assertEquals(ExchangeOperator.class, childrenOperator.get(2).getClass());
+
+ // Validate the changes of node structure
+ assertEquals("root.sg.d0.s1",
timeJoinNode.getChildren().get(0).getOutputColumnNames().get(0));
+ assertEquals("root.sg.d1.s1",
timeJoinNode.getChildren().get(1).getOutputColumnNames().get(0));
+ assertEquals(TimeJoinNode.class,
timeJoinNode.getChildren().get(2).getClass());
+
+ // Validate the second pipeline
+ TimeJoinNode subTimeJoinNode = (TimeJoinNode)
timeJoinNode.getChildren().get(2);
+ assertEquals(2, subTimeJoinNode.getChildren().size());
+ assertEquals(
+ "root.sg.d2.s1",
subTimeJoinNode.getChildren().get(0).getOutputColumnNames().get(0));
+ assertEquals(
+ "root.sg.d3.s1",
subTimeJoinNode.getChildren().get(1).getOutputColumnNames().get(0));
+
+ // Validate the number exchange operator
+ assertEquals(1, context.getExchangeSumNum());
+ }
+
+ /**
+ * This test will test dop = 3. Expected result is three pipelines:
+ *
+ * <p>The first is: TimeJoin1 - [SeriesScan0, ExchangeOperator,
ExchangeOperator];
+ *
+ * <p>The second is: ExchangeOperator - SeriesScan1.
+ *
+ * <p>The third is: ExchangeOperator - TimeJoin1-1[SeriesScan2, SeriesScan3].
+ */
+ @Test
+ public void testConsumeAllChildrenPipelineBuilder3() throws
IllegalPathException {
+ TypeProvider typeProvider = new TypeProvider();
+ TimeJoinNode timeJoinNode = initTimeJoinNode(typeProvider, 4);
+ LocalExecutionPlanContext context =
createLocalExecutionPlanContext(typeProvider);
+ context.setDegreeOfParallelism(3);
+
+ List<Operator> childrenOperator =
+
operatorTreeGenerator.dealWithConsumeAllChildrenPipelineBreaker(timeJoinNode,
context);
+ // The number of pipeline is 2, since parent pipeline hasn't joined
+ assertEquals(2, context.getPipelineNumber());
+
+ // Validate the first pipeline
+ assertEquals(3, childrenOperator.size());
+ assertEquals(SeriesScanOperator.class, childrenOperator.get(0).getClass());
+ assertEquals(ExchangeOperator.class, childrenOperator.get(1).getClass());
+ assertEquals(ExchangeOperator.class, childrenOperator.get(2).getClass());
+
+ // Validate the changes of node structure
+ assertEquals(3, timeJoinNode.getChildren().size());
+ assertEquals("root.sg.d0.s1",
timeJoinNode.getChildren().get(0).getOutputColumnNames().get(0));
+ assertEquals("root.sg.d1.s1",
timeJoinNode.getChildren().get(1).getOutputColumnNames().get(0));
+ assertEquals(TimeJoinNode.class,
timeJoinNode.getChildren().get(2).getClass());
+
+ // Validate the second pipeline
+ ExchangeOperator exchangeOperator1 = (ExchangeOperator)
childrenOperator.get(1);
+ assertEquals("SeriesScanNode1", exchangeOperator1.getSourceId().getId());
+
+ // Validate the third pipeline
+ TimeJoinNode subTimeJoinNode = (TimeJoinNode)
timeJoinNode.getChildren().get(2);
+ assertEquals(2, subTimeJoinNode.getChildren().size());
+ assertEquals(
+ "root.sg.d2.s1",
subTimeJoinNode.getChildren().get(0).getOutputColumnNames().get(0));
+ assertEquals(
+ "root.sg.d3.s1",
subTimeJoinNode.getChildren().get(1).getOutputColumnNames().get(0));
+ ExchangeOperator exchangeOperator2 = (ExchangeOperator)
childrenOperator.get(2);
+ assertEquals(exchangeOperator2.getSourceId(),
subTimeJoinNode.getPlanNodeId());
+
+ // Validate the number exchange operator
+ assertEquals(2, context.getExchangeSumNum());
+ }
+
+ /**
+ * This test will test dop = 4. Expected result is four pipelines:
+ *
+ * <p>The first is: TimeJoin1 - [SeriesScan0, ExchangeOperator,
ExchangeOperator,
+ * ExchangeOperator];
+ *
+ * <p>The second is: ExchangeOperator - SeriesScan1.
+ *
+ * <p>The third is: ExchangeOperator - SeriesScan2.
+ *
+ * <p>The forth is: ExchangeOperator - SeriesScan3.
+ */
+ @Test
+ public void testConsumeAllChildrenPipelineBuilder4() throws
IllegalPathException {
+ TypeProvider typeProvider = new TypeProvider();
+ TimeJoinNode timeJoinNode = initTimeJoinNode(typeProvider, 4);
+ LocalExecutionPlanContext context =
createLocalExecutionPlanContext(typeProvider);
+ context.setDegreeOfParallelism(4);
+
+ List<Operator> childrenOperator =
+
operatorTreeGenerator.dealWithConsumeAllChildrenPipelineBreaker(timeJoinNode,
context);
+ // The number of pipeline is 3, since parent pipeline hasn't joined
+ assertEquals(3, context.getPipelineNumber());
+
+ // Validate the first pipeline
+ assertEquals(4, childrenOperator.size());
+ assertEquals(SeriesScanOperator.class, childrenOperator.get(0).getClass());
+ assertEquals(ExchangeOperator.class, childrenOperator.get(1).getClass());
+ assertEquals(ExchangeOperator.class, childrenOperator.get(2).getClass());
+ assertEquals(ExchangeOperator.class, childrenOperator.get(3).getClass());
+
+ // Validate the changes of node structure
+ assertEquals(4, timeJoinNode.getChildren().size());
+ for (int i = 0; i < 4; i++) {
+ assertEquals(SeriesScanNode.class,
timeJoinNode.getChildren().get(i).getClass());
+ assertEquals(
+ String.format("root.sg.d%d.s1", i),
+ timeJoinNode.getChildren().get(i).getOutputColumnNames().get(0));
+ }
+
+ // Validate the second pipeline
+ ExchangeOperator exchangeOperator1 = (ExchangeOperator)
childrenOperator.get(1);
+ assertEquals("SeriesScanNode1", exchangeOperator1.getSourceId().getId());
+
+ // Validate the third pipeline
+ ExchangeOperator exchangeOperator2 = (ExchangeOperator)
childrenOperator.get(2);
+ assertEquals("SeriesScanNode2", exchangeOperator2.getSourceId().getId());
+
+ // Validate the forth pipeline
+ ExchangeOperator exchangeOperator3 = (ExchangeOperator)
childrenOperator.get(3);
+ assertEquals("SeriesScanNode3", exchangeOperator3.getSourceId().getId());
+
+ // Validate the number exchange operator
+ assertEquals(3, context.getExchangeSumNum());
+ }
+
+ /**
+ * This test will test dop = 5. Expected result is five pipelines:
+ *
+ * <p>The first is: TimeJoin1 - [ExchangeOperator, ExchangeOperator,
ExchangeOperator,
+ * ExchangeOperator];
+ *
+ * <p>The second is: ExchangeOperator - SeriesScan0.
+ *
+ * <p>The third is: ExchangeOperator - SeriesScan1.
+ *
+ * <p>The forth is: ExchangeOperator - SeriesScan2.
+ *
+ * <p>The fifth is: ExchangeOperator - SeriesScan3.
+ */
+ @Test
+ public void testConsumeAllChildrenPipelineBuilder5() throws
IllegalPathException {
+ TypeProvider typeProvider = new TypeProvider();
+ TimeJoinNode timeJoinNode = initTimeJoinNode(typeProvider, 4);
+ LocalExecutionPlanContext context =
createLocalExecutionPlanContext(typeProvider);
+ context.setDegreeOfParallelism(5);
+
+ List<Operator> childrenOperator =
+
operatorTreeGenerator.dealWithConsumeAllChildrenPipelineBreaker(timeJoinNode,
context);
+ // The number of pipeline is 4, since parent pipeline hasn't joined
+ assertEquals(4, context.getPipelineNumber());
+
+ // Validate the first pipeline
+ assertEquals(4, childrenOperator.size());
+ for (int i = 0; i < 4; i++) {
+ assertEquals(ExchangeOperator.class, childrenOperator.get(i).getClass());
+ }
+
+ // Validate the changes of node structure
+ assertEquals(4, timeJoinNode.getChildren().size());
+ for (int i = 0; i < 4; i++) {
+ assertEquals(SeriesScanNode.class,
timeJoinNode.getChildren().get(i).getClass());
+ assertEquals(
+ String.format("root.sg.d%d.s1", i),
+ timeJoinNode.getChildren().get(i).getOutputColumnNames().get(0));
+ }
+
+ // Validate the second pipeline
+ ExchangeOperator exchangeOperator1 = (ExchangeOperator)
childrenOperator.get(0);
+ assertEquals("SeriesScanNode0", exchangeOperator1.getSourceId().getId());
+
+ // Validate the third pipeline
+ ExchangeOperator exchangeOperator2 = (ExchangeOperator)
childrenOperator.get(1);
+ assertEquals("SeriesScanNode1", exchangeOperator2.getSourceId().getId());
+
+ // Validate the forth pipeline
+ ExchangeOperator exchangeOperator3 = (ExchangeOperator)
childrenOperator.get(2);
+ assertEquals("SeriesScanNode2", exchangeOperator3.getSourceId().getId());
+
+ // Validate the fifth pipeline
+ ExchangeOperator exchangeOperator4 = (ExchangeOperator)
childrenOperator.get(3);
+ assertEquals("SeriesScanNode3", exchangeOperator4.getSourceId().getId());
+
+ // Validate the number exchange operator
+ assertEquals(4, context.getExchangeSumNum());
+ }
+
+ /**
+ * This test will test dop = 6. Expected result is still five pipelines:
+ *
+ * <p>The first is: TimeJoin1 - [ExchangeOperator, ExchangeOperator,
ExchangeOperator,
+ * ExchangeOperator];
+ *
+ * <p>The second is: ExchangeOperator - SeriesScan0.
+ *
+ * <p>The third is: ExchangeOperator - SeriesScan1.
+ *
+ * <p>The forth is: ExchangeOperator - SeriesScan2.
+ *
+ * <p>The fifth is: ExchangeOperator - SeriesScan3.
+ */
+ @Test
+ public void testConsumeAllChildrenPipelineBuilder6() throws
IllegalPathException {
+ TypeProvider typeProvider = new TypeProvider();
+ TimeJoinNode timeJoinNode = initTimeJoinNode(typeProvider, 4);
+ LocalExecutionPlanContext context =
createLocalExecutionPlanContext(typeProvider);
+ context.setDegreeOfParallelism(6);
+
+ List<Operator> childrenOperator =
+
operatorTreeGenerator.dealWithConsumeAllChildrenPipelineBreaker(timeJoinNode,
context);
+ // The number of pipeline is 4, since parent pipeline hasn't joined
+ assertEquals(4, context.getPipelineNumber());
+
+ // Validate the first pipeline
+ assertEquals(4, childrenOperator.size());
+ for (int i = 0; i < 4; i++) {
+ assertEquals(ExchangeOperator.class, childrenOperator.get(i).getClass());
+ }
+
+ // Validate the changes of node structure
+ assertEquals(4, timeJoinNode.getChildren().size());
+ for (int i = 0; i < 4; i++) {
+ assertEquals(SeriesScanNode.class,
timeJoinNode.getChildren().get(i).getClass());
+ assertEquals(
+ String.format("root.sg.d%d.s1", i),
+ timeJoinNode.getChildren().get(i).getOutputColumnNames().get(0));
+ }
+
+ // Validate the second pipeline
+ ExchangeOperator exchangeOperator1 = (ExchangeOperator)
childrenOperator.get(0);
+ assertEquals("SeriesScanNode0", exchangeOperator1.getSourceId().getId());
+
+ // Validate the third pipeline
+ ExchangeOperator exchangeOperator2 = (ExchangeOperator)
childrenOperator.get(1);
+ assertEquals("SeriesScanNode1", exchangeOperator2.getSourceId().getId());
+
+ // Validate the forth pipeline
+ ExchangeOperator exchangeOperator3 = (ExchangeOperator)
childrenOperator.get(2);
+ assertEquals("SeriesScanNode2", exchangeOperator3.getSourceId().getId());
+
+ // Validate the fifth pipeline
+ ExchangeOperator exchangeOperator4 = (ExchangeOperator)
childrenOperator.get(3);
+ assertEquals("SeriesScanNode3", exchangeOperator4.getSourceId().getId());
+
+ // Validate the number exchange operator
+ assertEquals(4, context.getExchangeSumNum());
+ }
+
+ /**
+ * The operator structure is [DeviceView -
[SeriesScan0,SeriesScan1,SeriesScan2,SeriesScan3]].
+ *
+ * <p>The next six tests, I will test this DeviceViewOperator with different
dop.
+ *
+ * <p>The first test will test dop = 1. Expected result is that no child
pipelines will be
+ * divided.
+ */
+ @Test
+ public void testConsumeOneByOneChildrenPipelineBuilder1() throws
IllegalPathException {
+ TypeProvider typeProvider = new TypeProvider();
+ DeviceViewNode deviceViewNode = initDeviceViewNode(typeProvider, 4);
+ LocalExecutionPlanContext context =
createLocalExecutionPlanContext(typeProvider);
+ context.setDegreeOfParallelism(1);
+
+ List<Operator> childrenOperator =
+
operatorTreeGenerator.dealWithConsumeChildrenOneByOneNode(deviceViewNode,
context);
+ assertEquals(0, context.getPipelineNumber());
+ assertEquals(4, childrenOperator.size());
+ for (int i = 0; i < 4; i++) {
+ assertEquals(AlignedSeriesScanOperator.class,
childrenOperator.get(i).getClass());
+ assertEquals(
+ String.format("root.sg.d%d.s1", i),
+ deviceViewNode.getChildren().get(i).getOutputColumnNames().get(0));
+ }
+
+ // Validate the number exchange operator
+ assertEquals(0, context.getExchangeSumNum());
+ }
+
+ /**
+ * This test will test dop = 2. Expected result is five pipelines with
dependency:
+ *
+ * <p>The first is: DeviceView - [ExchangeOperator, ExchangeOperator,
ExchangeOperator,
+ * ExchangeOperator];
+ *
+ * <p>The second is: ExchangeOperator - SeriesScan0.
+ *
+ * <p>The third is: ExchangeOperator - SeriesScan1, which has dependency
second pipeline.
+ *
+ * <p>The forth is: ExchangeOperator - SeriesScan2, which has dependency
third pipeline.
+ *
+ * <p>The fifth is: ExchangeOperator - SeriesScan3, which has dependency
forth pipeline.
+ */
+ @Test
+ public void testConsumeOneByOneChildrenPipelineBuilder2() throws
IllegalPathException {
+ TypeProvider typeProvider = new TypeProvider();
+ DeviceViewNode deviceViewNode = initDeviceViewNode(typeProvider, 4);
+ LocalExecutionPlanContext context =
createLocalExecutionPlanContext(typeProvider);
+ context.setDegreeOfParallelism(2);
+
+ List<Operator> childrenOperator =
+
operatorTreeGenerator.dealWithConsumeChildrenOneByOneNode(deviceViewNode,
context);
+ // The number of pipeline is 4, since parent pipeline hasn't joined
+ assertEquals(4, context.getPipelineNumber());
+
+ // Validate the first pipeline
+ assertEquals(4, childrenOperator.size());
+ for (int i = 0; i < 4; i++) {
+ assertEquals(ExchangeOperator.class, childrenOperator.get(i).getClass());
+ }
+
+ // Validate the second pipeline
+ ExchangeOperator exchangeOperator1 = (ExchangeOperator)
childrenOperator.get(0);
+ assertEquals("AlignedSeriesScanNode0",
exchangeOperator1.getSourceId().getId());
+ assertEquals(-1,
context.getPipelineDriverFactories().get(0).getDependencyPipelineIndex());
+
+ // Validate the third pipeline
+ ExchangeOperator exchangeOperator2 = (ExchangeOperator)
childrenOperator.get(1);
+ assertEquals("AlignedSeriesScanNode1",
exchangeOperator2.getSourceId().getId());
+ assertEquals(0,
context.getPipelineDriverFactories().get(1).getDependencyPipelineIndex());
+
+ // Validate the forth pipeline
+ ExchangeOperator exchangeOperator3 = (ExchangeOperator)
childrenOperator.get(2);
+ assertEquals("AlignedSeriesScanNode2",
exchangeOperator3.getSourceId().getId());
+ assertEquals(1,
context.getPipelineDriverFactories().get(2).getDependencyPipelineIndex());
+
+ // Validate the fifth pipeline
+ ExchangeOperator exchangeOperator4 = (ExchangeOperator)
childrenOperator.get(3);
+ assertEquals("AlignedSeriesScanNode3",
exchangeOperator4.getSourceId().getId());
+ assertEquals(2,
context.getPipelineDriverFactories().get(3).getDependencyPipelineIndex());
+
+ // Validate the number exchange operator
+ assertEquals(1, context.getExchangeSumNum());
+ }
+
+ /**
+ * This test will test dop = 3. Expected result is five pipelines with
dependency:
+ *
+ * <p>The first is: DeviceView - [ExchangeOperator, ExchangeOperator,
ExchangeOperator,
+ * ExchangeOperator];
+ *
+ * <p>The second is: ExchangeOperator - SeriesScan0.
+ *
+ * <p>The third is: ExchangeOperator - SeriesScan1.
+ *
+ * <p>The forth is: ExchangeOperator - SeriesScan2, which has dependency
second pipeline.
+ *
+ * <p>The fifth is: ExchangeOperator - SeriesScan3, which has dependency
third pipeline.
+ */
+ @Test
+ public void testConsumeOneByOneChildrenPipelineBuilder3() throws
IllegalPathException {
+ TypeProvider typeProvider = new TypeProvider();
+ DeviceViewNode deviceViewNode = initDeviceViewNode(typeProvider, 4);
+ LocalExecutionPlanContext context =
createLocalExecutionPlanContext(typeProvider);
+ context.setDegreeOfParallelism(3);
+
+ List<Operator> childrenOperator =
+
operatorTreeGenerator.dealWithConsumeChildrenOneByOneNode(deviceViewNode,
context);
+ // The number of pipeline is 4, since parent pipeline hasn't joined
+ assertEquals(4, context.getPipelineNumber());
+
+ // Validate the first pipeline
+ assertEquals(4, childrenOperator.size());
+ for (int i = 0; i < 4; i++) {
+ assertEquals(ExchangeOperator.class, childrenOperator.get(i).getClass());
+ }
+
+ // Validate the second pipeline
+ ExchangeOperator exchangeOperator1 = (ExchangeOperator)
childrenOperator.get(0);
+ assertEquals("AlignedSeriesScanNode0",
exchangeOperator1.getSourceId().getId());
+ assertEquals(-1,
context.getPipelineDriverFactories().get(0).getDependencyPipelineIndex());
+
+ // Validate the third pipeline
+ ExchangeOperator exchangeOperator2 = (ExchangeOperator)
childrenOperator.get(1);
+ assertEquals("AlignedSeriesScanNode1",
exchangeOperator2.getSourceId().getId());
+ assertEquals(-1,
context.getPipelineDriverFactories().get(1).getDependencyPipelineIndex());
+
+ // Validate the forth pipeline
+ ExchangeOperator exchangeOperator3 = (ExchangeOperator)
childrenOperator.get(2);
+ assertEquals("AlignedSeriesScanNode2",
exchangeOperator3.getSourceId().getId());
+ assertEquals(0,
context.getPipelineDriverFactories().get(2).getDependencyPipelineIndex());
+
+ // Validate the fifth pipeline
+ ExchangeOperator exchangeOperator4 = (ExchangeOperator)
childrenOperator.get(3);
+ assertEquals("AlignedSeriesScanNode3",
exchangeOperator4.getSourceId().getId());
+ assertEquals(1,
context.getPipelineDriverFactories().get(3).getDependencyPipelineIndex());
+
+ // Validate the number exchange operator
+ assertEquals(2, context.getExchangeSumNum());
+ }
+
+ /**
+ * This test will test dop = 4. Expected result is five pipelines with
dependency:
+ *
+ * <p>The first is: DeviceView - [ExchangeOperator, ExchangeOperator,
ExchangeOperator,
+ * ExchangeOperator];
+ *
+ * <p>The second is: ExchangeOperator - SeriesScan0.
+ *
+ * <p>The third is: ExchangeOperator - SeriesScan1.
+ *
+ * <p>The forth is: ExchangeOperator - SeriesScan2.
+ *
+ * <p>The fifth is: ExchangeOperator - SeriesScan3, which has dependency
second pipeline.
+ */
+ @Test
+ public void testConsumeOneByOneChildrenPipelineBuilder4() throws
IllegalPathException {
+ TypeProvider typeProvider = new TypeProvider();
+ DeviceViewNode deviceViewNode = initDeviceViewNode(typeProvider, 4);
+ LocalExecutionPlanContext context =
createLocalExecutionPlanContext(typeProvider);
+ context.setDegreeOfParallelism(4);
+
+ List<Operator> childrenOperator =
+
operatorTreeGenerator.dealWithConsumeChildrenOneByOneNode(deviceViewNode,
context);
+ // The number of pipeline is 4, since parent pipeline hasn't joined
+ assertEquals(4, context.getPipelineNumber());
+
+ // Validate the first pipeline
+ assertEquals(4, childrenOperator.size());
+ for (int i = 0; i < 4; i++) {
+ assertEquals(ExchangeOperator.class, childrenOperator.get(i).getClass());
+ }
+
+ // Validate the second pipeline
+ ExchangeOperator exchangeOperator1 = (ExchangeOperator)
childrenOperator.get(0);
+ assertEquals("AlignedSeriesScanNode0",
exchangeOperator1.getSourceId().getId());
+ assertEquals(-1,
context.getPipelineDriverFactories().get(0).getDependencyPipelineIndex());
+
+ // Validate the third pipeline
+ ExchangeOperator exchangeOperator2 = (ExchangeOperator)
childrenOperator.get(1);
+ assertEquals("AlignedSeriesScanNode1",
exchangeOperator2.getSourceId().getId());
+ assertEquals(-1,
context.getPipelineDriverFactories().get(1).getDependencyPipelineIndex());
+
+ // Validate the forth pipeline
+ ExchangeOperator exchangeOperator3 = (ExchangeOperator)
childrenOperator.get(2);
+ assertEquals("AlignedSeriesScanNode2",
exchangeOperator3.getSourceId().getId());
+ assertEquals(-1,
context.getPipelineDriverFactories().get(2).getDependencyPipelineIndex());
+
+ // Validate the fifth pipeline
+ ExchangeOperator exchangeOperator4 = (ExchangeOperator)
childrenOperator.get(3);
+ assertEquals("AlignedSeriesScanNode3",
exchangeOperator4.getSourceId().getId());
+ assertEquals(0,
context.getPipelineDriverFactories().get(3).getDependencyPipelineIndex());
+
+ // Validate the number exchange operator
+ assertEquals(3, context.getExchangeSumNum());
+ }
+
+ /**
+ * This test will test dop = 5. Expected result is five pipelines without
dependency:
+ *
+ * <p>The first is: DeviceView - [ExchangeOperator, ExchangeOperator,
ExchangeOperator,
+ * ExchangeOperator];
+ *
+ * <p>The second is: ExchangeOperator - SeriesScan0.
+ *
+ * <p>The third is: ExchangeOperator - SeriesScan1.
+ *
+ * <p>The forth is: ExchangeOperator - SeriesScan2.
+ *
+ * <p>The fifth is: ExchangeOperator - SeriesScan3.
+ */
+ @Test
+ public void testConsumeOneByOneChildrenPipelineBuilder5() throws
IllegalPathException {
+ TypeProvider typeProvider = new TypeProvider();
+ DeviceViewNode deviceViewNode = initDeviceViewNode(typeProvider, 4);
+ LocalExecutionPlanContext context =
createLocalExecutionPlanContext(typeProvider);
+ context.setDegreeOfParallelism(5);
+
+ List<Operator> childrenOperator =
+
operatorTreeGenerator.dealWithConsumeChildrenOneByOneNode(deviceViewNode,
context);
+ // The number of pipeline is 4, since parent pipeline hasn't joined
+ assertEquals(4, context.getPipelineNumber());
+
+ // Validate the first pipeline
+ assertEquals(4, childrenOperator.size());
+ for (int i = 0; i < 4; i++) {
+ assertEquals(ExchangeOperator.class, childrenOperator.get(i).getClass());
+ }
+
+ // Validate the second pipeline
+ ExchangeOperator exchangeOperator1 = (ExchangeOperator)
childrenOperator.get(0);
+ assertEquals("AlignedSeriesScanNode0",
exchangeOperator1.getSourceId().getId());
+ assertEquals(-1,
context.getPipelineDriverFactories().get(0).getDependencyPipelineIndex());
+
+ // Validate the third pipeline
+ ExchangeOperator exchangeOperator2 = (ExchangeOperator)
childrenOperator.get(1);
+ assertEquals("AlignedSeriesScanNode1",
exchangeOperator2.getSourceId().getId());
+ assertEquals(-1,
context.getPipelineDriverFactories().get(1).getDependencyPipelineIndex());
+
+ // Validate the forth pipeline
+ ExchangeOperator exchangeOperator3 = (ExchangeOperator)
childrenOperator.get(2);
+ assertEquals("AlignedSeriesScanNode2",
exchangeOperator3.getSourceId().getId());
+ assertEquals(-1,
context.getPipelineDriverFactories().get(2).getDependencyPipelineIndex());
+
+ // Validate the fifth pipeline
+ ExchangeOperator exchangeOperator4 = (ExchangeOperator)
childrenOperator.get(3);
+ assertEquals("AlignedSeriesScanNode3",
exchangeOperator4.getSourceId().getId());
+ assertEquals(-1,
context.getPipelineDriverFactories().get(3).getDependencyPipelineIndex());
+
+ // Validate the number exchange operator
+ assertEquals(4, context.getExchangeSumNum());
+ }
+
+ /**
+ * This test will test dop = 5. Expected result is five pipelines without
dependency:
+ *
+ * <p>The first is: DeviceView - [ExchangeOperator, ExchangeOperator,
ExchangeOperator,
+ * ExchangeOperator];
+ *
+ * <p>The second is: ExchangeOperator - SeriesScan0.
+ *
+ * <p>The third is: ExchangeOperator - SeriesScan1.
+ *
+ * <p>The forth is: ExchangeOperator - SeriesScan2.
+ *
+ * <p>The fifth is: ExchangeOperator - SeriesScan3.
+ */
+ @Test
+ public void testConsumeOneByOneChildrenPipelineBuilder6() throws
IllegalPathException {
+ TypeProvider typeProvider = new TypeProvider();
+ DeviceViewNode deviceViewNode = initDeviceViewNode(typeProvider, 4);
+ LocalExecutionPlanContext context =
createLocalExecutionPlanContext(typeProvider);
+ context.setDegreeOfParallelism(5);
+
+ List<Operator> childrenOperator =
+
operatorTreeGenerator.dealWithConsumeChildrenOneByOneNode(deviceViewNode,
context);
+ // The number of pipeline is 4, since parent pipeline hasn't joined
+ assertEquals(4, context.getPipelineNumber());
+
+ // Validate the first pipeline
+ assertEquals(4, childrenOperator.size());
+ for (int i = 0; i < 4; i++) {
+ assertEquals(ExchangeOperator.class, childrenOperator.get(i).getClass());
+ }
+
+ // Validate the second pipeline
+ ExchangeOperator exchangeOperator1 = (ExchangeOperator)
childrenOperator.get(0);
+ assertEquals("AlignedSeriesScanNode0",
exchangeOperator1.getSourceId().getId());
+ assertEquals(-1,
context.getPipelineDriverFactories().get(0).getDependencyPipelineIndex());
+
+ // Validate the third pipeline
+ ExchangeOperator exchangeOperator2 = (ExchangeOperator)
childrenOperator.get(1);
+ assertEquals("AlignedSeriesScanNode1",
exchangeOperator2.getSourceId().getId());
+ assertEquals(-1,
context.getPipelineDriverFactories().get(1).getDependencyPipelineIndex());
+
+ // Validate the forth pipeline
+ ExchangeOperator exchangeOperator3 = (ExchangeOperator)
childrenOperator.get(2);
+ assertEquals("AlignedSeriesScanNode2",
exchangeOperator3.getSourceId().getId());
+ assertEquals(-1,
context.getPipelineDriverFactories().get(2).getDependencyPipelineIndex());
+
+ // Validate the fifth pipeline
+ ExchangeOperator exchangeOperator4 = (ExchangeOperator)
childrenOperator.get(3);
+ assertEquals("AlignedSeriesScanNode3",
exchangeOperator4.getSourceId().getId());
+ assertEquals(-1,
context.getPipelineDriverFactories().get(3).getDependencyPipelineIndex());
+
+ // Validate the number exchange operator
+ assertEquals(4, context.getExchangeSumNum());
+ }
+
+ @Test
+ public void testGetChildNumInEachPipeline() {
+ List<PlanNode> allChildren = new ArrayList<>();
+ allChildren.add(new SeriesScanNode(new PlanNodeId("localNode1"), null));
+ allChildren.add(new ExchangeNode(new PlanNodeId("remoteNode1")));
+ allChildren.add(new SeriesScanNode(new PlanNodeId("localNode2"), null));
+
+ int[] childNumInEachPipeline =
+ operatorTreeGenerator.getChildNumInEachPipeline(allChildren, 2, 3);
+ assertEquals(2, childNumInEachPipeline.length);
+ assertEquals(2, childNumInEachPipeline[0]);
+ assertEquals(1, childNumInEachPipeline[1]);
+
+ allChildren.add(new SeriesScanNode(new PlanNodeId("localNode3"), null));
+ allChildren.add(new SeriesScanNode(new PlanNodeId("localNode4"), null));
+ allChildren.add(new ExchangeNode(new PlanNodeId("remoteNode2")));
+ allChildren.add(new ExchangeNode(new PlanNodeId("remoteNode3")));
+ allChildren.add(new SeriesScanNode(new PlanNodeId("localNode5"), null));
+ allChildren.add(new ExchangeNode(new PlanNodeId("remoteNode4")));
+ childNumInEachPipeline =
operatorTreeGenerator.getChildNumInEachPipeline(allChildren, 5, 3);
+ assertEquals(3, childNumInEachPipeline.length);
+ assertEquals(2, childNumInEachPipeline[0]);
+ assertEquals(2, childNumInEachPipeline[1]);
+ assertEquals(5, childNumInEachPipeline[2]);
+ }
+
+ private LocalExecutionPlanContext
createLocalExecutionPlanContext(TypeProvider typeProvider) {
+ ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1,
"test-instance-notification");
+
+ QueryId queryId = new QueryId("stub_query");
+ FragmentInstanceId instanceId =
+ new FragmentInstanceId(new PlanFragmentId(queryId, 0),
"stub-instance");
+ FragmentInstanceStateMachine stateMachine =
+ new FragmentInstanceStateMachine(instanceId,
instanceNotificationExecutor);
+ DataRegion dataRegion = Mockito.mock(DataRegion.class);
+ FragmentInstanceContext fragmentInstanceContext =
+ createFragmentInstanceContext(instanceId, stateMachine);
+ fragmentInstanceContext.setDataRegion(dataRegion);
+
+ return new LocalExecutionPlanContext(typeProvider,
fragmentInstanceContext);
+ }
+
+ /**
+ * This method will init a timeJoinNode with @childNum seriesScanNode as
children.
+ *
+ * @param childNum the number of children
+ * @return a timeJoinNode with @childNum seriesScanNode as children
+ */
+ private TimeJoinNode initTimeJoinNode(TypeProvider typeProvider, int
childNum)
+ throws IllegalPathException {
+ TimeJoinNode timeJoinNode = new TimeJoinNode(new
PlanNodeId("TimeJoinNode"), Ordering.ASC);
+ for (int i = 0; i < childNum; i++) {
+ SeriesScanNode seriesScanNode =
+ new SeriesScanNode(
+ new PlanNodeId(String.format("SeriesScanNode%d", i)),
+ new MeasurementPath(String.format("root.sg.d%d.s1", i),
TSDataType.INT32));
+ typeProvider.setType(seriesScanNode.getSeriesPath().toString(),
TSDataType.INT32);
+ timeJoinNode.addChild(seriesScanNode);
+ }
+ return timeJoinNode;
+ }
+
+ /**
+ * This method will init a DeviceViewNode with @childNum
alignedSeriesScanNode as children.
+ *
+ * @param childNum the number of children
+ * @return a DeviceViewNode with @childNum alignedSeriesScanNode as children
+ */
+ private DeviceViewNode initDeviceViewNode(TypeProvider typeProvider, int
childNum)
+ throws IllegalPathException {
+ DeviceViewNode deviceViewNode =
+ new DeviceViewNode(new PlanNodeId("DeviceViewNode"), null, null, null);
+ for (int i = 0; i < childNum; i++) {
+ AlignedSeriesScanNode alignedSeriesScanNode =
+ new AlignedSeriesScanNode(
+ new PlanNodeId(String.format("AlignedSeriesScanNode%d", i)),
+ new AlignedPath(String.format("root.sg.d%d", i), "s1"));
+ deviceViewNode.addChild(alignedSeriesScanNode);
+ }
+ return deviceViewNode;
+ }
+}