This is an automated email from the ASF dual-hosted git repository.
lta pushed a commit to branch cluster_concurrent
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/cluster_concurrent by this
push:
new 03f0f9b reorganize thread pool manager
03f0f9b is described below
commit 03f0f9bd25fcc492527efe5b3b5c818b705c7db2
Author: lta <[email protected]>
AuthorDate: Thu Apr 11 23:57:53 2019 +0800
reorganize thread pool manager
---
.../cluster/concurrent/pool/QPTaskManager.java | 18 ++++++++++--------
.../cluster/concurrent/pool/RaftTaskManager.java | 17 ++++++++++-------
.../cluster/concurrent/pool/ThreadPoolManager.java | 21 ++++++++++++---------
.../cluster/concurrent/pool/QPTaskManagerTest.java | 8 ++++----
.../apache/iotdb/db/engine/pool/FlushManager.java | 6 +++---
.../apache/iotdb/db/engine/pool/MergeManager.java | 8 ++++----
6 files changed, 43 insertions(+), 35 deletions(-)
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/concurrent/pool/QPTaskManager.java
b/cluster/src/main/java/org/apache/iotdb/cluster/concurrent/pool/QPTaskManager.java
index 198af19..cc26913 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/concurrent/pool/QPTaskManager.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/concurrent/pool/QPTaskManager.java
@@ -38,14 +38,6 @@ public class QPTaskManager extends ThreadPoolManager {
return QPTaskManager.InstanceHolder.instance;
}
- @Override
- public void init() {
-
- ClusterConfig config = ClusterDescriptor.getInstance().getConfig();
- this.threadCnt = config.getConcurrentQPSubTaskThread();
- pool = IoTDBThreadPoolFactory.newFixedThreadPool(threadCnt,
ThreadName.QP_TASK.getName());
- }
-
/**
* Name of Pool Manager
*/
@@ -54,6 +46,16 @@ public class QPTaskManager extends ThreadPoolManager {
return MANAGER_NAME;
}
+ @Override
+ public String getThreadName() {
+ return ThreadName.QP_TASK.getName();
+ }
+
+ @Override
+ public int getThreadPoolSize() {
+ return
ClusterDescriptor.getInstance().getConfig().getConcurrentQPSubTaskThread();
+ }
+
private static class InstanceHolder {
private InstanceHolder() {
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/concurrent/pool/RaftTaskManager.java
b/cluster/src/main/java/org/apache/iotdb/cluster/concurrent/pool/RaftTaskManager.java
index 492c11d..62259cd 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/concurrent/pool/RaftTaskManager.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/concurrent/pool/RaftTaskManager.java
@@ -38,13 +38,6 @@ public class RaftTaskManager extends ThreadPoolManager{
return RaftTaskManager.InstanceHolder.instance;
}
- @Override
- public void init() {
- ClusterConfig config = ClusterDescriptor.getInstance().getConfig();
- int threadCnt = config.getConcurrentQPSubTaskThread();
- pool = IoTDBThreadPoolFactory.newFixedThreadPool(threadCnt,
ThreadName.RAFT_TASK.getName());
- }
-
/**
* Name of Pool Manager
*/
@@ -53,6 +46,16 @@ public class RaftTaskManager extends ThreadPoolManager{
return MANAGER_NAME;
}
+ @Override
+ public String getThreadName() {
+ return ThreadName.RAFT_TASK.getName();
+ }
+
+ @Override
+ public int getThreadPoolSize() {
+ return
ClusterDescriptor.getInstance().getConfig().getConcurrentQPSubTaskThread();
+ }
+
private static class InstanceHolder {
private InstanceHolder() {
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/concurrent/pool/ThreadPoolManager.java
b/cluster/src/main/java/org/apache/iotdb/cluster/concurrent/pool/ThreadPoolManager.java
index 025e0d5..a6dfa42 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/concurrent/pool/ThreadPoolManager.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/concurrent/pool/ThreadPoolManager.java
@@ -22,12 +22,12 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.exception.ProcessorException;
public abstract class ThreadPoolManager {
ExecutorService pool;
- int threadCnt;
private void checkInit() {
if (pool == null) {
@@ -38,27 +38,29 @@ public abstract class ThreadPoolManager {
/**
* Init pool manager
*/
- public abstract void init();
+ public void init(){
+ pool = IoTDBThreadPoolFactory.newFixedThreadPool(getThreadPoolSize(),
getThreadName());
+ }
/**
* Block new submits and exit when all RUNNING THREADS AND TASKS IN THE
QUEUE end.
*
* @param block if set to true, this method will wait for timeOut
milliseconds. false, return
* directly.
- * @param timeOut block time out in milliseconds.
+ * @param timeout block time out in milliseconds.
* @throws ProcessorException if timeOut is reached or being interrupted
while waiting to exit.
*/
- public void close(boolean block, long timeOut) throws ProcessorException {
+ public void close(boolean block, long timeout) throws ProcessorException {
if (pool != null) {
try {
pool.shutdown();
if (block) {
try {
- if (!pool.awaitTermination(timeOut, TimeUnit.MILLISECONDS)) {
+ if (!pool.awaitTermination(timeout, TimeUnit.MILLISECONDS)) {
throw new ProcessorException(
String
.format("%s thread pool doesn't exit after %d ms",
getManagerName(),
- timeOut));
+ timeout));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
@@ -79,6 +81,10 @@ public abstract class ThreadPoolManager {
*/
public abstract String getManagerName();
+ public abstract String getThreadName();
+
+ public abstract int getThreadPoolSize();
+
public void execute(Runnable task) {
checkInit();
pool.execute(task);
@@ -93,7 +99,4 @@ public abstract class ThreadPoolManager {
return ((ThreadPoolExecutor) pool).getActiveCount();
}
- public int getThreadCnt() {
- return threadCnt;
- }
}
diff --git
a/cluster/src/test/java/org/apache/iotdb/cluster/concurrent/pool/QPTaskManagerTest.java
b/cluster/src/test/java/org/apache/iotdb/cluster/concurrent/pool/QPTaskManagerTest.java
index 6996c91..148d25d 100644
---
a/cluster/src/test/java/org/apache/iotdb/cluster/concurrent/pool/QPTaskManagerTest.java
+++
b/cluster/src/test/java/org/apache/iotdb/cluster/concurrent/pool/QPTaskManagerTest.java
@@ -64,14 +64,14 @@ public class QPTaskManagerTest {
@Test
public void testSubmitAndClose() throws InterruptedException {
- assertEquals(clusterConfig.getConcurrentQPSubTaskThread(),
qpTaskManager.getThreadCnt());
+ assertEquals(clusterConfig.getConcurrentQPSubTaskThread(),
qpTaskManager.getThreadPoolSize());
- int threadCnt = qpTaskManager.getThreadCnt();
+ int threadPoolSize = qpTaskManager.getThreadPoolSize();
// test thread num
- for (int i = 1; i <= threadCnt + 2; i++) {
+ for (int i = 1; i <= threadPoolSize + 2; i++) {
qpTaskManager.submit(testRunnable);
Thread.sleep(10);
- assertEquals(Math.min(i, threadCnt), qpTaskManager.getActiveCnt());
+ assertEquals(Math.min(i, threadPoolSize), qpTaskManager.getActiveCnt());
}
// test close
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/engine/pool/FlushManager.java
b/iotdb/src/main/java/org/apache/iotdb/db/engine/pool/FlushManager.java
index 7c996aa..86feaf0 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/pool/FlushManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/pool/FlushManager.java
@@ -94,16 +94,16 @@ public class FlushManager {
*
* @param block
* if set to true, this method will wait for timeOut milliseconds.
- * @param timeOut
+ * @param timeout
* block time out in milliseconds.
* @throws ProcessorException
* if timeOut is reached or being interrupted while waiting to
exit.
*/
- public void close(boolean block, long timeOut) throws ProcessorException {
+ public void close(boolean block, long timeout) throws ProcessorException {
pool.shutdown();
if (block) {
try {
- if (!pool.awaitTermination(timeOut, TimeUnit.MILLISECONDS)) {
+ if (!pool.awaitTermination(timeout, TimeUnit.MILLISECONDS)) {
throw new ProcessorException("Flush thread pool doesn't exit after "
+ EXIT_WAIT_TIME + " ms");
}
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/engine/pool/MergeManager.java
b/iotdb/src/main/java/org/apache/iotdb/db/engine/pool/MergeManager.java
index 75190f2..44874bb 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/pool/MergeManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/pool/MergeManager.java
@@ -62,16 +62,16 @@ public class MergeManager {
*
* @param block if set block to true, this method will wait for timeOut
milliseconds to close the
* merge pool. false, return directly.
- * @param timeOut block time out in milliseconds.
+ * @param timeout block time out in milliseconds.
* @throws ProcessorException if timeOut reach or interrupted while waiting
to exit.
*/
- public void forceClose(boolean block, long timeOut) throws
ProcessorException {
+ public void forceClose(boolean block, long timeout) throws
ProcessorException {
pool.shutdownNow();
if (block) {
try {
- if (!pool.awaitTermination(timeOut, TimeUnit.MILLISECONDS)) {
+ if (!pool.awaitTermination(timeout, TimeUnit.MILLISECONDS)) {
throw new ProcessorException(
- "Merge thread pool doesn't exit after " + timeOut + " ms");
+ "Merge thread pool doesn't exit after " + timeout + " ms");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();