This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch pipe-load-balancer
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 3bca619191b9dd0e9f40dc2b4b9dadc0062a0bc7
Author: Steve Yurong Su <[email protected]>
AuthorDate: Tue Apr 2 17:08:43 2024 +0800

    Pipe: support different load balance strategies for data sync
---
 .../client/IoTDBConfigNodeSyncClientManager.java   |   8 +-
 .../protocol/IoTDBConfigRegionConnector.java       |   6 +-
 .../client/IoTDBDataNodeAsyncClientManager.java    |  86 +++++++++++++++--
 .../client/IoTDBDataNodeSyncClientManager.java     |   5 +-
 .../async/IoTDBDataRegionAsyncConnector.java       |  23 ++++-
 .../thrift/sync/IoTDBDataNodeSyncConnector.java    |   5 +-
 .../config/constant/PipeConnectorConstant.java     |  16 +++
 .../connector/client/IoTDBSyncClientManager.java   | 107 ++++++++++++++++++---
 .../connector/protocol/IoTDBSslSyncConnector.java  |  25 ++++-
 9 files changed, 248 insertions(+), 33 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java
index e92f4879d4c..e420a6b5c6d 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java
@@ -34,8 +34,12 @@ import java.util.Map;
 public class IoTDBConfigNodeSyncClientManager extends IoTDBSyncClientManager {
 
   public IoTDBConfigNodeSyncClientManager(
-      List<TEndPoint> endPoints, boolean useSSL, String trustStorePath, String 
trustStorePwd) {
-    super(endPoints, useSSL, trustStorePath, trustStorePwd, false);
+      List<TEndPoint> endPoints,
+      boolean useSSL,
+      String trustStorePath,
+      String trustStorePwd,
+      String loadBalanceStrategy) {
+    super(endPoints, useSSL, trustStorePath, trustStorePwd, false, 
loadBalanceStrategy);
   }
 
   @Override
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
index 1706ad85bbc..6058fed3027 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
@@ -59,8 +59,10 @@ public class IoTDBConfigRegionConnector extends 
IoTDBSslSyncConnector {
       boolean useSSL,
       String trustStorePath,
       String trustStorePwd,
-      boolean useLeaderCache) {
-    return new IoTDBConfigNodeSyncClientManager(nodeUrls, useSSL, 
trustStorePath, trustStorePwd);
+      boolean useLeaderCache,
+      String loadBalanceStrategy) {
+    return new IoTDBConfigNodeSyncClientManager(
+        nodeUrls, useSSL, trustStorePath, trustStorePwd, loadBalanceStrategy);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
index 45aab88c2c9..b034a8182ed 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
@@ -46,6 +46,10 @@ import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_PRIORITY_STRATEGY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_RANDOM_STRATEGY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_ROUND_ROBIN_STRATEGY;
+
 public class IoTDBDataNodeAsyncClientManager extends IoTDBClientManager
     implements IoTDBDataNodeCacheLeaderClientManager {
 
@@ -59,7 +63,10 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
       ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER = new AtomicReference<>();
   private final IClientManager<TEndPoint, AsyncPipeDataTransferServiceClient> 
endPoint2Client;
 
-  public IoTDBDataNodeAsyncClientManager(List<TEndPoint> endPoints, boolean 
useLeaderCache) {
+  private final LoadBalancer loadBalancer;
+
+  public IoTDBDataNodeAsyncClientManager(
+      List<TEndPoint> endPoints, boolean useLeaderCache, String 
loadBalanceStrategy) {
     super(endPoints, useLeaderCache);
 
     endPointSet = new HashSet<>(endPoints);
@@ -75,17 +82,27 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
       }
     }
     endPoint2Client = ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.get();
+
+    switch (loadBalanceStrategy) {
+      case CONNECTOR_LOAD_BALANCE_ROUND_ROBIN_STRATEGY:
+        loadBalancer = new RoundRobinLoadBalancer();
+        break;
+      case CONNECTOR_LOAD_BALANCE_RANDOM_STRATEGY:
+        loadBalancer = new RandomLoadBalancer();
+        break;
+      case CONNECTOR_LOAD_BALANCE_PRIORITY_STRATEGY:
+        loadBalancer = new PriorityLoadBalancer();
+        break;
+      default:
+        LOGGER.warn(
+            "Unknown load balance strategy: {}, use round-robin strategy 
instead.",
+            loadBalanceStrategy);
+        loadBalancer = new RoundRobinLoadBalancer();
+    }
   }
 
   public AsyncPipeDataTransferServiceClient borrowClient() throws Exception {
-    final int clientSize = endPointList.size();
-    while (true) {
-      final TEndPoint targetNodeUrl = endPointList.get((int) 
(currentClientIndex++ % clientSize));
-      final AsyncPipeDataTransferServiceClient client = 
endPoint2Client.borrowClient(targetNodeUrl);
-      if (handshakeIfNecessary(targetNodeUrl, client)) {
-        return client;
-      }
-    }
+    return loadBalancer.borrowClient();
   }
 
   public AsyncPipeDataTransferServiceClient borrowClient(String deviceId) 
throws Exception {
@@ -239,4 +256,55 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
 
     LEADER_CACHE_MANAGER.updateLeaderEndPoint(deviceId, endPoint);
   }
+
+  /////////////////////// Strategies for load balance 
//////////////////////////
+
+  private interface LoadBalancer {
+    AsyncPipeDataTransferServiceClient borrowClient() throws Exception;
+  }
+
+  private class RoundRobinLoadBalancer implements LoadBalancer {
+    @Override
+    public AsyncPipeDataTransferServiceClient borrowClient() throws Exception {
+      final int clientSize = endPointList.size();
+      while (true) {
+        final TEndPoint targetNodeUrl = endPointList.get((int) 
(currentClientIndex++ % clientSize));
+        final AsyncPipeDataTransferServiceClient client =
+            endPoint2Client.borrowClient(targetNodeUrl);
+        if (handshakeIfNecessary(targetNodeUrl, client)) {
+          return client;
+        }
+      }
+    }
+  }
+
+  private class RandomLoadBalancer implements LoadBalancer {
+    @Override
+    public AsyncPipeDataTransferServiceClient borrowClient() throws Exception {
+      final int clientSize = endPointList.size();
+      while (true) {
+        final TEndPoint targetNodeUrl = endPointList.get((int) (Math.random() 
* clientSize));
+        final AsyncPipeDataTransferServiceClient client =
+            endPoint2Client.borrowClient(targetNodeUrl);
+        if (handshakeIfNecessary(targetNodeUrl, client)) {
+          return client;
+        }
+      }
+    }
+  }
+
+  private class PriorityLoadBalancer implements LoadBalancer {
+    @Override
+    public AsyncPipeDataTransferServiceClient borrowClient() throws Exception {
+      while (true) {
+        for (final TEndPoint targetNodeUrl : endPointList) {
+          final AsyncPipeDataTransferServiceClient client =
+              endPoint2Client.borrowClient(targetNodeUrl);
+          if (handshakeIfNecessary(targetNodeUrl, client)) {
+            return client;
+          }
+        }
+      }
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java
index 91dec3517ea..2c346a11cf2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java
@@ -47,8 +47,9 @@ public class IoTDBDataNodeSyncClientManager extends 
IoTDBSyncClientManager
       boolean useSSL,
       String trustStorePath,
       String trustStorePwd,
-      boolean useLeaderCache) {
-    super(endPoints, useSSL, trustStorePath, trustStorePwd, useLeaderCache);
+      boolean useLeaderCache,
+      String loadBalanceStrategy) {
+    super(endPoints, useSSL, trustStorePath, trustStorePwd, useLeaderCache, 
loadBalanceStrategy);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index 1f631997c51..fa1b511abed 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -62,11 +62,15 @@ import java.util.concurrent.PriorityBlockingQueue;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_MODE_ENABLE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LEADER_CACHE_ENABLE_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_ROUND_ROBIN_STRATEGY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_STRATEGY_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_STRATEGY_SET;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_BATCH_MODE_ENABLE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_SSL_ENABLE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_SSL_TRUST_STORE_PATH_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_SSL_TRUST_STORE_PWD_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_LEADER_CACHE_ENABLE_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_LOAD_BALANCE_STRATEGY_KEY;
 
 public class IoTDBDataRegionAsyncConnector extends IoTDBConnector {
 
@@ -79,6 +83,7 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
       "Failed to borrow client from client pool or exception occurred "
           + "when sending to receiver %s:%s.";
 
+  private String loadBalanceStrategy;
   private IoTDBDataNodeAsyncClientManager clientManager;
 
   private final IoTDBDataRegionSyncConnector retryConnector = new 
IoTDBDataRegionSyncConnector();
@@ -99,12 +104,27 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
     retryConnector.validate(validator);
 
     final PipeParameters parameters = validator.getParameters();
+
     validator.validate(
         args -> !((boolean) args[0] || (boolean) args[1] || (boolean) args[2]),
         "Only 'iotdb-thrift-ssl-sink' supports SSL transmission currently.",
         parameters.getBooleanOrDefault(SINK_IOTDB_SSL_ENABLE_KEY, false),
         parameters.hasAttribute(SINK_IOTDB_SSL_TRUST_STORE_PATH_KEY),
         parameters.hasAttribute(SINK_IOTDB_SSL_TRUST_STORE_PWD_KEY));
+
+    loadBalanceStrategy =
+        parameters
+            .getStringOrDefault(
+                Arrays.asList(CONNECTOR_LOAD_BALANCE_STRATEGY_KEY, 
SINK_LOAD_BALANCE_STRATEGY_KEY),
+                CONNECTOR_LOAD_BALANCE_ROUND_ROBIN_STRATEGY)
+            .trim()
+            .toLowerCase();
+    validator.validate(
+        arg -> 
CONNECTOR_LOAD_BALANCE_STRATEGY_SET.contains(loadBalanceStrategy),
+        String.format(
+            "Load balance strategy should be one of %s, but got %s.",
+            CONNECTOR_LOAD_BALANCE_STRATEGY_SET, loadBalanceStrategy),
+        loadBalanceStrategy);
   }
 
   @Override
@@ -124,7 +144,8 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
             nodeUrls,
             parameters.getBooleanOrDefault(
                 Arrays.asList(SINK_LEADER_CACHE_ENABLE_KEY, 
CONNECTOR_LEADER_CACHE_ENABLE_KEY),
-                CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE));
+                CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE),
+            loadBalanceStrategy);
 
     if (isTabletBatchModeEnabled) {
       tabletBatchBuilder = new 
IoTDBThriftAsyncPipeTransferBatchReqBuilder(parameters);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
index b516fd5d728..61493b31212 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
@@ -84,10 +84,11 @@ public abstract class IoTDBDataNodeSyncConnector extends 
IoTDBSslSyncConnector {
       boolean useSSL,
       String trustStorePath,
       String trustStorePwd,
-      boolean useLeaderCache) {
+      boolean useLeaderCache,
+      String loadBalanceStrategy) {
     clientManager =
         new IoTDBDataNodeSyncClientManager(
-            nodeUrls, useSSL, trustStorePath, trustStorePwd, useLeaderCache);
+            nodeUrls, useSSL, trustStorePath, trustStorePwd, useLeaderCache, 
loadBalanceStrategy);
     return clientManager;
   }
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
index 1aecd761b2a..43fbd7c61b0 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
@@ -23,6 +23,9 @@ import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 
 import java.io.File;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
 
 import static org.apache.iotdb.commons.conf.IoTDBConstant.MB;
 
@@ -137,6 +140,19 @@ public class PipeConnectorConstant {
   public static final String SINK_LEADER_CACHE_ENABLE_KEY = 
"sink.leader-cache.enable";
   public static final boolean CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE = 
true;
 
+  public static final String CONNECTOR_LOAD_BALANCE_STRATEGY_KEY =
+      "connector.load-balance-strategy";
+  public static final String SINK_LOAD_BALANCE_STRATEGY_KEY = 
"sink.load-balance-strategy";
+  public static final String CONNECTOR_LOAD_BALANCE_ROUND_ROBIN_STRATEGY = 
"round-robin";
+  public static final String CONNECTOR_LOAD_BALANCE_RANDOM_STRATEGY = "random";
+  public static final String CONNECTOR_LOAD_BALANCE_PRIORITY_STRATEGY = 
"priority";
+  public static final Set<String> CONNECTOR_LOAD_BALANCE_STRATEGY_SET =
+      new HashSet<>(
+          Arrays.asList(
+              CONNECTOR_LOAD_BALANCE_ROUND_ROBIN_STRATEGY,
+              CONNECTOR_LOAD_BALANCE_RANDOM_STRATEGY,
+              CONNECTOR_LOAD_BALANCE_PRIORITY_STRATEGY));
+
   public static final String SINK_TOPIC_KEY = "sink.topic";
   public static final String SINK_CONSUMER_GROUP_KEY = "sink.consumer-group";
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
index f2cb76021ac..9a20c0bb727 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
@@ -41,6 +41,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_PRIORITY_STRATEGY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_RANDOM_STRATEGY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_ROUND_ROBIN_STRATEGY;
+
 public abstract class IoTDBSyncClientManager extends IoTDBClientManager 
implements Closeable {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBSyncClientManager.class);
@@ -54,12 +58,15 @@ public abstract class IoTDBSyncClientManager extends 
IoTDBClientManager implemen
   protected final Map<TEndPoint, Pair<IoTDBSyncClient, Boolean>> 
endPoint2ClientAndStatus =
       new ConcurrentHashMap<>();
 
+  private final LoadBalancer loadBalancer;
+
   protected IoTDBSyncClientManager(
       List<TEndPoint> endPoints,
       boolean useSSL,
       String trustStorePath,
       String trustStorePwd,
-      boolean useLeaderCache) {
+      boolean useLeaderCache,
+      String loadBalanceStrategy) {
     super(endPoints, useLeaderCache);
 
     this.useSSL = useSSL;
@@ -69,6 +76,23 @@ public abstract class IoTDBSyncClientManager extends 
IoTDBClientManager implemen
     for (final TEndPoint endPoint : endPoints) {
       endPoint2ClientAndStatus.put(endPoint, new Pair<>(null, false));
     }
+
+    switch (loadBalanceStrategy) {
+      case CONNECTOR_LOAD_BALANCE_ROUND_ROBIN_STRATEGY:
+        loadBalancer = new RoundRobinLoadBalancer();
+        break;
+      case CONNECTOR_LOAD_BALANCE_RANDOM_STRATEGY:
+        loadBalancer = new RandomLoadBalancer();
+        break;
+      case CONNECTOR_LOAD_BALANCE_PRIORITY_STRATEGY:
+        loadBalancer = new PriorityLoadBalancer();
+        break;
+      default:
+        LOGGER.warn(
+            "Unknown load balance strategy: {}, use round-robin strategy 
instead.",
+            loadBalanceStrategy);
+        loadBalancer = new RoundRobinLoadBalancer();
+    }
   }
 
   public void checkClientStatusAndTryReconstructIfNecessary() {
@@ -193,18 +217,7 @@ public abstract class IoTDBSyncClientManager extends 
IoTDBClientManager implemen
   protected abstract String getClusterId();
 
   public Pair<IoTDBSyncClient, Boolean> getClient() {
-    final int clientSize = endPointList.size();
-    // Round-robin, find the next alive client
-    for (int tryCount = 0; tryCount < clientSize; ++tryCount) {
-      final int clientIndex = (int) (currentClientIndex++ % clientSize);
-      final Pair<IoTDBSyncClient, Boolean> clientAndStatus =
-          endPoint2ClientAndStatus.get(endPointList.get(clientIndex));
-      if (Boolean.TRUE.equals(clientAndStatus.getRight())) {
-        return clientAndStatus;
-      }
-    }
-    throw new PipeConnectionException(
-        "All clients are dead, please check the connection to the receiver.");
+    return loadBalancer.getClient();
   }
 
   @Override
@@ -236,4 +249,72 @@ public abstract class IoTDBSyncClientManager extends 
IoTDBClientManager implemen
       }
     }
   }
