This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 257d9aabac0 [IOTDB-6327] Random choosing available nodes to send sql
requests
257d9aabac0 is described below
commit 257d9aabac0ebd68c7ab1525448f8d82e6aeac32
Author: Jackie Tien <[email protected]>
AuthorDate: Wed May 15 12:11:18 2024 +0800
[IOTDB-6327] Random choosing available nodes to send sql requests
---
.../org/apache/iotdb/isession/INodeSupplier.java | 3 +++
.../apache/iotdb/session/DummyNodesSupplier.java | 12 ++++++++++
.../org/apache/iotdb/session/NodesSupplier.java | 26 ++++++++++++++++------
.../apache/iotdb/session/QueryEndPointPolicy.java} | 7 +++---
.../apache/iotdb/session/RoundRobinPolicy.java} | 17 ++++++++++----
.../java/org/apache/iotdb/session/Session.java | 22 +++++++++++++++++-
6 files changed, 71 insertions(+), 16 deletions(-)
diff --git
a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/INodeSupplier.java
b/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/INodeSupplier.java
index 371630a6b85..60a2a5adb43 100644
---
a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/INodeSupplier.java
+++
b/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/INodeSupplier.java
@@ -22,9 +22,12 @@ package org.apache.iotdb.isession;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import java.util.List;
+import java.util.Optional;
import java.util.function.Supplier;
public interface INodeSupplier extends Supplier<List<TEndPoint>> {
void close();
+
+ Optional<TEndPoint> getQueryEndPoint();
}
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/DummyNodesSupplier.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/DummyNodesSupplier.java
index 64a7c87dcf2..bb3186d4200 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/DummyNodesSupplier.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/DummyNodesSupplier.java
@@ -24,11 +24,14 @@ import org.apache.iotdb.isession.INodeSupplier;
import java.util.Collections;
import java.util.List;
+import java.util.Optional;
public class DummyNodesSupplier implements INodeSupplier {
private final List<TEndPoint> availableNodes;
+ private final QueryEndPointPolicy policy = new RoundRobinPolicy();
+
public DummyNodesSupplier(List<TEndPoint> availableNodes) {
this.availableNodes = Collections.unmodifiableList(availableNodes);
}
@@ -38,6 +41,15 @@ public class DummyNodesSupplier implements INodeSupplier {
// do nothing
}
+ @Override
+ public Optional<TEndPoint> getQueryEndPoint() {
+ if (availableNodes == null || availableNodes.isEmpty()) {
+ return Optional.empty();
+ } else {
+ return Optional.of(policy.chooseOne(availableNodes));
+ }
+ }
+
@Override
public List<TEndPoint> get() {
return availableNodes;
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/NodesSupplier.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/NodesSupplier.java
index 8f0244bbcab..37532023ce6 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/NodesSupplier.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/NodesSupplier.java
@@ -28,9 +28,10 @@ import org.slf4j.LoggerFactory;
import java.time.ZoneId;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.Optional;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -55,9 +56,9 @@ public class NodesSupplier implements INodeSupplier, Runnable
{
private static final int FETCH_SIZE = 10_000;
- // availableNodes won't be updated frequently, so we use CopyOnWriteArraySet
which is thread-safe
+ // availableNodes won't be updated frequently, so we use
CopyOnWriteArrayList which is thread-safe
// and is optimized for scenarios of reading more and writing less
- private volatile Set<TEndPoint> availableNodes = new CopyOnWriteArraySet<>();
+ private volatile List<TEndPoint> availableNodes = new
CopyOnWriteArrayList<>();
private final boolean useSSL;
private final String trustStore;
@@ -77,6 +78,8 @@ public class NodesSupplier implements INodeSupplier, Runnable
{
private final String version;
+ private final QueryEndPointPolicy policy = new RoundRobinPolicy();
+
private ThriftConnection client;
private volatile boolean closed = false;
@@ -133,7 +136,7 @@ public class NodesSupplier implements INodeSupplier,
Runnable {
String trustStorePwd,
boolean enableRPCCompression,
String version) {
- this.availableNodes.addAll(endPointList);
+ this.availableNodes.addAll(new HashSet<>(endPointList));
this.userName = userName;
this.password = password;
this.useSSL = useSSL;
@@ -151,7 +154,7 @@ public class NodesSupplier implements INodeSupplier,
Runnable {
// and the List needn't be thread-safe, because it will only be used in one
thread.
@Override
public List<TEndPoint> get() {
- return new ArrayList<>(availableNodes);
+ return availableNodes;
}
@Override
@@ -213,6 +216,15 @@ public class NodesSupplier implements INodeSupplier,
Runnable {
destroyCurrentClient();
}
+ @Override
+ public Optional<TEndPoint> getQueryEndPoint() {
+ if (availableNodes == null || availableNodes.isEmpty()) {
+ return Optional.empty();
+ } else {
+ return Optional.of(policy.chooseOne(get()));
+ }
+ }
+
private boolean updateDataNodeList() {
try (SessionDataSet sessionDataSet =
client.executeQueryStatement(SHOW_DATA_NODES_COMMAND, TIMEOUT_IN_MS,
FETCH_SIZE)) {
@@ -231,7 +243,7 @@ public class NodesSupplier implements INodeSupplier,
Runnable {
}
// replace the older ones.
if (!res.isEmpty()) {
- availableNodes = new CopyOnWriteArraySet<>(res);
+ availableNodes = res;
}
return true;
} catch (Exception e) {
diff --git
a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/INodeSupplier.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/QueryEndPointPolicy.java
similarity index 85%
copy from
iotdb-client/isession/src/main/java/org/apache/iotdb/isession/INodeSupplier.java
copy to
iotdb-client/session/src/main/java/org/apache/iotdb/session/QueryEndPointPolicy.java
index 371630a6b85..9554efda8de 100644
---
a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/INodeSupplier.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/QueryEndPointPolicy.java
@@ -17,14 +17,13 @@
* under the License.
*/
-package org.apache.iotdb.isession;
+package org.apache.iotdb.session;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import java.util.List;
-import java.util.function.Supplier;
-public interface INodeSupplier extends Supplier<List<TEndPoint>> {
+public interface QueryEndPointPolicy {
- void close();
+ TEndPoint chooseOne(List<TEndPoint> endPointList);
}
diff --git
a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/INodeSupplier.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/RoundRobinPolicy.java
similarity index 72%
copy from
iotdb-client/isession/src/main/java/org/apache/iotdb/isession/INodeSupplier.java
copy to
iotdb-client/session/src/main/java/org/apache/iotdb/session/RoundRobinPolicy.java
index 371630a6b85..f99827cc33e 100644
---
a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/INodeSupplier.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/RoundRobinPolicy.java
@@ -17,14 +17,23 @@
* under the License.
*/
-package org.apache.iotdb.isession;
+package org.apache.iotdb.session;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import java.util.List;
-import java.util.function.Supplier;
-public interface INodeSupplier extends Supplier<List<TEndPoint>> {
+public class RoundRobinPolicy implements QueryEndPointPolicy {
- void close();
+ private int index = 0;
+
+ @Override
+ public TEndPoint chooseOne(List<TEndPoint> endPointList) {
+ int tmp = index;
+ if (tmp >= endPointList.size()) {
+ tmp = 0;
+ }
+ index = tmp + 1;
+ return endPointList.get(tmp);
+ }
}
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
index 626578c4f83..c984744f09c 100644
--- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -89,6 +89,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
@@ -888,7 +889,7 @@ public class Session implements ISession {
private SessionDataSet executeStatementMayRedirect(String sql, long
timeoutInMs)
throws StatementExecutionException, IoTDBConnectionException {
try {
- return defaultSessionConnection.executeQueryStatement(sql, timeoutInMs);
+ return getQuerySessionConnection().executeQueryStatement(sql,
timeoutInMs);
} catch (RedirectException e) {
handleQueryRedirection(e.getEndPoint());
if (enableQueryRedirection) {
@@ -905,6 +906,25 @@ public class Session implements ISession {
}
}
+ private SessionConnection getQuerySessionConnection() {
+ Optional<TEndPoint> endPoint =
+ availableNodes == null ? Optional.empty() :
availableNodes.getQueryEndPoint();
+ if (!endPoint.isPresent() || endPointToSessionConnection == null) {
+ return defaultSessionConnection;
+ }
+ SessionConnection connection =
+ endPointToSessionConnection.computeIfAbsent(
+ endPoint.get(),
+ k -> {
+ try {
+ return constructSessionConnection(this, endPoint.get(),
zoneId);
+ } catch (IoTDBConnectionException ex) {
+ return null;
+ }
+ });
+ return connection == null ? defaultSessionConnection : connection;
+ }
+
/**
* execute non query statement
*