This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 4d7aaeb8c9c [IOTDB-6143] Pipe: Support PipeConnector subtasks with the 
same parameters concurrently scheduling (#11083)
4d7aaeb8c9c is described below

commit 4d7aaeb8c9c46b5640d8b66b1193bc23c2a569db
Author: Itami Sho <[email protected]>
AuthorDate: Wed Sep 20 09:59:22 2023 +0800

    [IOTDB-6143] Pipe: Support PipeConnector subtasks with the same parameters 
concurrently scheduling (#11083)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../pipe/it/IoTDBPipeConnectorParallelIT.java      | 144 +++++++++++++++++++++
 .../config/constant/PipeConnectorConstant.java     |   6 +
 .../connector/protocol/opcua/OpcUaConnector.java   |  72 +++++++++--
 .../protocol/websocket/WebSocketConnector.java     |  56 ++++++--
 .../connector/PipeConnectorSubtaskManager.java     | 137 +++++++++++++-------
 5 files changed, 340 insertions(+), 75 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeConnectorParallelIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeConnectorParallelIT.java
new file mode 100644
index 00000000000..85e53dcb242
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeConnectorParallelIT.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.it;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
+import org.apache.iotdb.db.it.utils.TestUtils;
+import org.apache.iotdb.it.env.MultiEnvFactory;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.MultiClusterIT2;
+import org.apache.iotdb.itbase.env.BaseEnv;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static org.awaitility.Awaitility.await;
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({MultiClusterIT2.class})
+public class IoTDBPipeConnectorParallelIT {
+  private BaseEnv senderEnv;
+  private BaseEnv receiverEnv;
+
+  @Before
+  public void setUp() throws Exception {
+    MultiEnvFactory.createEnv(2);
+    senderEnv = MultiEnvFactory.getEnv(0);
+    receiverEnv = MultiEnvFactory.getEnv(1);
+
+    senderEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true);
+    receiverEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true);
+
+    senderEnv.initClusterEnvironment();
+    receiverEnv.initClusterEnvironment();
+  }
+
+  @After
+  public void tearDown() {
+    senderEnv.cleanClusterEnvironment();
+    receiverEnv.cleanClusterEnvironment();
+  }
+
+  @Test
+  public void testIoTConnectorParallel() throws Exception {
+    DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+    String receiverIp = receiverDataNode.getIp();
+    int receiverPort = receiverDataNode.getPort();
+
+    Set<String> expectedResSet = new HashSet<>();
+    try (SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
+      Map<String, String> extractorAttributes = new HashMap<>();
+      Map<String, String> processorAttributes = new HashMap<>();
+      Map<String, String> connectorAttributes = new HashMap<>();
+
+      connectorAttributes.put("connector", "iotdb-thrift-connector");
+      connectorAttributes.put("connector.batch.enable", "false");
+      connectorAttributes.put("connector.ip", receiverIp);
+      connectorAttributes.put("connector.port", 
Integer.toString(receiverPort));
+      connectorAttributes.put("connector.parallel.tasks", "3");
+
+      TSStatus status =
+          client.createPipe(
+              new TCreatePipeReq("testPipe", connectorAttributes)
+                  .setExtractorAttributes(extractorAttributes)
+                  .setProcessorAttributes(processorAttributes));
+
+      Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
status.getCode());
+
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.startPipe("testPipe").getCode());
+
+      try (Connection connection = senderEnv.getConnection();
+          Statement statement = connection.createStatement()) {
+        statement.execute("insert into root.sg1.d1(time, s1) values (0, 1)");
+        statement.execute("insert into root.sg1.d1(time, s1) values (1, 2)");
+        statement.execute("insert into root.sg1.d1(time, s1) values (2, 3)");
+        statement.execute("insert into root.sg1.d1(time, s1) values (3, 4)");
+      } catch (SQLException e) {
+        e.printStackTrace();
+        fail(e.getMessage());
+      }
+
+      expectedResSet.add("0,1.0,");
+      expectedResSet.add("1,2.0,");
+      expectedResSet.add("2,3.0,");
+      expectedResSet.add("3,4.0,");
+      assertDataOnReceiver(receiverEnv, expectedResSet);
+      assertDataOnReceiver(receiverEnv, expectedResSet);
+    }
+  }
+
+  private void assertDataOnReceiver(BaseEnv receiverEnv, Set<String> 
expectedResSet) {
+    try (Connection connection = receiverEnv.getConnection();
+        Statement statement = connection.createStatement()) {
+      await()
+          .atMost(600, TimeUnit.SECONDS)
+          .untilAsserted(
+              () ->
+                  TestUtils.assertResultSetEqual(
+                      statement.executeQuery("select * from root.**"),
+                      "Time,root.sg1.d1.s1,",
+                      expectedResSet));
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
index 8135ae577ad..3414b7a9b29 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
@@ -19,6 +19,8 @@
 
 package org.apache.iotdb.db.pipe.config.constant;
 
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+
 import static org.apache.iotdb.commons.conf.IoTDBConstant.MB;
 
 public class PipeConnectorConstant {
@@ -29,6 +31,10 @@ public class PipeConnectorConstant {
   public static final String CONNECTOR_IOTDB_PORT_KEY = "connector.port";
   public static final String CONNECTOR_IOTDB_NODE_URLS_KEY = 
"connector.node-urls";
 
+  public static final String CONNECTOR_IOTDB_PARALLEL_TASKS_KEY = 
"connector.parallel.tasks";
+  public static final int CONNECTOR_IOTDB_PARALLEL_TASKS_DEFAULT_VALUE =
+      PipeConfig.getInstance().getPipeSubtaskExecutorMaxThreadNum();
+
   public static final String CONNECTOR_IOTDB_BATCH_MODE_ENABLE_KEY = 
"connector.batch.enable";
   public static final boolean CONNECTOR_IOTDB_BATCH_MODE_ENABLE_DEFAULT_VALUE 
= true;
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java
index 703e4652d99..bc86efb6ec1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java
@@ -28,9 +28,11 @@ import 
org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.write.record.Tablet;
 
 import org.eclipse.milo.opcua.sdk.server.OpcUaServer;
@@ -43,7 +45,10 @@ import 
org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE;
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PASSWORD_KEY;
@@ -63,6 +68,10 @@ public class OpcUaConnector implements PipeConnector {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(OpcUaConnector.class);
 
+  private static final Map<String, Pair<AtomicInteger, OpcUaServer>>
+      SERVER_KEY_TO_REFERENCE_COUNT_AND_SERVER_MAP = new ConcurrentHashMap<>();
+
+  private String serverKey;
   private OpcUaServer server;
 
   @Override
@@ -86,14 +95,31 @@ public class OpcUaConnector implements PipeConnector {
         parameters.getStringOrDefault(
             CONNECTOR_IOTDB_PASSWORD_KEY, 
CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE);
 
-    server =
-        new OpcUaServerBuilder()
-            .setTcpBindPort(tcpBindPort)
-            .setHttpsBindPort(httpsBindPort)
-            .setUser(user)
-            .setPassword(password)
-            .build();
-    server.startup();
+    synchronized (SERVER_KEY_TO_REFERENCE_COUNT_AND_SERVER_MAP) {
+      serverKey = httpsBindPort + ":" + tcpBindPort;
+
+      server =
+          SERVER_KEY_TO_REFERENCE_COUNT_AND_SERVER_MAP
+              .computeIfAbsent(
+                  serverKey,
+                  key -> {
+                    try {
+                      final OpcUaServer newServer =
+                          new OpcUaServerBuilder()
+                              .setTcpBindPort(tcpBindPort)
+                              .setHttpsBindPort(httpsBindPort)
+                              .setUser(user)
+                              .setPassword(password)
+                              .build();
+                      newServer.startup();
+                      return new Pair<>(new AtomicInteger(0), newServer);
+                    } catch (Exception e) {
+                      throw new PipeException("Failed to build and startup 
OpcUaServer", e);
+                    }
+                  })
+              .getRight();
+      
SERVER_KEY_TO_REFERENCE_COUNT_AND_SERVER_MAP.get(serverKey).getLeft().incrementAndGet();
+    }
   }
 
   @Override
@@ -106,6 +132,11 @@ public class OpcUaConnector implements PipeConnector {
     // Server side, do nothing
   }
 
+  @Override
+  public void transfer(Event event) throws Exception {
+    // Do nothing when receive heartbeat or other events
+  }
+
   @Override
   public void transfer(TabletInsertionEvent tabletInsertionEvent) throws 
Exception {
     // PipeProcessor can change the type of TabletInsertionEvent
@@ -234,13 +265,26 @@ public class OpcUaConnector implements PipeConnector {
     }
   }
 
-  @Override
-  public void transfer(Event event) throws Exception {
-    // Do nothing when receive heartbeat or other events
-  }
-
   @Override
   public void close() throws Exception {
-    server.shutdown();
+    if (serverKey == null) {
+      return;
+    }
+
+    synchronized (SERVER_KEY_TO_REFERENCE_COUNT_AND_SERVER_MAP) {
+      final Pair<AtomicInteger, OpcUaServer> pair =
+          SERVER_KEY_TO_REFERENCE_COUNT_AND_SERVER_MAP.get(serverKey);
+      if (pair == null) {
+        return;
+      }
+
+      if (pair.getLeft().decrementAndGet() <= 0) {
+        try {
+          pair.getRight().shutdown();
+        } finally {
+          SERVER_KEY_TO_REFERENCE_COUNT_AND_SERVER_MAP.remove(serverKey);
+        }
+      }
+    }
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
index 879cd9e2423..37b12170582 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
@@ -39,15 +39,21 @@ import javax.annotation.Nullable;
 
 import java.net.InetSocketAddress;
 import java.util.Comparator;
+import java.util.Map;
 import java.util.Optional;
 import java.util.PriorityQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
 
 public class WebSocketConnector implements PipeConnector {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(WebSocketConnector.class);
-  private final AtomicReference<WebSocketConnectorServer> server = new 
AtomicReference<>();
-  private int port;
+
+  private static final Map<Integer, Pair<AtomicInteger, 
WebSocketConnectorServer>>
+      PORT_TO_REFERENCE_COUNT_AND_SERVER_MAP = new ConcurrentHashMap<>();
+
+  private Integer port;
+  private WebSocketConnectorServer server;
 
   public final AtomicLong commitIdGenerator = new AtomicLong(0);
   private final AtomicLong lastCommitId = new AtomicLong(0);
@@ -68,13 +74,19 @@ public class WebSocketConnector implements PipeConnector {
 
   @Override
   public void handshake() throws Exception {
-    if (server.get() == null) {
-      synchronized (server) {
-        if (server.get() == null) {
-          server.set(new WebSocketConnectorServer(new InetSocketAddress(port), 
this));
-          server.get().start();
-        }
-      }
+    synchronized (PORT_TO_REFERENCE_COUNT_AND_SERVER_MAP) {
+      server =
+          PORT_TO_REFERENCE_COUNT_AND_SERVER_MAP
+              .computeIfAbsent(
+                  port,
+                  key -> {
+                    final WebSocketConnectorServer newServer =
+                        new WebSocketConnectorServer(new 
InetSocketAddress(port), this);
+                    newServer.start();
+                    return new Pair<>(new AtomicInteger(0), newServer);
+                  })
+              .getRight();
+      
PORT_TO_REFERENCE_COUNT_AND_SERVER_MAP.get(port).getLeft().incrementAndGet();
     }
   }
 
@@ -94,7 +106,7 @@ public class WebSocketConnector implements PipeConnector {
     long commitId = commitIdGenerator.incrementAndGet();
     ((EnrichedEvent) tabletInsertionEvent)
         .increaseReferenceCount(WebSocketConnector.class.getName());
-    server.get().addEvent(new Pair<>(commitId, tabletInsertionEvent));
+    server.addEvent(new Pair<>(commitId, tabletInsertionEvent));
   }
 
   @Override
@@ -108,7 +120,7 @@ public class WebSocketConnector implements PipeConnector {
     for (TabletInsertionEvent event : 
tsFileInsertionEvent.toTabletInsertionEvents()) {
       long commitId = commitIdGenerator.incrementAndGet();
       ((EnrichedEvent) 
event).increaseReferenceCount(WebSocketConnector.class.getName());
-      server.get().addEvent(new Pair<>(commitId, event));
+      server.addEvent(new Pair<>(commitId, event));
     }
   }
 
@@ -117,7 +129,25 @@ public class WebSocketConnector implements PipeConnector {
 
   @Override
   public void close() throws Exception {
-    server.get().stop();
+    if (port == null) {
+      return;
+    }
+
+    synchronized (PORT_TO_REFERENCE_COUNT_AND_SERVER_MAP) {
+      final Pair<AtomicInteger, WebSocketConnectorServer> pair =
+          PORT_TO_REFERENCE_COUNT_AND_SERVER_MAP.get(port);
+      if (pair == null) {
+        return;
+      }
+
+      if (pair.getLeft().decrementAndGet() <= 0) {
+        try {
+          pair.getRight().stop();
+        } finally {
+          PORT_TO_REFERENCE_COUNT_AND_SERVER_MAP.remove(port);
+        }
+      }
+    }
   }
 
   public synchronized void commit(long requestCommitId, @Nullable 
EnrichedEvent enrichedEvent) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
index cb901477b76..455d113f4d7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
@@ -39,16 +39,22 @@ import 
org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
+import java.util.function.Supplier;
 
 public class PipeConnectorSubtaskManager {
 
+  private static final Map<String, Supplier<PipeConnector>> 
CONNECTOR_CONSTRUCTORS =
+      new HashMap<>();
+
   private static final String FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE =
       "Failed to deregister PipeConnectorSubtask. No such subtask: ";
 
-  private final Map<String, PipeConnectorSubtaskLifeCycle>
+  private final Map<String, List<PipeConnectorSubtaskLifeCycle>>
       attributeSortedString2SubtaskLifeCycleMap = new HashMap<>();
 
   public synchronized String register(
@@ -59,58 +65,62 @@ public class PipeConnectorSubtaskManager {
         new TreeMap<>(pipeConnectorParameters.getAttribute()).toString();
 
     if 
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) 
{
-      // 1. Construct, validate and customize PipeConnector, and then 
handshake (create connection)
-      // with the target
+      final int connectorNum =
+          pipeConnectorParameters.getIntOrDefault(
+              PipeConnectorConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_KEY,
+              
PipeConnectorConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_DEFAULT_VALUE);
+      final List<PipeConnectorSubtaskLifeCycle> 
pipeConnectorSubtaskLifeCycleList =
+          new ArrayList<>(connectorNum);
+
       final String connectorKey =
           pipeConnectorParameters.getStringOrDefault(
               PipeConnectorConstant.CONNECTOR_KEY,
               BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName());
-
-      PipeConnector pipeConnector;
-      if 
(connectorKey.equals(BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName())
-          || connectorKey.equals(
-              
BuiltinPipePlugin.IOTDB_THRIFT_SYNC_CONNECTOR.getPipePluginName())) {
-        pipeConnector = new IoTDBThriftSyncConnector();
-      } else if (connectorKey.equals(
-          BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName())) 
{
-        pipeConnector = new IoTDBThriftAsyncConnector();
-      } else if (connectorKey.equals(
-          BuiltinPipePlugin.IOTDB_LEGACY_PIPE_CONNECTOR.getPipePluginName())) {
-        pipeConnector = new IoTDBLegacyPipeConnector();
-      } else if (connectorKey.equals(
-          BuiltinPipePlugin.IOTDB_AIR_GAP_CONNECTOR.getPipePluginName())) {
-        pipeConnector = new IoTDBAirGapConnector();
-      } else if 
(connectorKey.equals(BuiltinPipePlugin.OPC_UA_CONNECTOR.getPipePluginName())) {
-        pipeConnector = new OpcUaConnector();
-      } else if 
(connectorKey.equals(BuiltinPipePlugin.WEBSOCKET_CONNECTOR.getPipePluginName()))
 {
-        pipeConnector = new WebSocketConnector();
-      } else {
-        pipeConnector = 
PipeAgent.plugin().reflectConnector(pipeConnectorParameters);
-      }
-
-      try {
-        pipeConnector.validate(new 
PipeParameterValidator(pipeConnectorParameters));
-        pipeConnector.customize(
-            pipeConnectorParameters, new 
PipeTaskRuntimeConfiguration(pipeRuntimeEnvironment));
-        pipeConnector.handshake();
-      } catch (Exception e) {
-        throw new PipeException(
-            "Failed to construct PipeConnector, because of " + e.getMessage(), 
e);
-      }
-
-      // 2. Construct PipeConnectorSubtaskLifeCycle to manage 
PipeConnectorSubtask's life cycle
+      // Shared pending queue for all subtasks
       final BoundedBlockingPendingQueue<Event> pendingQueue =
           new BoundedBlockingPendingQueue<>(
               PipeConfig.getInstance().getPipeConnectorPendingQueueSize());
-      final PipeConnectorSubtask pipeConnectorSubtask =
-          new PipeConnectorSubtask(attributeSortedString, pendingQueue, 
pipeConnector);
-      final PipeConnectorSubtaskLifeCycle pipeConnectorSubtaskLifeCycle =
-          new PipeConnectorSubtaskLifeCycle(executor, pipeConnectorSubtask, 
pendingQueue);
+
+      for (int i = 0; i < connectorNum; i++) {
+        final PipeConnector pipeConnector =
+            CONNECTOR_CONSTRUCTORS
+                .getOrDefault(
+                    connectorKey,
+                    () -> 
PipeAgent.plugin().reflectConnector(pipeConnectorParameters))
+                .get();
+
+        // 1. Construct, validate and customize PipeConnector, and then 
handshake (create
+        // connection) with the target
+        try {
+          pipeConnector.validate(new 
PipeParameterValidator(pipeConnectorParameters));
+          pipeConnector.customize(
+              pipeConnectorParameters, new 
PipeTaskRuntimeConfiguration(pipeRuntimeEnvironment));
+          pipeConnector.handshake();
+        } catch (Exception e) {
+          throw new PipeException(
+              "Failed to construct PipeConnector, because of " + 
e.getMessage(), e);
+        }
+
+        // 2. Construct PipeConnectorSubtaskLifeCycle to manage 
PipeConnectorSubtask's life cycle
+        final PipeConnectorSubtask pipeConnectorSubtask =
+            new PipeConnectorSubtask(
+                String.format(
+                    "%s_%s_%s", attributeSortedString, 
pipeRuntimeEnvironment.getCreationTime(), i),
+                pendingQueue,
+                pipeConnector);
+        final PipeConnectorSubtaskLifeCycle pipeConnectorSubtaskLifeCycle =
+            new PipeConnectorSubtaskLifeCycle(executor, pipeConnectorSubtask, 
pendingQueue);
+        pipeConnectorSubtaskLifeCycleList.add(pipeConnectorSubtaskLifeCycle);
+      }
+
       attributeSortedString2SubtaskLifeCycleMap.put(
-          attributeSortedString, pipeConnectorSubtaskLifeCycle);
+          attributeSortedString, pipeConnectorSubtaskLifeCycleList);
     }
 
-    
attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString).register();
+    for (final PipeConnectorSubtaskLifeCycle lifeCycle :
+        attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString)) {
+      lifeCycle.register();
+    }
 
     return attributeSortedString;
   }
@@ -120,7 +130,11 @@ public class PipeConnectorSubtaskManager {
       throw new PipeException(FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE + 
attributeSortedString);
     }
 
-    if 
(attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString).deregister())
 {
+    final List<PipeConnectorSubtaskLifeCycle> lifeCycles =
+        attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString);
+    lifeCycles.removeIf(PipeConnectorSubtaskLifeCycle::deregister);
+
+    if (lifeCycles.isEmpty()) {
       attributeSortedString2SubtaskLifeCycleMap.remove(attributeSortedString);
     }
   }
@@ -130,7 +144,10 @@ public class PipeConnectorSubtaskManager {
       throw new PipeException(FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE + 
attributeSortedString);
     }
 
-    
attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString).start();
+    for (final PipeConnectorSubtaskLifeCycle lifeCycle :
+        attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString)) {
+      lifeCycle.start();
+    }
   }
 
   public synchronized void stop(String attributeSortedString) {
@@ -138,7 +155,10 @@ public class PipeConnectorSubtaskManager {
       throw new PipeException(FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE + 
attributeSortedString);
     }
 
-    
attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString).stop();
+    for (final PipeConnectorSubtaskLifeCycle lifeCycle :
+        attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString)) {
+      lifeCycle.stop();
+    }
   }
 
   public BoundedBlockingPendingQueue<Event> getPipeConnectorPendingQueue(
@@ -148,13 +168,34 @@ public class PipeConnectorSubtaskManager {
           "Failed to get PendingQueue. No such subtask: " + 
attributeSortedString);
     }
 
-    return 
attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString).getPendingQueue();
+    // All subtasks share the same pending queue
+    return attributeSortedString2SubtaskLifeCycleMap
+        .get(attributeSortedString)
+        .get(0)
+        .getPendingQueue();
   }
 
   /////////////////////////  Singleton Instance Holder  
/////////////////////////
 
   private PipeConnectorSubtaskManager() {
-    // Empty constructor
+    CONNECTOR_CONSTRUCTORS.put(
+        BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName(),
+        IoTDBThriftSyncConnector::new);
+    CONNECTOR_CONSTRUCTORS.put(
+        BuiltinPipePlugin.IOTDB_THRIFT_SYNC_CONNECTOR.getPipePluginName(),
+        IoTDBThriftSyncConnector::new);
+    CONNECTOR_CONSTRUCTORS.put(
+        BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName(),
+        IoTDBThriftAsyncConnector::new);
+    CONNECTOR_CONSTRUCTORS.put(
+        BuiltinPipePlugin.IOTDB_LEGACY_PIPE_CONNECTOR.getPipePluginName(),
+        IoTDBLegacyPipeConnector::new);
+    CONNECTOR_CONSTRUCTORS.put(
+        BuiltinPipePlugin.IOTDB_AIR_GAP_CONNECTOR.getPipePluginName(), 
IoTDBAirGapConnector::new);
+    CONNECTOR_CONSTRUCTORS.put(
+        BuiltinPipePlugin.WEBSOCKET_CONNECTOR.getPipePluginName(), 
WebSocketConnector::new);
+    CONNECTOR_CONSTRUCTORS.put(
+        BuiltinPipePlugin.OPC_UA_CONNECTOR.getPipePluginName(), 
OpcUaConnector::new);
   }
 
   private static class PipeSubtaskManagerHolder {

Reply via email to