This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new fd48ef68254 Pipe: support different load balance strategies for data 
sync (#12281)
fd48ef68254 is described below

commit fd48ef682541d156590d7c1f8d774ddd2335407d
Author: Steve Yurong Su <[email protected]>
AuthorDate: Tue Apr 2 20:49:01 2024 +0800

    Pipe: support different load balance strategies for data sync (#12281)
    
    **Supported sinks:** iotdb-airgap-sink, iotdb-ssl-thrift-sink, 
iotdb-sync-sink, iotdb-async-sink
    
    ```
    create pipe p1
    with sink (
      'load-balance-strategy' = 'random'
    );
    ```
    
    Options for `'load-balance-strategy'`: `round-robin`, `random` and 
`priority`. `round-robin` by default.
    
    **Explaination:**
    
    * `round-robin`:  The connector will select the next client in a 
round-robin manner. If the current client is dead, the connector will try the 
next client until it finds an alive client. If all clients are dead, the 
connector will throw a `PipeConnectionException`.
    
    * `random`: The connector will select a client randomly. If the selected 
client is dead, the connector will try the next client until it finds an alive 
client. If all clients are dead, the connector will throw a 
`PipeConnectionException`.
    
    * `priority`: The connector will select the first alive client in given 
clients (via. `node-urls`). If all clients are dead, the connector will throw a 
`PipeConnectionException`.
---
 .../client/IoTDBConfigNodeSyncClientManager.java   |   8 +-
 .../protocol/IoTDBConfigRegionConnector.java       |   6 +-
 .../client/IoTDBDataNodeAsyncClientManager.java    |  86 ++++++++++++++--
 .../client/IoTDBDataNodeSyncClientManager.java     |   5 +-
 .../async/IoTDBDataRegionAsyncConnector.java       |   4 +-
 .../thrift/sync/IoTDBDataNodeSyncConnector.java    |   5 +-
 .../config/constant/PipeConnectorConstant.java     |  18 ++++
 .../pipe/connector/client/IoTDBSyncClient.java     |   4 +-
 .../connector/client/IoTDBSyncClientManager.java   | 107 ++++++++++++++++---
 .../connector/protocol/IoTDBAirGapConnector.java   |  98 +++++++++++++++---
 .../pipe/connector/protocol/IoTDBConnector.java    | 113 ++++++++++++---------
 .../connector/protocol/IoTDBSslSyncConnector.java  |   6 +-
 12 files changed, 367 insertions(+), 93 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java
index e92f4879d4c..e420a6b5c6d 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java
@@ -34,8 +34,12 @@ import java.util.Map;
 public class IoTDBConfigNodeSyncClientManager extends IoTDBSyncClientManager {
 
   public IoTDBConfigNodeSyncClientManager(
-      List<TEndPoint> endPoints, boolean useSSL, String trustStorePath, String 
trustStorePwd) {
-    super(endPoints, useSSL, trustStorePath, trustStorePwd, false);
+      List<TEndPoint> endPoints,
+      boolean useSSL,
+      String trustStorePath,
+      String trustStorePwd,
+      String loadBalanceStrategy) {
+    super(endPoints, useSSL, trustStorePath, trustStorePwd, false, 
loadBalanceStrategy);
   }
 
   @Override
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
index 1706ad85bbc..6058fed3027 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
@@ -59,8 +59,10 @@ public class IoTDBConfigRegionConnector extends 
IoTDBSslSyncConnector {
       boolean useSSL,
       String trustStorePath,
       String trustStorePwd,
-      boolean useLeaderCache) {
-    return new IoTDBConfigNodeSyncClientManager(nodeUrls, useSSL, 
trustStorePath, trustStorePwd);
+      boolean useLeaderCache,
+      String loadBalanceStrategy) {
+    return new IoTDBConfigNodeSyncClientManager(
+        nodeUrls, useSSL, trustStorePath, trustStorePwd, loadBalanceStrategy);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
index 45aab88c2c9..b034a8182ed 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
@@ -46,6 +46,10 @@ import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_PRIORITY_STRATEGY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_RANDOM_STRATEGY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_ROUND_ROBIN_STRATEGY;
+
 public class IoTDBDataNodeAsyncClientManager extends IoTDBClientManager
     implements IoTDBDataNodeCacheLeaderClientManager {
 
@@ -59,7 +63,10 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
       ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER = new AtomicReference<>();
   private final IClientManager<TEndPoint, AsyncPipeDataTransferServiceClient> 
endPoint2Client;
 
-  public IoTDBDataNodeAsyncClientManager(List<TEndPoint> endPoints, boolean 
useLeaderCache) {
+  private final LoadBalancer loadBalancer;
+
+  public IoTDBDataNodeAsyncClientManager(
+      List<TEndPoint> endPoints, boolean useLeaderCache, String 
loadBalanceStrategy) {
     super(endPoints, useLeaderCache);
 
     endPointSet = new HashSet<>(endPoints);
@@ -75,17 +82,27 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
       }
     }
     endPoint2Client = ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.get();
+
+    switch (loadBalanceStrategy) {
+      case CONNECTOR_LOAD_BALANCE_ROUND_ROBIN_STRATEGY:
+        loadBalancer = new RoundRobinLoadBalancer();
+        break;
+      case CONNECTOR_LOAD_BALANCE_RANDOM_STRATEGY:
+        loadBalancer = new RandomLoadBalancer();
+        break;
+      case CONNECTOR_LOAD_BALANCE_PRIORITY_STRATEGY:
+        loadBalancer = new PriorityLoadBalancer();
+        break;
+      default:
+        LOGGER.warn(
+            "Unknown load balance strategy: {}, use round-robin strategy 
instead.",
+            loadBalanceStrategy);
+        loadBalancer = new RoundRobinLoadBalancer();
+    }
   }
 
   public AsyncPipeDataTransferServiceClient borrowClient() throws Exception {
-    final int clientSize = endPointList.size();
-    while (true) {
-      final TEndPoint targetNodeUrl = endPointList.get((int) 
(currentClientIndex++ % clientSize));
-      final AsyncPipeDataTransferServiceClient client = 
endPoint2Client.borrowClient(targetNodeUrl);
-      if (handshakeIfNecessary(targetNodeUrl, client)) {
-        return client;
-      }
-    }
+    return loadBalancer.borrowClient();
   }
 
   public AsyncPipeDataTransferServiceClient borrowClient(String deviceId) 
throws Exception {
@@ -239,4 +256,55 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
 
     LEADER_CACHE_MANAGER.updateLeaderEndPoint(deviceId, endPoint);
   }
+
+  /////////////////////// Strategies for load balance 
//////////////////////////
+
+  private interface LoadBalancer {
+    AsyncPipeDataTransferServiceClient borrowClient() throws Exception;
+  }
+
+  private class RoundRobinLoadBalancer implements LoadBalancer {
+    @Override
+    public AsyncPipeDataTransferServiceClient borrowClient() throws Exception {
+      final int clientSize = endPointList.size();
+      while (true) {
+        final TEndPoint targetNodeUrl = endPointList.get((int) 
(currentClientIndex++ % clientSize));
+        final AsyncPipeDataTransferServiceClient client =
+            endPoint2Client.borrowClient(targetNodeUrl);
+        if (handshakeIfNecessary(targetNodeUrl, client)) {
+          return client;
+        }
+      }
+    }
+  }
+
+  private class RandomLoadBalancer implements LoadBalancer {
+    @Override
+    public AsyncPipeDataTransferServiceClient borrowClient() throws Exception {
+      final int clientSize = endPointList.size();
+      while (true) {
+        final TEndPoint targetNodeUrl = endPointList.get((int) (Math.random() 
* clientSize));
+        final AsyncPipeDataTransferServiceClient client =
+            endPoint2Client.borrowClient(targetNodeUrl);
+        if (handshakeIfNecessary(targetNodeUrl, client)) {
+          return client;
+        }
+      }
+    }
+  }
+
+  private class PriorityLoadBalancer implements LoadBalancer {
+    @Override
+    public AsyncPipeDataTransferServiceClient borrowClient() throws Exception {
+      while (true) {
+        for (final TEndPoint targetNodeUrl : endPointList) {
+          final AsyncPipeDataTransferServiceClient client =
+              endPoint2Client.borrowClient(targetNodeUrl);
+          if (handshakeIfNecessary(targetNodeUrl, client)) {
+            return client;
+          }
+        }
+      }
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java
index 91dec3517ea..2c346a11cf2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java
@@ -47,8 +47,9 @@ public class IoTDBDataNodeSyncClientManager extends 
IoTDBSyncClientManager
       boolean useSSL,
       String trustStorePath,
       String trustStorePwd,
-      boolean useLeaderCache) {
-    super(endPoints, useSSL, trustStorePath, trustStorePwd, useLeaderCache);
+      boolean useLeaderCache,
+      String loadBalanceStrategy) {
+    super(endPoints, useSSL, trustStorePath, trustStorePwd, useLeaderCache, 
loadBalanceStrategy);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index 1f631997c51..dd6b002c523 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -99,6 +99,7 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
     retryConnector.validate(validator);
 
     final PipeParameters parameters = validator.getParameters();
+
     validator.validate(
         args -> !((boolean) args[0] || (boolean) args[1] || (boolean) args[2]),
         "Only 'iotdb-thrift-ssl-sink' supports SSL transmission currently.",
@@ -124,7 +125,8 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
             nodeUrls,
             parameters.getBooleanOrDefault(
                 Arrays.asList(SINK_LEADER_CACHE_ENABLE_KEY, 
CONNECTOR_LEADER_CACHE_ENABLE_KEY),
-                CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE));
+                CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE),
+            loadBalanceStrategy);
 
     if (isTabletBatchModeEnabled) {
       tabletBatchBuilder = new 
IoTDBThriftAsyncPipeTransferBatchReqBuilder(parameters);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
index b516fd5d728..61493b31212 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
@@ -84,10 +84,11 @@ public abstract class IoTDBDataNodeSyncConnector extends 
IoTDBSslSyncConnector {
       boolean useSSL,
       String trustStorePath,
       String trustStorePwd,
-      boolean useLeaderCache) {
+      boolean useLeaderCache,
+      String loadBalanceStrategy) {
     clientManager =
         new IoTDBDataNodeSyncClientManager(
-            nodeUrls, useSSL, trustStorePath, trustStorePwd, useLeaderCache);
+            nodeUrls, useSSL, trustStorePath, trustStorePwd, useLeaderCache, 
loadBalanceStrategy);
     return clientManager;
   }
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
index 1aecd761b2a..bb9d075910d 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
@@ -23,6 +23,10 @@ import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 
 import java.io.File;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
 
 import static org.apache.iotdb.commons.conf.IoTDBConstant.MB;
 
@@ -137,6 +141,20 @@ public class PipeConnectorConstant {
   public static final String SINK_LEADER_CACHE_ENABLE_KEY = 
"sink.leader-cache.enable";
   public static final boolean CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE = 
true;
 
+  public static final String CONNECTOR_LOAD_BALANCE_STRATEGY_KEY =
+      "connector.load-balance-strategy";
+  public static final String SINK_LOAD_BALANCE_STRATEGY_KEY = 
"sink.load-balance-strategy";
+  public static final String CONNECTOR_LOAD_BALANCE_ROUND_ROBIN_STRATEGY = 
"round-robin";
+  public static final String CONNECTOR_LOAD_BALANCE_RANDOM_STRATEGY = "random";
+  public static final String CONNECTOR_LOAD_BALANCE_PRIORITY_STRATEGY = 
"priority";
+  public static final Set<String> CONNECTOR_LOAD_BALANCE_STRATEGY_SET =
+      Collections.unmodifiableSet(
+          new HashSet<>(
+              Arrays.asList(
+                  CONNECTOR_LOAD_BALANCE_ROUND_ROBIN_STRATEGY,
+                  CONNECTOR_LOAD_BALANCE_RANDOM_STRATEGY,
+                  CONNECTOR_LOAD_BALANCE_PRIORITY_STRATEGY)));
+
   public static final String SINK_TOPIC_KEY = "sink.topic";
   public static final String SINK_CONSUMER_GROUP_KEY = "sink.consumer-group";
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClient.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClient.java
index d7851da970c..3ab7a156b74 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClient.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClient.java
@@ -31,8 +31,8 @@ import org.apache.thrift.transport.TTransportException;
 public class IoTDBSyncClient extends IClientRPCService.Client
     implements ThriftClient, AutoCloseable {
 
-  private String ipAddress;
-  private int port;
+  private final String ipAddress;
+  private final int port;
 
   public IoTDBSyncClient(
       ThriftClientProperty property,
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
index f2cb76021ac..9a20c0bb727 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
@@ -41,6 +41,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_PRIORITY_STRATEGY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_RANDOM_STRATEGY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_ROUND_ROBIN_STRATEGY;
+
 public abstract class IoTDBSyncClientManager extends IoTDBClientManager 
implements Closeable {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBSyncClientManager.class);
@@ -54,12 +58,15 @@ public abstract class IoTDBSyncClientManager extends 
IoTDBClientManager implemen
   protected final Map<TEndPoint, Pair<IoTDBSyncClient, Boolean>> 
endPoint2ClientAndStatus =
       new ConcurrentHashMap<>();
 
+  private final LoadBalancer loadBalancer;
+
   protected IoTDBSyncClientManager(
       List<TEndPoint> endPoints,
       boolean useSSL,
       String trustStorePath,
       String trustStorePwd,
-      boolean useLeaderCache) {
+      boolean useLeaderCache,
+      String loadBalanceStrategy) {
     super(endPoints, useLeaderCache);
 
     this.useSSL = useSSL;
@@ -69,6 +76,23 @@ public abstract class IoTDBSyncClientManager extends 
IoTDBClientManager implemen
     for (final TEndPoint endPoint : endPoints) {
       endPoint2ClientAndStatus.put(endPoint, new Pair<>(null, false));
     }
+
+    switch (loadBalanceStrategy) {
+      case CONNECTOR_LOAD_BALANCE_ROUND_ROBIN_STRATEGY:
+        loadBalancer = new RoundRobinLoadBalancer();
+        break;
+      case CONNECTOR_LOAD_BALANCE_RANDOM_STRATEGY:
+        loadBalancer = new RandomLoadBalancer();
+        break;
+      case CONNECTOR_LOAD_BALANCE_PRIORITY_STRATEGY:
+        loadBalancer = new PriorityLoadBalancer();
+        break;
+      default:
+        LOGGER.warn(
+            "Unknown load balance strategy: {}, use round-robin strategy 
instead.",
+            loadBalanceStrategy);
+        loadBalancer = new RoundRobinLoadBalancer();
+    }
   }
 
   public void checkClientStatusAndTryReconstructIfNecessary() {
@@ -193,18 +217,7 @@ public abstract class IoTDBSyncClientManager extends 
IoTDBClientManager implemen
   protected abstract String getClusterId();
 
   public Pair<IoTDBSyncClient, Boolean> getClient() {
-    final int clientSize = endPointList.size();
-    // Round-robin, find the next alive client
-    for (int tryCount = 0; tryCount < clientSize; ++tryCount) {
-      final int clientIndex = (int) (currentClientIndex++ % clientSize);
-      final Pair<IoTDBSyncClient, Boolean> clientAndStatus =
-          endPoint2ClientAndStatus.get(endPointList.get(clientIndex));
-      if (Boolean.TRUE.equals(clientAndStatus.getRight())) {
-        return clientAndStatus;
-      }
-    }
-    throw new PipeConnectionException(
-        "All clients are dead, please check the connection to the receiver.");
+    return loadBalancer.getClient();
   }
 
   @Override
@@ -236,4 +249,72 @@ public abstract class IoTDBSyncClientManager extends 
IoTDBClientManager implemen
       }
     }
   }
+
+  /////////////////////// Strategies for load balance 
//////////////////////////
+
+  private interface LoadBalancer {
+    Pair<IoTDBSyncClient, Boolean> getClient();
+  }
+
+  private class RoundRobinLoadBalancer implements LoadBalancer {
+    @Override
+    public Pair<IoTDBSyncClient, Boolean> getClient() {
+      final int clientSize = endPointList.size();
+      // Round-robin, find the next alive client
+      for (int tryCount = 0; tryCount < clientSize; ++tryCount) {
+        final int clientIndex = (int) (currentClientIndex++ % clientSize);
+        final Pair<IoTDBSyncClient, Boolean> clientAndStatus =
+            endPoint2ClientAndStatus.get(endPointList.get(clientIndex));
+        if (Boolean.TRUE.equals(clientAndStatus.getRight())) {
+          return clientAndStatus;
+        }
+      }
+
+      throw new PipeConnectionException(
+          "All clients are dead, please check the connection to the 
receiver.");
+    }
+  }
+
+  private class RandomLoadBalancer implements LoadBalancer {
+    @Override
+    public Pair<IoTDBSyncClient, Boolean> getClient() {
+      final int clientSize = endPointList.size();
+      final int clientIndex = (int) (Math.random() * clientSize);
+      final Pair<IoTDBSyncClient, Boolean> clientAndStatus =
+          endPoint2ClientAndStatus.get(endPointList.get(clientIndex));
+      if (Boolean.TRUE.equals(clientAndStatus.getRight())) {
+        return clientAndStatus;
+      }
+
+      // Random, find the next alive client
+      for (int tryCount = 0; tryCount < clientSize - 1; ++tryCount) {
+        final int nextClientIndex = (clientIndex + tryCount + 1) % clientSize;
+        final Pair<IoTDBSyncClient, Boolean> nextClientAndStatus =
+            endPoint2ClientAndStatus.get(endPointList.get(nextClientIndex));
+        if (Boolean.TRUE.equals(nextClientAndStatus.getRight())) {
+          return nextClientAndStatus;
+        }
+      }
+
+      throw new PipeConnectionException(
+          "All clients are dead, please check the connection to the 
receiver.");
+    }
+  }
+
+  private class PriorityLoadBalancer implements LoadBalancer {
+    @Override
+    public Pair<IoTDBSyncClient, Boolean> getClient() {
+      // Priority, find the first alive client
+      for (final TEndPoint endPoint : endPointList) {
+        final Pair<IoTDBSyncClient, Boolean> clientAndStatus =
+            endPoint2ClientAndStatus.get(endPoint);
+        if (Boolean.TRUE.equals(clientAndStatus.getRight())) {
+          return clientAndStatus;
+        }
+      }
+
+      throw new PipeConnectionException(
+          "All clients are dead, please check the connection to the 
receiver.");
+    }
+  }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBAirGapConnector.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBAirGapConnector.java
index 25c8b4abeb7..33ebea1b868 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBAirGapConnector.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBAirGapConnector.java
@@ -48,6 +48,9 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_AIR_GAP_E_LANGUAGE_ENABLE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_AIR_GAP_HANDSHAKE_TIMEOUT_MS_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_AIR_GAP_HANDSHAKE_TIMEOUT_MS_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_PRIORITY_STRATEGY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_RANDOM_STRATEGY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_ROUND_ROBIN_STRATEGY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_AIR_GAP_E_LANGUAGE_ENABLE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_AIR_GAP_HANDSHAKE_TIMEOUT_MS_KEY;
 import static org.apache.iotdb.commons.utils.BasicStructureSerDeUtil.LONG_LEN;
@@ -61,10 +64,12 @@ public abstract class IoTDBAirGapConnector extends 
IoTDBConnector {
   protected final List<Socket> sockets = new ArrayList<>();
   protected final List<Boolean> isSocketAlive = new ArrayList<>();
 
+  private LoadBalancer loadBalancer;
+  private long currentClientIndex = 0;
+
   private int handshakeTimeoutMs;
-  private boolean eLanguageEnable;
 
-  private long currentClientIndex = 0;
+  private boolean eLanguageEnable;
 
   // The air gap connector does not use clientManager thus we put handshake 
type here
   protected boolean supportModsIfIsDataNodeReceiver = true;
@@ -86,6 +91,23 @@ public abstract class IoTDBAirGapConnector extends 
IoTDBConnector {
       sockets.add(null);
     }
 
+    switch (loadBalanceStrategy) {
+      case CONNECTOR_LOAD_BALANCE_ROUND_ROBIN_STRATEGY:
+        loadBalancer = new RoundRobinLoadBalancer();
+        break;
+      case CONNECTOR_LOAD_BALANCE_RANDOM_STRATEGY:
+        loadBalancer = new RandomLoadBalancer();
+        break;
+      case CONNECTOR_LOAD_BALANCE_PRIORITY_STRATEGY:
+        loadBalancer = new PriorityLoadBalancer();
+        break;
+      default:
+        LOGGER.warn(
+            "Unknown load balance strategy: {}, use round-robin strategy 
instead.",
+            loadBalanceStrategy);
+        loadBalancer = new RoundRobinLoadBalancer();
+    }
+
     handshakeTimeoutMs =
         parameters.getIntOrDefault(
             Arrays.asList(
@@ -235,16 +257,7 @@ public abstract class IoTDBAirGapConnector extends 
IoTDBConnector {
       String fileName, long position, byte[] payLoad) throws IOException;
 
   protected int nextSocketIndex() {
-    final int socketSize = sockets.size();
-    // Round-robin, find the next alive client
-    for (int tryCount = 0; tryCount < socketSize; ++tryCount) {
-      final int clientIndex = (int) (currentClientIndex++ % socketSize);
-      if (Boolean.TRUE.equals(isSocketAlive.get(clientIndex))) {
-        return clientIndex;
-      }
-    }
-    throw new PipeConnectionException(
-        "All sockets are dead, please check the connection to the receiver.");
+    return loadBalancer.nextSocketIndex();
   }
 
   protected boolean send(Socket socket, byte[] bytes) throws IOException {
@@ -296,4 +309,65 @@ public abstract class IoTDBAirGapConnector extends 
IoTDBConnector {
       }
     }
   }
+
+  /////////////////////// Strategies for load balance 
//////////////////////////
+
+  private interface LoadBalancer {
+    int nextSocketIndex();
+  }
+
+  private class RoundRobinLoadBalancer implements LoadBalancer {
+    @Override
+    public int nextSocketIndex() {
+      final int socketSize = sockets.size();
+      // Round-robin, find the next alive client
+      for (int tryCount = 0; tryCount < socketSize; ++tryCount) {
+        final int clientIndex = (int) (currentClientIndex++ % socketSize);
+        if (Boolean.TRUE.equals(isSocketAlive.get(clientIndex))) {
+          return clientIndex;
+        }
+      }
+
+      throw new PipeConnectionException(
+          "All sockets are dead, please check the connection to the 
receiver.");
+    }
+  }
+
+  private class RandomLoadBalancer implements LoadBalancer {
+    @Override
+    public int nextSocketIndex() {
+      final int socketSize = sockets.size();
+      final int clientIndex = (int) (Math.random() * socketSize);
+      if (Boolean.TRUE.equals(isSocketAlive.get(clientIndex))) {
+        return clientIndex;
+      }
+
+      // Random, find the next alive client
+      for (int tryCount = 0; tryCount < socketSize - 1; ++tryCount) {
+        final int nextClientIndex = (clientIndex + tryCount + 1) % socketSize;
+        if (Boolean.TRUE.equals(isSocketAlive.get(nextClientIndex))) {
+          return nextClientIndex;
+        }
+      }
+
+      throw new PipeConnectionException(
+          "All sockets are dead, please check the connection to the 
receiver.");
+    }
+  }
+
+  private class PriorityLoadBalancer implements LoadBalancer {
+    @Override
+    public int nextSocketIndex() {
+      // Priority, find the first alive client
+      final int socketSize = sockets.size();
+      for (int i = 0; i < socketSize; ++i) {
+        if (Boolean.TRUE.equals(isSocketAlive.get(i))) {
+          return i;
+        }
+      }
+
+      throw new PipeConnectionException(
+          "All sockets are dead, please check the connection to the 
receiver.");
+    }
+  }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
index b2342966c20..65b32a48396 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
@@ -33,7 +33,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.HashSet;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Objects;
 import java.util.Set;
@@ -54,6 +54,9 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_IP_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_NODE_URLS_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PORT_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_ROUND_ROBIN_STRATEGY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_STRATEGY_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_STRATEGY_SET;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_EXCEPTION_CONFLICT_RECORD_IGNORED_DATA_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_EXCEPTION_CONFLICT_RETRY_MAX_TIME_SECONDS_KEY;
@@ -64,63 +67,80 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_IP_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_NODE_URLS_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_PORT_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_LOAD_BALANCE_STRATEGY_KEY;
 
 public abstract class IoTDBConnector implements PipeConnector {
 
+  private static final String PARSE_URL_ERROR_FORMATTER =
+      "Exception occurred while parsing node urls from target servers: {}";
+  private static final String PARSE_URL_ERROR_MESSAGE =
+      "Error occurred while parsing node urls from target servers, please 
check the specified 'host':'port' or 'node-urls'";
+
   private static final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBConnector.class);
 
   protected final List<TEndPoint> nodeUrls = new ArrayList<>();
 
+  protected String loadBalanceStrategy;
+
   protected boolean isTabletBatchModeEnabled = true;
 
   protected PipeReceiverStatusHandler receiverStatusHandler;
 
-  private static final String PARSE_URL_ERROR_FORMATTER =
-      "Exception occurred while parsing node urls from target servers: {}";
-
-  private static final String PARSE_URL_ERROR_MESSAGE =
-      "Error occurred while parsing node urls from target servers, please 
check the specified 'host':'port' or 'node-urls'";
-
   @Override
   public void validate(PipeParameterValidator validator) throws Exception {
     final PipeParameters parameters = validator.getParameters();
-    validator
-        .validate(
-            args ->
-                (boolean) args[0]
-                    || (((boolean) args[1] || (boolean) args[2]) && (boolean) 
args[3])
-                    || (boolean) args[4]
-                    || (((boolean) args[5] || (boolean) args[6]) && (boolean) 
args[7]),
-            String.format(
-                "One of %s, %s:%s, %s, %s:%s must be specified",
-                CONNECTOR_IOTDB_NODE_URLS_KEY,
-                CONNECTOR_IOTDB_HOST_KEY,
-                CONNECTOR_IOTDB_PORT_KEY,
-                SINK_IOTDB_NODE_URLS_KEY,
-                SINK_IOTDB_HOST_KEY,
-                SINK_IOTDB_PORT_KEY),
-            parameters.hasAttribute(CONNECTOR_IOTDB_NODE_URLS_KEY),
-            parameters.hasAttribute(CONNECTOR_IOTDB_IP_KEY),
-            parameters.hasAttribute(CONNECTOR_IOTDB_HOST_KEY),
-            parameters.hasAttribute(CONNECTOR_IOTDB_PORT_KEY),
-            parameters.hasAttribute(SINK_IOTDB_NODE_URLS_KEY),
-            parameters.hasAttribute(SINK_IOTDB_IP_KEY),
-            parameters.hasAttribute(SINK_IOTDB_HOST_KEY),
-            parameters.hasAttribute(SINK_IOTDB_PORT_KEY))
-        .validate(
-            arg -> arg.equals("retry") || arg.equals("ignore"),
-            String.format(
-                "The value of key %s or %s must be either 'retry' or 
'ignore'.",
-                CONNECTOR_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_KEY,
-                SINK_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_KEY),
-            parameters
-                .getStringOrDefault(
-                    Arrays.asList(
-                        CONNECTOR_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_KEY,
-                        SINK_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_KEY),
-                    
CONNECTOR_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_DEFAULT_VALUE)
-                .trim()
-                .toLowerCase());
+
+    validator.validate(
+        args ->
+            (boolean) args[0]
+                || (((boolean) args[1] || (boolean) args[2]) && (boolean) 
args[3])
+                || (boolean) args[4]
+                || (((boolean) args[5] || (boolean) args[6]) && (boolean) 
args[7]),
+        String.format(
+            "One of %s, %s:%s, %s, %s:%s must be specified",
+            CONNECTOR_IOTDB_NODE_URLS_KEY,
+            CONNECTOR_IOTDB_HOST_KEY,
+            CONNECTOR_IOTDB_PORT_KEY,
+            SINK_IOTDB_NODE_URLS_KEY,
+            SINK_IOTDB_HOST_KEY,
+            SINK_IOTDB_PORT_KEY),
+        parameters.hasAttribute(CONNECTOR_IOTDB_NODE_URLS_KEY),
+        parameters.hasAttribute(CONNECTOR_IOTDB_IP_KEY),
+        parameters.hasAttribute(CONNECTOR_IOTDB_HOST_KEY),
+        parameters.hasAttribute(CONNECTOR_IOTDB_PORT_KEY),
+        parameters.hasAttribute(SINK_IOTDB_NODE_URLS_KEY),
+        parameters.hasAttribute(SINK_IOTDB_IP_KEY),
+        parameters.hasAttribute(SINK_IOTDB_HOST_KEY),
+        parameters.hasAttribute(SINK_IOTDB_PORT_KEY));
+
+    loadBalanceStrategy =
+        parameters
+            .getStringOrDefault(
+                Arrays.asList(CONNECTOR_LOAD_BALANCE_STRATEGY_KEY, 
SINK_LOAD_BALANCE_STRATEGY_KEY),
+                CONNECTOR_LOAD_BALANCE_ROUND_ROBIN_STRATEGY)
+            .trim()
+            .toLowerCase();
+    validator.validate(
+        arg -> 
CONNECTOR_LOAD_BALANCE_STRATEGY_SET.contains(loadBalanceStrategy),
+        String.format(
+            "Load balance strategy should be one of %s, but got %s.",
+            CONNECTOR_LOAD_BALANCE_STRATEGY_SET, loadBalanceStrategy),
+        loadBalanceStrategy);
+
+    validator.validate(
+        arg -> arg.equals("retry") || arg.equals("ignore"),
+        String.format(
+            "The value of key %s or %s must be either 'retry' or 'ignore'.",
+            CONNECTOR_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_KEY,
+            SINK_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_KEY),
+        parameters
+            .getStringOrDefault(
+                Arrays.asList(
+                    CONNECTOR_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_KEY,
+                    SINK_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_KEY),
+                CONNECTOR_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_DEFAULT_VALUE)
+            .trim()
+            .toLowerCase());
   }
 
   @Override
@@ -168,9 +188,9 @@ public abstract class IoTDBConnector implements 
PipeConnector {
                 CONNECTOR_EXCEPTION_OTHERS_RECORD_IGNORED_DATA_DEFAULT_VALUE));
   }
 
-  protected Set<TEndPoint> parseNodeUrls(PipeParameters parameters)
+  protected LinkedHashSet<TEndPoint> parseNodeUrls(PipeParameters parameters)
       throws PipeParameterNotValidException {
-    final Set<TEndPoint> givenNodeUrls = new HashSet<>(nodeUrls);
+    final LinkedHashSet<TEndPoint> givenNodeUrls = new 
LinkedHashSet<>(nodeUrls);
 
     try {
       if (parameters.hasAttribute(CONNECTOR_IOTDB_IP_KEY)
@@ -223,6 +243,7 @@ public abstract class IoTDBConnector implements 
PipeConnector {
     }
 
     checkNodeUrls(givenNodeUrls);
+
     return givenNodeUrls;
   }
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
index e7ecbdf67e1..f9e5a8743f5 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
@@ -113,7 +113,8 @@ public abstract class IoTDBSslSyncConnector extends 
IoTDBConnector {
             CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE);
 
     clientManager =
-        constructClient(nodeUrls, useSSL, trustStorePath, trustStorePwd, 
useLeaderCache);
+        constructClient(
+            nodeUrls, useSSL, trustStorePath, trustStorePwd, useLeaderCache, 
loadBalanceStrategy);
   }
 
   protected abstract IoTDBSyncClientManager constructClient(
@@ -121,7 +122,8 @@ public abstract class IoTDBSslSyncConnector extends 
IoTDBConnector {
       boolean useSSL,
       String trustStorePath,
       String trustStorePwd,
-      boolean useLeaderCache);
+      boolean useLeaderCache,
+      String loadBalanceStrategy);
 
   @Override
   public void handshake() throws Exception {


Reply via email to