This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new c8b50b8938 use cached thread pool to accelerate short-lived
asynchronous tasks (#7870)
c8b50b8938 is described below
commit c8b50b8938ef8f613e29ea3065cf73fe7661e1c2
Author: Alan Choo <[email protected]>
AuthorDate: Mon Nov 7 18:01:12 2022 +0800
use cached thread pool to accelerate short-lived asynchronous tasks (#7870)
---
.../iotdb/commons/concurrent/ThreadName.java | 2 +-
.../org/apache/iotdb/db/engine/StorageEngine.java | 2 +-
.../apache/iotdb/db/engine/StorageEngineV2.java | 90 +++++++++++++---------
3 files changed, 55 insertions(+), 39 deletions(-)
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index 03610f68a9..a6a1119e4b 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -31,7 +31,7 @@ public enum ThreadName {
DATANODE_INTERNAL_RPC_PROCESSOR("DataNodeInternalRPC-Processor"),
INFLUXDB_RPC_SERVICE("InfluxdbRPC-Service"),
INFLUXDB_RPC_PROCESSOR("InfluxdbRPC-Processor"),
- DATA_REGION_RECOVER_SERVICE("Data-Region-Recover"),
+ STORAGE_ENGINE_CACHED_SERVICE("StorageEngine"),
FLUSH_SERVICE("Flush"),
FLUSH_SUB_TASK_SERVICE("Flush-SubTask"),
FLUSH_TASK_SUBMIT("FlushTask-Submit-Pool"),
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 31d781d334..6e0a7cf395 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -210,7 +210,7 @@ public class StorageEngine implements IService {
setAllSgReady(false);
recoveryThreadPool =
IoTDBThreadPoolFactory.newCachedThreadPool(
- ThreadName.DATA_REGION_RECOVER_SERVICE.getName());
+ ThreadName.STORAGE_ENGINE_CACHED_SERVICE.getName());
List<IStorageGroupMNode> sgNodes =
IoTDB.schemaProcessor.getAllStorageGroupNodes();
// init wal recover manager
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java
b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java
index 76468f5fe2..4bf7e2c453 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java
@@ -129,7 +129,8 @@ public class StorageEngineV2 implements IService {
private ScheduledExecutorService unseqMemtableTimedFlushCheckThread;
private TsFileFlushPolicy fileFlushPolicy = new DirectFlushPolicy();
- private ExecutorService recoveryThreadPool;
+ /** used to do short-lived asynchronous tasks */
+ private ExecutorService cachedThreadPool;
// add customized listeners here for flush and close events
private List<CloseFileListener> customCloseFileListeners = new ArrayList<>();
private List<FlushListener> customFlushListeners = new ArrayList<>();
@@ -201,12 +202,12 @@ public class StorageEngineV2 implements IService {
public void recover() {
setAllSgReady(false);
- recoveryThreadPool =
+ cachedThreadPool =
IoTDBThreadPoolFactory.newCachedThreadPool(
- ThreadName.DATA_REGION_RECOVER_SERVICE.getName());
+ ThreadName.STORAGE_ENGINE_CACHED_SERVICE.getName());
List<Future<Void>> futures = new LinkedList<>();
- asyncRecover(recoveryThreadPool, futures);
+ asyncRecover(futures);
// wait until wal is recovered
if (!config.isClusterMode()
@@ -222,24 +223,13 @@ public class StorageEngineV2 implements IService {
Thread recoverEndTrigger =
new Thread(
() -> {
- for (Future<Void> future : futures) {
- try {
- future.get();
- } catch (ExecutionException e) {
- throw new StorageEngineFailureException("StorageEngine
failed to recover.", e);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new StorageEngineFailureException("StorageEngine
failed to recover.", e);
- }
- }
- recoveryThreadPool.shutdown();
+ checkResults(futures, "StorageEngine failed to recover.");
setAllSgReady(true);
});
recoverEndTrigger.start();
}
- private void asyncRecover(ExecutorService pool, List<Future<Void>> futures) {
-
+ private void asyncRecover(List<Future<Void>> futures) {
Map<String, List<DataRegionId>> localDataRegionInfo =
getLocalDataRegionInfo();
localDataRegionInfo.values().forEach(list -> recoverDataRegionNum +=
list.size());
readyDataRegionNum = new AtomicInteger(0);
@@ -265,7 +255,7 @@ public class StorageEngineV2 implements IService {
recoverDataRegionNum);
return null;
};
- futures.add(pool.submit(recoverDataRegionTask));
+ futures.add(cachedThreadPool.submit(recoverDataRegionTask));
}
}
}
@@ -401,7 +391,7 @@ public class StorageEngineV2 implements IService {
seqMemtableTimedFlushCheckThread, ThreadName.TIMED_FlUSH_SEQ_MEMTABLE);
ThreadUtils.stopThreadPool(
unseqMemtableTimedFlushCheckThread,
ThreadName.TIMED_FlUSH_UNSEQ_MEMTABLE);
- recoveryThreadPool.shutdownNow();
+ cachedThreadPool.shutdownNow();
dataRegionMap.clear();
}
@@ -419,7 +409,7 @@ public class StorageEngineV2 implements IService {
shutdownTimedService(ttlCheckThread, "TTlCheckThread");
shutdownTimedService(seqMemtableTimedFlushCheckThread,
"SeqMemtableTimedFlushCheckThread");
shutdownTimedService(unseqMemtableTimedFlushCheckThread,
"UnseqMemtableTimedFlushCheckThread");
- recoveryThreadPool.shutdownNow();
+ cachedThreadPool.shutdownNow();
dataRegionMap.clear();
}
@@ -435,19 +425,6 @@ public class StorageEngineV2 implements IService {
}
}
- private void stopTimedServiceAndThrow(ScheduledExecutorService pool, String
poolName)
- throws ShutdownException {
- if (pool != null) {
- pool.shutdownNow();
- try {
- pool.awaitTermination(30, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- logger.warn("{} still doesn't exit after 30s", poolName);
- throw new ShutdownException(e);
- }
- }
- }
-
@Override
public ServiceType getID() {
return ServiceType.STORAGE_ENGINE_SERVICE;
@@ -493,36 +470,75 @@ public class StorageEngineV2 implements IService {
/** flush command Sync asyncCloseOneProcessor all file node processors. */
public void syncCloseAllProcessor() {
logger.info("Start closing all storage group processor");
+ List<Future<Void>> tasks = new ArrayList<>();
for (DataRegion dataRegion : dataRegionMap.values()) {
if (dataRegion != null) {
- dataRegion.syncCloseAllWorkingTsFileProcessors();
+ tasks.add(
+ cachedThreadPool.submit(
+ () -> {
+ dataRegion.syncCloseAllWorkingTsFileProcessors();
+ return null;
+ }));
}
}
+ checkResults(tasks, "Failed to sync close processor.");
}
public void forceCloseAllProcessor() throws TsFileProcessorException {
logger.info("Start force closing all storage group processor");
+ List<Future<Void>> tasks = new ArrayList<>();
for (DataRegion dataRegion : dataRegionMap.values()) {
if (dataRegion != null) {
- dataRegion.forceCloseAllWorkingTsFileProcessors();
+ tasks.add(
+ cachedThreadPool.submit(
+ () -> {
+ dataRegion.forceCloseAllWorkingTsFileProcessors();
+ return null;
+ }));
}
}
+ checkResults(tasks, "Failed to force close processor.");
}
public void closeStorageGroupProcessor(String storageGroupPath, boolean
isSeq) {
+ List<Future<Void>> tasks = new ArrayList<>();
for (DataRegion dataRegion : dataRegionMap.values()) {
if (dataRegion.getStorageGroupName().equals(storageGroupPath)) {
if (isSeq) {
for (TsFileProcessor tsFileProcessor :
dataRegion.getWorkSequenceTsFileProcessors()) {
- dataRegion.syncCloseOneTsFileProcessor(isSeq, tsFileProcessor);
+ tasks.add(
+ cachedThreadPool.submit(
+ () -> {
+ dataRegion.syncCloseOneTsFileProcessor(isSeq,
tsFileProcessor);
+ return null;
+ }));
}
} else {
for (TsFileProcessor tsFileProcessor :
dataRegion.getWorkUnsequenceTsFileProcessors()) {
- dataRegion.syncCloseOneTsFileProcessor(isSeq, tsFileProcessor);
+ tasks.add(
+ cachedThreadPool.submit(
+ () -> {
+ dataRegion.syncCloseOneTsFileProcessor(isSeq,
tsFileProcessor);
+ return null;
+ }));
}
}
}
}
+ checkResults(tasks, "Failed to close storage group processor.");
+ }
+
+ private <V> void checkResults(List<Future<V>> tasks, String errorMsg) {
+ for (Future<V> task : tasks) {
+ try {
+ task.get();
+ } catch (ExecutionException e) {
+ throw new StorageEngineFailureException(errorMsg, e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new StorageEngineFailureException(errorMsg, e);
+ }
+ }
}
/**