This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch cluster_concurrent
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/cluster_concurrent by this 
push:
     new 03f0f9b  reorganize thread pool manager
03f0f9b is described below

commit 03f0f9bd25fcc492527efe5b3b5c818b705c7db2
Author: lta <[email protected]>
AuthorDate: Thu Apr 11 23:57:53 2019 +0800

    reorganize thread pool manager
---
 .../cluster/concurrent/pool/QPTaskManager.java      | 18 ++++++++++--------
 .../cluster/concurrent/pool/RaftTaskManager.java    | 17 ++++++++++-------
 .../cluster/concurrent/pool/ThreadPoolManager.java  | 21 ++++++++++++---------
 .../cluster/concurrent/pool/QPTaskManagerTest.java  |  8 ++++----
 .../apache/iotdb/db/engine/pool/FlushManager.java   |  6 +++---
 .../apache/iotdb/db/engine/pool/MergeManager.java   |  8 ++++----
 6 files changed, 43 insertions(+), 35 deletions(-)

diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/concurrent/pool/QPTaskManager.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/concurrent/pool/QPTaskManager.java
index 198af19..cc26913 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/concurrent/pool/QPTaskManager.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/concurrent/pool/QPTaskManager.java
@@ -38,14 +38,6 @@ public class QPTaskManager extends ThreadPoolManager {
     return QPTaskManager.InstanceHolder.instance;
   }
 
-  @Override
-  public void init() {
-
-    ClusterConfig config = ClusterDescriptor.getInstance().getConfig();
-    this.threadCnt = config.getConcurrentQPSubTaskThread();
-    pool = IoTDBThreadPoolFactory.newFixedThreadPool(threadCnt, 
ThreadName.QP_TASK.getName());
-  }
-
   /**
    * Name of Pool Manager
    */
@@ -54,6 +46,16 @@ public class QPTaskManager extends ThreadPoolManager {
     return MANAGER_NAME;
   }
 
