This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch pipe-load-balancer
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 806cb4f812496a595c9fad4b264cc63c0d929283
Author: Steve Yurong Su <[email protected]>
AuthorDate: Tue Apr 2 18:20:04 2024 +0800

    airgap load balancer
---
 .../connector/protocol/IoTDBAirGapConnector.java   | 123 +++++++++++++++++++--
 .../pipe/connector/protocol/IoTDBConnector.java    |   7 +-
 2 files changed, 115 insertions(+), 15 deletions(-)

diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBAirGapConnector.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBAirGapConnector.java
index 25c8b4abeb7..8922747011a 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBAirGapConnector.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBAirGapConnector.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import 
org.apache.iotdb.commons.pipe.connector.payload.airgap.AirGapELanguageConstant;
 import 
org.apache.iotdb.commons.pipe.connector.payload.airgap.AirGapOneByteResponse;
 import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
 import org.apache.iotdb.pipe.api.exception.PipeException;
@@ -48,8 +49,14 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_AIR_GAP_E_LANGUAGE_ENABLE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_AIR_GAP_HANDSHAKE_TIMEOUT_MS_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_AIR_GAP_HANDSHAKE_TIMEOUT_MS_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_PRIORITY_STRATEGY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_RANDOM_STRATEGY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_ROUND_ROBIN_STRATEGY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_STRATEGY_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_STRATEGY_SET;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_AIR_GAP_E_LANGUAGE_ENABLE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_AIR_GAP_HANDSHAKE_TIMEOUT_MS_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_LOAD_BALANCE_STRATEGY_KEY;
 import static org.apache.iotdb.commons.utils.BasicStructureSerDeUtil.LONG_LEN;
 
 public abstract class IoTDBAirGapConnector extends IoTDBConnector {
@@ -61,14 +68,37 @@ public abstract class IoTDBAirGapConnector extends 
IoTDBConnector {
   protected final List<Socket> sockets = new ArrayList<>();
   protected final List<Boolean> isSocketAlive = new ArrayList<>();
 
+  private String loadBalanceStrategy;
+  private LoadBalancer loadBalancer;
+  private long currentClientIndex = 0;
+
   private int handshakeTimeoutMs;
-  private boolean eLanguageEnable;
 
-  private long currentClientIndex = 0;
+  private boolean eLanguageEnable;
 
   // The air gap connector does not use clientManager thus we put handshake 
type here
   protected boolean supportModsIfIsDataNodeReceiver = true;
 
+  @Override
+  public void validate(PipeParameterValidator validator) throws Exception {
+    super.validate(validator);
+
+    loadBalanceStrategy =
+        validator
+            .getParameters()
+            .getStringOrDefault(
+                Arrays.asList(CONNECTOR_LOAD_BALANCE_STRATEGY_KEY, 
SINK_LOAD_BALANCE_STRATEGY_KEY),
+                CONNECTOR_LOAD_BALANCE_ROUND_ROBIN_STRATEGY)
+            .trim()
+            .toLowerCase();
+    validator.validate(
+        arg -> 
CONNECTOR_LOAD_BALANCE_STRATEGY_SET.contains(loadBalanceStrategy),
+        String.format(
+            "Load balance strategy should be one of %s, but got %s.",
+            CONNECTOR_LOAD_BALANCE_STRATEGY_SET, loadBalanceStrategy),
+        loadBalanceStrategy);
+  }
+
   @Override
   public void customize(PipeParameters parameters, 
PipeConnectorRuntimeConfiguration configuration)
       throws Exception {
@@ -86,6 +116,23 @@ public abstract class IoTDBAirGapConnector extends 
IoTDBConnector {
       sockets.add(null);
     }
 
+    switch (loadBalanceStrategy) {
+      case CONNECTOR_LOAD_BALANCE_ROUND_ROBIN_STRATEGY:
+        loadBalancer = new RoundRobinLoadBalancer();
+        break;
+      case CONNECTOR_LOAD_BALANCE_RANDOM_STRATEGY:
+        loadBalancer = new RandomLoadBalancer();
+        break;
+      case CONNECTOR_LOAD_BALANCE_PRIORITY_STRATEGY:
+        loadBalancer = new PriorityLoadBalancer();
+        break;
+      default:
+        LOGGER.warn(
+            "Unknown load balance strategy: {}, use round-robin strategy 
instead.",
+            loadBalanceStrategy);
+        loadBalancer = new RoundRobinLoadBalancer();
+    }
+
     handshakeTimeoutMs =
         parameters.getIntOrDefault(
             Arrays.asList(
@@ -235,16 +282,7 @@ public abstract class IoTDBAirGapConnector extends 
IoTDBConnector {
       String fileName, long position, byte[] payLoad) throws IOException;
 
   protected int nextSocketIndex() {
-    final int socketSize = sockets.size();
-    // Round-robin, find the next alive client
-    for (int tryCount = 0; tryCount < socketSize; ++tryCount) {
-      final int clientIndex = (int) (currentClientIndex++ % socketSize);
-      if (Boolean.TRUE.equals(isSocketAlive.get(clientIndex))) {
-        return clientIndex;
-      }
-    }
-    throw new PipeConnectionException(
-        "All sockets are dead, please check the connection to the receiver.");
+    return loadBalancer.nextSocketIndex();
   }
 
   protected boolean send(Socket socket, byte[] bytes) throws IOException {
@@ -296,4 +334,65 @@ public abstract class IoTDBAirGapConnector extends 
IoTDBConnector {
       }
     }
   }
+
+  /////////////////////// Strategies for load balance 
//////////////////////////
+
+  private interface LoadBalancer {
+    int nextSocketIndex();
+  }
+
+  private class RoundRobinLoadBalancer implements LoadBalancer {
+    @Override
+    public int nextSocketIndex() {
+      final int socketSize = sockets.size();
+      // Round-robin, find the next alive client
+      for (int tryCount = 0; tryCount < socketSize; ++tryCount) {
+        final int clientIndex = (int) (currentClientIndex++ % socketSize);
+        if (Boolean.TRUE.equals(isSocketAlive.get(clientIndex))) {
+          return clientIndex;
+        }
+      }
+
+      throw new PipeConnectionException(
+          "All sockets are dead, please check the connection to the 
receiver.");
+    }
+  }
+
+  private class RandomLoadBalancer implements LoadBalancer {
+    @Override
+    public int nextSocketIndex() {
+      final int socketSize = sockets.size();
+      final int clientIndex = (int) (Math.random() * socketSize);
+      if (Boolean.TRUE.equals(isSocketAlive.get(clientIndex))) {
+        return clientIndex;
+      }
+
+      // Random, find the next alive client
+      for (int tryCount = 0; tryCount < socketSize - 1; ++tryCount) {
+        final int nextClientIndex = (clientIndex + tryCount + 1) % socketSize;
+        if (Boolean.TRUE.equals(isSocketAlive.get(nextClientIndex))) {
+          return nextClientIndex;
+        }
+      }
+
+      throw new PipeConnectionException(
+          "All sockets are dead, please check the connection to the 
receiver.");
+    }
+  }
+
+  private class PriorityLoadBalancer implements LoadBalancer {
+    @Override
+    public int nextSocketIndex() {
+      // Priority, find the first alive client
+      final int socketSize = sockets.size();
+      for (int i = 0; i < socketSize; ++i) {
+        if (Boolean.TRUE.equals(isSocketAlive.get(i))) {
+          return i;
+        }
+      }
+
+      throw new PipeConnectionException(
+          "All sockets are dead, please check the connection to the 
receiver.");
+    }
+  }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
index b2fdff35385..0efc39632ee 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
@@ -33,7 +33,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.HashSet;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Objects;
 import java.util.Set;
@@ -152,9 +152,9 @@ public abstract class IoTDBConnector implements 
PipeConnector {
                 CONNECTOR_EXCEPTION_OTHERS_RECORD_IGNORED_DATA_DEFAULT_VALUE));
   }
 
-  protected Set<TEndPoint> parseNodeUrls(PipeParameters parameters)
+  protected LinkedHashSet<TEndPoint> parseNodeUrls(PipeParameters parameters)
       throws PipeParameterNotValidException {
-    final Set<TEndPoint> givenNodeUrls = new HashSet<>(nodeUrls);
+    final LinkedHashSet<TEndPoint> givenNodeUrls = new 
LinkedHashSet<>(nodeUrls);
 
     try {
       if (parameters.hasAttribute(CONNECTOR_IOTDB_IP_KEY)
@@ -207,6 +207,7 @@ public abstract class IoTDBConnector implements 
PipeConnector {
     }
 
     checkNodeUrls(givenNodeUrls);
+
     return givenNodeUrls;
   }
 

Reply via email to