This is an automated email from the ASF dual-hosted git repository.

hxd pushed a commit to branch cluster-
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/cluster- by this push:
     new 81867e2  extract closing thread pool
81867e2 is described below

commit 81867e2bd2fac02c6fd49668e88d49ee169211df
Author: xiangdong huang <[email protected]>
AuthorDate: Thu Aug 19 16:22:54 2021 +0800

    extract closing thread pool
---
 .../org/apache/iotdb/cluster/ClusterIoTDB.java     | 26 ++++++++++------------
 .../apache/iotdb/cluster/log/LogDispatcher.java    |  5 +++++
 2 files changed, 17 insertions(+), 14 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
index e9fd8fe..78a2981 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
@@ -18,6 +18,8 @@
  */
 package org.apache.iotdb.cluster;
 
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
 import org.apache.iotdb.cluster.client.DataClientProvider;
 import org.apache.iotdb.cluster.client.async.AsyncMetaClient;
 import org.apache.iotdb.cluster.client.sync.SyncClientAdaptor;
@@ -27,6 +29,7 @@ import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.coordinator.Coordinator;
 import org.apache.iotdb.cluster.exception.ConfigInconsistentException;
 import org.apache.iotdb.cluster.exception.StartUpCheckFailureException;
+import org.apache.iotdb.cluster.log.LogDispatcher;
 import org.apache.iotdb.cluster.metadata.CMManager;
 import org.apache.iotdb.cluster.metadata.MetaPuller;
 import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
@@ -520,23 +523,18 @@ public class ClusterIoTDB implements ClusterIoTDBMBean {
   }
 
   private void stopThreadPools() {
-    if (reportThread != null) {
-      reportThread.shutdownNow();
-      try {
-        reportThread.awaitTermination(THREAD_POLL_WAIT_TERMINATION_TIME_S, 
TimeUnit.SECONDS);
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        logger.error("Unexpected interruption when waiting for reportThread to 
end", e);
-      }
-    }
-    if (hardLinkCleanerThread != null) {
-      hardLinkCleanerThread.shutdownNow();
+    stopThreadPool(reportThread, "reportThread");
+    stopThreadPool(hardLinkCleanerThread, "hardLinkCleanerThread");
+  }
+
+  private void stopThreadPool(ExecutorService pool, String name) {
+    if (pool != null) {
+      pool.shutdownNow();
       try {
-        hardLinkCleanerThread.awaitTermination(
-            THREAD_POLL_WAIT_TERMINATION_TIME_S, TimeUnit.SECONDS);
+        pool.awaitTermination(THREAD_POLL_WAIT_TERMINATION_TIME_S, 
TimeUnit.SECONDS);
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
-        logger.error("Unexpected interruption when waiting for hardlinkCleaner 
to end", e);
+        logger.error("Unexpected interruption when waiting for {} to end", 
name, e);
       }
     }
   }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
index 2ce4d76..691da8d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
@@ -65,8 +65,11 @@ public class LogDispatcher {
   private RaftMember member;
   private boolean useBatchInLogCatchUp =
       ClusterDescriptor.getInstance().getConfig().isUseBatchInLogCatchUp();
+  // each follower has a queue and a dispatch thread is attached in 
executorService.
   private List<BlockingQueue<SendLogRequest>> nodeLogQueues = new 
ArrayList<>();
   private ExecutorService executorService;
+
+  // TODO we have no way to close this pool. should change later.
   private static ExecutorService serializationService =
       IoTDBThreadPoolFactory.newFixedThreadPoolWithDaemonThread(
           Runtime.getRuntime().availableProcessors(), "DispatcherEncoder");
@@ -89,6 +92,8 @@ public class LogDispatcher {
   }
 
   public void offer(SendLogRequest log) {
+    // if nodeLogQueues.isEmpty(), then nothing to do.
+
     // do serialization here to avoid taking LogManager for too long
     if (!nodeLogQueues.isEmpty()) {
       log.serializedLogFuture =

Reply via email to