This is an automated email from the ASF dual-hosted git repository.
chaow pushed a commit to branch cluster_new
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/cluster_new by this push:
new e3ccf65 fix conf bug
new bf25c71 Merge pull request #1877 from
yhwangBonc/cluster_new_fix_conf_bug
e3ccf65 is described below
commit e3ccf65d824b71695b842e62d723add11b83078c
Author: wangyanhong <[email protected]>
AuthorDate: Tue Oct 27 21:40:27 2020 +0800
fix conf bug
---
.../src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java | 2 +-
.../main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java | 2 +-
.../src/main/java/org/apache/iotdb/cluster/server/ClientServer.java | 5 +++--
.../src/main/java/org/apache/iotdb/cluster/server/RaftServer.java | 4 +++-
.../org/apache/iotdb/cluster/server/heartbeat/HeartbeatServer.java | 4 +++-
.../src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java | 4 +++-
6 files changed, 14 insertions(+), 7 deletions(-)
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
index ae2985b..b60e7bc 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
@@ -40,7 +40,7 @@ public class ClusterConfig {
@ClusterConsistent
private boolean isRpcThriftCompressionEnabled = false;
- private int maxConcurrentClientNum = 100000;
+ private int maxConcurrentClientNum = 10000;
@ClusterConsistent
private int replicationNum = 2;
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
index 33f467c..ccb5749 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
@@ -154,7 +154,7 @@ public class ClusterDescriptor {
boolean isInvalidSeedIp = InetAddresses.isInetAddress(seedIP);
if (!isInvalidSeedIp) {
String newSeedIP = hostnameToIP(seedIP);
- newSeedUrls.add(newSeedIP + ":" + splits[1] + ":" + splits[2]);
+ newSeedUrls.add(newSeedIP + ":" + splits[1] + ":" + splits[2]+ ":" +
splits[3]);
} else {
newSeedUrls.add(seedUrl);
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java
b/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java
index 306439b..b5e7c02 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java
@@ -143,9 +143,10 @@ public class ClientServer extends TSServiceImpl {
config.getClusterRpcPort()));
// async service also requires nonblocking server, and HsHaServer is
basically more efficient a
// nonblocking server
+ int maxConcurrentClientNum = Math.max(CommonUtils.getCpuCores(),
+ config.getMaxConcurrentClientNum());
TThreadPoolServer.Args poolArgs =
- new
TThreadPoolServer.Args(serverTransport).maxWorkerThreads(ClusterDescriptor
- .getInstance().getConfig().getMaxConcurrentClientNum())
+ new
TThreadPoolServer.Args(serverTransport).maxWorkerThreads(maxConcurrentClientNum)
.minWorkerThreads(CommonUtils.getCpuCores());
poolArgs.executorService(new ThreadPoolExecutor(poolArgs.minWorkerThreads,
poolArgs.maxWorkerThreads, poolArgs.stopTimeoutVal,
poolArgs.stopTimeoutUnit,
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/server/RaftServer.java
b/cluster/src/main/java/org/apache/iotdb/cluster/server/RaftServer.java
index 13d007d..b7d78b6 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/RaftServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/RaftServer.java
@@ -186,8 +186,10 @@ public abstract class RaftServer implements
RaftService.AsyncIface, RaftService.
TThreadedSelectorServer.Args poolArgs =
new TThreadedSelectorServer.Args((TNonblockingServerTransport) socket);
poolArgs.selectorThreads(CommonUtils.getCpuCores());
+ int maxConcurrentClientNum = Math.max(CommonUtils.getCpuCores(),
+ config.getMaxConcurrentClientNum());
poolArgs.executorService(new ThreadPoolExecutor(CommonUtils.getCpuCores(),
- config.getMaxConcurrentClientNum(), poolArgs.getStopTimeoutVal(),
poolArgs.getStopTimeoutUnit(),
+ maxConcurrentClientNum, poolArgs.getStopTimeoutVal(),
poolArgs.getStopTimeoutUnit(),
new SynchronousQueue<>(), new ThreadFactory() {
private AtomicLong threadIndex = new AtomicLong(0);
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatServer.java
b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatServer.java
index 4070761..fbe4b68 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatServer.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatServer.java
@@ -168,9 +168,11 @@ public abstract class HeartbeatServer {
private TServer getAsyncHeartbeatServer() throws TTransportException {
heartbeatSocket = getHeartbeatServerSocket();
+ int maxConcurrentClientNum = Math.max(CommonUtils.getCpuCores(),
+ config.getMaxConcurrentClientNum());
Args poolArgs =
new Args((TNonblockingServerTransport) heartbeatSocket)
- .maxWorkerThreads(config.getMaxConcurrentClientNum())
+ .maxWorkerThreads(maxConcurrentClientNum)
.minWorkerThreads(CommonUtils.getCpuCores());
poolArgs.executorService(new ThreadPoolExecutor(poolArgs.minWorkerThreads,
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java
b/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java
index d5312a9..7d65fc5 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java
@@ -234,8 +234,10 @@ public class ClusterUtils {
public static TServer createTThreadPoolServer(TServerTransport socket,
String clientThreadPrefix, TProcessor processor, TProtocolFactory
protocolFactory) {
ClusterConfig config = ClusterDescriptor.getInstance().getConfig();
+ int maxConcurrentClientNum = Math.max(CommonUtils.getCpuCores(),
+ config.getMaxConcurrentClientNum());
TThreadPoolServer.Args poolArgs =
- new
TThreadPoolServer.Args(socket).maxWorkerThreads(config.getMaxConcurrentClientNum())
+ new
TThreadPoolServer.Args(socket).maxWorkerThreads(maxConcurrentClientNum)
.minWorkerThreads(CommonUtils.getCpuCores());
poolArgs.executorService(new ThreadPoolExecutor(poolArgs.minWorkerThreads,