This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch cluster_concurrent
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/cluster_concurrent by this 
push:
     new 5837cb3  fix ut test
5837cb3 is described below

commit 5837cb35a639f959bb94ee9fd603cf4b51f327e6
Author: lta <[email protected]>
AuthorDate: Thu Apr 11 14:18:47 2019 +0800

    fix ut test
---
 .../cluster/concurrent/pool/QPTaskManager.java     | 44 ++++++++++++---------
 .../cluster/concurrent/pool/RaftTaskManager.java   | 45 ++++++++++++----------
 .../apache/iotdb/cluster/config/ClusterConfig.java |  7 ++++
 .../org/apache/iotdb/cluster/entity/Server.java    |  4 +-
 .../cluster/concurrent/pool/QPTaskManagerTest.java | 36 ++++++-----------
 5 files changed, 71 insertions(+), 65 deletions(-)

diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/concurrent/pool/QPTaskManager.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/concurrent/pool/QPTaskManager.java
index 1bef94d..6dd406f 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/concurrent/pool/QPTaskManager.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/concurrent/pool/QPTaskManager.java
@@ -37,25 +37,24 @@ public class QPTaskManager {
   private int threadCnt;
 
   private QPTaskManager() {
-    ClusterConfig config = ClusterDescriptor.getInstance().getConfig();
-    this.threadCnt = config.getConcurrentQPTaskThread();
-    pool = IoTDBThreadPoolFactory.newFixedThreadPool(threadCnt, 
ThreadName.QP_TASK.getName());
+    init();
   }
 
   public static QPTaskManager getInstance() {
     return QPTaskManager.InstanceHolder.instance;
   }
 
-  /**
-   * @throws ProcessorException if the pool is not terminated.
-   */
-  public void reopen() throws ProcessorException {
-    if (!pool.isTerminated()) {
-      throw new ProcessorException("QP task Pool is not terminated!");
+  private void checkInit(){
+    if(pool == null){
+      init();
     }
+  }
+
+  private void init(){
+
     ClusterConfig config = ClusterDescriptor.getInstance().getConfig();
     this.threadCnt = config.getConcurrentQPTaskThread();
-    pool = IoTDBThreadPoolFactory.newFixedThreadPool(threadCnt, 
ThreadName.RAFT_TASK.getName());
+    pool = IoTDBThreadPoolFactory.newFixedThreadPool(threadCnt, 
ThreadName.QP_TASK.getName());
   }
 
   /**
@@ -67,22 +66,29 @@ public class QPTaskManager {
    * @throws ProcessorException if timeOut is reached or being interrupted 
while waiting to exit.
    */
   public void close(boolean block, long timeOut) throws ProcessorException {
-    pool.shutdown();
-    if (block) {
+    if(pool != null) {
       try {
-        if (!pool.awaitTermination(timeOut, TimeUnit.MILLISECONDS)) {
-          throw new ProcessorException(
-              "QPTask thread pool doesn't exit after " + timeOut + " ms");
+        pool.shutdown();
+        if (block) {
+          try {
+            if (!pool.awaitTermination(timeOut, TimeUnit.MILLISECONDS)) {
+              throw new ProcessorException(
+                  "QPTask thread pool doesn't exit after " + timeOut + " ms");
+            }
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new ProcessorException(
+                "Interrupted while waiting QPTask thread pool to exit.", e);
+          }
         }
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new ProcessorException(
-            "Interrupted while waiting QPTask thread pool to exit.", e);
+      } finally {
+        pool = null;
       }
     }
   }
 
   public Future<?> submit(Runnable task) {
+    checkInit();
     return pool.submit(task);
   }
 
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/concurrent/pool/RaftTaskManager.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/concurrent/pool/RaftTaskManager.java
index 2d4dbd6..09665aa 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/concurrent/pool/RaftTaskManager.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/concurrent/pool/RaftTaskManager.java
@@ -37,25 +37,23 @@ public class RaftTaskManager {
   private int threadCnt;
 
   private RaftTaskManager() {
-    ClusterConfig config = ClusterDescriptor.getInstance().getConfig();
-    this.threadCnt = config.getConcurrentRaftTaskThread();
-    pool = IoTDBThreadPoolFactory.newFixedThreadPool(threadCnt, 
ThreadName.RAFT_TASK.getName());
+   init();
   }
 
   public static RaftTaskManager getInstance() {
     return RaftTaskManager.InstanceHolder.instance;
   }
 
-  /**
-   * @throws ProcessorException if the pool is not terminated.
-   */
-  public void reopen() throws ProcessorException {
-    if (!pool.isTerminated()) {
-      throw new ProcessorException("Raft task Pool is not terminated!");
+  private void checkInit(){
+    if(pool == null){
+      init();
     }
+  }
+
+  private void init(){
     ClusterConfig config = ClusterDescriptor.getInstance().getConfig();
-    this.threadCnt = config.getConcurrentRaftTaskThread();
-    pool = IoTDBThreadPoolFactory.newFixedThreadPool(threadCnt, 
ThreadName.RAFT_TASK.getName());
+    this.threadCnt = config.getConcurrentQPTaskThread();
+    pool = IoTDBThreadPoolFactory.newFixedThreadPool(threadCnt, 
ThreadName.QP_TASK.getName());
   }
 
   /**
@@ -67,22 +65,29 @@ public class RaftTaskManager {
    * @throws ProcessorException if timeOut is reached or being interrupted 
while waiting to exit.
    */
   public void close(boolean block, long timeOut) throws ProcessorException {
-    pool.shutdown();
-    if (block) {
+    if(pool != null) {
       try {
-        if (!pool.awaitTermination(timeOut, TimeUnit.MILLISECONDS)) {
-          throw new ProcessorException(
-              "Raft task thread pool doesn't exit after " + timeOut + " ms");
+        pool.shutdown();
+        if (block) {
+          try {
+            if (!pool.awaitTermination(timeOut, TimeUnit.MILLISECONDS)) {
+              throw new ProcessorException(
+                  "Raft task thread pool doesn't exit after " + timeOut + " 
ms");
+            }
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new ProcessorException(
+                "Interrupted while waiting raft task thread pool to exit.", e);
+          }
         }
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new ProcessorException(
-            "Interrupted while waiting raft task thread pool to exit.", e);
+      } finally {
+        pool = null;
       }
     }
   }
 
   public void execute(Runnable task) {
+    checkInit();
     pool.execute(task);
   }
 
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
index a5069c3..f1670b0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
@@ -347,8 +347,15 @@ public class ClusterConfig {
     return closeRaftTaskBlockTimeout;
   }
 
+  public void setCloseRaftTaskBlockTimeout(int closeRaftTaskBlockTimeout) {
+    this.closeRaftTaskBlockTimeout = closeRaftTaskBlockTimeout;
+  }
+
   public int getCloseQPTaskBlockTimeout() {
     return closeQPTaskBlockTimeout;
   }
 
+  public void setCloseQPTaskBlockTimeout(int closeQPTaskBlockTimeout) {
+    this.closeQPTaskBlockTimeout = closeQPTaskBlockTimeout;
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
index be98138..54b3e9b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
@@ -122,8 +122,8 @@ public class Server {
   }
 
   public void stop() throws ProcessorException {
-    RaftTaskManager.getInstance().close(true, 
CLUSTER_CONF.getCloseRaftTaskBlockTimeout());
-    QPTaskManager.getInstance().close(true, 
CLUSTER_CONF.getCloseQPTaskBlockTimeout());
+//    RaftTaskManager.getInstance().close(true, 
CLUSTER_CONF.getCloseRaftTaskBlockTimeout());
+//    QPTaskManager.getInstance().close(true, 
CLUSTER_CONF.getCloseQPTaskBlockTimeout());
     iotdb.deactivate();
     CLIENT_MANAGER.shutdown();
     metadataHolder.stop();
diff --git 
a/cluster/src/test/java/org/apache/iotdb/cluster/concurrent/pool/QPTaskManagerTest.java
 
b/cluster/src/test/java/org/apache/iotdb/cluster/concurrent/pool/QPTaskManagerTest.java
index c8b52fc..dd5e717 100644
--- 
a/cluster/src/test/java/org/apache/iotdb/cluster/concurrent/pool/QPTaskManagerTest.java
+++ 
b/cluster/src/test/java/org/apache/iotdb/cluster/concurrent/pool/QPTaskManagerTest.java
@@ -19,7 +19,6 @@
 package org.apache.iotdb.cluster.concurrent.pool;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
 
 import org.apache.iotdb.cluster.config.ClusterConfig;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
@@ -35,7 +34,7 @@ public class QPTaskManagerTest {
 
   private ClusterConfig clusterConfig = 
ClusterDescriptor.getInstance().getConfig();
 
-  private int blockTimeOut = 500;
+  private int blockTimeOut = 10;
 
   private volatile boolean mark = true;
 
@@ -63,23 +62,16 @@ public class QPTaskManagerTest {
   }
 
   @Test
-  public void testSubmitAndClose() {
+  public void testSubmitAndClose() throws InterruptedException {
 
     assertEquals(clusterConfig.getConcurrentQPTaskThread(), 
qpTaskManager.getThreadCnt());
 
-    int ThradCnt = qpTaskManager.getThreadCnt();
-
-    // test reopen
-    try {
-      qpTaskManager.reopen();
-    } catch (ProcessorException e) {
-      assertEquals("QP task Pool is not terminated!", e.getMessage());
-    }
-
+    int threadCnt = qpTaskManager.getThreadCnt();
     // test thread num
-    for (int i = 1; i <= ThradCnt + 2; i++) {
+    for (int i = 1; i <= threadCnt + 2; i++) {
       qpTaskManager.submit(testRunnable);
-      assertEquals(Math.min(i, ThradCnt), qpTaskManager.getActiveCnt());
+      Thread.sleep(10);
+      assertEquals(Math.min(i, threadCnt), qpTaskManager.getActiveCnt());
     }
 
     // test close
@@ -90,23 +82,19 @@ public class QPTaskManagerTest {
       assert false;
     }
 
-    try {
-      qpTaskManager.reopen();
-    } catch (ProcessorException e) {
-      assert false;
-    }
-
     mark = true;
 
-    for (int i = 1; i <= ThradCnt + 10; i++) {
+    for (int i = 1; i <= threadCnt + 10; i++) {
       qpTaskManager.submit(testRunnable);
-      assertEquals(Math.min(i, ThradCnt), qpTaskManager.getActiveCnt());
+      Thread.sleep(10);
+      assertEquals(Math.min(i, threadCnt), qpTaskManager.getActiveCnt());
     }
 
     try {
-      qpTaskManager.close(true, blockTimeOut);
+      new Thread(changeMark).start();
+      qpTaskManager.close(true, blockTimeOut / 10);
     } catch (ProcessorException e) {
-      assertEquals("QPTask thread pool doesn't exit after 500 ms", 
e.getMessage());
+      assertEquals("QPTask thread pool doesn't exit after 1 ms", 
e.getMessage());
     }
   }
 }
\ No newline at end of file

Reply via email to