This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new fd48ef68254 Pipe: support different load balance strategies for data
sync (#12281)
fd48ef68254 is described below
commit fd48ef682541d156590d7c1f8d774ddd2335407d
Author: Steve Yurong Su <[email protected]>
AuthorDate: Tue Apr 2 20:49:01 2024 +0800
Pipe: support different load balance strategies for data sync (#12281)
**Supported sinks:** iotdb-airgap-sink, iotdb-ssl-thrift-sink,
iotdb-sync-sink, iotdb-async-sink
```
create pipe p1
with sink (
'load-balance-strategy' = 'random'
);
```
Options for `'load-balance-strategy'`: `round-robin`, `random` and
`priority`. `round-robin` by default.
**Explaination:**
* `round-robin`: The connector will select the next client in a
round-robin manner. If the current client is dead, the connector will try the
next client until it finds an alive client. If all clients are dead, the
connector will throw a `PipeConnectionException`.
* `random`: The connector will select a client randomly. If the selected
client is dead, the connector will try the next client until it finds an alive
client. If all clients are dead, the connector will throw a
`PipeConnectionException`.
* `priority`: The connector will select the first alive client in given
clients (via. `node-urls`). If all clients are dead, the connector will throw a
`PipeConnectionException`.
---
.../client/IoTDBConfigNodeSyncClientManager.java | 8 +-
.../protocol/IoTDBConfigRegionConnector.java | 6 +-
.../client/IoTDBDataNodeAsyncClientManager.java | 86 ++++++++++++++--
.../client/IoTDBDataNodeSyncClientManager.java | 5 +-
.../async/IoTDBDataRegionAsyncConnector.java | 4 +-
.../thrift/sync/IoTDBDataNodeSyncConnector.java | 5 +-
.../config/constant/PipeConnectorConstant.java | 18 ++++
.../pipe/connector/client/IoTDBSyncClient.java | 4 +-
.../connector/client/IoTDBSyncClientManager.java | 107 ++++++++++++++++---
.../connector/protocol/IoTDBAirGapConnector.java | 98 +++++++++++++++---
.../pipe/connector/protocol/IoTDBConnector.java | 113 ++++++++++++---------
.../connector/protocol/IoTDBSslSyncConnector.java | 6 +-
12 files changed, 367 insertions(+), 93 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java
index e92f4879d4c..e420a6b5c6d 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java
@@ -34,8 +34,12 @@ import java.util.Map;
public class IoTDBConfigNodeSyncClientManager extends IoTDBSyncClientManager {
public IoTDBConfigNodeSyncClientManager(
- List<TEndPoint> endPoints, boolean useSSL, String trustStorePath, String
trustStorePwd) {
- super(endPoints, useSSL, trustStorePath, trustStorePwd, false);
+ List<TEndPoint> endPoints,
+ boolean useSSL,
+ String trustStorePath,
+ String trustStorePwd,
+ String loadBalanceStrategy) {
+ super(endPoints, useSSL, trustStorePath, trustStorePwd, false,
loadBalanceStrategy);
}
@Override
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
index 1706ad85bbc..6058fed3027 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
@@ -59,8 +59,10 @@ public class IoTDBConfigRegionConnector extends
IoTDBSslSyncConnector {
boolean useSSL,
String trustStorePath,
String trustStorePwd,
- boolean useLeaderCache) {
- return new IoTDBConfigNodeSyncClientManager(nodeUrls, useSSL,
trustStorePath, trustStorePwd);
+ boolean useLeaderCache,
+ String loadBalanceStrategy) {
+ return new IoTDBConfigNodeSyncClientManager(
+ nodeUrls, useSSL, trustStorePath, trustStorePwd, loadBalanceStrategy);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
index 45aab88c2c9..b034a8182ed 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
@@ -46,6 +46,10 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_PRIORITY_STRATEGY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_RANDOM_STRATEGY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_ROUND_ROBIN_STRATEGY;
+
public class IoTDBDataNodeAsyncClientManager extends IoTDBClientManager
implements IoTDBDataNodeCacheLeaderClientManager {
@@ -59,7 +63,10 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER = new AtomicReference<>();
private final IClientManager<TEndPoint, AsyncPipeDataTransferServiceClient>
endPoint2Client;
- public IoTDBDataNodeAsyncClientManager(List<TEndPoint> endPoints, boolean
useLeaderCache) {
+ private final LoadBalancer loadBalancer;
+
+ public IoTDBDataNodeAsyncClientManager(
+ List<TEndPoint> endPoints, boolean useLeaderCache, String
loadBalanceStrategy) {
super(endPoints, useLeaderCache);
endPointSet = new HashSet<>(endPoints);
@@ -75,17 +82,27 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
}
}
endPoint2Client = ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.get();
+
+ switch (loadBalanceStrategy) {
+ case CONNECTOR_LOAD_BALANCE_ROUND_ROBIN_STRATEGY:
+ loadBalancer = new RoundRobinLoadBalancer();
+ break;
+ case CONNECTOR_LOAD_BALANCE_RANDOM_STRATEGY:
+ loadBalancer = new RandomLoadBalancer();
+ break;
+ case CONNECTOR_LOAD_BALANCE_PRIORITY_STRATEGY:
+ loadBalancer = new PriorityLoadBalancer();
+ break;
+ default:
+ LOGGER.warn(
+ "Unknown load balance strategy: {}, use round-robin strategy
instead.",
+ loadBalanceStrategy);
+ loadBalancer = new RoundRobinLoadBalancer();
+ }
}
public AsyncPipeDataTransferServiceClient borrowClient() throws Exception {
- final int clientSize = endPointList.size();
- while (true) {
- final TEndPoint targetNodeUrl = endPointList.get((int)
(currentClientIndex++ % clientSize));
- final AsyncPipeDataTransferServiceClient client =
endPoint2Client.borrowClient(targetNodeUrl);
- if (handshakeIfNecessary(targetNodeUrl, client)) {
- return client;
- }
- }
+ return loadBalancer.borrowClient();
}
public AsyncPipeDataTransferServiceClient borrowClient(String deviceId)
throws Exception {
@@ -239,4 +256,55 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
LEADER_CACHE_MANAGER.updateLeaderEndPoint(deviceId, endPoint);
}
+
+ /////////////////////// Strategies for load balance
//////////////////////////
+
+ private interface LoadBalancer {
+ AsyncPipeDataTransferServiceClient borrowClient() throws Exception;
+ }
+
+ private class RoundRobinLoadBalancer implements LoadBalancer {
+ @Override
+ public AsyncPipeDataTransferServiceClient borrowClient() throws Exception {
+ final int clientSize = endPointList.size();
+ while (true) {
+ final TEndPoint targetNodeUrl = endPointList.get((int)
(currentClientIndex++ % clientSize));
+ final AsyncPipeDataTransferServiceClient client =
+ endPoint2Client.borrowClient(targetNodeUrl);
+ if (handshakeIfNecessary(targetNodeUrl, client)) {
+ return client;
+ }
+ }
+ }
+ }
+
+ private class RandomLoadBalancer implements LoadBalancer {
+ @Override
+ public AsyncPipeDataTransferServiceClient borrowClient() throws Exception {
+ final int clientSize = endPointList.size();
+ while (true) {
+ final TEndPoint targetNodeUrl = endPointList.get((int) (Math.random()
* clientSize));
+ final AsyncPipeDataTransferServiceClient client =
+ endPoint2Client.borrowClient(targetNodeUrl);
+ if (handshakeIfNecessary(targetNodeUrl, client)) {
+ return client;
+ }
+ }
+ }
+ }
+
+ private class PriorityLoadBalancer implements LoadBalancer {
+ @Override
+ public AsyncPipeDataTransferServiceClient borrowClient() throws Exception {
+ while (true) {
+ for (final TEndPoint targetNodeUrl : endPointList) {
+ final AsyncPipeDataTransferServiceClient client =
+ endPoint2Client.borrowClient(targetNodeUrl);
+ if (handshakeIfNecessary(targetNodeUrl, client)) {
+ return client;
+ }
+ }
+ }
+ }
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java
index 91dec3517ea..2c346a11cf2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java
@@ -47,8 +47,9 @@ public class IoTDBDataNodeSyncClientManager extends
IoTDBSyncClientManager
boolean useSSL,
String trustStorePath,
String trustStorePwd,
- boolean useLeaderCache) {
- super(endPoints, useSSL, trustStorePath, trustStorePwd, useLeaderCache);
+ boolean useLeaderCache,
+ String loadBalanceStrategy) {
+ super(endPoints, useSSL, trustStorePath, trustStorePwd, useLeaderCache,
loadBalanceStrategy);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index 1f631997c51..dd6b002c523 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -99,6 +99,7 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
retryConnector.validate(validator);
final PipeParameters parameters = validator.getParameters();
+
validator.validate(
args -> !((boolean) args[0] || (boolean) args[1] || (boolean) args[2]),
"Only 'iotdb-thrift-ssl-sink' supports SSL transmission currently.",
@@ -124,7 +125,8 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
nodeUrls,
parameters.getBooleanOrDefault(
Arrays.asList(SINK_LEADER_CACHE_ENABLE_KEY,
CONNECTOR_LEADER_CACHE_ENABLE_KEY),
- CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE));
+ CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE),
+ loadBalanceStrategy);
if (isTabletBatchModeEnabled) {
tabletBatchBuilder = new
IoTDBThriftAsyncPipeTransferBatchReqBuilder(parameters);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
index b516fd5d728..61493b31212 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
@@ -84,10 +84,11 @@ public abstract class IoTDBDataNodeSyncConnector extends
IoTDBSslSyncConnector {
boolean useSSL,
String trustStorePath,
String trustStorePwd,
- boolean useLeaderCache) {
+ boolean useLeaderCache,
+ String loadBalanceStrategy) {
clientManager =
new IoTDBDataNodeSyncClientManager(
- nodeUrls, useSSL, trustStorePath, trustStorePwd, useLeaderCache);
+ nodeUrls, useSSL, trustStorePath, trustStorePwd, useLeaderCache,
loadBalanceStrategy);
return clientManager;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
index 1aecd761b2a..bb9d075910d 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
@@ -23,6 +23,10 @@ import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import java.io.File;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
import static org.apache.iotdb.commons.conf.IoTDBConstant.MB;
@@ -137,6 +141,20 @@ public class PipeConnectorConstant {
public static final String SINK_LEADER_CACHE_ENABLE_KEY =
"sink.leader-cache.enable";
public static final boolean CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE =
true;
+ public static final String CONNECTOR_LOAD_BALANCE_STRATEGY_KEY =
+ "connector.load-balance-strategy";
+ public static final String SINK_LOAD_BALANCE_STRATEGY_KEY =
"sink.load-balance-strategy";
+ public static final String CONNECTOR_LOAD_BALANCE_ROUND_ROBIN_STRATEGY =
"round-robin";
+ public static final String CONNECTOR_LOAD_BALANCE_RANDOM_STRATEGY = "random";
+ public static final String CONNECTOR_LOAD_BALANCE_PRIORITY_STRATEGY =
"priority";
+ public static final Set<String> CONNECTOR_LOAD_BALANCE_STRATEGY_SET =
+ Collections.unmodifiableSet(
+ new HashSet<>(
+ Arrays.asList(
+ CONNECTOR_LOAD_BALANCE_ROUND_ROBIN_STRATEGY,
+ CONNECTOR_LOAD_BALANCE_RANDOM_STRATEGY,
+ CONNECTOR_LOAD_BALANCE_PRIORITY_STRATEGY)));
+
public static final String SINK_TOPIC_KEY = "sink.topic";
public static final String SINK_CONSUMER_GROUP_KEY = "sink.consumer-group";
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClient.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClient.java
index d7851da970c..3ab7a156b74 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClient.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClient.java
@@ -31,8 +31,8 @@ import org.apache.thrift.transport.TTransportException;
public class IoTDBSyncClient extends IClientRPCService.Client
implements ThriftClient, AutoCloseable {
- private String ipAddress;
- private int port;
+ private final String ipAddress;
+ private final int port;
public IoTDBSyncClient(
ThriftClientProperty property,
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
index f2cb76021ac..9a20c0bb727 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
@@ -41,6 +41,10 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_PRIORITY_STRATEGY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_RANDOM_STRATEGY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_ROUND_ROBIN_STRATEGY;
+
public abstract class IoTDBSyncClientManager extends IoTDBClientManager
implements Closeable {
private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBSyncClientManager.class);
@@ -54,12 +58,15 @@ public abstract class IoTDBSyncClientManager extends
IoTDBClientManager implemen
protected final Map<TEndPoint, Pair<IoTDBSyncClient, Boolean>>
endPoint2ClientAndStatus =
new ConcurrentHashMap<>();
+ private final LoadBalancer loadBalancer;
+
protected IoTDBSyncClientManager(
List<TEndPoint> endPoints,
boolean useSSL,
String trustStorePath,
String trustStorePwd,
- boolean useLeaderCache) {
+ boolean useLeaderCache,
+ String loadBalanceStrategy) {
super(endPoints, useLeaderCache);
this.useSSL = useSSL;
@@ -69,6 +76,23 @@ public abstract class IoTDBSyncClientManager extends
IoTDBClientManager implemen
for (final TEndPoint endPoint : endPoints) {
endPoint2ClientAndStatus.put(endPoint, new Pair<>(null, false));
}
+
+ switch (loadBalanceStrategy) {
+ case CONNECTOR_LOAD_BALANCE_ROUND_ROBIN_STRATEGY:
+ loadBalancer = new RoundRobinLoadBalancer();
+ break;
+ case CONNECTOR_LOAD_BALANCE_RANDOM_STRATEGY:
+ loadBalancer = new RandomLoadBalancer();
+ break;
+ case CONNECTOR_LOAD_BALANCE_PRIORITY_STRATEGY:
+ loadBalancer = new PriorityLoadBalancer();
+ break;
+ default:
+ LOGGER.warn(
+ "Unknown load balance strategy: {}, use round-robin strategy
instead.",
+ loadBalanceStrategy);
+ loadBalancer = new RoundRobinLoadBalancer();
+ }
}
public void checkClientStatusAndTryReconstructIfNecessary() {
@@ -193,18 +217,7 @@ public abstract class IoTDBSyncClientManager extends
IoTDBClientManager implemen
protected abstract String getClusterId();
public Pair<IoTDBSyncClient, Boolean> getClient() {
- final int clientSize = endPointList.size();
- // Round-robin, find the next alive client
- for (int tryCount = 0; tryCount < clientSize; ++tryCount) {
- final int clientIndex = (int) (currentClientIndex++ % clientSize);
- final Pair<IoTDBSyncClient, Boolean> clientAndStatus =
- endPoint2ClientAndStatus.get(endPointList.get(clientIndex));
- if (Boolean.TRUE.equals(clientAndStatus.getRight())) {
- return clientAndStatus;
- }
- }
- throw new PipeConnectionException(
- "All clients are dead, please check the connection to the receiver.");
+ return loadBalancer.getClient();
}
@Override
@@ -236,4 +249,72 @@ public abstract class IoTDBSyncClientManager extends
IoTDBClientManager implemen
}
}
}
+
+ /////////////////////// Strategies for load balance
//////////////////////////
+
+ private interface LoadBalancer {
+ Pair<IoTDBSyncClient, Boolean> getClient();
+ }
+
+ private class RoundRobinLoadBalancer implements LoadBalancer {
+ @Override
+ public Pair<IoTDBSyncClient, Boolean> getClient() {
+ final int clientSize = endPointList.size();
+ // Round-robin, find the next alive client
+ for (int tryCount = 0; tryCount < clientSize; ++tryCount) {
+ final int clientIndex = (int) (currentClientIndex++ % clientSize);
+ final Pair<IoTDBSyncClient, Boolean> clientAndStatus =
+ endPoint2ClientAndStatus.get(endPointList.get(clientIndex));
+ if (Boolean.TRUE.equals(clientAndStatus.getRight())) {
+ return clientAndStatus;
+ }
+ }
+
+ throw new PipeConnectionException(
+ "All clients are dead, please check the connection to the
receiver.");
+ }
+ }
+
+ private class RandomLoadBalancer implements LoadBalancer {
+ @Override
+ public Pair<IoTDBSyncClient, Boolean> getClient() {
+ final int clientSize = endPointList.size();
+ final int clientIndex = (int) (Math.random() * clientSize);
+ final Pair<IoTDBSyncClient, Boolean> clientAndStatus =
+ endPoint2ClientAndStatus.get(endPointList.get(clientIndex));
+ if (Boolean.TRUE.equals(clientAndStatus.getRight())) {
+ return clientAndStatus;
+ }
+
+ // Random, find the next alive client
+ for (int tryCount = 0; tryCount < clientSize - 1; ++tryCount) {
+ final int nextClientIndex = (clientIndex + tryCount + 1) % clientSize;
+ final Pair<IoTDBSyncClient, Boolean> nextClientAndStatus =
+ endPoint2ClientAndStatus.get(endPointList.get(nextClientIndex));
+ if (Boolean.TRUE.equals(nextClientAndStatus.getRight())) {
+ return nextClientAndStatus;
+ }
+ }
+
+ throw new PipeConnectionException(
+ "All clients are dead, please check the connection to the
receiver.");
+ }
+ }
+
+ private class PriorityLoadBalancer implements LoadBalancer {
+ @Override
+ public Pair<IoTDBSyncClient, Boolean> getClient() {
+ // Priority, find the first alive client
+ for (final TEndPoint endPoint : endPointList) {
+ final Pair<IoTDBSyncClient, Boolean> clientAndStatus =
+ endPoint2ClientAndStatus.get(endPoint);
+ if (Boolean.TRUE.equals(clientAndStatus.getRight())) {
+ return clientAndStatus;
+ }
+ }
+
+ throw new PipeConnectionException(
+ "All clients are dead, please check the connection to the
receiver.");
+ }
+ }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBAirGapConnector.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBAirGapConnector.java
index 25c8b4abeb7..33ebea1b868 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBAirGapConnector.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBAirGapConnector.java
@@ -48,6 +48,9 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_AIR_GAP_E_LANGUAGE_ENABLE_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_AIR_GAP_HANDSHAKE_TIMEOUT_MS_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_AIR_GAP_HANDSHAKE_TIMEOUT_MS_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_PRIORITY_STRATEGY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_RANDOM_STRATEGY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_ROUND_ROBIN_STRATEGY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_AIR_GAP_E_LANGUAGE_ENABLE_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_AIR_GAP_HANDSHAKE_TIMEOUT_MS_KEY;
import static org.apache.iotdb.commons.utils.BasicStructureSerDeUtil.LONG_LEN;
@@ -61,10 +64,12 @@ public abstract class IoTDBAirGapConnector extends
IoTDBConnector {
protected final List<Socket> sockets = new ArrayList<>();
protected final List<Boolean> isSocketAlive = new ArrayList<>();
+ private LoadBalancer loadBalancer;
+ private long currentClientIndex = 0;
+
private int handshakeTimeoutMs;
- private boolean eLanguageEnable;
- private long currentClientIndex = 0;
+ private boolean eLanguageEnable;
// The air gap connector does not use clientManager thus we put handshake
type here
protected boolean supportModsIfIsDataNodeReceiver = true;
@@ -86,6 +91,23 @@ public abstract class IoTDBAirGapConnector extends
IoTDBConnector {
sockets.add(null);
}
+ switch (loadBalanceStrategy) {
+ case CONNECTOR_LOAD_BALANCE_ROUND_ROBIN_STRATEGY:
+ loadBalancer = new RoundRobinLoadBalancer();
+ break;
+ case CONNECTOR_LOAD_BALANCE_RANDOM_STRATEGY:
+ loadBalancer = new RandomLoadBalancer();
+ break;
+ case CONNECTOR_LOAD_BALANCE_PRIORITY_STRATEGY:
+ loadBalancer = new PriorityLoadBalancer();
+ break;
+ default:
+ LOGGER.warn(
+ "Unknown load balance strategy: {}, use round-robin strategy
instead.",
+ loadBalanceStrategy);
+ loadBalancer = new RoundRobinLoadBalancer();
+ }
+
handshakeTimeoutMs =
parameters.getIntOrDefault(
Arrays.asList(
@@ -235,16 +257,7 @@ public abstract class IoTDBAirGapConnector extends
IoTDBConnector {
String fileName, long position, byte[] payLoad) throws IOException;
protected int nextSocketIndex() {
- final int socketSize = sockets.size();
- // Round-robin, find the next alive client
- for (int tryCount = 0; tryCount < socketSize; ++tryCount) {
- final int clientIndex = (int) (currentClientIndex++ % socketSize);
- if (Boolean.TRUE.equals(isSocketAlive.get(clientIndex))) {
- return clientIndex;
- }
- }
- throw new PipeConnectionException(
- "All sockets are dead, please check the connection to the receiver.");
+ return loadBalancer.nextSocketIndex();
}
protected boolean send(Socket socket, byte[] bytes) throws IOException {
@@ -296,4 +309,65 @@ public abstract class IoTDBAirGapConnector extends
IoTDBConnector {
}
}
}
+
+ /////////////////////// Strategies for load balance
//////////////////////////
+
+ private interface LoadBalancer {
+ int nextSocketIndex();
+ }
+
+ private class RoundRobinLoadBalancer implements LoadBalancer {
+ @Override
+ public int nextSocketIndex() {
+ final int socketSize = sockets.size();
+ // Round-robin, find the next alive client
+ for (int tryCount = 0; tryCount < socketSize; ++tryCount) {
+ final int clientIndex = (int) (currentClientIndex++ % socketSize);
+ if (Boolean.TRUE.equals(isSocketAlive.get(clientIndex))) {
+ return clientIndex;
+ }
+ }
+
+ throw new PipeConnectionException(
+ "All sockets are dead, please check the connection to the
receiver.");
+ }
+ }
+
+ private class RandomLoadBalancer implements LoadBalancer {
+ @Override
+ public int nextSocketIndex() {
+ final int socketSize = sockets.size();
+ final int clientIndex = (int) (Math.random() * socketSize);
+ if (Boolean.TRUE.equals(isSocketAlive.get(clientIndex))) {
+ return clientIndex;
+ }
+
+ // Random, find the next alive client
+ for (int tryCount = 0; tryCount < socketSize - 1; ++tryCount) {
+ final int nextClientIndex = (clientIndex + tryCount + 1) % socketSize;
+ if (Boolean.TRUE.equals(isSocketAlive.get(nextClientIndex))) {
+ return nextClientIndex;
+ }
+ }
+
+ throw new PipeConnectionException(
+ "All sockets are dead, please check the connection to the
receiver.");
+ }
+ }
+
+ private class PriorityLoadBalancer implements LoadBalancer {
+ @Override
+ public int nextSocketIndex() {
+ // Priority, find the first alive client
+ final int socketSize = sockets.size();
+ for (int i = 0; i < socketSize; ++i) {
+ if (Boolean.TRUE.equals(isSocketAlive.get(i))) {
+ return i;
+ }
+ }
+
+ throw new PipeConnectionException(
+ "All sockets are dead, please check the connection to the
receiver.");
+ }
+ }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
index b2342966c20..65b32a48396 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
@@ -33,7 +33,7 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.HashSet;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
@@ -54,6 +54,9 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_IP_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_NODE_URLS_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PORT_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_ROUND_ROBIN_STRATEGY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_STRATEGY_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_STRATEGY_SET;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_EXCEPTION_CONFLICT_RECORD_IGNORED_DATA_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_EXCEPTION_CONFLICT_RETRY_MAX_TIME_SECONDS_KEY;
@@ -64,63 +67,80 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_IP_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_NODE_URLS_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_PORT_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_LOAD_BALANCE_STRATEGY_KEY;
public abstract class IoTDBConnector implements PipeConnector {
+ private static final String PARSE_URL_ERROR_FORMATTER =
+ "Exception occurred while parsing node urls from target servers: {}";
+ private static final String PARSE_URL_ERROR_MESSAGE =
+ "Error occurred while parsing node urls from target servers, please
check the specified 'host':'port' or 'node-urls'";
+
private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBConnector.class);
protected final List<TEndPoint> nodeUrls = new ArrayList<>();
+ protected String loadBalanceStrategy;
+
protected boolean isTabletBatchModeEnabled = true;
protected PipeReceiverStatusHandler receiverStatusHandler;
- private static final String PARSE_URL_ERROR_FORMATTER =
- "Exception occurred while parsing node urls from target servers: {}";
-
- private static final String PARSE_URL_ERROR_MESSAGE =
- "Error occurred while parsing node urls from target servers, please
check the specified 'host':'port' or 'node-urls'";
-
@Override
public void validate(PipeParameterValidator validator) throws Exception {
final PipeParameters parameters = validator.getParameters();
- validator
- .validate(
- args ->
- (boolean) args[0]
- || (((boolean) args[1] || (boolean) args[2]) && (boolean)
args[3])
- || (boolean) args[4]
- || (((boolean) args[5] || (boolean) args[6]) && (boolean)
args[7]),
- String.format(
- "One of %s, %s:%s, %s, %s:%s must be specified",
- CONNECTOR_IOTDB_NODE_URLS_KEY,
- CONNECTOR_IOTDB_HOST_KEY,
- CONNECTOR_IOTDB_PORT_KEY,
- SINK_IOTDB_NODE_URLS_KEY,
- SINK_IOTDB_HOST_KEY,
- SINK_IOTDB_PORT_KEY),
- parameters.hasAttribute(CONNECTOR_IOTDB_NODE_URLS_KEY),
- parameters.hasAttribute(CONNECTOR_IOTDB_IP_KEY),
- parameters.hasAttribute(CONNECTOR_IOTDB_HOST_KEY),
- parameters.hasAttribute(CONNECTOR_IOTDB_PORT_KEY),
- parameters.hasAttribute(SINK_IOTDB_NODE_URLS_KEY),
- parameters.hasAttribute(SINK_IOTDB_IP_KEY),
- parameters.hasAttribute(SINK_IOTDB_HOST_KEY),
- parameters.hasAttribute(SINK_IOTDB_PORT_KEY))
- .validate(
- arg -> arg.equals("retry") || arg.equals("ignore"),
- String.format(
- "The value of key %s or %s must be either 'retry' or
'ignore'.",
- CONNECTOR_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_KEY,
- SINK_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_KEY),
- parameters
- .getStringOrDefault(
- Arrays.asList(
- CONNECTOR_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_KEY,
- SINK_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_KEY),
-
CONNECTOR_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_DEFAULT_VALUE)
- .trim()
- .toLowerCase());
+
+ validator.validate(
+ args ->
+ (boolean) args[0]
+ || (((boolean) args[1] || (boolean) args[2]) && (boolean)
args[3])
+ || (boolean) args[4]
+ || (((boolean) args[5] || (boolean) args[6]) && (boolean)
args[7]),
+ String.format(
+ "One of %s, %s:%s, %s, %s:%s must be specified",
+ CONNECTOR_IOTDB_NODE_URLS_KEY,
+ CONNECTOR_IOTDB_HOST_KEY,
+ CONNECTOR_IOTDB_PORT_KEY,
+ SINK_IOTDB_NODE_URLS_KEY,
+ SINK_IOTDB_HOST_KEY,
+ SINK_IOTDB_PORT_KEY),
+ parameters.hasAttribute(CONNECTOR_IOTDB_NODE_URLS_KEY),
+ parameters.hasAttribute(CONNECTOR_IOTDB_IP_KEY),
+ parameters.hasAttribute(CONNECTOR_IOTDB_HOST_KEY),
+ parameters.hasAttribute(CONNECTOR_IOTDB_PORT_KEY),
+ parameters.hasAttribute(SINK_IOTDB_NODE_URLS_KEY),
+ parameters.hasAttribute(SINK_IOTDB_IP_KEY),
+ parameters.hasAttribute(SINK_IOTDB_HOST_KEY),
+ parameters.hasAttribute(SINK_IOTDB_PORT_KEY));
+
+ loadBalanceStrategy =
+ parameters
+ .getStringOrDefault(
+ Arrays.asList(CONNECTOR_LOAD_BALANCE_STRATEGY_KEY,
SINK_LOAD_BALANCE_STRATEGY_KEY),
+ CONNECTOR_LOAD_BALANCE_ROUND_ROBIN_STRATEGY)
+ .trim()
+ .toLowerCase();
+ validator.validate(
+ arg ->
CONNECTOR_LOAD_BALANCE_STRATEGY_SET.contains(loadBalanceStrategy),
+ String.format(
+ "Load balance strategy should be one of %s, but got %s.",
+ CONNECTOR_LOAD_BALANCE_STRATEGY_SET, loadBalanceStrategy),
+ loadBalanceStrategy);
+
+ validator.validate(
+ arg -> arg.equals("retry") || arg.equals("ignore"),
+ String.format(
+ "The value of key %s or %s must be either 'retry' or 'ignore'.",
+ CONNECTOR_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_KEY,
+ SINK_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_KEY),
+ parameters
+ .getStringOrDefault(
+ Arrays.asList(
+ CONNECTOR_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_KEY,
+ SINK_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_KEY),
+ CONNECTOR_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_DEFAULT_VALUE)
+ .trim()
+ .toLowerCase());
}
@Override
@@ -168,9 +188,9 @@ public abstract class IoTDBConnector implements
PipeConnector {
CONNECTOR_EXCEPTION_OTHERS_RECORD_IGNORED_DATA_DEFAULT_VALUE));
}
- protected Set<TEndPoint> parseNodeUrls(PipeParameters parameters)
+ protected LinkedHashSet<TEndPoint> parseNodeUrls(PipeParameters parameters)
throws PipeParameterNotValidException {
- final Set<TEndPoint> givenNodeUrls = new HashSet<>(nodeUrls);
+ final LinkedHashSet<TEndPoint> givenNodeUrls = new
LinkedHashSet<>(nodeUrls);
try {
if (parameters.hasAttribute(CONNECTOR_IOTDB_IP_KEY)
@@ -223,6 +243,7 @@ public abstract class IoTDBConnector implements
PipeConnector {
}
checkNodeUrls(givenNodeUrls);
+
return givenNodeUrls;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
index e7ecbdf67e1..f9e5a8743f5 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
@@ -113,7 +113,8 @@ public abstract class IoTDBSslSyncConnector extends
IoTDBConnector {
CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE);
clientManager =
- constructClient(nodeUrls, useSSL, trustStorePath, trustStorePwd,
useLeaderCache);
+ constructClient(
+ nodeUrls, useSSL, trustStorePath, trustStorePwd, useLeaderCache,
loadBalanceStrategy);
}
protected abstract IoTDBSyncClientManager constructClient(
@@ -121,7 +122,8 @@ public abstract class IoTDBSslSyncConnector extends
IoTDBConnector {
boolean useSSL,
String trustStorePath,
String trustStorePwd,
- boolean useLeaderCache);
+ boolean useLeaderCache,
+ String loadBalanceStrategy);
@Override
public void handshake() throws Exception {