This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new ef53e32ac82 Fix active load cleanup lifecycle (#17947) (#17963)
ef53e32ac82 is described below
commit ef53e32ac82066b6ec00dbeecde5472f09d1f209
Author: Caideyipi <[email protected]>
AuthorDate: Thu Jun 18 11:02:19 2026 +0800
Fix active load cleanup lifecycle (#17947) (#17963)
(cherry picked from commit c4574b6567a56bee8b245eeea96602c4cea2aad0)
---
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 12 ++++++++++-
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 6 ++++++
.../iotdb/db/storageengine/StorageEngine.java | 3 +++
.../db/storageengine/load/LoadTsFileManager.java | 13 +++++++++++
.../storageengine/load/active/ActiveLoadAgent.java | 6 ++++++
.../load/active/ActiveLoadDirScanner.java | 18 ++++++++++++++--
.../active/ActiveLoadScheduledExecutorService.java | 21 +++++++++++++++---
.../load/active/ActiveLoadTsFileLoader.java | 25 +++++++++++++++++++++-
.../src/test/resources/iotdb-system.properties | 1 +
.../conf/iotdb-system.properties.template | 12 +++++++++++
10 files changed, 110 insertions(+), 7 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 07c73b43d87..c92c9bba28a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -4259,7 +4259,17 @@ public class IoTDBConfig {
}
public String getLoadActiveListeningPipeDir() {
- return loadActiveListeningPipeDir;
+ return loadActiveListeningPipeDir == null ||
Objects.equals(loadActiveListeningPipeDir, "")
+ ? extDir
+ + File.separator
+ + IoTDBConstant.LOAD_TSFILE_FOLDER_NAME
+ + File.separator
+ + IoTDBConstant.PIPE_FOLDER_NAME
+ : loadActiveListeningPipeDir;
+ }
+
+ public void setLoadActiveListeningPipeDir(String loadActiveListeningPipeDir)
{
+ this.loadActiveListeningPipeDir =
addDataHomeDir(loadActiveListeningPipeDir);
}
public String[] getLoadActiveListeningDirs() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index b380a31eb64..f9e87375b78 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -2484,6 +2484,9 @@ public class IoTDBDescriptor {
conf.setLoadActiveListeningFailDir(
properties.getProperty(
"load_active_listening_fail_dir",
conf.getLoadActiveListeningFailDir()));
+ conf.setLoadActiveListeningPipeDir(
+ properties.getProperty(
+ "load_active_listening_pipe_dir",
conf.getLoadActiveListeningPipeDir()));
final long loadActiveListeningCheckIntervalSeconds =
Long.parseLong(
@@ -2614,6 +2617,9 @@ public class IoTDBDescriptor {
properties.getProperty(
"load_active_listening_fail_dir",
ConfigurationFileUtils.getConfigurationDefaultValue("load_active_listening_fail_dir")));
+ conf.setLoadActiveListeningPipeDir(
+ properties.getProperty(
+ "load_active_listening_pipe_dir",
conf.getLoadActiveListeningPipeDir()));
conf.setLoadTsFileSpiltPartitionMaxSize(
Integer.parseInt(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
index 1c772cdb20b..ee214fd1358 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
@@ -310,6 +310,7 @@ public class StorageEngine implements IService {
}
asyncRecoverTsFileResource();
+ loadTsFileManager.start();
}
private void startTimedService() {
@@ -394,6 +395,7 @@ public class StorageEngine implements IService {
@Override
public void stop() {
+ loadTsFileManager.stop();
for (DataRegion dataRegion : dataRegionMap.values()) {
if (dataRegion != null) {
CompactionScheduleTaskManager.getInstance().unregisterDataRegion(dataRegion);
@@ -412,6 +414,7 @@ public class StorageEngine implements IService {
@Override
public void shutdown(long milliseconds) throws ShutdownException {
+ loadTsFileManager.stop();
try {
for (DataRegion dataRegion : dataRegionMap.values()) {
if (dataRegion != null) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
index 0b85e3f0e64..671444352f9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
@@ -125,9 +125,22 @@ public class LoadTsFileManager {
public LoadTsFileManager() {
registerCleanupTaskExecutor();
recover();
+ }
+
+ public void start() {
activeLoadAgent.start();
}
+ public void stop() {
+ activeLoadAgent.stop();
+ synchronized (uuid2CleanupTask) {
+ uuid2CleanupTask.values().forEach(CleanupTask::cancel);
+ uuid2CleanupTask.clear();
+ cleanupTaskQueue.clear();
+ }
+ new
HashSet<>(uuid2WriterManager.keySet()).forEach(this::forceCloseWriterManager);
+ }
+
private long getCleanupTaskDelayInMs() {
return CONFIG.getLoadCleanupTaskExecutionDelayTimeSeconds() * 1000L;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadAgent.java
index 6065c349c8c..36b5761ca39 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadAgent.java
@@ -62,6 +62,12 @@ public class ActiveLoadAgent {
activeLoadMetricsCollector.start();
}
+ public synchronized void stop() {
+ activeLoadDirScanner.stop();
+ activeLoadMetricsCollector.stop();
+ activeLoadTsFileLoader.stop();
+ }
+
/**
* Clean up all listening directories for active load on DataNode first
startup. This method will
* clean up all files and subdirectories in the listening directories,
including: 1. Pending
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java
index f4c705c17b3..4a476373fa7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java
@@ -40,6 +40,7 @@ import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.Arrays;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -54,6 +55,7 @@ public class ActiveLoadDirScanner extends
ActiveLoadScheduledExecutorService {
private static final String MODS = ".mods";
private final AtomicReference<String[]> listeningDirsConfig = new
AtomicReference<>();
+ private final AtomicReference<String> pipeListeningDirConfig = new
AtomicReference<>();
private final Set<String> listeningDirs = new CopyOnWriteArraySet<>();
private final Set<String> noPermissionDirs = new CopyOnWriteArraySet<>();
@@ -201,8 +203,20 @@ public class ActiveLoadDirScanner extends
ActiveLoadScheduledExecutorService {
} else {
listeningDirs.clear();
}
- // Hot reload active load listening dir for pipe data sync
- // Active load is always enabled for pipe data sync
+ if (!Objects.equals(
+ IOTDB_CONFIG.getLoadActiveListeningPipeDir(),
pipeListeningDirConfig.get())) {
+ synchronized (this) {
+ if (!Objects.equals(
+ IOTDB_CONFIG.getLoadActiveListeningPipeDir(),
pipeListeningDirConfig.get())) {
+ if (pipeListeningDirConfig.get() != null) {
+ listeningDirs.remove(pipeListeningDirConfig.get());
+ }
+
pipeListeningDirConfig.set(IOTDB_CONFIG.getLoadActiveListeningPipeDir());
+ }
+ }
+ }
+
+ // Active load is always enabled for pipe data sync.
listeningDirs.add(IOTDB_CONFIG.getLoadActiveListeningPipeDir());
// Create directories if not exists
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadScheduledExecutorService.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadScheduledExecutorService.java
index 610918e5a13..bd5d4e30ca7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadScheduledExecutorService.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadScheduledExecutorService.java
@@ -45,14 +45,19 @@ public abstract class ActiveLoadScheduledExecutorService {
private static final long MIN_EXECUTION_INTERVAL_SECONDS =
IOTDB_CONFIG.getLoadActiveListeningCheckIntervalSeconds();
- private final ScheduledExecutorService scheduledExecutorService;
+ private final ThreadName threadName;
+ private ScheduledExecutorService scheduledExecutorService;
private Future<?> future;
private final List<Pair<WrappedRunnable, Long>> jobs = new
CopyOnWriteArrayList<>();
protected ActiveLoadScheduledExecutorService(final ThreadName threadName) {
- scheduledExecutorService =
-
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(threadName.name());
+ this.threadName = threadName;
+ scheduledExecutorService = newScheduledExecutorService();
+ }
+
+ private ScheduledExecutorService newScheduledExecutorService() {
+ return
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(threadName.name());
}
public void register(Runnable runnable) {
@@ -73,6 +78,9 @@ public abstract class ActiveLoadScheduledExecutorService {
public synchronized void start() {
if (future == null) {
+ if (scheduledExecutorService.isShutdown()) {
+ scheduledExecutorService = newScheduledExecutorService();
+ }
future =
ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
scheduledExecutorService,
@@ -96,5 +104,12 @@ public abstract class ActiveLoadScheduledExecutorService {
future = null;
LOGGER.info("Active load periodical jobs executor is stopped
successfully.");
}
+ scheduledExecutorService.shutdownNow();
+ try {
+ scheduledExecutorService.awaitTermination(30, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ LOGGER.warn("{} still doesn't exit after 30s", threadName.getName());
+ Thread.currentThread().interrupt();
+ }
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
index a228ef6c651..ffb9e23bbfa 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
@@ -154,6 +154,25 @@ public class ActiveLoadTsFileLoader {
}
}
+ public void stop() {
+ final WrappedThreadPoolExecutor executor =
activeLoadExecutor.getAndSet(null);
+ if (executor == null) {
+ return;
+ }
+
+ executor.shutdownNow();
+ try {
+ if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
+ LOGGER.warn(
+ "{} still doesn't exit after 30s",
ThreadName.ACTIVE_LOAD_TSFILE_LOADER.getName());
+ }
+ } catch (final InterruptedException e) {
+ LOGGER.warn(
+ "{} still doesn't exit after 30s",
ThreadName.ACTIVE_LOAD_TSFILE_LOADER.getName());
+ Thread.currentThread().interrupt();
+ }
+ }
+
private void tryLoadPendingTsFiles() {
final IClientSession session =
new InternalClientSession(
@@ -200,18 +219,22 @@ public class ActiveLoadTsFileLoader {
Math.max(1, IOTDB_CONFIG.getLoadActiveListeningCheckIntervalSeconds()
<< 1);
long currentRetryTimes = 0;
- while (true) {
+ while (!Thread.currentThread().isInterrupted()) {
final ActiveLoadPendingQueue.ActiveLoadEntry entry =
pendingQueue.dequeueFromPending();
if (Objects.nonNull(entry)) {
return Optional.of(entry);
}
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
+ if (Thread.currentThread().isInterrupted()) {
+ return Optional.empty();
+ }
if (currentRetryTimes++ >= maxRetryTimes) {
return Optional.empty();
}
}
+ return Optional.empty();
}
private TSStatus loadTsFile(
diff --git a/iotdb-core/datanode/src/test/resources/iotdb-system.properties
b/iotdb-core/datanode/src/test/resources/iotdb-system.properties
index ce0ecff34f3..4732caa9fee 100644
--- a/iotdb-core/datanode/src/test/resources/iotdb-system.properties
+++ b/iotdb-core/datanode/src/test/resources/iotdb-system.properties
@@ -32,6 +32,7 @@ udf_lib_dir=target/ext/udf
trigger_lib_dir=target/ext/trigger
pipe_lib_dir=target/ext/pipe
load_active_listening_dirs=target/ext/load/pending
+load_active_listening_pipe_dir=target/ext/load/pipe
load_active_listening_fail_dir=target/ext/load/failed
####################
diff --git
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
index 741a4c846cd..9b5b69b304d 100644
---
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
+++
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
@@ -2062,6 +2062,18 @@ load_active_listening_enable=true
# If its prefix is "/", then the path is absolute. Otherwise, it is relative.
load_active_listening_dirs=ext/load/pending
+# The directory to be actively listened for tsfile loading from Pipe.
+# Only one directory can be configured.
+# effectiveMode: hot_reload
+# Datatype: String
+# For windows platform
+# If its prefix is a drive specifier followed by "\\", or if its prefix is
"\\\\", then the path is absolute.
+# Otherwise, it is relative.
+# load_active_listening_pipe_dir=ext\\load\\pipe
+# For Linux platform
+# If its prefix is "/", then the path is absolute. Otherwise, it is relative.
+load_active_listening_pipe_dir=ext/load/pipe
+
# The directory where tsfile are moved if the active listening mode fails to
load them.
# Only one directory can be configured.
# effectiveMode: hot_reload