+
+  /////////////////////// Strategies for load balance 
//////////////////////////
+
+  private interface LoadBalancer {
+    Pair<IoTDBSyncClient, Boolean> getClient();
+  }
+
+  private class RoundRobinLoadBalancer implements LoadBalancer {
+    @Override
+    public Pair<IoTDBSyncClient, Boolean> getClient() {
+      final int clientSize = endPointList.size();
+      // Round-robin, find the next alive client
+      for (int tryCount = 0; tryCount < clientSize; ++tryCount) {
+        final int clientIndex = (int) (currentClientIndex++ % clientSize);
+        final Pair<IoTDBSyncClient, Boolean> clientAndStatus =
+            endPoint2ClientAndStatus.get(endPointList.get(clientIndex));
+        if (Boolean.TRUE.equals(clientAndStatus.getRight())) {
+          return clientAndStatus;
+        }
+      }
+
+      throw new PipeConnectionException(
+          "All clients are dead, please check the connection to the 
receiver.");
+    }
+  }
+
+  private class RandomLoadBalancer implements LoadBalancer {
+    @Override
+    public Pair<IoTDBSyncClient, Boolean> getClient() {
+      final int clientSize = endPointList.size();
+      final int clientIndex = (int) (Math.random() * clientSize);
+      final Pair<IoTDBSyncClient, Boolean> clientAndStatus =
+          endPoint2ClientAndStatus.get(endPointList.get(clientIndex));
+      if (Boolean.TRUE.equals(clientAndStatus.getRight())) {
+        return clientAndStatus;
+      }
+
+      // Random, find the next alive client
+      for (int tryCount = 0; tryCount < clientSize - 1; ++tryCount) {
+        final int nextClientIndex = (clientIndex + tryCount + 1) % clientSize;
+        final Pair<IoTDBSyncClient, Boolean> nextClientAndStatus =
+            endPoint2ClientAndStatus.get(endPointList.get(nextClientIndex));
+        if (Boolean.TRUE.equals(nextClientAndStatus.getRight())) {
+          return nextClientAndStatus;
+        }
+      }
+
+      throw new PipeConnectionException(
+          "All clients are dead, please check the connection to the 
receiver.");
+    }
+  }
+
+  private class PriorityLoadBalancer implements LoadBalancer {
+    @Override
+    public Pair<IoTDBSyncClient, Boolean> getClient() {
+      // Priority, find the first alive client
+      for (final TEndPoint endPoint : endPointList) {
+        final Pair<IoTDBSyncClient, Boolean> clientAndStatus =
+            endPoint2ClientAndStatus.get(endPoint);
+        if (Boolean.TRUE.equals(clientAndStatus.getRight())) {
+          return clientAndStatus;
+        }
+      }
+
+      throw new PipeConnectionException(
+          "All clients are dead, please check the connection to the 
receiver.");
+    }
+  }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
index e7ecbdf67e1..6c8a2adbafc 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
@@ -47,11 +47,15 @@ import java.util.List;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LEADER_CACHE_ENABLE_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_ROUND_ROBIN_STRATEGY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_STRATEGY_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_STRATEGY_SET;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_SSL_ENABLE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_SSL_TRUST_STORE_PATH_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_SSL_TRUST_STORE_PWD_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_LEADER_CACHE_ENABLE_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_LOAD_BALANCE_STRATEGY_KEY;
 import static 
org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR;
 import static 
org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin.IOTDB_THRIFT_SSL_CONNECTOR;
 import static 
org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin.IOTDB_THRIFT_SSL_SINK;
@@ -60,6 +64,7 @@ public abstract class IoTDBSslSyncConnector extends 
IoTDBConnector {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBSslSyncConnector.class);
 
+  protected String loadBalanceStrategy;
   protected IoTDBSyncClientManager clientManager;
 
   @Override
@@ -85,6 +90,20 @@ public abstract class IoTDBSslSyncConnector extends 
IoTDBConnector {
             || parameters.getBooleanOrDefault(SINK_IOTDB_SSL_ENABLE_KEY, 
false),
         parameters.hasAttribute(SINK_IOTDB_SSL_TRUST_STORE_PATH_KEY),
         parameters.hasAttribute(SINK_IOTDB_SSL_TRUST_STORE_PWD_KEY));
+
+    loadBalanceStrategy =
+        parameters
+            .getStringOrDefault(
+                Arrays.asList(CONNECTOR_LOAD_BALANCE_STRATEGY_KEY, 
SINK_LOAD_BALANCE_STRATEGY_KEY),
+                CONNECTOR_LOAD_BALANCE_ROUND_ROBIN_STRATEGY)
+            .trim()
+            .toLowerCase();
+    validator.validate(
+        arg -> 
CONNECTOR_LOAD_BALANCE_STRATEGY_SET.contains(loadBalanceStrategy),
+        String.format(
+            "Load balance strategy should be one of %s, but got %s.",
+            CONNECTOR_LOAD_BALANCE_STRATEGY_SET, loadBalanceStrategy),
+        loadBalanceStrategy);
   }
 
   @Override
@@ -113,7 +132,8 @@ public abstract class IoTDBSslSyncConnector extends 
IoTDBConnector {
             CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE);
 
     clientManager =
-        constructClient(nodeUrls, useSSL, trustStorePath, trustStorePwd, 
useLeaderCache);
+        constructClient(
+            nodeUrls, useSSL, trustStorePath, trustStorePwd, useLeaderCache, 
loadBalanceStrategy);
   }
 
   protected abstract IoTDBSyncClientManager constructClient(
@@ -121,7 +141,8 @@ public abstract class IoTDBSslSyncConnector extends 
IoTDBConnector {
       boolean useSSL,
       String trustStorePath,
       String trustStorePwd,
-      boolean useLeaderCache);
+      boolean useLeaderCache,
+      String loadBalanceStrategy);
 
   @Override
   public void handshake() throws Exception {

Reply via email to