This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch pipe-load-balancer in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 3bca619191b9dd0e9f40dc2b4b9dadc0062a0bc7 Author: Steve Yurong Su <[email protected]> AuthorDate: Tue Apr 2 17:08:43 2024 +0800 Pipe: support different load balance strategies for data sync --- .../client/IoTDBConfigNodeSyncClientManager.java | 8 +- .../protocol/IoTDBConfigRegionConnector.java | 6 +- .../client/IoTDBDataNodeAsyncClientManager.java | 86 +++++++++++++++-- .../client/IoTDBDataNodeSyncClientManager.java | 5 +- .../async/IoTDBDataRegionAsyncConnector.java | 23 ++++- .../thrift/sync/IoTDBDataNodeSyncConnector.java | 5 +- .../config/constant/PipeConnectorConstant.java | 16 +++ .../connector/client/IoTDBSyncClientManager.java | 107 ++++++++++++++++++--- .../connector/protocol/IoTDBSslSyncConnector.java | 25 ++++- 9 files changed, 248 insertions(+), 33 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java index e92f4879d4c..e420a6b5c6d 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java @@ -34,8 +34,12 @@ import java.util.Map; public class IoTDBConfigNodeSyncClientManager extends IoTDBSyncClientManager { public IoTDBConfigNodeSyncClientManager( - List<TEndPoint> endPoints, boolean useSSL, String trustStorePath, String trustStorePwd) { - super(endPoints, useSSL, trustStorePath, trustStorePwd, false); + List<TEndPoint> endPoints, + boolean useSSL, + String trustStorePath, + String trustStorePwd, + String loadBalanceStrategy) { + super(endPoints, useSSL, trustStorePath, trustStorePwd, false, loadBalanceStrategy); } @Override diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java index 1706ad85bbc..6058fed3027 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java @@ -59,8 +59,10 @@ public class IoTDBConfigRegionConnector extends IoTDBSslSyncConnector { boolean useSSL, String trustStorePath, String trustStorePwd, - boolean useLeaderCache) { - return new IoTDBConfigNodeSyncClientManager(nodeUrls, useSSL, trustStorePath, trustStorePwd); + boolean useLeaderCache, + String loadBalanceStrategy) { + return new IoTDBConfigNodeSyncClientManager( + nodeUrls, useSSL, trustStorePath, trustStorePwd, loadBalanceStrategy); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java index 45aab88c2c9..b034a8182ed 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java @@ -46,6 +46,10 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_PRIORITY_STRATEGY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_RANDOM_STRATEGY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_ROUND_ROBIN_STRATEGY; + public class IoTDBDataNodeAsyncClientManager extends IoTDBClientManager implements IoTDBDataNodeCacheLeaderClientManager { @@ -59,7 +63,10 @@ public class IoTDBDataNodeAsyncClientManager extends IoTDBClientManager ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER = new AtomicReference<>(); private final IClientManager<TEndPoint, AsyncPipeDataTransferServiceClient> endPoint2Client; - public IoTDBDataNodeAsyncClientManager(List<TEndPoint> endPoints, boolean useLeaderCache) { + private final LoadBalancer loadBalancer; + + public IoTDBDataNodeAsyncClientManager( + List<TEndPoint> endPoints, boolean useLeaderCache, String loadBalanceStrategy) { super(endPoints, useLeaderCache); endPointSet = new HashSet<>(endPoints); @@ -75,17 +82,27 @@ public class IoTDBDataNodeAsyncClientManager extends IoTDBClientManager } } endPoint2Client = ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.get(); + + switch (loadBalanceStrategy) { + case CONNECTOR_LOAD_BALANCE_ROUND_ROBIN_STRATEGY: + loadBalancer = new RoundRobinLoadBalancer(); + break; + case CONNECTOR_LOAD_BALANCE_RANDOM_STRATEGY: + loadBalancer = new RandomLoadBalancer(); + break; + case CONNECTOR_LOAD_BALANCE_PRIORITY_STRATEGY: + loadBalancer = new PriorityLoadBalancer(); + break; + default: + LOGGER.warn( + "Unknown load balance strategy: {}, use round-robin strategy instead.", + loadBalanceStrategy); + loadBalancer = new RoundRobinLoadBalancer(); + } } public AsyncPipeDataTransferServiceClient borrowClient() throws Exception { - final int clientSize = endPointList.size(); - while (true) { - final TEndPoint targetNodeUrl = endPointList.get((int) (currentClientIndex++ % clientSize)); - final AsyncPipeDataTransferServiceClient client = endPoint2Client.borrowClient(targetNodeUrl); - if (handshakeIfNecessary(targetNodeUrl, client)) { - return client; - } - } + return loadBalancer.borrowClient(); } public AsyncPipeDataTransferServiceClient borrowClient(String deviceId) throws Exception { @@ -239,4 +256,55 @@ public class IoTDBDataNodeAsyncClientManager extends IoTDBClientManager LEADER_CACHE_MANAGER.updateLeaderEndPoint(deviceId, endPoint); } + + /////////////////////// Strategies for load balance ////////////////////////// + + private interface LoadBalancer { + AsyncPipeDataTransferServiceClient borrowClient() throws Exception; + } + + private class RoundRobinLoadBalancer implements LoadBalancer { + @Override + public AsyncPipeDataTransferServiceClient borrowClient() throws Exception { + final int clientSize = endPointList.size(); + while (true) { + final TEndPoint targetNodeUrl = endPointList.get((int) (currentClientIndex++ % clientSize)); + final AsyncPipeDataTransferServiceClient client = + endPoint2Client.borrowClient(targetNodeUrl); + if (handshakeIfNecessary(targetNodeUrl, client)) { + return client; + } + } + } + } + + private class RandomLoadBalancer implements LoadBalancer { + @Override + public AsyncPipeDataTransferServiceClient borrowClient() throws Exception { + final int clientSize = endPointList.size(); + while (true) { + final TEndPoint targetNodeUrl = endPointList.get((int) (Math.random() * clientSize)); + final AsyncPipeDataTransferServiceClient client = + endPoint2Client.borrowClient(targetNodeUrl); + if (handshakeIfNecessary(targetNodeUrl, client)) { + return client; + } + } + } + } + + private class PriorityLoadBalancer implements LoadBalancer { + @Override + public AsyncPipeDataTransferServiceClient borrowClient() throws Exception { + while (true) { + for (final TEndPoint targetNodeUrl : endPointList) { + final AsyncPipeDataTransferServiceClient client = + endPoint2Client.borrowClient(targetNodeUrl); + if (handshakeIfNecessary(targetNodeUrl, client)) { + return client; + } + } + } + } + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java index 91dec3517ea..2c346a11cf2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java @@ -47,8 +47,9 @@ public class IoTDBDataNodeSyncClientManager extends IoTDBSyncClientManager boolean useSSL, String trustStorePath, String trustStorePwd, - boolean useLeaderCache) { - super(endPoints, useSSL, trustStorePath, trustStorePwd, useLeaderCache); + boolean useLeaderCache, + String loadBalanceStrategy) { + super(endPoints, useSSL, trustStorePath, trustStorePwd, useLeaderCache, loadBalanceStrategy); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java index 1f631997c51..fa1b511abed 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java @@ -62,11 +62,15 @@ import java.util.concurrent.PriorityBlockingQueue; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_MODE_ENABLE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LEADER_CACHE_ENABLE_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_ROUND_ROBIN_STRATEGY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_STRATEGY_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_STRATEGY_SET; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_BATCH_MODE_ENABLE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_SSL_ENABLE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_SSL_TRUST_STORE_PATH_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_SSL_TRUST_STORE_PWD_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_LEADER_CACHE_ENABLE_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_LOAD_BALANCE_STRATEGY_KEY; public class IoTDBDataRegionAsyncConnector extends IoTDBConnector { @@ -79,6 +83,7 @@ public class IoTDBDataRegionAsyncConnector extends IoTDBConnector { "Failed to borrow client from client pool or exception occurred " + "when sending to receiver %s:%s."; + private String loadBalanceStrategy; private IoTDBDataNodeAsyncClientManager clientManager; private final IoTDBDataRegionSyncConnector retryConnector = new IoTDBDataRegionSyncConnector(); @@ -99,12 +104,27 @@ public class IoTDBDataRegionAsyncConnector extends IoTDBConnector { retryConnector.validate(validator); final PipeParameters parameters = validator.getParameters(); + validator.validate( args -> !((boolean) args[0] || (boolean) args[1] || (boolean) args[2]), "Only 'iotdb-thrift-ssl-sink' supports SSL transmission currently.", parameters.getBooleanOrDefault(SINK_IOTDB_SSL_ENABLE_KEY, false), parameters.hasAttribute(SINK_IOTDB_SSL_TRUST_STORE_PATH_KEY), parameters.hasAttribute(SINK_IOTDB_SSL_TRUST_STORE_PWD_KEY)); + + loadBalanceStrategy = + parameters + .getStringOrDefault( + Arrays.asList(CONNECTOR_LOAD_BALANCE_STRATEGY_KEY, SINK_LOAD_BALANCE_STRATEGY_KEY), + CONNECTOR_LOAD_BALANCE_ROUND_ROBIN_STRATEGY) + .trim() + .toLowerCase(); + validator.validate( + arg -> CONNECTOR_LOAD_BALANCE_STRATEGY_SET.contains(loadBalanceStrategy), + String.format( + "Load balance strategy should be one of %s, but got %s.", + CONNECTOR_LOAD_BALANCE_STRATEGY_SET, loadBalanceStrategy), + loadBalanceStrategy); } @Override @@ -124,7 +144,8 @@ public class IoTDBDataRegionAsyncConnector extends IoTDBConnector { nodeUrls, parameters.getBooleanOrDefault( Arrays.asList(SINK_LEADER_CACHE_ENABLE_KEY, CONNECTOR_LEADER_CACHE_ENABLE_KEY), - CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE)); + CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE), + loadBalanceStrategy); if (isTabletBatchModeEnabled) { tabletBatchBuilder = new IoTDBThriftAsyncPipeTransferBatchReqBuilder(parameters); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java index b516fd5d728..61493b31212 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java @@ -84,10 +84,11 @@ public abstract class IoTDBDataNodeSyncConnector extends IoTDBSslSyncConnector { boolean useSSL, String trustStorePath, String trustStorePwd, - boolean useLeaderCache) { + boolean useLeaderCache, + String loadBalanceStrategy) { clientManager = new IoTDBDataNodeSyncClientManager( - nodeUrls, useSSL, trustStorePath, trustStorePwd, useLeaderCache); + nodeUrls, useSSL, trustStorePath, trustStorePwd, useLeaderCache, loadBalanceStrategy); return clientManager; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java index 1aecd761b2a..43fbd7c61b0 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java @@ -23,6 +23,9 @@ import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.pipe.config.PipeConfig; import java.io.File; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; import static org.apache.iotdb.commons.conf.IoTDBConstant.MB; @@ -137,6 +140,19 @@ public class PipeConnectorConstant { public static final String SINK_LEADER_CACHE_ENABLE_KEY = "sink.leader-cache.enable"; public static final boolean CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE = true; + public static final String CONNECTOR_LOAD_BALANCE_STRATEGY_KEY = + "connector.load-balance-strategy"; + public static final String SINK_LOAD_BALANCE_STRATEGY_KEY = "sink.load-balance-strategy"; + public static final String CONNECTOR_LOAD_BALANCE_ROUND_ROBIN_STRATEGY = "round-robin"; + public static final String CONNECTOR_LOAD_BALANCE_RANDOM_STRATEGY = "random"; + public static final String CONNECTOR_LOAD_BALANCE_PRIORITY_STRATEGY = "priority"; + public static final Set<String> CONNECTOR_LOAD_BALANCE_STRATEGY_SET = + new HashSet<>( + Arrays.asList( + CONNECTOR_LOAD_BALANCE_ROUND_ROBIN_STRATEGY, + CONNECTOR_LOAD_BALANCE_RANDOM_STRATEGY, + CONNECTOR_LOAD_BALANCE_PRIORITY_STRATEGY)); + public static final String SINK_TOPIC_KEY = "sink.topic"; public static final String SINK_CONSUMER_GROUP_KEY = "sink.consumer-group"; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java index f2cb76021ac..9a20c0bb727 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java @@ -41,6 +41,10 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_PRIORITY_STRATEGY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_RANDOM_STRATEGY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_ROUND_ROBIN_STRATEGY; + public abstract class IoTDBSyncClientManager extends IoTDBClientManager implements Closeable { private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBSyncClientManager.class); @@ -54,12 +58,15 @@ public abstract class IoTDBSyncClientManager extends IoTDBClientManager implemen protected final Map<TEndPoint, Pair<IoTDBSyncClient, Boolean>> endPoint2ClientAndStatus = new ConcurrentHashMap<>(); + private final LoadBalancer loadBalancer; + protected IoTDBSyncClientManager( List<TEndPoint> endPoints, boolean useSSL, String trustStorePath, String trustStorePwd, - boolean useLeaderCache) { + boolean useLeaderCache, + String loadBalanceStrategy) { super(endPoints, useLeaderCache); this.useSSL = useSSL; @@ -69,6 +76,23 @@ public abstract class IoTDBSyncClientManager extends IoTDBClientManager implemen for (final TEndPoint endPoint : endPoints) { endPoint2ClientAndStatus.put(endPoint, new Pair<>(null, false)); } + + switch (loadBalanceStrategy) { + case CONNECTOR_LOAD_BALANCE_ROUND_ROBIN_STRATEGY: + loadBalancer = new RoundRobinLoadBalancer(); + break; + case CONNECTOR_LOAD_BALANCE_RANDOM_STRATEGY: + loadBalancer = new RandomLoadBalancer(); + break; + case CONNECTOR_LOAD_BALANCE_PRIORITY_STRATEGY: + loadBalancer = new PriorityLoadBalancer(); + break; + default: + LOGGER.warn( + "Unknown load balance strategy: {}, use round-robin strategy instead.", + loadBalanceStrategy); + loadBalancer = new RoundRobinLoadBalancer(); + } } public void checkClientStatusAndTryReconstructIfNecessary() { @@ -193,18 +217,7 @@ public abstract class IoTDBSyncClientManager extends IoTDBClientManager implemen protected abstract String getClusterId(); public Pair<IoTDBSyncClient, Boolean> getClient() { - final int clientSize = endPointList.size(); - // Round-robin, find the next alive client - for (int tryCount = 0; tryCount < clientSize; ++tryCount) { - final int clientIndex = (int) (currentClientIndex++ % clientSize); - final Pair<IoTDBSyncClient, Boolean> clientAndStatus = - endPoint2ClientAndStatus.get(endPointList.get(clientIndex)); - if (Boolean.TRUE.equals(clientAndStatus.getRight())) { - return clientAndStatus; - } - } - throw new PipeConnectionException( - "All clients are dead, please check the connection to the receiver."); + return loadBalancer.getClient(); } @Override @@ -236,4 +249,72 @@ public abstract class IoTDBSyncClientManager extends IoTDBClientManager implemen } } } + + /////////////////////// Strategies for load balance ////////////////////////// + + private interface LoadBalancer { + Pair<IoTDBSyncClient, Boolean> getClient(); + } + + private class RoundRobinLoadBalancer implements LoadBalancer { + @Override + public Pair<IoTDBSyncClient, Boolean> getClient() { + final int clientSize = endPointList.size(); + // Round-robin, find the next alive client + for (int tryCount = 0; tryCount < clientSize; ++tryCount) { + final int clientIndex = (int) (currentClientIndex++ % clientSize); + final Pair<IoTDBSyncClient, Boolean> clientAndStatus = + endPoint2ClientAndStatus.get(endPointList.get(clientIndex)); + if (Boolean.TRUE.equals(clientAndStatus.getRight())) { + return clientAndStatus; + } + } + + throw new PipeConnectionException( + "All clients are dead, please check the connection to the receiver."); + } + } + + private class RandomLoadBalancer implements LoadBalancer { + @Override + public Pair<IoTDBSyncClient, Boolean> getClient() { + final int clientSize = endPointList.size(); + final int clientIndex = (int) (Math.random() * clientSize); + final Pair<IoTDBSyncClient, Boolean> clientAndStatus = + endPoint2ClientAndStatus.get(endPointList.get(clientIndex)); + if (Boolean.TRUE.equals(clientAndStatus.getRight())) { + return clientAndStatus; + } + + // Random, find the next alive client + for (int tryCount = 0; tryCount < clientSize - 1; ++tryCount) { + final int nextClientIndex = (clientIndex + tryCount + 1) % clientSize; + final Pair<IoTDBSyncClient, Boolean> nextClientAndStatus = + endPoint2ClientAndStatus.get(endPointList.get(nextClientIndex)); + if (Boolean.TRUE.equals(nextClientAndStatus.getRight())) { + return nextClientAndStatus; + } + } + + throw new PipeConnectionException( + "All clients are dead, please check the connection to the receiver."); + } + } + + private class PriorityLoadBalancer implements LoadBalancer { + @Override + public Pair<IoTDBSyncClient, Boolean> getClient() { + // Priority, find the first alive client + for (final TEndPoint endPoint : endPointList) { + final Pair<IoTDBSyncClient, Boolean> clientAndStatus = + endPoint2ClientAndStatus.get(endPoint); + if (Boolean.TRUE.equals(clientAndStatus.getRight())) { + return clientAndStatus; + } + } + + throw new PipeConnectionException( + "All clients are dead, please check the connection to the receiver."); + } + } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java index e7ecbdf67e1..6c8a2adbafc 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java @@ -47,11 +47,15 @@ import java.util.List; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LEADER_CACHE_ENABLE_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_ROUND_ROBIN_STRATEGY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_STRATEGY_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_STRATEGY_SET; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_SSL_ENABLE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_SSL_TRUST_STORE_PATH_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_SSL_TRUST_STORE_PWD_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_LEADER_CACHE_ENABLE_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_LOAD_BALANCE_STRATEGY_KEY; import static org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR; import static org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin.IOTDB_THRIFT_SSL_CONNECTOR; import static org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin.IOTDB_THRIFT_SSL_SINK; @@ -60,6 +64,7 @@ public abstract class IoTDBSslSyncConnector extends IoTDBConnector { private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBSslSyncConnector.class); + protected String loadBalanceStrategy; protected IoTDBSyncClientManager clientManager; @Override @@ -85,6 +90,20 @@ public abstract class IoTDBSslSyncConnector extends IoTDBConnector { || parameters.getBooleanOrDefault(SINK_IOTDB_SSL_ENABLE_KEY, false), parameters.hasAttribute(SINK_IOTDB_SSL_TRUST_STORE_PATH_KEY), parameters.hasAttribute(SINK_IOTDB_SSL_TRUST_STORE_PWD_KEY)); + + loadBalanceStrategy = + parameters + .getStringOrDefault( + Arrays.asList(CONNECTOR_LOAD_BALANCE_STRATEGY_KEY, SINK_LOAD_BALANCE_STRATEGY_KEY), + CONNECTOR_LOAD_BALANCE_ROUND_ROBIN_STRATEGY) + .trim() + .toLowerCase(); + validator.validate( + arg -> CONNECTOR_LOAD_BALANCE_STRATEGY_SET.contains(loadBalanceStrategy), + String.format( + "Load balance strategy should be one of %s, but got %s.", + CONNECTOR_LOAD_BALANCE_STRATEGY_SET, loadBalanceStrategy), + loadBalanceStrategy); } @Override @@ -113,7 +132,8 @@ public abstract class IoTDBSslSyncConnector extends IoTDBConnector { CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE); clientManager = - constructClient(nodeUrls, useSSL, trustStorePath, trustStorePwd, useLeaderCache); + constructClient( + nodeUrls, useSSL, trustStorePath, trustStorePwd, useLeaderCache, loadBalanceStrategy); } protected abstract IoTDBSyncClientManager constructClient( @@ -121,7 +141,8 @@ public abstract class IoTDBSslSyncConnector extends IoTDBConnector { boolean useSSL, String trustStorePath, String trustStorePwd, - boolean useLeaderCache); + boolean useLeaderCache, + String loadBalanceStrategy); @Override public void handshake() throws Exception {
