This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new c577012169 [IOTDB-4005] Optimize the pipeline build logic for 
consumeAllNode
c577012169 is described below

commit c577012169cbed7e5c3a2c7a3f2471e44f8a3841
Author: Xiangwei Wei <[email protected]>
AuthorDate: Wed Feb 15 19:05:11 2023 +0800

    [IOTDB-4005] Optimize the pipeline build logic for consumeAllNode
---
 .../iotdb/it/env/cluster/MppCommonConfig.java      |   6 +
 .../it/env/cluster/MppSharedCommonConfig.java      |   7 +
 .../iotdb/it/env/remote/RemoteCommonConfig.java    |   5 +
 .../org/apache/iotdb/itbase/env/CommonConfig.java  |   2 +
 .../it/aggregation/IoTDBAggregationByLevel2IT.java |  40 +++++
 .../it/aggregation/IoTDBAggregationByLevelIT.java  |   4 +-
 .../db/it/alignbydevice/IoTDBAlignByDevice2IT.java |  40 +++++
 .../db/it/alignbydevice/IoTDBAlignByDeviceIT.java  |   2 +-
 .../IoTDBOrderByWithAlignByDevice2IT.java          |  40 +++++
 .../IoTDBOrderByWithAlignByDeviceIT.java           |   2 +-
 .../db/it/aligned/IoTDBAlignedSeriesQuery4IT.java  |  54 +++++++
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |   2 +-
 .../db/mpp/plan/planner/OperatorTreeGenerator.java | 171 +++++++++++++--------
 .../plan/node/process/HorizontallyConcatNode.java  |   1 -
 .../db/mpp/plan/plan/PipelineBuilderTest.java      |   9 +-
 15 files changed, 310 insertions(+), 75 deletions(-)

diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppCommonConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppCommonConfig.java
index 37c86f1746..389d0f383a 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppCommonConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppCommonConfig.java
@@ -317,6 +317,12 @@ public class MppCommonConfig extends MppBaseConfig 
implements CommonConfig {
     return this;
   }
 
