This is an automated email from the ASF dual-hosted git repository.
hxd pushed a commit to branch cluster-
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/cluster- by this push:
new 81867e2 extract closing thread pool
81867e2 is described below
commit 81867e2bd2fac02c6fd49668e88d49ee169211df
Author: xiangdong huang <[email protected]>
AuthorDate: Thu Aug 19 16:22:54 2021 +0800
extract closing thread pool
---
.../org/apache/iotdb/cluster/ClusterIoTDB.java | 26 ++++++++++------------
.../apache/iotdb/cluster/log/LogDispatcher.java | 5 +++++
2 files changed, 17 insertions(+), 14 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
index e9fd8fe..78a2981 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
@@ -18,6 +18,8 @@
*/
package org.apache.iotdb.cluster;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
import org.apache.iotdb.cluster.client.DataClientProvider;
import org.apache.iotdb.cluster.client.async.AsyncMetaClient;
import org.apache.iotdb.cluster.client.sync.SyncClientAdaptor;
@@ -27,6 +29,7 @@ import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.coordinator.Coordinator;
import org.apache.iotdb.cluster.exception.ConfigInconsistentException;
import org.apache.iotdb.cluster.exception.StartUpCheckFailureException;
+import org.apache.iotdb.cluster.log.LogDispatcher;
import org.apache.iotdb.cluster.metadata.CMManager;
import org.apache.iotdb.cluster.metadata.MetaPuller;
import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
@@ -520,23 +523,18 @@ public class ClusterIoTDB implements ClusterIoTDBMBean {
}
private void stopThreadPools() {
- if (reportThread != null) {
- reportThread.shutdownNow();
- try {
- reportThread.awaitTermination(THREAD_POLL_WAIT_TERMINATION_TIME_S,
TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- logger.error("Unexpected interruption when waiting for reportThread to
end", e);
- }
- }
- if (hardLinkCleanerThread != null) {
- hardLinkCleanerThread.shutdownNow();
+ stopThreadPool(reportThread, "reportThread");
+ stopThreadPool(hardLinkCleanerThread, "hardLinkCleanerThread");
+ }
+
+ private void stopThreadPool(ExecutorService pool, String name) {
+ if (pool != null) {
+ pool.shutdownNow();
try {
- hardLinkCleanerThread.awaitTermination(
- THREAD_POLL_WAIT_TERMINATION_TIME_S, TimeUnit.SECONDS);
+ pool.awaitTermination(THREAD_POLL_WAIT_TERMINATION_TIME_S,
TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
- logger.error("Unexpected interruption when waiting for hardlinkCleaner
to end", e);
+ logger.error("Unexpected interruption when waiting for {} to end",
name, e);
}
}
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
index 2ce4d76..691da8d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
@@ -65,8 +65,11 @@ public class LogDispatcher {
private RaftMember member;
private boolean useBatchInLogCatchUp =
ClusterDescriptor.getInstance().getConfig().isUseBatchInLogCatchUp();
+ // each follower has a queue and a dispatch thread is attached in
executorService.
private List<BlockingQueue<SendLogRequest>> nodeLogQueues = new
ArrayList<>();
private ExecutorService executorService;
+
+ // TODO we have no way to close this pool. should change later.
private static ExecutorService serializationService =
IoTDBThreadPoolFactory.newFixedThreadPoolWithDaemonThread(
Runtime.getRuntime().availableProcessors(), "DispatcherEncoder");
@@ -89,6 +92,8 @@ public class LogDispatcher {
}
public void offer(SendLogRequest log) {
+ // if nodeLogQueues.isEmpty(), then nothing to do.
+
// do serialization here to avoid taking LogManager for too long
if (!nodeLogQueues.isEmpty()) {
log.serializedLogFuture =