This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new c4574b6567a Fix active load cleanup lifecycle (#17947)
c4574b6567a is described below
commit c4574b6567a56bee8b245eeea96602c4cea2aad0
Author: Caideyipi <[email protected]>
AuthorDate: Tue Jun 16 16:28:16 2026 +0800
Fix active load cleanup lifecycle (#17947)
---
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 10 ++++++--
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 6 +++++
.../iotdb/db/storageengine/StorageEngine.java | 3 +++
.../db/storageengine/load/LoadTsFileManager.java | 13 +++++++++++
.../storageengine/load/active/ActiveLoadAgent.java | 6 +++++
.../load/active/ActiveLoadDirScanner.java | 17 ++++++++++++--
.../active/ActiveLoadScheduledExecutorService.java | 21 ++++++++++++++---
.../load/active/ActiveLoadTsFileLoader.java | 27 +++++++++++++++++++++-
.../src/test/resources/iotdb-system.properties | 1 +
.../conf/iotdb-system.properties.template | 12 ++++++++++
10 files changed, 108 insertions(+), 8 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index e45085bd4be..7aadfebc917 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -4163,11 +4163,17 @@ public class IoTDBConfig {
}
public String getLoadActiveListeningPipeDir() {
- return loadActiveListeningPipeDir;
+ return loadActiveListeningPipeDir == null ||
Objects.equals(loadActiveListeningPipeDir, "")
+ ? extDir
+ + File.separator
+ + IoTDBConstant.LOAD_TSFILE_FOLDER_NAME
+ + File.separator
+ + IoTDBConstant.PIPE_FOLDER_NAME
+ : loadActiveListeningPipeDir;
}
public void setLoadActiveListeningPipeDir(String loadActiveListeningPipeDir)
{
- this.loadActiveListeningPipeDir = loadActiveListeningPipeDir;
+ this.loadActiveListeningPipeDir =
addDataHomeDir(loadActiveListeningPipeDir);
}
public String[] getLoadActiveListeningDirs() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 068d44d0540..13352020a15 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -2503,6 +2503,9 @@ public class IoTDBDescriptor {
conf.setLoadActiveListeningFailDir(
properties.getProperty(
"load_active_listening_fail_dir",
conf.getLoadActiveListeningFailDir()));
+ conf.setLoadActiveListeningPipeDir(
+ properties.getProperty(
+ "load_active_listening_pipe_dir",
conf.getLoadActiveListeningPipeDir()));
final long loadActiveListeningCheckIntervalSeconds =
Long.parseLong(
@@ -2637,6 +2640,9 @@ public class IoTDBDescriptor {
properties.getProperty(
"load_active_listening_fail_dir",
ConfigurationFileUtils.getConfigurationDefaultValue("load_active_listening_fail_dir")));
+ conf.setLoadActiveListeningPipeDir(
+ properties.getProperty(
+ "load_active_listening_pipe_dir",
conf.getLoadActiveListeningPipeDir()));
conf.setLoadTsFileSpiltPartitionMaxSize(
Integer.parseInt(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
index db51775281d..532b6878577 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
@@ -327,6 +327,7 @@ public class StorageEngine implements IService {
}
asyncRecoverTsFileResource();
+ loadTsFileManager.start();
}
private void startTimedService() {
@@ -411,6 +412,7 @@ public class StorageEngine implements IService {
@Override
public void stop() {
+ loadTsFileManager.stop();
for (DataRegion dataRegion : dataRegionMap.values()) {
if (dataRegion != null) {
CompactionScheduleTaskManager.getInstance().unregisterDataRegion(dataRegion);
@@ -429,6 +431,7 @@ public class StorageEngine implements IService {
@Override
public void shutdown(long milliseconds) throws ShutdownException {
+ loadTsFileManager.stop();
try {
for (DataRegion dataRegion : dataRegionMap.values()) {
if (dataRegion != null) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
index e3c6948563a..05e53a90763 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
@@ -130,9 +130,22 @@ public class LoadTsFileManager {
public LoadTsFileManager() {
registerCleanupTaskExecutor();
recover();
+ }
+
+ public void start() {
activeLoadAgent.start();
}
+ public void stop() {
+ activeLoadAgent.stop();
+ synchronized (uuid2CleanupTask) {
+ uuid2CleanupTask.values().forEach(CleanupTask::cancel);
+ uuid2CleanupTask.clear();
+ cleanupTaskQueue.clear();
+ }
+ new
HashSet<>(uuid2WriterManager.keySet()).forEach(this::forceCloseWriterManager);
+ }
+
private long getCleanupTaskDelayInMs() {
return CONFIG.getLoadCleanupTaskExecutionDelayTimeSeconds() * 1000L;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadAgent.java
index bec3bbe072b..f3532536111 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadAgent.java
@@ -63,6 +63,12 @@ public class ActiveLoadAgent {
activeLoadMetricsCollector.start();
}
+ public synchronized void stop() {
+ activeLoadDirScanner.stop();
+ activeLoadMetricsCollector.stop();
+ activeLoadTsFileLoader.stop();
+ }
+
/**
* Clean up all listening directories for active load on DataNode first
startup. This method will
* clean up all files and subdirectories in the listening directories,
including: 1. Pending
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java
index 2a3ddee11f7..81de3f054cf 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java
@@ -54,6 +54,7 @@ public class ActiveLoadDirScanner extends
ActiveLoadScheduledExecutorService {
private static final Logger LOGGER =
LoggerFactory.getLogger(ActiveLoadDirScanner.class);
private final AtomicReference<String[]> listeningDirsConfig = new
AtomicReference<>();
+ private final AtomicReference<String> pipeListeningDirConfig = new
AtomicReference<>();
private final Set<String> listeningDirs = new CopyOnWriteArraySet<>();
private final Set<String> noPermissionDirs = new CopyOnWriteArraySet<>();
@@ -204,8 +205,20 @@ public class ActiveLoadDirScanner extends
ActiveLoadScheduledExecutorService {
} else {
listeningDirs.clear();
}
- // Hot reload active load listening dir for pipe data sync
- // Active load is always enabled for pipe data sync
+ if (!Objects.equals(
+ IOTDB_CONFIG.getLoadActiveListeningPipeDir(),
pipeListeningDirConfig.get())) {
+ synchronized (this) {
+ if (!Objects.equals(
+ IOTDB_CONFIG.getLoadActiveListeningPipeDir(),
pipeListeningDirConfig.get())) {
+ if (pipeListeningDirConfig.get() != null) {
+ listeningDirs.remove(pipeListeningDirConfig.get());
+ }
+
pipeListeningDirConfig.set(IOTDB_CONFIG.getLoadActiveListeningPipeDir());
+ }
+ }
+ }
+
+ // Active load is always enabled for pipe data sync.
listeningDirs.add(IOTDB_CONFIG.getLoadActiveListeningPipeDir());
// Create directories if not exists
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadScheduledExecutorService.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadScheduledExecutorService.java
index 6bd26645840..fb989e7923e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadScheduledExecutorService.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadScheduledExecutorService.java
@@ -46,14 +46,19 @@ public abstract class ActiveLoadScheduledExecutorService {
private static final long MIN_EXECUTION_INTERVAL_SECONDS =
IOTDB_CONFIG.getLoadActiveListeningCheckIntervalSeconds();
- private final ScheduledExecutorService scheduledExecutorService;
+ private final ThreadName threadName;
+ private ScheduledExecutorService scheduledExecutorService;
private Future<?> future;
private final List<Pair<WrappedRunnable, Long>> jobs = new
CopyOnWriteArrayList<>();
protected ActiveLoadScheduledExecutorService(final ThreadName threadName) {
- scheduledExecutorService =
-
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(threadName.name());
+ this.threadName = threadName;
+ scheduledExecutorService = newScheduledExecutorService();
+ }
+
+ private ScheduledExecutorService newScheduledExecutorService() {
+ return
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(threadName.name());
}
public void register(Runnable runnable) {
@@ -74,6 +79,9 @@ public abstract class ActiveLoadScheduledExecutorService {
public synchronized void start() {
if (future == null) {
+ if (scheduledExecutorService.isShutdown()) {
+ scheduledExecutorService = newScheduledExecutorService();
+ }
future =
ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
scheduledExecutorService,
@@ -97,5 +105,12 @@ public abstract class ActiveLoadScheduledExecutorService {
future = null;
LOGGER.info(StorageEngineMessages.ACTIVE_LOAD_EXECUTOR_STOPPED);
}
+ scheduledExecutorService.shutdownNow();
+ try {
+ scheduledExecutorService.awaitTermination(30, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ LOGGER.warn(StorageEngineMessages.STILL_NOT_EXIT_AFTER_30S,
threadName.getName());
+ Thread.currentThread().interrupt();
+ }
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
index 13ec94186b7..61e297a9e86 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
@@ -156,6 +156,27 @@ public class ActiveLoadTsFileLoader {
}
}
+ public void stop() {
+ final WrappedThreadPoolExecutor executor =
activeLoadExecutor.getAndSet(null);
+ if (executor == null) {
+ return;
+ }
+
+ executor.shutdownNow();
+ try {
+ if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
+ LOGGER.warn(
+ StorageEngineMessages.STILL_NOT_EXIT_AFTER_30S,
+ ThreadName.ACTIVE_LOAD_TSFILE_LOADER.getName());
+ }
+ } catch (final InterruptedException e) {
+ LOGGER.warn(
+ StorageEngineMessages.STILL_NOT_EXIT_AFTER_30S,
+ ThreadName.ACTIVE_LOAD_TSFILE_LOADER.getName());
+ Thread.currentThread().interrupt();
+ }
+ }
+
private void tryLoadPendingTsFiles() {
final IClientSession session =
new InternalClientSession(
@@ -202,18 +223,22 @@ public class ActiveLoadTsFileLoader {
Math.max(1, IOTDB_CONFIG.getLoadActiveListeningCheckIntervalSeconds()
<< 1);
long currentRetryTimes = 0;
- while (true) {
+ while (!Thread.currentThread().isInterrupted()) {
final ActiveLoadPendingQueue.ActiveLoadEntry entry =
pendingQueue.dequeueFromPending();
if (Objects.nonNull(entry)) {
return Optional.of(entry);
}
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
+ if (Thread.currentThread().isInterrupted()) {
+ return Optional.empty();
+ }
if (currentRetryTimes++ >= maxRetryTimes) {
return Optional.empty();
}
}
+ return Optional.empty();
}
private TSStatus loadTsFile(
diff --git a/iotdb-core/datanode/src/test/resources/iotdb-system.properties
b/iotdb-core/datanode/src/test/resources/iotdb-system.properties
index af1d5c3914a..a7a2a08bd1f 100644
--- a/iotdb-core/datanode/src/test/resources/iotdb-system.properties
+++ b/iotdb-core/datanode/src/test/resources/iotdb-system.properties
@@ -32,6 +32,7 @@ udf_lib_dir=target/ext/udf
trigger_lib_dir=target/ext/trigger
pipe_lib_dir=target/ext/pipe
load_active_listening_dirs=target/ext/load/pending
+load_active_listening_pipe_dir=target/ext/load/pipe
load_active_listening_fail_dir=target/ext/load/failed
####################
diff --git
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
index 1e855a9704c..8e762c0840c 100644
---
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
+++
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
@@ -2257,6 +2257,18 @@ load_active_listening_enable=true
# If its prefix is "/", then the path is absolute. Otherwise, it is relative.
load_active_listening_dirs=ext/load/pending
+# The directory to be actively listened for tsfile loading from Pipe.
+# Only one directory can be configured.
+# effectiveMode: hot_reload
+# Datatype: String
+# For windows platform
+# If its prefix is a drive specifier followed by "\\", or if its prefix is
"\\\\", then the path is absolute.
+# Otherwise, it is relative.
+# load_active_listening_pipe_dir=ext\\load\\pipe
+# For Linux platform
+# If its prefix is "/", then the path is absolute. Otherwise, it is relative.
+load_active_listening_pipe_dir=ext/load/pipe
+
# The directory where tsfile are moved if the active listening mode fails to
load them.
# Only one directory can be configured.
# effectiveMode: hot_reload