This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new c8b50b8938 use cached thread pool to accelerate short-lived 
asynchronous tasks (#7870)
c8b50b8938 is described below

commit c8b50b8938ef8f613e29ea3065cf73fe7661e1c2
Author: Alan Choo <[email protected]>
AuthorDate: Mon Nov 7 18:01:12 2022 +0800

    use cached thread pool to accelerate short-lived asynchronous tasks (#7870)
---
 .../iotdb/commons/concurrent/ThreadName.java       |  2 +-
 .../org/apache/iotdb/db/engine/StorageEngine.java  |  2 +-
 .../apache/iotdb/db/engine/StorageEngineV2.java    | 90 +++++++++++++---------
 3 files changed, 55 insertions(+), 39 deletions(-)

diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index 03610f68a9..a6a1119e4b 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -31,7 +31,7 @@ public enum ThreadName {
   DATANODE_INTERNAL_RPC_PROCESSOR("DataNodeInternalRPC-Processor"),
   INFLUXDB_RPC_SERVICE("InfluxdbRPC-Service"),
   INFLUXDB_RPC_PROCESSOR("InfluxdbRPC-Processor"),
-  DATA_REGION_RECOVER_SERVICE("Data-Region-Recover"),
+  STORAGE_ENGINE_CACHED_SERVICE("StorageEngine"),
   FLUSH_SERVICE("Flush"),
   FLUSH_SUB_TASK_SERVICE("Flush-SubTask"),
   FLUSH_TASK_SUBMIT("FlushTask-Submit-Pool"),
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java 
b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 31d781d334..6e0a7cf395 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -210,7 +210,7 @@ public class StorageEngine implements IService {
     setAllSgReady(false);
     recoveryThreadPool =
         IoTDBThreadPoolFactory.newCachedThreadPool(
-            ThreadName.DATA_REGION_RECOVER_SERVICE.getName());
+            ThreadName.STORAGE_ENGINE_CACHED_SERVICE.getName());
 
     List<IStorageGroupMNode> sgNodes = 
IoTDB.schemaProcessor.getAllStorageGroupNodes();
     // init wal recover manager
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java 
b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java
index 76468f5fe2..4bf7e2c453 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java
@@ -129,7 +129,8 @@ public class StorageEngineV2 implements IService {
   private ScheduledExecutorService unseqMemtableTimedFlushCheckThread;
 
   private TsFileFlushPolicy fileFlushPolicy = new DirectFlushPolicy();
-  private ExecutorService recoveryThreadPool;
+  /** used to do short-lived asynchronous tasks */
+  private ExecutorService cachedThreadPool;
   // add customized listeners here for flush and close events
   private List<CloseFileListener> customCloseFileListeners = new ArrayList<>();
   private List<FlushListener> customFlushListeners = new ArrayList<>();
@@ -201,12 +202,12 @@ public class StorageEngineV2 implements IService {
 
   public void recover() {
     setAllSgReady(false);
-    recoveryThreadPool =
+    cachedThreadPool =
         IoTDBThreadPoolFactory.newCachedThreadPool(
-            ThreadName.DATA_REGION_RECOVER_SERVICE.getName());
+            ThreadName.STORAGE_ENGINE_CACHED_SERVICE.getName());
 
     List<Future<Void>> futures = new LinkedList<>();
-    asyncRecover(recoveryThreadPool, futures);
+    asyncRecover(futures);
 
     // wait until wal is recovered
     if (!config.isClusterMode()
@@ -222,24 +223,13 @@ public class StorageEngineV2 implements IService {
     Thread recoverEndTrigger =
         new Thread(
             () -> {
-              for (Future<Void> future : futures) {
-                try {
-                  future.get();
-                } catch (ExecutionException e) {
-                  throw new StorageEngineFailureException("StorageEngine 
failed to recover.", e);
-                } catch (InterruptedException e) {
-                  Thread.currentThread().interrupt();
-                  throw new StorageEngineFailureException("StorageEngine 
failed to recover.", e);
-                }
-              }
-              recoveryThreadPool.shutdown();
+              checkResults(futures, "StorageEngine failed to recover.");
               setAllSgReady(true);
             });
     recoverEndTrigger.start();
   }
 
-  private void asyncRecover(ExecutorService pool, List<Future<Void>> futures) {
-
+  private void asyncRecover(List<Future<Void>> futures) {
     Map<String, List<DataRegionId>> localDataRegionInfo = 
getLocalDataRegionInfo();
     localDataRegionInfo.values().forEach(list -> recoverDataRegionNum += 
list.size());
     readyDataRegionNum = new AtomicInteger(0);
@@ -265,7 +255,7 @@ public class StorageEngineV2 implements IService {
                   recoverDataRegionNum);
               return null;
             };
-        futures.add(pool.submit(recoverDataRegionTask));
+        futures.add(cachedThreadPool.submit(recoverDataRegionTask));
       }
     }
   }
@@ -401,7 +391,7 @@ public class StorageEngineV2 implements IService {
         seqMemtableTimedFlushCheckThread, ThreadName.TIMED_FlUSH_SEQ_MEMTABLE);
     ThreadUtils.stopThreadPool(
         unseqMemtableTimedFlushCheckThread, 
ThreadName.TIMED_FlUSH_UNSEQ_MEMTABLE);
-    recoveryThreadPool.shutdownNow();
+    cachedThreadPool.shutdownNow();
     dataRegionMap.clear();
   }
 
@@ -419,7 +409,7 @@ public class StorageEngineV2 implements IService {
     shutdownTimedService(ttlCheckThread, "TTlCheckThread");
     shutdownTimedService(seqMemtableTimedFlushCheckThread, 
"SeqMemtableTimedFlushCheckThread");
     shutdownTimedService(unseqMemtableTimedFlushCheckThread, 
"UnseqMemtableTimedFlushCheckThread");
-    recoveryThreadPool.shutdownNow();
+    cachedThreadPool.shutdownNow();
     dataRegionMap.clear();
   }
 
@@ -435,19 +425,6 @@ public class StorageEngineV2 implements IService {
     }
   }
 
-  private void stopTimedServiceAndThrow(ScheduledExecutorService pool, String 
poolName)
-      throws ShutdownException {
-    if (pool != null) {
-      pool.shutdownNow();
-      try {
-        pool.awaitTermination(30, TimeUnit.SECONDS);
-      } catch (InterruptedException e) {
-        logger.warn("{} still doesn't exit after 30s", poolName);
-        throw new ShutdownException(e);
-      }
-    }
-  }
-
   @Override
   public ServiceType getID() {
     return ServiceType.STORAGE_ENGINE_SERVICE;
@@ -493,36 +470,75 @@ public class StorageEngineV2 implements IService {
   /** flush command Sync asyncCloseOneProcessor all file node processors. */
   public void syncCloseAllProcessor() {
     logger.info("Start closing all storage group processor");
+    List<Future<Void>> tasks = new ArrayList<>();
     for (DataRegion dataRegion : dataRegionMap.values()) {
       if (dataRegion != null) {
-        dataRegion.syncCloseAllWorkingTsFileProcessors();
+        tasks.add(
+            cachedThreadPool.submit(
+                () -> {
+                  dataRegion.syncCloseAllWorkingTsFileProcessors();
+                  return null;
+                }));
       }
     }
+    checkResults(tasks, "Failed to sync close processor.");
   }
 
   public void forceCloseAllProcessor() throws TsFileProcessorException {
     logger.info("Start force closing all storage group processor");
+    List<Future<Void>> tasks = new ArrayList<>();
     for (DataRegion dataRegion : dataRegionMap.values()) {
       if (dataRegion != null) {
-        dataRegion.forceCloseAllWorkingTsFileProcessors();
+        tasks.add(
+            cachedThreadPool.submit(
+                () -> {
+                  dataRegion.forceCloseAllWorkingTsFileProcessors();
+                  return null;
+                }));
       }
     }
+    checkResults(tasks, "Failed to force close processor.");
   }
 
   public void closeStorageGroupProcessor(String storageGroupPath, boolean 
isSeq) {
+    List<Future<Void>> tasks = new ArrayList<>();
     for (DataRegion dataRegion : dataRegionMap.values()) {
       if (dataRegion.getStorageGroupName().equals(storageGroupPath)) {
         if (isSeq) {
           for (TsFileProcessor tsFileProcessor : 
dataRegion.getWorkSequenceTsFileProcessors()) {
-            dataRegion.syncCloseOneTsFileProcessor(isSeq, tsFileProcessor);
+            tasks.add(
+                cachedThreadPool.submit(
+                    () -> {
+                      dataRegion.syncCloseOneTsFileProcessor(isSeq, 
tsFileProcessor);
+                      return null;
+                    }));
           }
         } else {
           for (TsFileProcessor tsFileProcessor : 
dataRegion.getWorkUnsequenceTsFileProcessors()) {
-            dataRegion.syncCloseOneTsFileProcessor(isSeq, tsFileProcessor);
+            tasks.add(
+                cachedThreadPool.submit(
+                    () -> {
+                      dataRegion.syncCloseOneTsFileProcessor(isSeq, 
tsFileProcessor);
+                      return null;
+                    }));
           }
         }
       }
     }
+    checkResults(tasks, "Failed to close storage group processor.");
+  }
+
+  private <V> void checkResults(List<Future<V>> tasks, String errorMsg) {
+    for (Future<V> task : tasks) {
+      try {
+        task.get();
+      } catch (ExecutionException e) {
+        throw new StorageEngineFailureException(errorMsg, e);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new StorageEngineFailureException(errorMsg, e);
+      }
+    }
   }
 
   /**

Reply via email to