This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch pipe-load-balancer in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 806cb4f812496a595c9fad4b264cc63c0d929283 Author: Steve Yurong Su <[email protected]> AuthorDate: Tue Apr 2 18:20:04 2024 +0800 airgap load balancer --- .../connector/protocol/IoTDBAirGapConnector.java | 123 +++++++++++++++++++-- .../pipe/connector/protocol/IoTDBConnector.java | 7 +- 2 files changed, 115 insertions(+), 15 deletions(-) diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBAirGapConnector.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBAirGapConnector.java index 25c8b4abeb7..8922747011a 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBAirGapConnector.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBAirGapConnector.java @@ -24,6 +24,7 @@ import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.connector.payload.airgap.AirGapELanguageConstant; import org.apache.iotdb.commons.pipe.connector.payload.airgap.AirGapOneByteResponse; import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.iotdb.pipe.api.exception.PipeConnectionException; import org.apache.iotdb.pipe.api.exception.PipeException; @@ -48,8 +49,14 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_AIR_GAP_E_LANGUAGE_ENABLE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_AIR_GAP_HANDSHAKE_TIMEOUT_MS_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_AIR_GAP_HANDSHAKE_TIMEOUT_MS_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_PRIORITY_STRATEGY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_RANDOM_STRATEGY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_ROUND_ROBIN_STRATEGY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_STRATEGY_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_STRATEGY_SET; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_AIR_GAP_E_LANGUAGE_ENABLE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_AIR_GAP_HANDSHAKE_TIMEOUT_MS_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_LOAD_BALANCE_STRATEGY_KEY; import static org.apache.iotdb.commons.utils.BasicStructureSerDeUtil.LONG_LEN; public abstract class IoTDBAirGapConnector extends IoTDBConnector { @@ -61,14 +68,37 @@ public abstract class IoTDBAirGapConnector extends IoTDBConnector { protected final List<Socket> sockets = new ArrayList<>(); protected final List<Boolean> isSocketAlive = new ArrayList<>(); + private String loadBalanceStrategy; + private LoadBalancer loadBalancer; + private long currentClientIndex = 0; + private int handshakeTimeoutMs; - private boolean eLanguageEnable; - private long currentClientIndex = 0; + private boolean eLanguageEnable; // The air gap connector does not use clientManager thus we put handshake type here protected boolean supportModsIfIsDataNodeReceiver = true; + @Override + public void validate(PipeParameterValidator validator) throws Exception { + super.validate(validator); + + loadBalanceStrategy = + validator + .getParameters() + .getStringOrDefault( + Arrays.asList(CONNECTOR_LOAD_BALANCE_STRATEGY_KEY, SINK_LOAD_BALANCE_STRATEGY_KEY), + CONNECTOR_LOAD_BALANCE_ROUND_ROBIN_STRATEGY) + .trim() + .toLowerCase(); + validator.validate( + arg -> CONNECTOR_LOAD_BALANCE_STRATEGY_SET.contains(loadBalanceStrategy), + String.format( + "Load balance strategy should be one of %s, but got %s.", + CONNECTOR_LOAD_BALANCE_STRATEGY_SET, loadBalanceStrategy), + loadBalanceStrategy); + } + @Override public void customize(PipeParameters parameters, PipeConnectorRuntimeConfiguration configuration) throws Exception { @@ -86,6 +116,23 @@ public abstract class IoTDBAirGapConnector extends IoTDBConnector { sockets.add(null); } + switch (loadBalanceStrategy) { + case CONNECTOR_LOAD_BALANCE_ROUND_ROBIN_STRATEGY: + loadBalancer = new RoundRobinLoadBalancer(); + break; + case CONNECTOR_LOAD_BALANCE_RANDOM_STRATEGY: + loadBalancer = new RandomLoadBalancer(); + break; + case CONNECTOR_LOAD_BALANCE_PRIORITY_STRATEGY: + loadBalancer = new PriorityLoadBalancer(); + break; + default: + LOGGER.warn( + "Unknown load balance strategy: {}, use round-robin strategy instead.", + loadBalanceStrategy); + loadBalancer = new RoundRobinLoadBalancer(); + } + handshakeTimeoutMs = parameters.getIntOrDefault( Arrays.asList( @@ -235,16 +282,7 @@ public abstract class IoTDBAirGapConnector extends IoTDBConnector { String fileName, long position, byte[] payLoad) throws IOException; protected int nextSocketIndex() { - final int socketSize = sockets.size(); - // Round-robin, find the next alive client - for (int tryCount = 0; tryCount < socketSize; ++tryCount) { - final int clientIndex = (int) (currentClientIndex++ % socketSize); - if (Boolean.TRUE.equals(isSocketAlive.get(clientIndex))) { - return clientIndex; - } - } - throw new PipeConnectionException( - "All sockets are dead, please check the connection to the receiver."); + return loadBalancer.nextSocketIndex(); } protected boolean send(Socket socket, byte[] bytes) throws IOException { @@ -296,4 +334,65 @@ public abstract class IoTDBAirGapConnector extends IoTDBConnector { } } } + + /////////////////////// Strategies for load balance ////////////////////////// + + private interface LoadBalancer { + int nextSocketIndex(); + } + + private class RoundRobinLoadBalancer implements LoadBalancer { + @Override + public int nextSocketIndex() { + final int socketSize = sockets.size(); + // Round-robin, find the next alive client + for (int tryCount = 0; tryCount < socketSize; ++tryCount) { + final int clientIndex = (int) (currentClientIndex++ % socketSize); + if (Boolean.TRUE.equals(isSocketAlive.get(clientIndex))) { + return clientIndex; + } + } + + throw new PipeConnectionException( + "All sockets are dead, please check the connection to the receiver."); + } + } + + private class RandomLoadBalancer implements LoadBalancer { + @Override + public int nextSocketIndex() { + final int socketSize = sockets.size(); + final int clientIndex = (int) (Math.random() * socketSize); + if (Boolean.TRUE.equals(isSocketAlive.get(clientIndex))) { + return clientIndex; + } + + // Random, find the next alive client + for (int tryCount = 0; tryCount < socketSize - 1; ++tryCount) { + final int nextClientIndex = (clientIndex + tryCount + 1) % socketSize; + if (Boolean.TRUE.equals(isSocketAlive.get(nextClientIndex))) { + return nextClientIndex; + } + } + + throw new PipeConnectionException( + "All sockets are dead, please check the connection to the receiver."); + } + } + + private class PriorityLoadBalancer implements LoadBalancer { + @Override + public int nextSocketIndex() { + // Priority, find the first alive client + final int socketSize = sockets.size(); + for (int i = 0; i < socketSize; ++i) { + if (Boolean.TRUE.equals(isSocketAlive.get(i))) { + return i; + } + } + + throw new PipeConnectionException( + "All sockets are dead, please check the connection to the receiver."); + } + } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java index b2fdff35385..0efc39632ee 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java @@ -33,7 +33,7 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Arrays; -import java.util.HashSet; +import java.util.LinkedHashSet; import java.util.List; import java.util.Objects; import java.util.Set; @@ -152,9 +152,9 @@ public abstract class IoTDBConnector implements PipeConnector { CONNECTOR_EXCEPTION_OTHERS_RECORD_IGNORED_DATA_DEFAULT_VALUE)); } - protected Set<TEndPoint> parseNodeUrls(PipeParameters parameters) + protected LinkedHashSet<TEndPoint> parseNodeUrls(PipeParameters parameters) throws PipeParameterNotValidException { - final Set<TEndPoint> givenNodeUrls = new HashSet<>(nodeUrls); + final LinkedHashSet<TEndPoint> givenNodeUrls = new LinkedHashSet<>(nodeUrls); try { if (parameters.hasAttribute(CONNECTOR_IOTDB_IP_KEY) @@ -207,6 +207,7 @@ public abstract class IoTDBConnector implements PipeConnector { } checkNodeUrls(givenNodeUrls); + return givenNodeUrls; }
