This is an automated email from the ASF dual-hosted git repository.
lta pushed a commit to branch cluster_concurrent
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/cluster_concurrent by this
push:
new b386037 fix bugs according to comments
new f53f559 Merge branch 'cluster_concurrent' of
github.com:apache/incubator-iotdb into cluster_concurrent
b386037 is described below
commit b3860374f1d5250814deb7a61bf7741f2db10ddb
Author: lta <[email protected]>
AuthorDate: Thu Apr 11 16:34:15 2019 +0800
fix bugs according to comments
---
.../cluster/concurrent/pool/QPTaskManager.java | 64 +++--------------
.../cluster/concurrent/pool/RaftTaskManager.java | 66 +++--------------
.../cluster/concurrent/pool/ThreadPoolManager.java | 82 ++++++++++++++++++++++
.../apache/iotdb/cluster/config/ClusterConfig.java | 51 ++++++++------
.../iotdb/cluster/config/ClusterDescriptor.java | 26 +++++--
.../org/apache/iotdb/cluster/entity/Server.java | 4 +-
.../iotdb/cluster/qp/callback/MultiQPTask.java | 2 +-
.../cluster/concurrent/pool/QPTaskManagerTest.java | 6 +-
iotdb/iotdb/conf/iotdb-cluster.properties | 27 +++++--
9 files changed, 177 insertions(+), 151 deletions(-)
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/concurrent/pool/QPTaskManager.java
b/cluster/src/main/java/org/apache/iotdb/cluster/concurrent/pool/QPTaskManager.java
index 6dd406f..8ea8a5f 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/concurrent/pool/QPTaskManager.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/concurrent/pool/QPTaskManager.java
@@ -18,23 +18,17 @@
*/
package org.apache.iotdb.cluster.concurrent.pool;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
import org.apache.iotdb.cluster.concurrent.ThreadName;
import org.apache.iotdb.cluster.config.ClusterConfig;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
-import org.apache.iotdb.db.exception.ProcessorException;
/**
* Manage all qp tasks in thread.
*/
-public class QPTaskManager {
+public class QPTaskManager extends ThreadPoolManager {
- private ExecutorService pool;
- private int threadCnt;
+ private static final String managerName = "qp task manager";
private QPTaskManager() {
init();
@@ -44,60 +38,20 @@ public class QPTaskManager {
return QPTaskManager.InstanceHolder.instance;
}
- private void checkInit(){
- if(pool == null){
- init();
- }
- }
-
- private void init(){
+ @Override
+ public void init() {
ClusterConfig config = ClusterDescriptor.getInstance().getConfig();
- this.threadCnt = config.getConcurrentQPTaskThread();
+ this.threadCnt = config.getConcurrentQPSubTaskThread();
pool = IoTDBThreadPoolFactory.newFixedThreadPool(threadCnt,
ThreadName.QP_TASK.getName());
}
/**
- * Block new qp task submits and exit when all RUNNING THREADS AND TASKS IN
THE QUEUE end.
- *
- * @param block if set to true, this method will wait for timeOut
milliseconds. false, return
- * directly. False, return directly.
- * @param timeOut block time out in milliseconds.
- * @throws ProcessorException if timeOut is reached or being interrupted
while waiting to exit.
+ * Name of Pool Manager
*/
- public void close(boolean block, long timeOut) throws ProcessorException {
- if(pool != null) {
- try {
- pool.shutdown();
- if (block) {
- try {
- if (!pool.awaitTermination(timeOut, TimeUnit.MILLISECONDS)) {
- throw new ProcessorException(
- "QPTask thread pool doesn't exit after " + timeOut + " ms");
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new ProcessorException(
- "Interrupted while waiting QPTask thread pool to exit.", e);
- }
- }
- } finally {
- pool = null;
- }
- }
- }
-
- public Future<?> submit(Runnable task) {
- checkInit();
- return pool.submit(task);
- }
-
- public int getActiveCnt() {
- return ((ThreadPoolExecutor) pool).getActiveCount();
- }
-
- public int getThreadCnt() {
- return threadCnt;
+ @Override
+ public String getManagerName() {
+ return managerName;
}
private static class InstanceHolder {
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/concurrent/pool/RaftTaskManager.java
b/cluster/src/main/java/org/apache/iotdb/cluster/concurrent/pool/RaftTaskManager.java
index 09665aa..ea60b76 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/concurrent/pool/RaftTaskManager.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/concurrent/pool/RaftTaskManager.java
@@ -18,85 +18,39 @@
*/
package org.apache.iotdb.cluster.concurrent.pool;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
import org.apache.iotdb.cluster.concurrent.ThreadName;
import org.apache.iotdb.cluster.config.ClusterConfig;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
-import org.apache.iotdb.db.exception.ProcessorException;
/**
* Manage all raft tasks which are applied to data state machine in thread.
*/
-public class RaftTaskManager {
+public class RaftTaskManager extends ThreadPoolManager{
-
- private ExecutorService pool;
- private int threadCnt;
+ private static final String managerName = "raft task manager";
private RaftTaskManager() {
- init();
+ init();
}
public static RaftTaskManager getInstance() {
return RaftTaskManager.InstanceHolder.instance;
}
- private void checkInit(){
- if(pool == null){
- init();
- }
- }
-
- private void init(){
+ @Override
+ public void init() {
ClusterConfig config = ClusterDescriptor.getInstance().getConfig();
- this.threadCnt = config.getConcurrentQPTaskThread();
+ int threadCnt = config.getConcurrentQPSubTaskThread();
pool = IoTDBThreadPoolFactory.newFixedThreadPool(threadCnt,
ThreadName.QP_TASK.getName());
}
/**
- * Block new raft submits and exit when all RUNNING THREADS AND TASKS IN THE
QUEUE end.
- *
- * @param block if set to true, this method will wait for timeOut
milliseconds. false, return
- * directly. False, return directly.
- * @param timeOut block time out in milliseconds.
- * @throws ProcessorException if timeOut is reached or being interrupted
while waiting to exit.
+ * Name of Pool Manager
*/
- public void close(boolean block, long timeOut) throws ProcessorException {
- if(pool != null) {
- try {
- pool.shutdown();
- if (block) {
- try {
- if (!pool.awaitTermination(timeOut, TimeUnit.MILLISECONDS)) {
- throw new ProcessorException(
- "Raft task thread pool doesn't exit after " + timeOut + "
ms");
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new ProcessorException(
- "Interrupted while waiting raft task thread pool to exit.", e);
- }
- }
- } finally {
- pool = null;
- }
- }
- }
-
- public void execute(Runnable task) {
- checkInit();
- pool.execute(task);
- }
-
- public int getActiveCnt() {
- return ((ThreadPoolExecutor) pool).getActiveCount();
- }
-
- public int getThreadCnt() {
- return threadCnt;
+ @Override
+ public String getManagerName() {
+ return managerName;
}
private static class InstanceHolder {
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/concurrent/pool/ThreadPoolManager.java
b/cluster/src/main/java/org/apache/iotdb/cluster/concurrent/pool/ThreadPoolManager.java
new file mode 100644
index 0000000..a2993ea
--- /dev/null
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/concurrent/pool/ThreadPoolManager.java
@@ -0,0 +1,82 @@
+
+package org.apache.iotdb.cluster.concurrent.pool;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.iotdb.db.exception.ProcessorException;
+
+public abstract class ThreadPoolManager {
+
+ ExecutorService pool;
+ int threadCnt;
+
+ private void checkInit() {
+ if (pool == null) {
+ init();
+ }
+ }
+
+ /**
+ * Init pool manager
+ */
+ public abstract void init();
+
+ /**
+ * Block new submits and exit when all RUNNING THREADS AND TASKS IN THE
QUEUE end.
+ *
+ * @param block if set to true, this method will wait for timeOut
milliseconds. false, return
+ * directly. False, return directly.
+ * @param timeOut block time out in milliseconds.
+ * @throws ProcessorException if timeOut is reached or being interrupted
while waiting to exit.
+ */
+ public void close(boolean block, long timeOut) throws ProcessorException {
+ if (pool != null) {
+ try {
+ pool.shutdown();
+ if (block) {
+ try {
+ if (!pool.awaitTermination(timeOut, TimeUnit.MILLISECONDS)) {
+ throw new ProcessorException(
+ String
+ .format("%s thread pool doesn't exit after %d ms",
getManagerName(),
+ timeOut));
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new ProcessorException(
+ String
+ .format("Interrupted while waiting %s thread pool to
exit.", getManagerName()),
+ e);
+ }
+ }
+ } finally {
+ pool = null;
+ }
+ }
+ }
+
+ /**
+ * Name of Pool Manager
+ */
+ public abstract String getManagerName();
+
+ public void execute(Runnable task) {
+ checkInit();
+ pool.execute(task);
+ }
+
+ public Future<?> submit(Runnable task) {
+ checkInit();
+ return pool.submit(task);
+ }
+
+ public int getActiveCnt() {
+ return ((ThreadPoolExecutor) pool).getActiveCount();
+ }
+
+ public int getThreadCnt() {
+ return threadCnt;
+ }
+}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
index f1670b0..417a4c5 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.cluster.config;
+import com.alipay.sofa.jraft.util.OnlyForTest;
import java.io.File;
import java.io.IOException;
import org.apache.commons.io.FileUtils;
@@ -27,8 +28,6 @@ import org.apache.iotdb.db.utils.FilePathUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.alipay.sofa.jraft.util.OnlyForTest;
-
public class ClusterConfig {
private static final Logger LOGGER =
LoggerFactory.getLogger(ClusterConfig.class);
@@ -70,9 +69,8 @@ public class ClusterConfig {
private String raftMetadataPath;
/**
- * A follower would become a candidate if it doesn't receive any message
- * from the leader in {@code electionTimeoutMs} milliseconds
- * Default: 1000 (1s)
+ * A follower would become a candidate if it doesn't receive any message
from the leader in {@code
+ * electionTimeoutMs} milliseconds Default: 1000 (1s)
*/
private int electionTimeoutMs = 1000;
@@ -112,8 +110,8 @@ public class ClusterConfig {
private int maxNumOfInnerRpcClient = 500;
/**
- * Max number of queue length to use @NodeAsClient, the request which exceed
to this
- * number will be rejected.
+ * Max number of queue length to use @NodeAsClient, the request which exceed
to this number will
+ * be rejected.
*/
private int maxQueueNumOfInnerRpcClient = 500;
@@ -128,25 +126,32 @@ public class ClusterConfig {
private int readDataConsistencyLevel = 1;
/**
- * How many threads can concurrently execute qp task. When <= 0, use CPU
core number.
+ * Max num of threads can concurrently execute qp sub-tasks of all client
requests. When <= 0, use
+ * CPU core number * 10. Each client request corresponds to a QP Task. A QP
task may be divided
+ * into several sub-tasks.
*/
- private int concurrentQPTaskThread =
Runtime.getRuntime().availableProcessors();
+ private int concurrentQPSubTaskThread =
Runtime.getRuntime().availableProcessors() * 10;
/**
- * How many threads can concurrently apply raft task. When <= 0, use CPU
core number.
+ * Max num of threads can concurrently apply tasks in data partition state
machine. When <= 0, use
+ * CPU core number.Raft Task refers to tasks that need to be performed in
the state machine.
*/
- private int concurrentRaftTaskThread =
Runtime.getRuntime().availableProcessors() * 10;
+ private int concurrentRaftTaskThread =
Runtime.getRuntime().availableProcessors();
/**
- * Max time of blocking main thread for waiting for all RUNNING RAFT TASK
THREADS AND TASKS IN THE QUEUE end.
- * For raft tasks, due to consistency is need to be guaranteed, it must
ensure that raft tasks finish.
+ * Max time of blocking main thread for waiting for all running task threads
and tasks in the
+ * queue until end. Raft Task refers to tasks that need to be performed in
the state machine.The
+ * unit is milliseconds. Due to guarantee data consistency, it must ensure
that tasks finish. If
+ * user sets the parameter, it will not sure to guarantee consistency.
*/
private int closeRaftTaskBlockTimeout = Integer.MAX_VALUE;
/**
- * Max time of blocking main thread for waiting for all RUNNING QP TASK
THREADS AND TASKS IN THE QUEUE end.
+ * Max time of blocking main thread for waiting for all running task threads
and tasks in the
+ * queue until end. Each client request corresponds to a QP Task. A QP task
may be divided into
+ * several sub-tasks.The unit is milliseconds.
*/
- private int closeQPTaskBlockTimeout = 1;
+ private int closeQPSubTaskBlockTimeout = 1000;
public ClusterConfig() {
// empty constructor
@@ -327,12 +332,12 @@ public class ClusterConfig {
this.readDataConsistencyLevel = readDataConsistencyLevel;
}
- public int getConcurrentQPTaskThread() {
- return concurrentQPTaskThread;
+ public int getConcurrentQPSubTaskThread() {
+ return concurrentQPSubTaskThread;
}
- public void setConcurrentQPTaskThread(int concurrentQPTaskThread) {
- this.concurrentQPTaskThread = concurrentQPTaskThread;
+ public void setConcurrentQPSubTaskThread(int concurrentQPSubTaskThread) {
+ this.concurrentQPSubTaskThread = concurrentQPSubTaskThread;
}
public int getConcurrentRaftTaskThread() {
@@ -351,11 +356,11 @@ public class ClusterConfig {
this.closeRaftTaskBlockTimeout = closeRaftTaskBlockTimeout;
}
- public int getCloseQPTaskBlockTimeout() {
- return closeQPTaskBlockTimeout;
+ public int getCloseQPSubTaskBlockTimeout() {
+ return closeQPSubTaskBlockTimeout;
}
- public void setCloseQPTaskBlockTimeout(int closeQPTaskBlockTimeout) {
- this.closeQPTaskBlockTimeout = closeQPTaskBlockTimeout;
+ public void setCloseQPSubTaskBlockTimeout(int closeQPSubTaskBlockTimeout) {
+ this.closeQPSubTaskBlockTimeout = closeQPSubTaskBlockTimeout;
}
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
index db84a34..98aa5de 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
@@ -152,20 +152,34 @@ public class ClusterDescriptor {
.parseInt(properties.getProperty("read_data_consistency_level",
Integer.toString(conf.getReadDataConsistencyLevel()))));
- conf.setConcurrentQPTaskThread(Integer
- .parseInt(properties.getProperty("concurrent_qp_task_thread",
- Integer.toString(conf.getConcurrentQPTaskThread()))));
- if (conf.getConcurrentQPTaskThread() <= 0) {
-
conf.setConcurrentQPTaskThread(Runtime.getRuntime().availableProcessors());
+ conf.setConcurrentQPSubTaskThread(Integer
+ .parseInt(properties.getProperty("concurrent_qp_sub_task_thread",
+ Integer.toString(conf.getConcurrentQPSubTaskThread()))));
+ if (conf.getConcurrentQPSubTaskThread() <= 0) {
+
conf.setConcurrentQPSubTaskThread(Runtime.getRuntime().availableProcessors());
}
conf.setConcurrentRaftTaskThread(Integer
.parseInt(properties.getProperty("concurrent_raft_task_thread",
- Integer.toString(conf.getConcurrentQPTaskThread()))));
+ Integer.toString(conf.getConcurrentQPSubTaskThread()))));
if (conf.getConcurrentRaftTaskThread() <= 0) {
conf.setConcurrentRaftTaskThread(Runtime.getRuntime().availableProcessors());
}
+ conf.setCloseRaftTaskBlockTimeout(Integer
+ .parseInt(properties.getProperty("close_raft_task_block_timeout",
+ Integer.toString(conf.getCloseRaftTaskBlockTimeout()))));
+ if (conf.getConcurrentRaftTaskThread() <= 0) {
+
conf.setCloseRaftTaskBlockTimeout(Runtime.getRuntime().availableProcessors());
+ }
+
+ conf.setCloseQPSubTaskBlockTimeout(Integer
+ .parseInt(properties.getProperty("close_qp_sub_task_block_timeout",
+ Integer.toString(conf.getCloseQPSubTaskBlockTimeout()))));
+ if (conf.getCloseQPSubTaskBlockTimeout() <= 0) {
+
conf.setCloseQPSubTaskBlockTimeout(Runtime.getRuntime().availableProcessors()*10);
+ }
+
} catch (IOException e) {
LOGGER.warn("Cannot load config file because, use default
configuration", e);
} catch (Exception e) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
b/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
index 54b3e9b..fca52dd 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
@@ -122,8 +122,8 @@ public class Server {
}
public void stop() throws ProcessorException {
-// RaftTaskManager.getInstance().close(true,
CLUSTER_CONF.getCloseRaftTaskBlockTimeout());
-// QPTaskManager.getInstance().close(true,
CLUSTER_CONF.getCloseQPTaskBlockTimeout());
+ RaftTaskManager.getInstance().close(true,
CLUSTER_CONF.getCloseRaftTaskBlockTimeout());
+ QPTaskManager.getInstance().close(true,
CLUSTER_CONF.getCloseQPSubTaskBlockTimeout());
iotdb.deactivate();
CLIENT_MANAGER.shutdown();
metadataHolder.stop();
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/qp/callback/MultiQPTask.java
b/cluster/src/main/java/org/apache/iotdb/cluster/qp/callback/MultiQPTask.java
index d15973b..f400eaf 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/qp/callback/MultiQPTask.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/qp/callback/MultiQPTask.java
@@ -41,7 +41,7 @@ public abstract class MultiQPTask extends QPTask {
@Override
public void shutdown() {
for (Future<?> task : taskThreadMap.values()) {
- if (task.isDone()) {
+ if (!task.isDone()) {
task.cancel(true);
}
}
diff --git
a/cluster/src/test/java/org/apache/iotdb/cluster/concurrent/pool/QPTaskManagerTest.java
b/cluster/src/test/java/org/apache/iotdb/cluster/concurrent/pool/QPTaskManagerTest.java
index bc413e4..f9f1ac3 100644
---
a/cluster/src/test/java/org/apache/iotdb/cluster/concurrent/pool/QPTaskManagerTest.java
+++
b/cluster/src/test/java/org/apache/iotdb/cluster/concurrent/pool/QPTaskManagerTest.java
@@ -64,7 +64,7 @@ public class QPTaskManagerTest {
@Test
public void testSubmitAndClose() throws InterruptedException {
- assertEquals(clusterConfig.getConcurrentQPTaskThread(),
qpTaskManager.getThreadCnt());
+ assertEquals(clusterConfig.getConcurrentQPSubTaskThread(),
qpTaskManager.getThreadCnt());
int threadCnt = qpTaskManager.getThreadCnt();
// test thread num
@@ -77,9 +77,9 @@ public class QPTaskManagerTest {
// test close
try {
new Thread(changeMark).start();
- qpTaskManager.close(true, blockTimeOut * 3);
+ qpTaskManager.close(true, blockTimeOut);
} catch (ProcessorException e) {
- e.printStackTrace();
+ assertEquals("QPTask thread pool doesn't exit after 10 ms",
e.getMessage());
}
Thread.sleep(blockTimeOut * 2);
diff --git a/iotdb/iotdb/conf/iotdb-cluster.properties
b/iotdb/iotdb/conf/iotdb-cluster.properties
index 8d58a69..46934aa 100644
--- a/iotdb/iotdb/conf/iotdb-cluster.properties
+++ b/iotdb/iotdb/conf/iotdb-cluster.properties
@@ -72,8 +72,25 @@ read_metadata_consistency_level = 1
# ReadDataConsistencyLevel: 1 Strong consistency, 2 Weak consistency
read_data_consistency_level = 1
-# How many thread can concurrently execute qp task. When <= 0, use CPU core
number.
-concurrent_qp_task_thread=0
-
-# How many thread can concurrently apply raft task. When <= 0, use CPU core
number.
-concurrent_raft_task_thread=0
\ No newline at end of file
+# Max num of threads can concurrently execute qp sub-tasks of all client
requests.
+# When <= 0, use CPU core number * 10.
+# Each client request corresponds to a QP Task.
+# A QP task may be divided into several sub-tasks.
+concurrent_qp_sub_task_thread = 0
+
+# Max num of threads can concurrently apply tasks in raft state machine.
+# When <= 0, use CPU core number.
+# Raft Task refers to tasks that need to be performed in the state machine.
+concurrent_raft_task_thread = 0
+
+# Max time of blocking main thread for waiting for all running task threads
and tasks in the
+# queue until end. The unit is milliseconds.
+# Each client request corresponds to a QP Task. A QP task may be divided into
several sub-tasks.
+close_qp_sub_task_block_timeout = 1000
+
+# Max time of blocking main thread for waiting for all running task threads
and tasks in the
+# queue until end. The unit is milliseconds.
+# Raft Task refers to tasks that need to be performed in the state machine.
+# Due to guarantee data consistency, it must ensure that raft tasks finish.
+# If user sets the parameter, it will not sure to guarantee consistency.
+close_raft_task_block_timeout = 0
\ No newline at end of file