This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new ef53e32ac82 Fix active load cleanup lifecycle (#17947) (#17963)
ef53e32ac82 is described below

commit ef53e32ac82066b6ec00dbeecde5472f09d1f209
Author: Caideyipi <[email protected]>
AuthorDate: Thu Jun 18 11:02:19 2026 +0800

    Fix active load cleanup lifecycle (#17947) (#17963)
    
    (cherry picked from commit c4574b6567a56bee8b245eeea96602c4cea2aad0)
---
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 12 ++++++++++-
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  6 ++++++
 .../iotdb/db/storageengine/StorageEngine.java      |  3 +++
 .../db/storageengine/load/LoadTsFileManager.java   | 13 +++++++++++
 .../storageengine/load/active/ActiveLoadAgent.java |  6 ++++++
 .../load/active/ActiveLoadDirScanner.java          | 18 ++++++++++++++--
 .../active/ActiveLoadScheduledExecutorService.java | 21 +++++++++++++++---
 .../load/active/ActiveLoadTsFileLoader.java        | 25 +++++++++++++++++++++-
 .../src/test/resources/iotdb-system.properties     |  1 +
 .../conf/iotdb-system.properties.template          | 12 +++++++++++
 10 files changed, 110 insertions(+), 7 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 07c73b43d87..c92c9bba28a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -4259,7 +4259,17 @@ public class IoTDBConfig {
   }
 
   public String getLoadActiveListeningPipeDir() {
-    return loadActiveListeningPipeDir;
+    return loadActiveListeningPipeDir == null || 
Objects.equals(loadActiveListeningPipeDir, "")
+        ? extDir
+            + File.separator
+            + IoTDBConstant.LOAD_TSFILE_FOLDER_NAME
+            + File.separator
+            + IoTDBConstant.PIPE_FOLDER_NAME
+        : loadActiveListeningPipeDir;
+  }
+
+  public void setLoadActiveListeningPipeDir(String loadActiveListeningPipeDir) 
{
+    this.loadActiveListeningPipeDir = 
addDataHomeDir(loadActiveListeningPipeDir);
   }
 
   public String[] getLoadActiveListeningDirs() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index b380a31eb64..f9e87375b78 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -2484,6 +2484,9 @@ public class IoTDBDescriptor {
     conf.setLoadActiveListeningFailDir(
         properties.getProperty(
             "load_active_listening_fail_dir", 
conf.getLoadActiveListeningFailDir()));
+    conf.setLoadActiveListeningPipeDir(
+        properties.getProperty(
+            "load_active_listening_pipe_dir", 
conf.getLoadActiveListeningPipeDir()));
 
     final long loadActiveListeningCheckIntervalSeconds =
         Long.parseLong(
@@ -2614,6 +2617,9 @@ public class IoTDBDescriptor {
         properties.getProperty(
             "load_active_listening_fail_dir",
             
ConfigurationFileUtils.getConfigurationDefaultValue("load_active_listening_fail_dir")));
+    conf.setLoadActiveListeningPipeDir(
+        properties.getProperty(
+            "load_active_listening_pipe_dir", 
conf.getLoadActiveListeningPipeDir()));
 
     conf.setLoadTsFileSpiltPartitionMaxSize(
         Integer.parseInt(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
index 1c772cdb20b..ee214fd1358 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
@@ -310,6 +310,7 @@ public class StorageEngine implements IService {
     }
 
     asyncRecoverTsFileResource();
+    loadTsFileManager.start();
   }
 
   private void startTimedService() {
@@ -394,6 +395,7 @@ public class StorageEngine implements IService {
 
   @Override
   public void stop() {
+    loadTsFileManager.stop();
     for (DataRegion dataRegion : dataRegionMap.values()) {
       if (dataRegion != null) {
         
CompactionScheduleTaskManager.getInstance().unregisterDataRegion(dataRegion);
@@ -412,6 +414,7 @@ public class StorageEngine implements IService {
 
   @Override
   public void shutdown(long milliseconds) throws ShutdownException {
+    loadTsFileManager.stop();
     try {
       for (DataRegion dataRegion : dataRegionMap.values()) {
         if (dataRegion != null) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
index 0b85e3f0e64..671444352f9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
@@ -125,9 +125,22 @@ public class LoadTsFileManager {
   public LoadTsFileManager() {
     registerCleanupTaskExecutor();
     recover();
+  }
+
+  public void start() {
     activeLoadAgent.start();
   }
 
+  public void stop() {
+    activeLoadAgent.stop();
+    synchronized (uuid2CleanupTask) {
+      uuid2CleanupTask.values().forEach(CleanupTask::cancel);
+      uuid2CleanupTask.clear();
+      cleanupTaskQueue.clear();
+    }
+    new 
HashSet<>(uuid2WriterManager.keySet()).forEach(this::forceCloseWriterManager);
+  }
+
   private long getCleanupTaskDelayInMs() {
     return CONFIG.getLoadCleanupTaskExecutionDelayTimeSeconds() * 1000L;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadAgent.java
index 6065c349c8c..36b5761ca39 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadAgent.java
@@ -62,6 +62,12 @@ public class ActiveLoadAgent {
     activeLoadMetricsCollector.start();
   }
 
+  public synchronized void stop() {
+    activeLoadDirScanner.stop();
+    activeLoadMetricsCollector.stop();
+    activeLoadTsFileLoader.stop();
+  }
+
   /**
    * Clean up all listening directories for active load on DataNode first 
startup. This method will
    * clean up all files and subdirectories in the listening directories, 
including: 1. Pending
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java
index f4c705c17b3..4a476373fa7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java
@@ -40,6 +40,7 @@ import java.nio.file.SimpleFileVisitor;
 import java.nio.file.attribute.BasicFileAttributes;
 import java.util.Arrays;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -54,6 +55,7 @@ public class ActiveLoadDirScanner extends 
ActiveLoadScheduledExecutorService {
   private static final String MODS = ".mods";
 
   private final AtomicReference<String[]> listeningDirsConfig = new 
AtomicReference<>();
+  private final AtomicReference<String> pipeListeningDirConfig = new 
AtomicReference<>();
   private final Set<String> listeningDirs = new CopyOnWriteArraySet<>();
 
   private final Set<String> noPermissionDirs = new CopyOnWriteArraySet<>();
@@ -201,8 +203,20 @@ public class ActiveLoadDirScanner extends 
ActiveLoadScheduledExecutorService {
       } else {
         listeningDirs.clear();
       }
-      // Hot reload active load listening dir for pipe data sync
-      // Active load is always enabled for pipe data sync
+      if (!Objects.equals(
+          IOTDB_CONFIG.getLoadActiveListeningPipeDir(), 
pipeListeningDirConfig.get())) {
+        synchronized (this) {
+          if (!Objects.equals(
+              IOTDB_CONFIG.getLoadActiveListeningPipeDir(), 
pipeListeningDirConfig.get())) {
+            if (pipeListeningDirConfig.get() != null) {
+              listeningDirs.remove(pipeListeningDirConfig.get());
+            }
+            
pipeListeningDirConfig.set(IOTDB_CONFIG.getLoadActiveListeningPipeDir());
+          }
+        }
+      }
+
+      // Active load is always enabled for pipe data sync.
       listeningDirs.add(IOTDB_CONFIG.getLoadActiveListeningPipeDir());
 
       // Create directories if not exists
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadScheduledExecutorService.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadScheduledExecutorService.java
index 610918e5a13..bd5d4e30ca7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadScheduledExecutorService.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadScheduledExecutorService.java
@@ -45,14 +45,19 @@ public abstract class ActiveLoadScheduledExecutorService {
 
   private static final long MIN_EXECUTION_INTERVAL_SECONDS =
       IOTDB_CONFIG.getLoadActiveListeningCheckIntervalSeconds();
-  private final ScheduledExecutorService scheduledExecutorService;
+  private final ThreadName threadName;
+  private ScheduledExecutorService scheduledExecutorService;
   private Future<?> future;
 
   private final List<Pair<WrappedRunnable, Long>> jobs = new 
CopyOnWriteArrayList<>();
 
   protected ActiveLoadScheduledExecutorService(final ThreadName threadName) {
-    scheduledExecutorService =
-        
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(threadName.name());
+    this.threadName = threadName;
+    scheduledExecutorService = newScheduledExecutorService();
+  }
+
+  private ScheduledExecutorService newScheduledExecutorService() {
+    return 
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(threadName.name());
   }
 
   public void register(Runnable runnable) {
@@ -73,6 +78,9 @@ public abstract class ActiveLoadScheduledExecutorService {
 
   public synchronized void start() {
     if (future == null) {
+      if (scheduledExecutorService.isShutdown()) {
+        scheduledExecutorService = newScheduledExecutorService();
+      }
       future =
           ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
               scheduledExecutorService,
@@ -96,5 +104,12 @@ public abstract class ActiveLoadScheduledExecutorService {
       future = null;
       LOGGER.info("Active load periodical jobs executor is stopped 
successfully.");
     }
+    scheduledExecutorService.shutdownNow();
+    try {
+      scheduledExecutorService.awaitTermination(30, TimeUnit.SECONDS);
+    } catch (InterruptedException e) {
+      LOGGER.warn("{} still doesn't exit after 30s", threadName.getName());
+      Thread.currentThread().interrupt();
+    }
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
index a228ef6c651..ffb9e23bbfa 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
@@ -154,6 +154,25 @@ public class ActiveLoadTsFileLoader {
     }
   }
 
+  public void stop() {
+    final WrappedThreadPoolExecutor executor = 
activeLoadExecutor.getAndSet(null);
+    if (executor == null) {
+      return;
+    }
+
+    executor.shutdownNow();
+    try {
+      if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
+        LOGGER.warn(
+            "{} still doesn't exit after 30s", 
ThreadName.ACTIVE_LOAD_TSFILE_LOADER.getName());
+      }
+    } catch (final InterruptedException e) {
+      LOGGER.warn(
+          "{} still doesn't exit after 30s", 
ThreadName.ACTIVE_LOAD_TSFILE_LOADER.getName());
+      Thread.currentThread().interrupt();
+    }
+  }
+
   private void tryLoadPendingTsFiles() {
     final IClientSession session =
         new InternalClientSession(
@@ -200,18 +219,22 @@ public class ActiveLoadTsFileLoader {
         Math.max(1, IOTDB_CONFIG.getLoadActiveListeningCheckIntervalSeconds() 
<< 1);
     long currentRetryTimes = 0;
 
-    while (true) {
+    while (!Thread.currentThread().isInterrupted()) {
       final ActiveLoadPendingQueue.ActiveLoadEntry entry = 
pendingQueue.dequeueFromPending();
       if (Objects.nonNull(entry)) {
         return Optional.of(entry);
       }
 
       LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
+      if (Thread.currentThread().isInterrupted()) {
+        return Optional.empty();
+      }
 
       if (currentRetryTimes++ >= maxRetryTimes) {
         return Optional.empty();
       }
     }
+    return Optional.empty();
   }
 
   private TSStatus loadTsFile(
diff --git a/iotdb-core/datanode/src/test/resources/iotdb-system.properties 
b/iotdb-core/datanode/src/test/resources/iotdb-system.properties
index ce0ecff34f3..4732caa9fee 100644
--- a/iotdb-core/datanode/src/test/resources/iotdb-system.properties
+++ b/iotdb-core/datanode/src/test/resources/iotdb-system.properties
@@ -32,6 +32,7 @@ udf_lib_dir=target/ext/udf
 trigger_lib_dir=target/ext/trigger
 pipe_lib_dir=target/ext/pipe
 load_active_listening_dirs=target/ext/load/pending
+load_active_listening_pipe_dir=target/ext/load/pipe
 load_active_listening_fail_dir=target/ext/load/failed
 
 ####################
diff --git 
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
 
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
index 741a4c846cd..9b5b69b304d 100644
--- 
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
+++ 
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
@@ -2062,6 +2062,18 @@ load_active_listening_enable=true
 # If its prefix is "/", then the path is absolute. Otherwise, it is relative.
 load_active_listening_dirs=ext/load/pending
 
+# The directory to be actively listened for tsfile loading from Pipe.
+# Only one directory can be configured.
+# effectiveMode: hot_reload
+# Datatype: String
+# For windows platform
+# If its prefix is a drive specifier followed by "\\", or if its prefix is 
"\\\\", then the path is absolute.
+# Otherwise, it is relative.
+# load_active_listening_pipe_dir=ext\\load\\pipe
+# For Linux platform
+# If its prefix is "/", then the path is absolute. Otherwise, it is relative.
+load_active_listening_pipe_dir=ext/load/pipe
+
 # The directory where tsfile are moved if the active listening mode fails to 
load them.
 # Only one directory can be configured.
 # effectiveMode: hot_reload

Reply via email to