+  @Override
+  public CommonConfig setDegreeOfParallelism(int degreeOfParallelism) {
+    setProperty("degree_of_query_parallelism", 
String.valueOf(degreeOfParallelism));
+    return this;
+  }
+
   @Override
   public CommonConfig setDataRatisTriggerSnapshotThreshold(long threshold) {
     setProperty("data_region_ratis_snapshot_trigger_threshold", 
String.valueOf(threshold));
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppSharedCommonConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppSharedCommonConfig.java
index 9ff5b627d1..26e6a1d3ba 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppSharedCommonConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppSharedCommonConfig.java
@@ -323,6 +323,13 @@ public class MppSharedCommonConfig implements CommonConfig 
{
     return this;
   }
 
+  @Override
+  public CommonConfig setDegreeOfParallelism(int degreeOfParallelism) {
+    cnConfig.setDegreeOfParallelism(degreeOfParallelism);
+    dnConfig.setDegreeOfParallelism(degreeOfParallelism);
+    return this;
+  }
+
   @Override
   public CommonConfig setDataRatisTriggerSnapshotThreshold(long threshold) {
     cnConfig.setDataRatisTriggerSnapshotThreshold(threshold);
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/RemoteCommonConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/RemoteCommonConfig.java
index 71d6e7642b..9312a0f52d 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/RemoteCommonConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/RemoteCommonConfig.java
@@ -232,6 +232,11 @@ public class RemoteCommonConfig implements CommonConfig {
     return this;
   }
 
+  @Override
+  public CommonConfig setDegreeOfParallelism(int degreeOfParallelism) {
+    return this;
+  }
+
   @Override
   public CommonConfig setDataRatisTriggerSnapshotThreshold(long threshold) {
     return this;
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java 
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
index 50ac53bb86..81a50b6ebe 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
@@ -105,6 +105,8 @@ public interface CommonConfig {
 
   CommonConfig setQueryThreadCount(int queryThreadCount);
 
+  CommonConfig setDegreeOfParallelism(int degreeOfParallelism);
+
   CommonConfig setDataRatisTriggerSnapshotThreshold(long threshold);
 
   CommonConfig setSeriesSlotNum(int seriesSlotNum);
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBAggregationByLevel2IT.java
 
b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBAggregationByLevel2IT.java
new file mode 100644
index 0000000000..83a5086f7e
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBAggregationByLevel2IT.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.it.aggregation;
+
+import org.apache.iotdb.it.env.EnvFactory;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+public class IoTDBAggregationByLevel2IT extends IoTDBAggregationByLevelIT {
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    
EnvFactory.getEnv().getConfig().getCommonConfig().setDegreeOfParallelism(4);
+    EnvFactory.getEnv().initClusterEnvironment();
+    prepareData();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    EnvFactory.getEnv().cleanClusterEnvironment();
+  }
+}
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBAggregationByLevelIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBAggregationByLevelIT.java
index 97136f6b7e..50aac6cfc1 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBAggregationByLevelIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBAggregationByLevelIT.java
@@ -52,7 +52,7 @@ import static org.junit.Assert.fail;
 @Category({LocalStandaloneIT.class, ClusterIT.class})
 public class IoTDBAggregationByLevelIT {
 
-  private static final String[] dataSet =
+  protected static final String[] dataSet =
       new String[] {
         "CREATE DATABASE root.sg1",
         "CREATE DATABASE root.sg2",
@@ -695,7 +695,7 @@ public class IoTDBAggregationByLevelIT {
     }
   }
 
-  private static void prepareData() {
+  protected static void prepareData() {
     try (Connection connection = EnvFactory.getEnv().getConnection();
         Statement statement = connection.createStatement()) {
 
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBAlignByDevice2IT.java
 
b/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBAlignByDevice2IT.java
new file mode 100644
index 0000000000..efca7a3f54
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBAlignByDevice2IT.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.it.alignbydevice;
+
+import org.apache.iotdb.it.env.EnvFactory;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+public class IoTDBAlignByDevice2IT extends IoTDBAlignByDeviceIT {
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    
EnvFactory.getEnv().getConfig().getCommonConfig().setDegreeOfParallelism(4);
+    EnvFactory.getEnv().initClusterEnvironment();
+    insertData();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    EnvFactory.getEnv().cleanClusterEnvironment();
+  }
+}
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBAlignByDeviceIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBAlignByDeviceIT.java
index 9d264203f6..e9fa955cf9 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBAlignByDeviceIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBAlignByDeviceIT.java
@@ -113,7 +113,7 @@ public class IoTDBAlignByDeviceIT {
     EnvFactory.getEnv().cleanClusterEnvironment();
   }
 
-  private static void insertData() {
+  protected static void insertData() {
     try (Connection connection = EnvFactory.getEnv().getConnection();
         Statement statement = connection.createStatement()) {
 
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBOrderByWithAlignByDevice2IT.java
 
b/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBOrderByWithAlignByDevice2IT.java
new file mode 100644
index 0000000000..519f8e76d9
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBOrderByWithAlignByDevice2IT.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.it.alignbydevice;
+
+import org.apache.iotdb.it.env.EnvFactory;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+public class IoTDBOrderByWithAlignByDevice2IT extends 
IoTDBOrderByWithAlignByDeviceIT {
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    
EnvFactory.getEnv().getConfig().getCommonConfig().setDegreeOfParallelism(4);
+    EnvFactory.getEnv().initClusterEnvironment();
+    insertData();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    EnvFactory.getEnv().cleanClusterEnvironment();
+  }
+}
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBOrderByWithAlignByDeviceIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBOrderByWithAlignByDeviceIT.java
index a6e766475b..fba9177148 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBOrderByWithAlignByDeviceIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBOrderByWithAlignByDeviceIT.java
@@ -89,7 +89,7 @@ public class IoTDBOrderByWithAlignByDeviceIT {
    *
    * 
<p>https://docs.google.com/spreadsheets/d/18XlOIi27ZIIdRnar2WNXVMxkZwjgwlPZmzJLVpZRpAA/edit#gid=0
    */
-  private static void insertData() {
+  protected static void insertData() {
     try (Connection iotDBConnection = EnvFactory.getEnv().getConnection();
         Statement statement = iotDBConnection.createStatement()) {
       // create TimeSeries
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/aligned/IoTDBAlignedSeriesQuery4IT.java
 
b/integration-test/src/test/java/org/apache/iotdb/db/it/aligned/IoTDBAlignedSeriesQuery4IT.java
new file mode 100644
index 0000000000..6c3104d0aa
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/aligned/IoTDBAlignedSeriesQuery4IT.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.it.aligned;
+
+import org.apache.iotdb.db.it.utils.AlignedWriteUtil;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({LocalStandaloneIT.class, ClusterIT.class})
+public class IoTDBAlignedSeriesQuery4IT extends IoTDBAlignedSeriesQueryIT {
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    EnvFactory.getEnv()
+        .getConfig()
+        .getCommonConfig()
+        .setEnableSeqSpaceCompaction(false)
+        .setEnableUnseqSpaceCompaction(false)
+        .setEnableCrossSpaceCompaction(false)
+        .setMaxTsBlockLineNumber(3)
+        .setDegreeOfParallelism(4);
+    EnvFactory.getEnv().initClusterEnvironment();
+    AlignedWriteUtil.insertData();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    EnvFactory.getEnv().cleanClusterEnvironment();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 77aadfda46..749c06efd3 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -318,7 +318,7 @@ public class IoTDBConfig {
   /** How many threads can concurrently execute query statement. When <= 0, 
use CPU core number. */
   private int queryThreadCount = Runtime.getRuntime().availableProcessors();
 
-  private int degreeOfParallelism = Runtime.getRuntime().availableProcessors() 
/ 2;
+  private int degreeOfParallelism = Math.max(1, 
Runtime.getRuntime().availableProcessors() / 2);
 
   /** How many queries can be concurrently executed. When <= 0, use 1000. */
   private int maxAllowedConcurrentQueries = 1000;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index 89880c5ca1..49706b4ebe 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -2258,77 +2258,86 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
       // Keep it since we may change the structure of origin children nodes
       List<PlanNode> afterwardsNodes = new ArrayList<>();
       // 1. Calculate localChildren size
-      int localChildrenSize = 0;
-      for (PlanNode child : node.getChildren()) {
-        if (!(child instanceof ExchangeNode)) {
+      int localChildrenSize = 0, firstChildIndex = -1;
+      for (int i = 0; i < node.getChildren().size(); i++) {
+        if (!(node.getChildren().get(i) instanceof ExchangeNode)) {
           localChildrenSize++;
+          firstChildIndex = firstChildIndex == -1 ? i : firstChildIndex;
+          // deal with exchangeNode at head
+        } else if (firstChildIndex == -1) {
+          Operator childOperation = node.getChildren().get(i).accept(this, 
context);
+          finalExchangeNum += 1;
+          parentPipelineChildren.add(childOperation);
+          afterwardsNodes.add(node.getChildren().get(i));
         }
       }
-      // 2. divide every childNumInEachPipeline localChildren to different 
pipeline
-      int[] childNumInEachPipeline =
-          getChildNumInEachPipeline(
-              node.getChildren(), localChildrenSize, 
context.getDegreeOfParallelism());
-      // If dop > size(children) + 1, we can allocate extra dop to child node
-      // Extra dop = dop - size(children), since dop = 1 means serial but not 0
-      int childGroupNum = Math.min(context.getDegreeOfParallelism(), 
localChildrenSize);
+      if (firstChildIndex == -1) {
+        context.setExchangeSumNum(finalExchangeNum);
+        return parentPipelineChildren;
+      }
+      // If dop > localChildrenSize + 1, we can allocate extra dop to child 
node
+      // Extra dop = dop - localChildrenSize, since dop = 1 means serial but 
not 0
       int dopForChild = Math.max(1, context.getDegreeOfParallelism() - 
localChildrenSize);
-      int startIndex, endIndex = 0;
-      for (int i = 0; i < childGroupNum; i++) {
-        startIndex = endIndex;
-        endIndex += childNumInEachPipeline[i];
-        // Only if dop >= size(children) + 1, split all children to new 
pipeline
-        // Otherwise, the first group will belong to the parent pipeline
-        if (i == 0 && context.getDegreeOfParallelism() < localChildrenSize + 
1) {
-          for (int j = startIndex; j < endIndex; j++) {
-            Operator childOperation = node.getChildren().get(j).accept(this, 
context);
+      // If dop > localChildrenSize, we create one new pipeline for each child
+      if (context.getDegreeOfParallelism() > localChildrenSize) {
+        for (int i = firstChildIndex; i < node.getChildren().size(); i++) {
+          PlanNode childNode = node.getChildren().get(i);
+          if (childNode instanceof ExchangeNode) {
+            Operator childOperation = childNode.accept(this, context);
+            finalExchangeNum += 1;
             parentPipelineChildren.add(childOperation);
-            afterwardsNodes.add(node.getChildren().get(j));
+          } else {
+            LocalExecutionPlanContext subContext = context.createSubContext();
+            subContext.setDegreeOfParallelism(dopForChild);
+
+            int originPipeNum = context.getPipelineNumber();
+            Operator sourceOperator = createNewPipelineForChildNode(context, 
subContext, childNode);
+            parentPipelineChildren.add(sourceOperator);
+            dopForChild =
+                Math.max(1, dopForChild - (subContext.getPipelineNumber() - 1 
- originPipeNum));
+            finalExchangeNum += subContext.getExchangeSumNum() - 
context.getExchangeSumNum() + 1;
           }
-          continue;
         }
-        LocalExecutionPlanContext subContext = context.createSubContext();
-        subContext.setDegreeOfParallelism(dopForChild);
-        // Create partial parent operator for children
-        PlanNode partialParentNode = null;
-        Operator partialParentOperator = null;
-
-        int originPipeNum = context.getPipelineNumber();
-        if (endIndex - startIndex == 1) {
-          partialParentNode = node.getChildren().get(i);
-          partialParentOperator = node.getChildren().get(i).accept(this, 
subContext);
-        } else {
-          // PartialParentNode is equals to parentNode except children
-          partialParentNode = node.createSubNode(i, startIndex, endIndex);
-          partialParentOperator = partialParentNode.accept(this, subContext);
+      } else {
+        // If dop <= localChildrenSize, we have to divide every 
childNumInEachPipeline localChildren
+        // to different pipeline
+        int[] childNumInEachPipeline =
+            getChildNumInEachPipeline(
+                node.getChildren(), localChildrenSize, 
context.getDegreeOfParallelism());
+        int childGroupNum = Math.min(context.getDegreeOfParallelism(), 
localChildrenSize);
+        int startIndex, endIndex = firstChildIndex;
+        for (int i = 0; i < childGroupNum; i++) {
+          startIndex = endIndex;
+          endIndex += childNumInEachPipeline[i];
+          // Only if dop >= size(children) + 1, split all children to new 
pipeline
+          // Otherwise, the first group will belong to the parent pipeline
+          if (i == 0) {
+            for (int j = startIndex; j < endIndex; j++) {
+              Operator childOperation = node.getChildren().get(j).accept(this, 
context);
+              parentPipelineChildren.add(childOperation);
+              afterwardsNodes.add(node.getChildren().get(j));
+            }
+            continue;
+          }
+          LocalExecutionPlanContext subContext = context.createSubContext();
+          subContext.setDegreeOfParallelism(1);
+          // Create partial parent operator for children
+          PlanNode partialParentNode = null;
+          if (endIndex - startIndex == 1) {
+            partialParentNode = node.getChildren().get(startIndex);
+          } else {
+            // PartialParentNode is equals to parentNode except children
+            partialParentNode = node.createSubNode(i, startIndex, endIndex);
+          }
+
+          Operator sourceOperator =
+              createNewPipelineForChildNode(context, subContext, 
partialParentNode);
+          parentPipelineChildren.add(sourceOperator);
+          afterwardsNodes.add(partialParentNode);
+          finalExchangeNum += subContext.getExchangeSumNum() - 
context.getExchangeSumNum() + 1;
         }
-        // update dop for child
-        dopForChild = Math.max(1, dopForChild - 
(subContext.getPipelineNumber() - originPipeNum));
-        ISinkHandle localSinkHandle =
-            MPP_DATA_EXCHANGE_MANAGER.createLocalSinkHandleForPipeline(
-                // Attention, there is no parent node, use first child node 
instead
-                subContext.getDriverContext(), 
node.getChildren().get(i).getPlanNodeId().getId());
-        subContext.setSinkHandle(localSinkHandle);
-        subContext.addPipelineDriverFactory(partialParentOperator, 
subContext.getDriverContext());
-
-        ExchangeOperator sourceOperator =
-            new ExchangeOperator(
-                context
-                    .getDriverContext()
-                    .addOperatorContext(
-                        context.getNextOperatorId(), null, 
ExchangeOperator.class.getSimpleName()),
-                MPP_DATA_EXCHANGE_MANAGER.createLocalSourceHandleForPipeline(
-                    ((LocalSinkHandle) 
localSinkHandle).getSharedTsBlockQueue(),
-                    context.getDriverContext()),
-                partialParentNode.getPlanNodeId());
-        context
-            .getTimeSliceAllocator()
-            .recordExecutionWeight(sourceOperator.getOperatorContext(), 1);
-        parentPipelineChildren.add(sourceOperator);
-        afterwardsNodes.add(partialParentNode);
-        context.addExchangeOperator(sourceOperator);
-        finalExchangeNum += subContext.getExchangeSumNum() - 
context.getExchangeSumNum() + 1;
+        ((MultiChildProcessNode) node).setChildren(afterwardsNodes);
       }
-      ((MultiChildProcessNode) node).setChildren(afterwardsNodes);
     }
     context.setExchangeSumNum(finalExchangeNum);
     return parentPipelineChildren;
@@ -2340,6 +2349,8 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
    * operator, maybe we can allocate based on workload rather than child 
number.
    *
    * <p>If child is ExchangeNode, it won't affect the children number of 
current group.
+   *
+   * <p>This method can only be invoked when dop <= localChildrenSize.
    */
   public int[] getChildNumInEachPipeline(
       List<PlanNode> allChildren, int localChildrenSize, int dop) {
@@ -2347,9 +2358,13 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
     int[] childNumInEachPipeline = new int[maxPipelineNum];
     int avgChildNum = Math.max(1, localChildrenSize / dop);
     // allocate remaining child to group from splitIndex
-    int splitIndex =
-        localChildrenSize <= dop ? maxPipelineNum : maxPipelineNum - 
localChildrenSize % dop;
-    int pipelineIndex = 0, childIndex = 0;
+    int splitIndex = maxPipelineNum - localChildrenSize % dop;
+    int childIndex = 0;
+    // Skip ExchangeNode at head
+    while (childIndex < allChildren.size() && allChildren.get(childIndex) 
instanceof ExchangeNode) {
+      childIndex++;
+    }
+    int pipelineIndex = 0;
     while (pipelineIndex < maxPipelineNum) {
       int childNum = pipelineIndex < splitIndex ? avgChildNum : avgChildNum + 
1;
       int originChildIndex = childIndex;
@@ -2368,6 +2383,32 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
     return childNumInEachPipeline;
   }
 
+  private Operator createNewPipelineForChildNode(
+      LocalExecutionPlanContext context, LocalExecutionPlanContext subContext, 
PlanNode childNode) {
+    Operator childOperation = childNode.accept(this, subContext);
+    ISinkHandle localSinkHandle =
+        MPP_DATA_EXCHANGE_MANAGER.createLocalSinkHandleForPipeline(
+            // Attention, there is no parent node, use first child node instead
+            subContext.getDriverContext(), childNode.getPlanNodeId().getId());
+    subContext.setSinkHandle(localSinkHandle);
+    subContext.addPipelineDriverFactory(childOperation, 
subContext.getDriverContext());
+
+    ExchangeOperator sourceOperator =
+        new ExchangeOperator(
+            context
+                .getDriverContext()
+                .addOperatorContext(
+                    context.getNextOperatorId(), null, 
ExchangeOperator.class.getSimpleName()),
+            MPP_DATA_EXCHANGE_MANAGER.createLocalSourceHandleForPipeline(
+                ((LocalSinkHandle) localSinkHandle).getSharedTsBlockQueue(),
+                context.getDriverContext()),
+            childNode.getPlanNodeId());
+
+    
context.getTimeSliceAllocator().recordExecutionWeight(sourceOperator.getOperatorContext(),
 1);
+    context.addExchangeOperator(sourceOperator);
+    return sourceOperator;
+  }
+
   public List<Operator> dealWithConsumeChildrenOneByOneNode(
       PlanNode node, LocalExecutionPlanContext context) {
     List<Operator> parentPipelineChildren = new ArrayList<>();
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/HorizontallyConcatNode.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/HorizontallyConcatNode.java
index f6c785f8fc..00d1b3c4f9 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/HorizontallyConcatNode.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/HorizontallyConcatNode.java
@@ -63,7 +63,6 @@ public class HorizontallyConcatNode extends 
MultiChildProcessNode {
     return children.stream()
         .map(PlanNode::getOutputColumnNames)
         .flatMap(List::stream)
-        .distinct()
         .collect(Collectors.toList());
   }
 
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/PipelineBuilderTest.java
 
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/PipelineBuilderTest.java
index c0ee46142b..d3db6d7811 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/PipelineBuilderTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/PipelineBuilderTest.java
@@ -679,22 +679,23 @@ public class PipelineBuilderTest {
   @Test
   public void testGetChildNumInEachPipeline() {
     List<PlanNode> allChildren = new ArrayList<>();
-    allChildren.add(new SeriesScanNode(new PlanNodeId("localNode1"), null));
     allChildren.add(new ExchangeNode(new PlanNodeId("remoteNode1")));
+    allChildren.add(new SeriesScanNode(new PlanNodeId("localNode1"), null));
+    allChildren.add(new ExchangeNode(new PlanNodeId("remoteNode2")));
     allChildren.add(new SeriesScanNode(new PlanNodeId("localNode2"), null));
 
     int[] childNumInEachPipeline =
-        operatorTreeGenerator.getChildNumInEachPipeline(allChildren, 2, 3);
+        operatorTreeGenerator.getChildNumInEachPipeline(allChildren, 2, 2);
     assertEquals(2, childNumInEachPipeline.length);
     assertEquals(2, childNumInEachPipeline[0]);
     assertEquals(1, childNumInEachPipeline[1]);
 
     allChildren.add(new SeriesScanNode(new PlanNodeId("localNode3"), null));
     allChildren.add(new SeriesScanNode(new PlanNodeId("localNode4"), null));
-    allChildren.add(new ExchangeNode(new PlanNodeId("remoteNode2")));
     allChildren.add(new ExchangeNode(new PlanNodeId("remoteNode3")));
-    allChildren.add(new SeriesScanNode(new PlanNodeId("localNode5"), null));
     allChildren.add(new ExchangeNode(new PlanNodeId("remoteNode4")));
+    allChildren.add(new SeriesScanNode(new PlanNodeId("localNode5"), null));
+    allChildren.add(new ExchangeNode(new PlanNodeId("remoteNode5")));
     childNumInEachPipeline = 
operatorTreeGenerator.getChildNumInEachPipeline(allChildren, 5, 3);
     assertEquals(3, childNumInEachPipeline.length);
     assertEquals(2, childNumInEachPipeline[0]);

Reply via email to