+  @Override
+  public String getThreadName() {
+    return ThreadName.QP_TASK.getName();
+  }
+
+  @Override
+  public int getThreadPoolSize() {
+    return 
ClusterDescriptor.getInstance().getConfig().getConcurrentQPSubTaskThread();
+  }
+
   private static class InstanceHolder {
 
     private InstanceHolder() {
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/concurrent/pool/RaftTaskManager.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/concurrent/pool/RaftTaskManager.java
index 492c11d..62259cd 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/concurrent/pool/RaftTaskManager.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/concurrent/pool/RaftTaskManager.java
@@ -38,13 +38,6 @@ public class RaftTaskManager extends ThreadPoolManager{
     return RaftTaskManager.InstanceHolder.instance;
   }
 
-  @Override
-  public void init() {
-    ClusterConfig config = ClusterDescriptor.getInstance().getConfig();
-    int threadCnt = config.getConcurrentQPSubTaskThread();
-    pool = IoTDBThreadPoolFactory.newFixedThreadPool(threadCnt, 
ThreadName.RAFT_TASK.getName());
-  }
-
   /**
    * Name of Pool Manager
    */
@@ -53,6 +46,16 @@ public class RaftTaskManager extends ThreadPoolManager{
     return MANAGER_NAME;
   }
 
+  @Override
+  public String getThreadName() {
+    return ThreadName.RAFT_TASK.getName();
+  }
+
+  @Override
+  public int getThreadPoolSize() {
+    return 
ClusterDescriptor.getInstance().getConfig().getConcurrentQPSubTaskThread();
+  }
+
   private static class InstanceHolder {
 
     private InstanceHolder() {
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/concurrent/pool/ThreadPoolManager.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/concurrent/pool/ThreadPoolManager.java
index 025e0d5..a6dfa42 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/concurrent/pool/ThreadPoolManager.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/concurrent/pool/ThreadPoolManager.java
@@ -22,12 +22,12 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.exception.ProcessorException;
 
 public abstract class ThreadPoolManager {
 
   ExecutorService pool;
-  int threadCnt;
 
   private void checkInit() {
     if (pool == null) {
@@ -38,27 +38,29 @@ public abstract class ThreadPoolManager {
   /**
    * Init pool manager
    */
-  public abstract void init();
+  public void init(){
+    pool = IoTDBThreadPoolFactory.newFixedThreadPool(getThreadPoolSize(), 
getThreadName());
+  }
 
   /**
    * Block new submits and exit when all RUNNING THREADS AND TASKS IN THE 
QUEUE end.
    *
    * @param block if set to true, this method will wait for timeOut 
milliseconds. false, return
    * directly.
-   * @param timeOut block time out in milliseconds.
+   * @param timeout block time out in milliseconds.
    * @throws ProcessorException if timeOut is reached or being interrupted 
while waiting to exit.
    */
-  public void close(boolean block, long timeOut) throws ProcessorException {
+  public void close(boolean block, long timeout) throws ProcessorException {
     if (pool != null) {
       try {
         pool.shutdown();
         if (block) {
           try {
-            if (!pool.awaitTermination(timeOut, TimeUnit.MILLISECONDS)) {
+            if (!pool.awaitTermination(timeout, TimeUnit.MILLISECONDS)) {
               throw new ProcessorException(
                   String
                       .format("%s thread pool doesn't exit after %d ms", 
getManagerName(),
-                          timeOut));
+                          timeout));
             }
           } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
@@ -79,6 +81,10 @@ public abstract class ThreadPoolManager {
    */
   public abstract String getManagerName();
 
+  public abstract String getThreadName();
+
+  public abstract int getThreadPoolSize();
+
   public void execute(Runnable task) {
     checkInit();
     pool.execute(task);
@@ -93,7 +99,4 @@ public abstract class ThreadPoolManager {
     return ((ThreadPoolExecutor) pool).getActiveCount();
   }
 
-  public int getThreadCnt() {
-    return threadCnt;
-  }
 }
diff --git 
a/cluster/src/test/java/org/apache/iotdb/cluster/concurrent/pool/QPTaskManagerTest.java
 
b/cluster/src/test/java/org/apache/iotdb/cluster/concurrent/pool/QPTaskManagerTest.java
index 6996c91..148d25d 100644
--- 
a/cluster/src/test/java/org/apache/iotdb/cluster/concurrent/pool/QPTaskManagerTest.java
+++ 
b/cluster/src/test/java/org/apache/iotdb/cluster/concurrent/pool/QPTaskManagerTest.java
@@ -64,14 +64,14 @@ public class QPTaskManagerTest {
   @Test
   public void testSubmitAndClose() throws InterruptedException {
 
-    assertEquals(clusterConfig.getConcurrentQPSubTaskThread(), 
qpTaskManager.getThreadCnt());
+    assertEquals(clusterConfig.getConcurrentQPSubTaskThread(), 
qpTaskManager.getThreadPoolSize());
 
-    int threadCnt = qpTaskManager.getThreadCnt();
+    int threadPoolSize = qpTaskManager.getThreadPoolSize();
     // test thread num
-    for (int i = 1; i <= threadCnt + 2; i++) {
+    for (int i = 1; i <= threadPoolSize + 2; i++) {
       qpTaskManager.submit(testRunnable);
       Thread.sleep(10);
-      assertEquals(Math.min(i, threadCnt), qpTaskManager.getActiveCnt());
+      assertEquals(Math.min(i, threadPoolSize), qpTaskManager.getActiveCnt());
     }
 
     // test close
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/engine/pool/FlushManager.java 
b/iotdb/src/main/java/org/apache/iotdb/db/engine/pool/FlushManager.java
index 7c996aa..86feaf0 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/pool/FlushManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/pool/FlushManager.java
@@ -94,16 +94,16 @@ public class FlushManager {
    *
    * @param block
    *            if set to true, this method will wait for timeOut milliseconds.
-   * @param timeOut
+   * @param timeout
    *            block time out in milliseconds.
    * @throws ProcessorException
    *             if timeOut is reached or being interrupted while waiting to 
exit.
    */
-  public void close(boolean block, long timeOut) throws ProcessorException {
+  public void close(boolean block, long timeout) throws ProcessorException {
     pool.shutdown();
     if (block) {
       try {
-        if (!pool.awaitTermination(timeOut, TimeUnit.MILLISECONDS)) {
+        if (!pool.awaitTermination(timeout, TimeUnit.MILLISECONDS)) {
           throw new ProcessorException("Flush thread pool doesn't exit after "
               + EXIT_WAIT_TIME + " ms");
         }
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/engine/pool/MergeManager.java 
b/iotdb/src/main/java/org/apache/iotdb/db/engine/pool/MergeManager.java
index 75190f2..44874bb 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/pool/MergeManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/pool/MergeManager.java
@@ -62,16 +62,16 @@ public class MergeManager {
    *
    * @param block if set block to true, this method will wait for timeOut 
milliseconds to close the
    * merge pool. false, return directly.
-   * @param timeOut block time out in milliseconds.
+   * @param timeout block time out in milliseconds.
    * @throws ProcessorException if timeOut reach or interrupted while waiting 
to exit.
    */
-  public void forceClose(boolean block, long timeOut) throws 
ProcessorException {
+  public void forceClose(boolean block, long timeout) throws 
ProcessorException {
     pool.shutdownNow();
     if (block) {
       try {
-        if (!pool.awaitTermination(timeOut, TimeUnit.MILLISECONDS)) {
+        if (!pool.awaitTermination(timeout, TimeUnit.MILLISECONDS)) {
           throw new ProcessorException(
-              "Merge thread pool doesn't exit after " + timeOut + " ms");
+              "Merge thread pool doesn't exit after " + timeout + " ms");
         }
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();

Reply via email to