This is an automated email from the ASF dual-hosted git repository.
lta pushed a commit to branch cluster_concurrent
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/cluster_concurrent by this
push:
new 5837cb3 fix ut test
5837cb3 is described below
commit 5837cb35a639f959bb94ee9fd603cf4b51f327e6
Author: lta <[email protected]>
AuthorDate: Thu Apr 11 14:18:47 2019 +0800
fix ut test
---
.../cluster/concurrent/pool/QPTaskManager.java | 44 ++++++++++++---------
.../cluster/concurrent/pool/RaftTaskManager.java | 45 ++++++++++++----------
.../apache/iotdb/cluster/config/ClusterConfig.java | 7 ++++
.../org/apache/iotdb/cluster/entity/Server.java | 4 +-
.../cluster/concurrent/pool/QPTaskManagerTest.java | 36 ++++++-----------
5 files changed, 71 insertions(+), 65 deletions(-)
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/concurrent/pool/QPTaskManager.java
b/cluster/src/main/java/org/apache/iotdb/cluster/concurrent/pool/QPTaskManager.java
index 1bef94d..6dd406f 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/concurrent/pool/QPTaskManager.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/concurrent/pool/QPTaskManager.java
@@ -37,25 +37,24 @@ public class QPTaskManager {
private int threadCnt;
private QPTaskManager() {
- ClusterConfig config = ClusterDescriptor.getInstance().getConfig();
- this.threadCnt = config.getConcurrentQPTaskThread();
- pool = IoTDBThreadPoolFactory.newFixedThreadPool(threadCnt,
ThreadName.QP_TASK.getName());
+ init();
}
public static QPTaskManager getInstance() {
return QPTaskManager.InstanceHolder.instance;
}
- /**
- * @throws ProcessorException if the pool is not terminated.
- */
- public void reopen() throws ProcessorException {
- if (!pool.isTerminated()) {
- throw new ProcessorException("QP task Pool is not terminated!");
+ private void checkInit(){
+ if(pool == null){
+ init();
}
+ }
+
+ private void init(){
+
ClusterConfig config = ClusterDescriptor.getInstance().getConfig();
this.threadCnt = config.getConcurrentQPTaskThread();
- pool = IoTDBThreadPoolFactory.newFixedThreadPool(threadCnt,
ThreadName.RAFT_TASK.getName());
+ pool = IoTDBThreadPoolFactory.newFixedThreadPool(threadCnt,
ThreadName.QP_TASK.getName());
}
/**
@@ -67,22 +66,29 @@ public class QPTaskManager {
* @throws ProcessorException if timeOut is reached or being interrupted
while waiting to exit.
*/
public void close(boolean block, long timeOut) throws ProcessorException {
- pool.shutdown();
- if (block) {
+ if(pool != null) {
try {
- if (!pool.awaitTermination(timeOut, TimeUnit.MILLISECONDS)) {
- throw new ProcessorException(
- "QPTask thread pool doesn't exit after " + timeOut + " ms");
+ pool.shutdown();
+ if (block) {
+ try {
+ if (!pool.awaitTermination(timeOut, TimeUnit.MILLISECONDS)) {
+ throw new ProcessorException(
+ "QPTask thread pool doesn't exit after " + timeOut + " ms");
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new ProcessorException(
+ "Interrupted while waiting QPTask thread pool to exit.", e);
+ }
}
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new ProcessorException(
- "Interrupted while waiting QPTask thread pool to exit.", e);
+ } finally {
+ pool = null;
}
}
}
public Future<?> submit(Runnable task) {
+ checkInit();
return pool.submit(task);
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/concurrent/pool/RaftTaskManager.java
b/cluster/src/main/java/org/apache/iotdb/cluster/concurrent/pool/RaftTaskManager.java
index 2d4dbd6..09665aa 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/concurrent/pool/RaftTaskManager.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/concurrent/pool/RaftTaskManager.java
@@ -37,25 +37,23 @@ public class RaftTaskManager {
private int threadCnt;
private RaftTaskManager() {
- ClusterConfig config = ClusterDescriptor.getInstance().getConfig();
- this.threadCnt = config.getConcurrentRaftTaskThread();
- pool = IoTDBThreadPoolFactory.newFixedThreadPool(threadCnt,
ThreadName.RAFT_TASK.getName());
+ init();
}
public static RaftTaskManager getInstance() {
return RaftTaskManager.InstanceHolder.instance;
}
- /**
- * @throws ProcessorException if the pool is not terminated.
- */
- public void reopen() throws ProcessorException {
- if (!pool.isTerminated()) {
- throw new ProcessorException("Raft task Pool is not terminated!");
+ private void checkInit(){
+ if(pool == null){
+ init();
}
+ }
+
+ private void init(){
ClusterConfig config = ClusterDescriptor.getInstance().getConfig();
- this.threadCnt = config.getConcurrentRaftTaskThread();
- pool = IoTDBThreadPoolFactory.newFixedThreadPool(threadCnt,
ThreadName.RAFT_TASK.getName());
+ this.threadCnt = config.getConcurrentQPTaskThread();
+ pool = IoTDBThreadPoolFactory.newFixedThreadPool(threadCnt,
ThreadName.QP_TASK.getName());
}
/**
@@ -67,22 +65,29 @@ public class RaftTaskManager {
* @throws ProcessorException if timeOut is reached or being interrupted
while waiting to exit.
*/
public void close(boolean block, long timeOut) throws ProcessorException {
- pool.shutdown();
- if (block) {
+ if(pool != null) {
try {
- if (!pool.awaitTermination(timeOut, TimeUnit.MILLISECONDS)) {
- throw new ProcessorException(
- "Raft task thread pool doesn't exit after " + timeOut + " ms");
+ pool.shutdown();
+ if (block) {
+ try {
+ if (!pool.awaitTermination(timeOut, TimeUnit.MILLISECONDS)) {
+ throw new ProcessorException(
+ "Raft task thread pool doesn't exit after " + timeOut + "
ms");
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new ProcessorException(
+ "Interrupted while waiting raft task thread pool to exit.", e);
+ }
}
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new ProcessorException(
- "Interrupted while waiting raft task thread pool to exit.", e);
+ } finally {
+ pool = null;
}
}
}
public void execute(Runnable task) {
+ checkInit();
pool.execute(task);
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
index a5069c3..f1670b0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
@@ -347,8 +347,15 @@ public class ClusterConfig {
return closeRaftTaskBlockTimeout;
}
+ public void setCloseRaftTaskBlockTimeout(int closeRaftTaskBlockTimeout) {
+ this.closeRaftTaskBlockTimeout = closeRaftTaskBlockTimeout;
+ }
+
public int getCloseQPTaskBlockTimeout() {
return closeQPTaskBlockTimeout;
}
+ public void setCloseQPTaskBlockTimeout(int closeQPTaskBlockTimeout) {
+ this.closeQPTaskBlockTimeout = closeQPTaskBlockTimeout;
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
b/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
index be98138..54b3e9b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
@@ -122,8 +122,8 @@ public class Server {
}
public void stop() throws ProcessorException {
- RaftTaskManager.getInstance().close(true,
CLUSTER_CONF.getCloseRaftTaskBlockTimeout());
- QPTaskManager.getInstance().close(true,
CLUSTER_CONF.getCloseQPTaskBlockTimeout());
+// RaftTaskManager.getInstance().close(true,
CLUSTER_CONF.getCloseRaftTaskBlockTimeout());
+// QPTaskManager.getInstance().close(true,
CLUSTER_CONF.getCloseQPTaskBlockTimeout());
iotdb.deactivate();
CLIENT_MANAGER.shutdown();
metadataHolder.stop();
diff --git
a/cluster/src/test/java/org/apache/iotdb/cluster/concurrent/pool/QPTaskManagerTest.java
b/cluster/src/test/java/org/apache/iotdb/cluster/concurrent/pool/QPTaskManagerTest.java
index c8b52fc..dd5e717 100644
---
a/cluster/src/test/java/org/apache/iotdb/cluster/concurrent/pool/QPTaskManagerTest.java
+++
b/cluster/src/test/java/org/apache/iotdb/cluster/concurrent/pool/QPTaskManagerTest.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.cluster.concurrent.pool;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
import org.apache.iotdb.cluster.config.ClusterConfig;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
@@ -35,7 +34,7 @@ public class QPTaskManagerTest {
private ClusterConfig clusterConfig =
ClusterDescriptor.getInstance().getConfig();
- private int blockTimeOut = 500;
+ private int blockTimeOut = 10;
private volatile boolean mark = true;
@@ -63,23 +62,16 @@ public class QPTaskManagerTest {
}
@Test
- public void testSubmitAndClose() {
+ public void testSubmitAndClose() throws InterruptedException {
assertEquals(clusterConfig.getConcurrentQPTaskThread(),
qpTaskManager.getThreadCnt());
- int ThradCnt = qpTaskManager.getThreadCnt();
-
- // test reopen
- try {
- qpTaskManager.reopen();
- } catch (ProcessorException e) {
- assertEquals("QP task Pool is not terminated!", e.getMessage());
- }
-
+ int threadCnt = qpTaskManager.getThreadCnt();
// test thread num
- for (int i = 1; i <= ThradCnt + 2; i++) {
+ for (int i = 1; i <= threadCnt + 2; i++) {
qpTaskManager.submit(testRunnable);
- assertEquals(Math.min(i, ThradCnt), qpTaskManager.getActiveCnt());
+ Thread.sleep(10);
+ assertEquals(Math.min(i, threadCnt), qpTaskManager.getActiveCnt());
}
// test close
@@ -90,23 +82,19 @@ public class QPTaskManagerTest {
assert false;
}
- try {
- qpTaskManager.reopen();
- } catch (ProcessorException e) {
- assert false;
- }
-
mark = true;
- for (int i = 1; i <= ThradCnt + 10; i++) {
+ for (int i = 1; i <= threadCnt + 10; i++) {
qpTaskManager.submit(testRunnable);
- assertEquals(Math.min(i, ThradCnt), qpTaskManager.getActiveCnt());
+ Thread.sleep(10);
+ assertEquals(Math.min(i, threadCnt), qpTaskManager.getActiveCnt());
}
try {
- qpTaskManager.close(true, blockTimeOut);
+ new Thread(changeMark).start();
+ qpTaskManager.close(true, blockTimeOut / 10);
} catch (ProcessorException e) {
- assertEquals("QPTask thread pool doesn't exit after 500 ms",
e.getMessage());
+ assertEquals("QPTask thread pool doesn't exit after 1 ms",
e.getMessage());
}
}
}
\ No newline at end of file