This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 4d7aaeb8c9c [IOTDB-6143] Pipe: Support PipeConnector subtasks with the
same parameters concurrently scheduling (#11083)
4d7aaeb8c9c is described below
commit 4d7aaeb8c9c46b5640d8b66b1193bc23c2a569db
Author: Itami Sho <[email protected]>
AuthorDate: Wed Sep 20 09:59:22 2023 +0800
[IOTDB-6143] Pipe: Support PipeConnector subtasks with the same parameters
concurrently scheduling (#11083)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../pipe/it/IoTDBPipeConnectorParallelIT.java | 144 +++++++++++++++++++++
.../config/constant/PipeConnectorConstant.java | 6 +
.../connector/protocol/opcua/OpcUaConnector.java | 72 +++++++++--
.../protocol/websocket/WebSocketConnector.java | 56 ++++++--
.../connector/PipeConnectorSubtaskManager.java | 137 +++++++++++++-------
5 files changed, 340 insertions(+), 75 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeConnectorParallelIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeConnectorParallelIT.java
new file mode 100644
index 00000000000..85e53dcb242
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeConnectorParallelIT.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.it;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
+import org.apache.iotdb.db.it.utils.TestUtils;
+import org.apache.iotdb.it.env.MultiEnvFactory;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.MultiClusterIT2;
+import org.apache.iotdb.itbase.env.BaseEnv;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static org.awaitility.Awaitility.await;
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({MultiClusterIT2.class})
+public class IoTDBPipeConnectorParallelIT {
+ private BaseEnv senderEnv;
+ private BaseEnv receiverEnv;
+
+ @Before
+ public void setUp() throws Exception {
+ MultiEnvFactory.createEnv(2);
+ senderEnv = MultiEnvFactory.getEnv(0);
+ receiverEnv = MultiEnvFactory.getEnv(1);
+
+ senderEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true);
+ receiverEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true);
+
+ senderEnv.initClusterEnvironment();
+ receiverEnv.initClusterEnvironment();
+ }
+
+ @After
+ public void tearDown() {
+ senderEnv.cleanClusterEnvironment();
+ receiverEnv.cleanClusterEnvironment();
+ }
+
+ @Test
+ public void testIoTConnectorParallel() throws Exception {
+ DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+ String receiverIp = receiverDataNode.getIp();
+ int receiverPort = receiverDataNode.getPort();
+
+ Set<String> expectedResSet = new HashSet<>();
+ try (SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
+ Map<String, String> extractorAttributes = new HashMap<>();
+ Map<String, String> processorAttributes = new HashMap<>();
+ Map<String, String> connectorAttributes = new HashMap<>();
+
+ connectorAttributes.put("connector", "iotdb-thrift-connector");
+ connectorAttributes.put("connector.batch.enable", "false");
+ connectorAttributes.put("connector.ip", receiverIp);
+ connectorAttributes.put("connector.port",
Integer.toString(receiverPort));
+ connectorAttributes.put("connector.parallel.tasks", "3");
+
+ TSStatus status =
+ client.createPipe(
+ new TCreatePipeReq("testPipe", connectorAttributes)
+ .setExtractorAttributes(extractorAttributes)
+ .setProcessorAttributes(processorAttributes));
+
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
+
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.startPipe("testPipe").getCode());
+
+ try (Connection connection = senderEnv.getConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute("insert into root.sg1.d1(time, s1) values (0, 1)");
+ statement.execute("insert into root.sg1.d1(time, s1) values (1, 2)");
+ statement.execute("insert into root.sg1.d1(time, s1) values (2, 3)");
+ statement.execute("insert into root.sg1.d1(time, s1) values (3, 4)");
+ } catch (SQLException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+
+ expectedResSet.add("0,1.0,");
+ expectedResSet.add("1,2.0,");
+ expectedResSet.add("2,3.0,");
+ expectedResSet.add("3,4.0,");
+ assertDataOnReceiver(receiverEnv, expectedResSet);
+ assertDataOnReceiver(receiverEnv, expectedResSet);
+ }
+ }
+
+ private void assertDataOnReceiver(BaseEnv receiverEnv, Set<String>
expectedResSet) {
+ try (Connection connection = receiverEnv.getConnection();
+ Statement statement = connection.createStatement()) {
+ await()
+ .atMost(600, TimeUnit.SECONDS)
+ .untilAsserted(
+ () ->
+ TestUtils.assertResultSetEqual(
+ statement.executeQuery("select * from root.**"),
+ "Time,root.sg1.d1.s1,",
+ expectedResSet));
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
index 8135ae577ad..3414b7a9b29 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.db.pipe.config.constant;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+
import static org.apache.iotdb.commons.conf.IoTDBConstant.MB;
public class PipeConnectorConstant {
@@ -29,6 +31,10 @@ public class PipeConnectorConstant {
public static final String CONNECTOR_IOTDB_PORT_KEY = "connector.port";
public static final String CONNECTOR_IOTDB_NODE_URLS_KEY =
"connector.node-urls";
+ public static final String CONNECTOR_IOTDB_PARALLEL_TASKS_KEY =
"connector.parallel.tasks";
+ public static final int CONNECTOR_IOTDB_PARALLEL_TASKS_DEFAULT_VALUE =
+ PipeConfig.getInstance().getPipeSubtaskExecutorMaxThreadNum();
+
public static final String CONNECTOR_IOTDB_BATCH_MODE_ENABLE_KEY =
"connector.batch.enable";
public static final boolean CONNECTOR_IOTDB_BATCH_MODE_ENABLE_DEFAULT_VALUE
= true;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java
index 703e4652d99..bc86efb6ec1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java
@@ -28,9 +28,11 @@ import
org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.record.Tablet;
import org.eclipse.milo.opcua.sdk.server.OpcUaServer;
@@ -43,7 +45,10 @@ import
org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE;
import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PASSWORD_KEY;
@@ -63,6 +68,10 @@ public class OpcUaConnector implements PipeConnector {
private static final Logger LOGGER =
LoggerFactory.getLogger(OpcUaConnector.class);
+ private static final Map<String, Pair<AtomicInteger, OpcUaServer>>
+ SERVER_KEY_TO_REFERENCE_COUNT_AND_SERVER_MAP = new ConcurrentHashMap<>();
+
+ private String serverKey;
private OpcUaServer server;
@Override
@@ -86,14 +95,31 @@ public class OpcUaConnector implements PipeConnector {
parameters.getStringOrDefault(
CONNECTOR_IOTDB_PASSWORD_KEY,
CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE);
- server =
- new OpcUaServerBuilder()
- .setTcpBindPort(tcpBindPort)
- .setHttpsBindPort(httpsBindPort)
- .setUser(user)
- .setPassword(password)
- .build();
- server.startup();
+ synchronized (SERVER_KEY_TO_REFERENCE_COUNT_AND_SERVER_MAP) {
+ serverKey = httpsBindPort + ":" + tcpBindPort;
+
+ server =
+ SERVER_KEY_TO_REFERENCE_COUNT_AND_SERVER_MAP
+ .computeIfAbsent(
+ serverKey,
+ key -> {
+ try {
+ final OpcUaServer newServer =
+ new OpcUaServerBuilder()
+ .setTcpBindPort(tcpBindPort)
+ .setHttpsBindPort(httpsBindPort)
+ .setUser(user)
+ .setPassword(password)
+ .build();
+ newServer.startup();
+ return new Pair<>(new AtomicInteger(0), newServer);
+ } catch (Exception e) {
+ throw new PipeException("Failed to build and startup
OpcUaServer", e);
+ }
+ })
+ .getRight();
+
SERVER_KEY_TO_REFERENCE_COUNT_AND_SERVER_MAP.get(serverKey).getLeft().incrementAndGet();
+ }
}
@Override
@@ -106,6 +132,11 @@ public class OpcUaConnector implements PipeConnector {
// Server side, do nothing
}
+ @Override
+ public void transfer(Event event) throws Exception {
+ // Do nothing when receive heartbeat or other events
+ }
+
@Override
public void transfer(TabletInsertionEvent tabletInsertionEvent) throws
Exception {
// PipeProcessor can change the type of TabletInsertionEvent
@@ -234,13 +265,26 @@ public class OpcUaConnector implements PipeConnector {
}
}
- @Override
- public void transfer(Event event) throws Exception {
- // Do nothing when receive heartbeat or other events
- }
-
@Override
public void close() throws Exception {
- server.shutdown();
+ if (serverKey == null) {
+ return;
+ }
+
+ synchronized (SERVER_KEY_TO_REFERENCE_COUNT_AND_SERVER_MAP) {
+ final Pair<AtomicInteger, OpcUaServer> pair =
+ SERVER_KEY_TO_REFERENCE_COUNT_AND_SERVER_MAP.get(serverKey);
+ if (pair == null) {
+ return;
+ }
+
+ if (pair.getLeft().decrementAndGet() <= 0) {
+ try {
+ pair.getRight().shutdown();
+ } finally {
+ SERVER_KEY_TO_REFERENCE_COUNT_AND_SERVER_MAP.remove(serverKey);
+ }
+ }
+ }
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
index 879cd9e2423..37b12170582 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
@@ -39,15 +39,21 @@ import javax.annotation.Nullable;
import java.net.InetSocketAddress;
import java.util.Comparator;
+import java.util.Map;
import java.util.Optional;
import java.util.PriorityQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
public class WebSocketConnector implements PipeConnector {
private static final Logger LOGGER =
LoggerFactory.getLogger(WebSocketConnector.class);
- private final AtomicReference<WebSocketConnectorServer> server = new
AtomicReference<>();
- private int port;
+
+ private static final Map<Integer, Pair<AtomicInteger,
WebSocketConnectorServer>>
+ PORT_TO_REFERENCE_COUNT_AND_SERVER_MAP = new ConcurrentHashMap<>();
+
+ private Integer port;
+ private WebSocketConnectorServer server;
public final AtomicLong commitIdGenerator = new AtomicLong(0);
private final AtomicLong lastCommitId = new AtomicLong(0);
@@ -68,13 +74,19 @@ public class WebSocketConnector implements PipeConnector {
@Override
public void handshake() throws Exception {
- if (server.get() == null) {
- synchronized (server) {
- if (server.get() == null) {
- server.set(new WebSocketConnectorServer(new InetSocketAddress(port),
this));
- server.get().start();
- }
- }
+ synchronized (PORT_TO_REFERENCE_COUNT_AND_SERVER_MAP) {
+ server =
+ PORT_TO_REFERENCE_COUNT_AND_SERVER_MAP
+ .computeIfAbsent(
+ port,
+ key -> {
+ final WebSocketConnectorServer newServer =
+ new WebSocketConnectorServer(new
InetSocketAddress(port), this);
+ newServer.start();
+ return new Pair<>(new AtomicInteger(0), newServer);
+ })
+ .getRight();
+
PORT_TO_REFERENCE_COUNT_AND_SERVER_MAP.get(port).getLeft().incrementAndGet();
}
}
@@ -94,7 +106,7 @@ public class WebSocketConnector implements PipeConnector {
long commitId = commitIdGenerator.incrementAndGet();
((EnrichedEvent) tabletInsertionEvent)
.increaseReferenceCount(WebSocketConnector.class.getName());
- server.get().addEvent(new Pair<>(commitId, tabletInsertionEvent));
+ server.addEvent(new Pair<>(commitId, tabletInsertionEvent));
}
@Override
@@ -108,7 +120,7 @@ public class WebSocketConnector implements PipeConnector {
for (TabletInsertionEvent event :
tsFileInsertionEvent.toTabletInsertionEvents()) {
long commitId = commitIdGenerator.incrementAndGet();
((EnrichedEvent)
event).increaseReferenceCount(WebSocketConnector.class.getName());
- server.get().addEvent(new Pair<>(commitId, event));
+ server.addEvent(new Pair<>(commitId, event));
}
}
@@ -117,7 +129,25 @@ public class WebSocketConnector implements PipeConnector {
@Override
public void close() throws Exception {
- server.get().stop();
+ if (port == null) {
+ return;
+ }
+
+ synchronized (PORT_TO_REFERENCE_COUNT_AND_SERVER_MAP) {
+ final Pair<AtomicInteger, WebSocketConnectorServer> pair =
+ PORT_TO_REFERENCE_COUNT_AND_SERVER_MAP.get(port);
+ if (pair == null) {
+ return;
+ }
+
+ if (pair.getLeft().decrementAndGet() <= 0) {
+ try {
+ pair.getRight().stop();
+ } finally {
+ PORT_TO_REFERENCE_COUNT_AND_SERVER_MAP.remove(port);
+ }
+ }
+ }
}
public synchronized void commit(long requestCommitId, @Nullable
EnrichedEvent enrichedEvent) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
index cb901477b76..455d113f4d7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
@@ -39,16 +39,22 @@ import
org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.exception.PipeException;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.TreeMap;
+import java.util.function.Supplier;
public class PipeConnectorSubtaskManager {
+ private static final Map<String, Supplier<PipeConnector>>
CONNECTOR_CONSTRUCTORS =
+ new HashMap<>();
+
private static final String FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE =
"Failed to deregister PipeConnectorSubtask. No such subtask: ";
- private final Map<String, PipeConnectorSubtaskLifeCycle>
+ private final Map<String, List<PipeConnectorSubtaskLifeCycle>>
attributeSortedString2SubtaskLifeCycleMap = new HashMap<>();
public synchronized String register(
@@ -59,58 +65,62 @@ public class PipeConnectorSubtaskManager {
new TreeMap<>(pipeConnectorParameters.getAttribute()).toString();
if
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString))
{
- // 1. Construct, validate and customize PipeConnector, and then
handshake (create connection)
- // with the target
+ final int connectorNum =
+ pipeConnectorParameters.getIntOrDefault(
+ PipeConnectorConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_KEY,
+
PipeConnectorConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_DEFAULT_VALUE);
+ final List<PipeConnectorSubtaskLifeCycle>
pipeConnectorSubtaskLifeCycleList =
+ new ArrayList<>(connectorNum);
+
final String connectorKey =
pipeConnectorParameters.getStringOrDefault(
PipeConnectorConstant.CONNECTOR_KEY,
BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName());
-
- PipeConnector pipeConnector;
- if
(connectorKey.equals(BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName())
- || connectorKey.equals(
-
BuiltinPipePlugin.IOTDB_THRIFT_SYNC_CONNECTOR.getPipePluginName())) {
- pipeConnector = new IoTDBThriftSyncConnector();
- } else if (connectorKey.equals(
- BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName()))
{
- pipeConnector = new IoTDBThriftAsyncConnector();
- } else if (connectorKey.equals(
- BuiltinPipePlugin.IOTDB_LEGACY_PIPE_CONNECTOR.getPipePluginName())) {
- pipeConnector = new IoTDBLegacyPipeConnector();
- } else if (connectorKey.equals(
- BuiltinPipePlugin.IOTDB_AIR_GAP_CONNECTOR.getPipePluginName())) {
- pipeConnector = new IoTDBAirGapConnector();
- } else if
(connectorKey.equals(BuiltinPipePlugin.OPC_UA_CONNECTOR.getPipePluginName())) {
- pipeConnector = new OpcUaConnector();
- } else if
(connectorKey.equals(BuiltinPipePlugin.WEBSOCKET_CONNECTOR.getPipePluginName()))
{
- pipeConnector = new WebSocketConnector();
- } else {
- pipeConnector =
PipeAgent.plugin().reflectConnector(pipeConnectorParameters);
- }
-
- try {
- pipeConnector.validate(new
PipeParameterValidator(pipeConnectorParameters));
- pipeConnector.customize(
- pipeConnectorParameters, new
PipeTaskRuntimeConfiguration(pipeRuntimeEnvironment));
- pipeConnector.handshake();
- } catch (Exception e) {
- throw new PipeException(
- "Failed to construct PipeConnector, because of " + e.getMessage(),
e);
- }
-
- // 2. Construct PipeConnectorSubtaskLifeCycle to manage
PipeConnectorSubtask's life cycle
+ // Shared pending queue for all subtasks
final BoundedBlockingPendingQueue<Event> pendingQueue =
new BoundedBlockingPendingQueue<>(
PipeConfig.getInstance().getPipeConnectorPendingQueueSize());
- final PipeConnectorSubtask pipeConnectorSubtask =
- new PipeConnectorSubtask(attributeSortedString, pendingQueue,
pipeConnector);
- final PipeConnectorSubtaskLifeCycle pipeConnectorSubtaskLifeCycle =
- new PipeConnectorSubtaskLifeCycle(executor, pipeConnectorSubtask,
pendingQueue);
+
+ for (int i = 0; i < connectorNum; i++) {
+ final PipeConnector pipeConnector =
+ CONNECTOR_CONSTRUCTORS
+ .getOrDefault(
+ connectorKey,
+ () ->
PipeAgent.plugin().reflectConnector(pipeConnectorParameters))
+ .get();
+
+ // 1. Construct, validate and customize PipeConnector, and then
handshake (create
+ // connection) with the target
+ try {
+ pipeConnector.validate(new
PipeParameterValidator(pipeConnectorParameters));
+ pipeConnector.customize(
+ pipeConnectorParameters, new
PipeTaskRuntimeConfiguration(pipeRuntimeEnvironment));
+ pipeConnector.handshake();
+ } catch (Exception e) {
+ throw new PipeException(
+ "Failed to construct PipeConnector, because of " +
e.getMessage(), e);
+ }
+
+ // 2. Construct PipeConnectorSubtaskLifeCycle to manage
PipeConnectorSubtask's life cycle
+ final PipeConnectorSubtask pipeConnectorSubtask =
+ new PipeConnectorSubtask(
+ String.format(
+ "%s_%s_%s", attributeSortedString,
pipeRuntimeEnvironment.getCreationTime(), i),
+ pendingQueue,
+ pipeConnector);
+ final PipeConnectorSubtaskLifeCycle pipeConnectorSubtaskLifeCycle =
+ new PipeConnectorSubtaskLifeCycle(executor, pipeConnectorSubtask,
pendingQueue);
+ pipeConnectorSubtaskLifeCycleList.add(pipeConnectorSubtaskLifeCycle);
+ }
+
attributeSortedString2SubtaskLifeCycleMap.put(
- attributeSortedString, pipeConnectorSubtaskLifeCycle);
+ attributeSortedString, pipeConnectorSubtaskLifeCycleList);
}
-
attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString).register();
+ for (final PipeConnectorSubtaskLifeCycle lifeCycle :
+ attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString)) {
+ lifeCycle.register();
+ }
return attributeSortedString;
}
@@ -120,7 +130,11 @@ public class PipeConnectorSubtaskManager {
throw new PipeException(FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE +
attributeSortedString);
}
- if
(attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString).deregister())
{
+ final List<PipeConnectorSubtaskLifeCycle> lifeCycles =
+ attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString);
+ lifeCycles.removeIf(PipeConnectorSubtaskLifeCycle::deregister);
+
+ if (lifeCycles.isEmpty()) {
attributeSortedString2SubtaskLifeCycleMap.remove(attributeSortedString);
}
}
@@ -130,7 +144,10 @@ public class PipeConnectorSubtaskManager {
throw new PipeException(FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE +
attributeSortedString);
}
-
attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString).start();
+ for (final PipeConnectorSubtaskLifeCycle lifeCycle :
+ attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString)) {
+ lifeCycle.start();
+ }
}
public synchronized void stop(String attributeSortedString) {
@@ -138,7 +155,10 @@ public class PipeConnectorSubtaskManager {
throw new PipeException(FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE +
attributeSortedString);
}
-
attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString).stop();
+ for (final PipeConnectorSubtaskLifeCycle lifeCycle :
+ attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString)) {
+ lifeCycle.stop();
+ }
}
public BoundedBlockingPendingQueue<Event> getPipeConnectorPendingQueue(
@@ -148,13 +168,34 @@ public class PipeConnectorSubtaskManager {
"Failed to get PendingQueue. No such subtask: " +
attributeSortedString);
}
- return
attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString).getPendingQueue();
+ // All subtasks share the same pending queue
+ return attributeSortedString2SubtaskLifeCycleMap
+ .get(attributeSortedString)
+ .get(0)
+ .getPendingQueue();
}
///////////////////////// Singleton Instance Holder
/////////////////////////
private PipeConnectorSubtaskManager() {
- // Empty constructor
+ CONNECTOR_CONSTRUCTORS.put(
+ BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName(),
+ IoTDBThriftSyncConnector::new);
+ CONNECTOR_CONSTRUCTORS.put(
+ BuiltinPipePlugin.IOTDB_THRIFT_SYNC_CONNECTOR.getPipePluginName(),
+ IoTDBThriftSyncConnector::new);
+ CONNECTOR_CONSTRUCTORS.put(
+ BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName(),
+ IoTDBThriftAsyncConnector::new);
+ CONNECTOR_CONSTRUCTORS.put(
+ BuiltinPipePlugin.IOTDB_LEGACY_PIPE_CONNECTOR.getPipePluginName(),
+ IoTDBLegacyPipeConnector::new);
+ CONNECTOR_CONSTRUCTORS.put(
+ BuiltinPipePlugin.IOTDB_AIR_GAP_CONNECTOR.getPipePluginName(),
IoTDBAirGapConnector::new);
+ CONNECTOR_CONSTRUCTORS.put(
+ BuiltinPipePlugin.WEBSOCKET_CONNECTOR.getPipePluginName(),
WebSocketConnector::new);
+ CONNECTOR_CONSTRUCTORS.put(
+ BuiltinPipePlugin.OPC_UA_CONNECTOR.getPipePluginName(),
OpcUaConnector::new);
}
private static class PipeSubtaskManagerHolder {