This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 34c63cf1219 Load: Support Datanode actively listening dirs and auto 
load tsfile (#13085)
34c63cf1219 is described below

commit 34c63cf1219eac631015cb1a59f868ddc1ef12f0
Author: YC27 <[email protected]>
AuthorDate: Wed Aug 14 15:40:10 2024 +0800

    Load: Support Datanode actively listening dirs and auto load tsfile (#13085)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  97 +++++++
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  86 +++++-
 .../iotdb/db/exception/LoadEmptyFileException.java |   4 +-
 .../execution/load/LoadTsFileManager.java          |  10 +
 .../load/active/ActiveLoadDirScanner.java          | 175 ++++++++++++
 .../load/active/ActiveLoadPendingQueue.java        |  64 +++++
 .../load/active/ActiveLoadTsFileLoader.java        | 310 +++++++++++++++++++++
 .../load/{ => limiter}/LoadTsFileRateLimiter.java  |   2 +-
 .../load/{ => splitter}/AlignedChunkData.java      |   2 +-
 .../execution/load/{ => splitter}/ChunkData.java   |   2 +-
 .../load/{ => splitter}/DeletionData.java          |   2 +-
 .../load/{ => splitter}/NonAlignedChunkData.java   |   2 +-
 .../execution/load/{ => splitter}/TsFileData.java  |   2 +-
 .../load/{ => splitter}/TsFileSplitter.java        |   2 +-
 .../queryengine/plan/analyze/AnalyzeVisitor.java   |   6 +-
 ...TsfileAnalyzer.java => LoadTsFileAnalyzer.java} |  23 +-
 .../plan/node/load/LoadTsFilePieceNode.java        |   2 +-
 .../plan/scheduler/load/LoadTsFileScheduler.java   |   6 +-
 .../iotdb/db/storageengine/StorageEngine.java      |   2 +-
 .../db/storageengine/dataregion/DataRegion.java    |   2 +-
 .../storageengine/dataregion/tsfile/TsFileID.java  |   6 +-
 .../conf/iotdb-system.properties.template          |  44 ++-
 .../iotdb/commons/concurrent/ThreadName.java       |   6 +-
 .../apache/iotdb/commons/conf/IoTDBConstant.java   |   2 +
 24 files changed, 814 insertions(+), 45 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 80bffe7074b..bbe142d8cea 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -1122,6 +1122,35 @@ public class IoTDBConfig {
 
   private double loadWriteThroughputBytesPerSecond = -1; // Bytes/s
 
+  private boolean loadActiveListeningEnable = true;
+
+  private String[] loadActiveListeningDirs =
+      new String[] {
+        IoTDBConstant.EXT_FOLDER_NAME
+            + File.separator
+            + IoTDBConstant.LOAD_TSFILE_FOLDER_NAME
+            + File.separator
+            + IoTDBConstant.LOAD_TSFILE_ACTIVE_LISTENING_PENDING_FOLDER_NAME
+      };
+
+  private String loadActiveListeningPipeDir =
+      IoTDBConstant.EXT_FOLDER_NAME
+          + File.separator
+          + IoTDBConstant.LOAD_TSFILE_FOLDER_NAME
+          + File.separator
+          + IoTDBConstant.PIPE_FOLDER_NAME;
+
+  private String loadActiveListeningFailDir =
+      IoTDBConstant.EXT_FOLDER_NAME
+          + File.separator
+          + IoTDBConstant.LOAD_TSFILE_FOLDER_NAME
+          + File.separator
+          + IoTDBConstant.LOAD_TSFILE_ACTIVE_LISTENING_FAILED_FOLDER_NAME;
+
+  private long loadActiveListeningCheckIntervalSeconds = 5L;
+
+  private int loadActiveListeningMaxThreadNum = 8;
+
   /** Pipe related */
   /** initialized as empty, updated based on the latest `systemDir` during 
querying */
   private String[] pipeReceiverFileDirs = new String[0];
@@ -1287,6 +1316,11 @@ public class IoTDBConfig {
     schemaRegionConsensusDir = addDataHomeDir(schemaRegionConsensusDir);
     indexRootFolder = addDataHomeDir(indexRootFolder);
     extDir = addDataHomeDir(extDir);
+    for (int i = 0; i < loadActiveListeningDirs.length; i++) {
+      loadActiveListeningDirs[i] = addDataHomeDir(loadActiveListeningDirs[i]);
+    }
+    loadActiveListeningPipeDir = addDataHomeDir(loadActiveListeningPipeDir);
+    loadActiveListeningFailDir = addDataHomeDir(loadActiveListeningFailDir);
     udfDir = addDataHomeDir(udfDir);
     udfTemporaryLibDir = addDataHomeDir(udfTemporaryLibDir);
     triggerDir = addDataHomeDir(triggerDir);
@@ -3886,6 +3920,69 @@ public class IoTDBConfig {
     this.loadWriteThroughputBytesPerSecond = loadWriteThroughputBytesPerSecond;
   }
 
+  public int getLoadActiveListeningMaxThreadNum() {
+    return loadActiveListeningMaxThreadNum;
+  }
+
+  public void setLoadActiveListeningMaxThreadNum(int 
loadActiveListeningMaxThreadNum) {
+    this.loadActiveListeningMaxThreadNum = loadActiveListeningMaxThreadNum;
+  }
+
+  public long getLoadActiveListeningCheckIntervalSeconds() {
+    return loadActiveListeningCheckIntervalSeconds;
+  }
+
+  public void setLoadActiveListeningCheckIntervalSeconds(
+      long loadActiveListeningCheckIntervalSeconds) {
+    this.loadActiveListeningCheckIntervalSeconds = 
loadActiveListeningCheckIntervalSeconds;
+  }
+
+  public String getLoadActiveListeningFailDir() {
+    return loadActiveListeningFailDir == null || 
Objects.equals(loadActiveListeningFailDir, "")
+        ? extDir
+            + File.separator
+            + IoTDBConstant.LOAD_TSFILE_FOLDER_NAME
+            + File.separator
+            + IoTDBConstant.LOAD_TSFILE_ACTIVE_LISTENING_FAILED_FOLDER_NAME
+        : loadActiveListeningFailDir;
+  }
+
+  public void setLoadActiveListeningFailDir(String loadActiveListeningFailDir) 
{
+    this.loadActiveListeningFailDir = 
addDataHomeDir(loadActiveListeningFailDir);
+  }
+
+  public String getLoadActiveListeningPipeDir() {
+    return loadActiveListeningPipeDir;
+  }
+
+  public String[] getLoadActiveListeningDirs() {
+    return (Objects.isNull(this.loadActiveListeningDirs)
+            || this.loadActiveListeningDirs.length == 0)
+        ? new String[] {
+          extDir
+              + File.separator
+              + IoTDBConstant.LOAD_TSFILE_FOLDER_NAME
+              + File.separator
+              + IoTDBConstant.LOAD_TSFILE_ACTIVE_LISTENING_PENDING_FOLDER_NAME
+        }
+        : this.loadActiveListeningDirs;
+  }
+
+  public void setLoadActiveListeningDirs(String[] loadActiveListeningDirs) {
+    for (int i = 0; i < loadActiveListeningDirs.length; i++) {
+      loadActiveListeningDirs[i] = addDataHomeDir(loadActiveListeningDirs[i]);
+    }
+    this.loadActiveListeningDirs = loadActiveListeningDirs;
+  }
+
+  public boolean getLoadActiveListeningEnable() {
+    return loadActiveListeningEnable;
+  }
+
+  public void setLoadActiveListeningEnable(boolean loadActiveListeningEnable) {
+    this.loadActiveListeningEnable = loadActiveListeningEnable;
+  }
+
   public void setPipeReceiverFileDirs(String[] pipeReceiverFileDirs) {
     this.pipeReceiverFileDirs = pipeReceiverFileDirs;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 564c146bc94..77168a1794e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -1818,19 +1818,7 @@ public class IoTDBDescriptor {
       loadCompactionHotModifiedProps(properties);
 
       // update load config
-      conf.setLoadCleanupTaskExecutionDelayTimeSeconds(
-          Long.parseLong(
-              properties.getProperty(
-                  "load_clean_up_task_execution_delay_time_seconds",
-                  ConfigurationFileUtils.getConfigurationDefaultValue(
-                      "load_clean_up_task_execution_delay_time_seconds"))));
-
-      conf.setLoadWriteThroughputBytesPerSecond(
-          Double.parseDouble(
-              properties.getProperty(
-                  "load_write_throughput_bytes_per_second",
-                  ConfigurationFileUtils.getConfigurationDefaultValue(
-                      "load_write_throughput_bytes_per_second"))));
+      loadLoadTsFileHotModifiedProp(properties);
 
       // update pipe config
       commonDescriptor
@@ -2161,6 +2149,78 @@ public class IoTDBDescriptor {
             properties.getProperty(
                 "load_write_throughput_bytes_per_second",
                 String.valueOf(conf.getLoadWriteThroughputBytesPerSecond()))));
+
+    conf.setLoadActiveListeningEnable(
+        Boolean.parseBoolean(
+            properties.getProperty(
+                "load_active_listening_enable",
+                Boolean.toString(conf.getLoadActiveListeningEnable()))));
+    conf.setLoadActiveListeningDirs(
+        Arrays.stream(
+                properties
+                    .getProperty(
+                        "load_active_listening_dirs",
+                        String.join(",", conf.getLoadActiveListeningDirs()))
+                    .trim()
+                    .split(","))
+            .filter(dir -> !dir.isEmpty())
+            .toArray(String[]::new));
+    conf.setLoadActiveListeningFailDir(
+        properties.getProperty(
+            "load_active_listening_fail_dir", 
conf.getLoadActiveListeningFailDir()));
+    conf.setLoadActiveListeningCheckIntervalSeconds(
+        Long.parseLong(
+            properties.getProperty(
+                "load_active_listening_check_interval_seconds",
+                
Long.toString(conf.getLoadActiveListeningCheckIntervalSeconds()))));
+    conf.setLoadActiveListeningMaxThreadNum(
+        Integer.parseInt(
+            properties.getProperty(
+                "load_active_listening_max_thread_num",
+                Integer.toString(
+                    Math.min(
+                        conf.getLoadActiveListeningMaxThreadNum(),
+                        Math.max(1, Runtime.getRuntime().availableProcessors() 
/ 2))))));
+  }
+
+  private void loadLoadTsFileHotModifiedProp(Properties properties) throws 
IOException {
+    conf.setLoadCleanupTaskExecutionDelayTimeSeconds(
+        Long.parseLong(
+            properties.getProperty(
+                "load_clean_up_task_execution_delay_time_seconds",
+                ConfigurationFileUtils.getConfigurationDefaultValue(
+                    "load_clean_up_task_execution_delay_time_seconds"))));
+
+    conf.setLoadWriteThroughputBytesPerSecond(
+        Double.parseDouble(
+            properties.getProperty(
+                "load_write_throughput_bytes_per_second",
+                ConfigurationFileUtils.getConfigurationDefaultValue(
+                    "load_write_throughput_bytes_per_second"))));
+
+    conf.setLoadActiveListeningEnable(
+        Boolean.parseBoolean(
+            properties.getProperty(
+                "load_active_listening_enable",
+                ConfigurationFileUtils.getConfigurationDefaultValue(
+                    "load_active_listening_enable"))));
+    conf.setLoadActiveListeningDirs(
+        Arrays.stream(
+                properties
+                    .getProperty(
+                        "load_active_listening_dirs",
+                        String.join(
+                            ",",
+                            
ConfigurationFileUtils.getConfigurationDefaultValue(
+                                "load_active_listening_dirs")))
+                    .trim()
+                    .split(","))
+            .filter(dir -> !dir.isEmpty())
+            .toArray(String[]::new));
+    conf.setLoadActiveListeningFailDir(
+        properties.getProperty(
+            "load_active_listening_fail_dir",
+            
ConfigurationFileUtils.getConfigurationDefaultValue("load_active_listening_fail_dir")));
   }
 
   @SuppressWarnings("squid:S3518") // "proportionSum" can't be zero
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/LoadEmptyFileException.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/LoadEmptyFileException.java
index 1c9e9bc4590..59e8a1d13f0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/LoadEmptyFileException.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/LoadEmptyFileException.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.db.exception;
 
 public class LoadEmptyFileException extends LoadFileException {
 
-  public LoadEmptyFileException() {
-    super("Cannot load an empty file");
+  public LoadEmptyFileException(final String fileName) {
+    super(fileName);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
index 1d2b7d9f378..509032f0b35 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
@@ -35,6 +35,11 @@ import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
 import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
 import org.apache.iotdb.db.exception.LoadFileException;
 import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
+import 
org.apache.iotdb.db.queryengine.execution.load.active.ActiveLoadDirScanner;
+import 
org.apache.iotdb.db.queryengine.execution.load.active.ActiveLoadTsFileLoader;
+import org.apache.iotdb.db.queryengine.execution.load.splitter.ChunkData;
+import org.apache.iotdb.db.queryengine.execution.load.splitter.DeletionData;
+import org.apache.iotdb.db.queryengine.execution.load.splitter.TsFileData;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
 import 
org.apache.iotdb.db.queryengine.plan.scheduler.load.LoadTsFileScheduler.LoadCommand;
 import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
@@ -92,9 +97,14 @@ public class LoadTsFileManager {
   private final Map<String, CleanupTask> uuid2CleanupTask = new 
ConcurrentHashMap<>();
   private final PriorityBlockingQueue<CleanupTask> cleanupTaskQueue = new 
PriorityBlockingQueue<>();
 
+  private final ActiveLoadTsFileLoader activeLoadTsFileLoader = new 
ActiveLoadTsFileLoader();
+  private final ActiveLoadDirScanner activeLoadDirScanner =
+      new ActiveLoadDirScanner(activeLoadTsFileLoader);
+
   public LoadTsFileManager() {
     registerCleanupTaskExecutor();
     recover();
+    activeLoadDirScanner.start();
   }
 
   private void registerCleanupTaskExecutor() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/active/ActiveLoadDirScanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/active/ActiveLoadDirScanner.java
new file mode 100644
index 00000000000..e49789a8c0b
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/active/ActiveLoadDirScanner.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.load.active;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.tsfile.common.conf.TSFileConfig;
+import org.apache.tsfile.read.TsFileSequenceReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class ActiveLoadDirScanner {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ActiveLoadDirScanner.class);
+
+  private static final String RESOURCE = ".resource";
+  private static final String MODS = ".mods";
+
+  private static final IoTDBConfig IOTDB_CONFIG = 
IoTDBDescriptor.getInstance().getConfig();
+
+  private static final ScheduledExecutorService DIR_SCAN_JOB_EXECUTOR =
+      IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+          ThreadName.ACTIVE_LOAD_DIR_SCANNER.getName());
+  private static final long MIN_SCAN_INTERVAL_SECONDS =
+      
IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningCheckIntervalSeconds();
+
+  private final AtomicReference<String[]> listeningDirsConfig = new 
AtomicReference<>();
+  private final Set<String> listeningDirs = new HashSet<>();
+
+  private final ActiveLoadTsFileLoader activeLoadTsFileLoader;
+
+  private Future<?> dirScanJobFuture;
+
+  public ActiveLoadDirScanner(final ActiveLoadTsFileLoader 
activeLoadTsFileLoader) {
+    this.activeLoadTsFileLoader = activeLoadTsFileLoader;
+  }
+
+  public synchronized void start() {
+    if (dirScanJobFuture == null) {
+      dirScanJobFuture =
+          ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
+              DIR_SCAN_JOB_EXECUTOR,
+              this::scanSafely,
+              MIN_SCAN_INTERVAL_SECONDS,
+              MIN_SCAN_INTERVAL_SECONDS,
+              TimeUnit.SECONDS);
+      LOGGER.info(
+          "Active load dir scanner started. Scan interval: {}s.", 
MIN_SCAN_INTERVAL_SECONDS);
+    }
+  }
+
+  public synchronized void stop() {
+    if (dirScanJobFuture != null) {
+      dirScanJobFuture.cancel(false);
+      dirScanJobFuture = null;
+      LOGGER.info("Active load dir scanner stopped.");
+    }
+  }
+
+  private void scanSafely() {
+    try {
+      scan();
+    } catch (final Exception e) {
+      LOGGER.warn("Error occurred during active load dir scanning.", e);
+    }
+  }
+
+  private void scan() throws IOException {
+    hotReloadActiveLoadDirs();
+
+    for (final String listeningDir : listeningDirs) {
+      final int currentAllowedPendingSize = 
activeLoadTsFileLoader.getCurrentAllowedPendingSize();
+      if (currentAllowedPendingSize <= 0) {
+        return;
+      }
+
+      final boolean isGeneratedByPipe =
+          listeningDir.equals(IOTDB_CONFIG.getLoadActiveListeningPipeDir());
+      FileUtils.streamFiles(new File(listeningDir), true, (String[]) null)
+          .map(
+              file ->
+                  (file.getName().endsWith(RESOURCE) || 
file.getName().endsWith(MODS))
+                      ? getTsFilePath(file.getAbsolutePath())
+                      : file.getAbsolutePath())
+          .filter(this::isTsFileCompleted)
+          .limit(currentAllowedPendingSize)
+          .forEach(file -> activeLoadTsFileLoader.tryTriggerTsFileLoad(file, 
isGeneratedByPipe));
+    }
+  }
+
+  private boolean isTsFileCompleted(final String file) {
+    try (final TsFileSequenceReader reader = new TsFileSequenceReader(file, 
false)) {
+      return TSFileConfig.MAGIC_STRING.equals(reader.readTailMagic());
+    } catch (final Exception e) {
+      return false;
+    }
+  }
+
+  private void hotReloadActiveLoadDirs() {
+    try {
+      // Hot reload active load listening dirs if active listening is enabled
+      if (IOTDB_CONFIG.getLoadActiveListeningEnable()) {
+        if (IOTDB_CONFIG.getLoadActiveListeningDirs() != 
listeningDirsConfig.get()) {
+          synchronized (this) {
+            if (IOTDB_CONFIG.getLoadActiveListeningDirs() != 
listeningDirsConfig.get()) {
+              
listeningDirsConfig.set(IOTDB_CONFIG.getLoadActiveListeningDirs());
+              
listeningDirs.addAll(Arrays.asList(IOTDB_CONFIG.getLoadActiveListeningDirs()));
+            }
+          }
+        }
+      }
+
+      // Hot reload active load listening dir for pipe data sync
+      // Active load is always enabled for pipe data sync
+      listeningDirs.add(IOTDB_CONFIG.getLoadActiveListeningPipeDir());
+
+      // Create directories if not exists
+      listeningDirs.forEach(this::createDirectoriesIfNotExists);
+    } catch (final Exception e) {
+      LOGGER.warn(
+          "Error occurred during hot reload active load dirs. "
+              + "Current active load listening dirs: {}.",
+          listeningDirs,
+          e);
+    }
+  }
+
+  private void createDirectoriesIfNotExists(final String dirPath) {
+    try {
+      FileUtils.forceMkdir(new File(dirPath));
+    } catch (final IOException e) {
+      LOGGER.warn("Error occurred during creating directory {} for active 
load.", dirPath, e);
+    }
+  }
+
+  private static String getTsFilePath(final String 
filePathWithResourceOrModsTail) {
+    return filePathWithResourceOrModsTail.endsWith(RESOURCE)
+        ? filePathWithResourceOrModsTail.substring(
+            0, filePathWithResourceOrModsTail.length() - RESOURCE.length())
+        : filePathWithResourceOrModsTail.substring(
+            0, filePathWithResourceOrModsTail.length() - MODS.length());
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/active/ActiveLoadPendingQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/active/ActiveLoadPendingQueue.java
new file mode 100644
index 00000000000..618eff470c6
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/active/ActiveLoadPendingQueue.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.load.active;
+
+import org.apache.tsfile.utils.Pair;
+
+import java.util.HashSet;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+public class ActiveLoadPendingQueue {
+
+  private final Set<String> pendingFileSet = new HashSet<>();
+  private final Queue<Pair<String, Boolean>> pendingFileQueue = new 
ConcurrentLinkedQueue<>();
+
+  private final Set<String> loadingFileSet = new HashSet<>();
+
+  public synchronized boolean enqueue(final String file, final boolean 
isGeneratedByPipe) {
+    if (!loadingFileSet.contains(file) && pendingFileSet.add(file)) {
+      pendingFileQueue.offer(new Pair<>(file, isGeneratedByPipe));
+      return true;
+    }
+    return false;
+  }
+
+  public synchronized Pair<String, Boolean> dequeueFromPending() {
+    final Pair<String, Boolean> pair = pendingFileQueue.poll();
+    if (pair != null) {
+      pendingFileSet.remove(pair.left);
+      loadingFileSet.add(pair.left);
+    }
+    return pair;
+  }
+
+  public synchronized void removeFromLoading(final String file) {
+    loadingFileSet.remove(file);
+  }
+
+  public int size() {
+    return pendingFileQueue.size() + loadingFileSet.size();
+  }
+
+  public boolean isEmpty() {
+    return pendingFileQueue.isEmpty() && loadingFileSet.isEmpty();
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/active/ActiveLoadTsFileLoader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/active/ActiveLoadTsFileLoader.java
new file mode 100644
index 00000000000..fb4800deef9
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/active/ActiveLoadTsFileLoader.java
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.load.active;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.concurrent.IoTThreadFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import 
org.apache.iotdb.commons.concurrent.threadpool.WrappedThreadPoolExecutor;
+import org.apache.iotdb.db.auth.AuthorityChecker;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.protocol.session.SessionManager;
+import org.apache.iotdb.db.queryengine.common.SessionInfo;
+import org.apache.iotdb.db.queryengine.plan.Coordinator;
+import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher;
+import 
org.apache.iotdb.db.queryengine.plan.analyze.schema.ClusterSchemaFetcher;
+import org.apache.iotdb.db.queryengine.plan.statement.Statement;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.tsfile.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
+import java.time.ZoneId;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.LockSupport;
+
+public class ActiveLoadTsFileLoader {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ActiveLoadTsFileLoader.class);
+
+  private static final int MAX_PENDING_SIZE = 1000;
+  private final ActiveLoadPendingQueue pendingQueue = new 
ActiveLoadPendingQueue();
+
+  private final AtomicReference<WrappedThreadPoolExecutor> activeLoadExecutor =
+      new AtomicReference<>();
+  private final AtomicReference<String> failDir = new AtomicReference<>();
+
+  public int getCurrentAllowedPendingSize() {
+    return MAX_PENDING_SIZE - pendingQueue.size();
+  }
+
+  public void tryTriggerTsFileLoad(String absolutePath, boolean 
isGeneratedByPipe) {
+    if (pendingQueue.enqueue(absolutePath, isGeneratedByPipe)) {
+      initFailDirIfNecessary();
+      adjustExecutorIfNecessary();
+    }
+  }
+
+  private void initFailDirIfNecessary() {
+    if (failDir.get() == null) {
+      synchronized (failDir) {
+        if (failDir.get() == null) {
+          final File failDirFile =
+              new 
File(IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningFailDir());
+          try {
+            FileUtils.forceMkdir(failDirFile);
+          } catch (final IOException e) {
+            LOGGER.warn(
+                "Error occurred during creating fail directory {} for active 
load.",
+                failDirFile,
+                e);
+          }
+          
failDir.set(IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningFailDir());
+        }
+      }
+    }
+  }
+
+  private void adjustExecutorIfNecessary() {
+    if (activeLoadExecutor.get() == null) {
+      synchronized (activeLoadExecutor) {
+        if (activeLoadExecutor.get() == null) {
+          activeLoadExecutor.set(
+              new WrappedThreadPoolExecutor(
+                  
IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningMaxThreadNum(),
+                  
IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningMaxThreadNum(),
+                  0L,
+                  TimeUnit.SECONDS,
+                  new LinkedBlockingQueue<>(),
+                  new 
IoTThreadFactory(ThreadName.ACTIVE_LOAD_TSFILE_LOADER.name()),
+                  ThreadName.ACTIVE_LOAD_TSFILE_LOADER.name()));
+        }
+      }
+    }
+
+    final int targetCorePoolSize =
+        Math.min(
+            pendingQueue.size(),
+            
IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningMaxThreadNum());
+
+    if (activeLoadExecutor.get().getCorePoolSize() != targetCorePoolSize) {
+      activeLoadExecutor.get().setCorePoolSize(targetCorePoolSize);
+    }
+
+    // calculate how many threads need to be loaded
+    final int threadsToBeAdded =
+        Math.max(targetCorePoolSize - 
activeLoadExecutor.get().getActiveCount(), 0);
+    for (int i = 0; i < threadsToBeAdded; i++) {
+      activeLoadExecutor.get().execute(this::tryLoadPendingTsFiles);
+    }
+  }
+
+  private void tryLoadPendingTsFiles() {
+    while (true) {
+      final Optional<Pair<String, Boolean>> filePair = tryGetNextPendingFile();
+      if (!filePair.isPresent()) {
+        return;
+      }
+
+      try {
+        final TSStatus result = loadTsFile(filePair.get());
+        if (result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+            || result.getCode() == 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
+          LOGGER.info(
+              "Successfully auto load tsfile {} (isGeneratedByPipe = {})",
+              filePair.get().getLeft(),
+              filePair.get().getRight());
+        } else {
+          handleLoadFailure(filePair.get(), result);
+        }
+      } catch (final FileNotFoundException e) {
+        handleFileNotFoundException(filePair.get());
+      } catch (final Exception e) {
+        handleOtherException(filePair.get(), e);
+      } finally {
+        pendingQueue.removeFromLoading(filePair.get().getLeft());
+      }
+    }
+  }
+
+  private Optional<Pair<String, Boolean>> tryGetNextPendingFile() {
+    final long maxRetryTimes =
+        Math.max(
+            1,
+            
IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningCheckIntervalSeconds()
+                << 1);
+    long currentRetryTimes = 0;
+
+    while (true) {
+      final Pair<String, Boolean> filePair = pendingQueue.dequeueFromPending();
+      if (Objects.nonNull(filePair)) {
+        return Optional.of(filePair);
+      }
+
+      LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
+
+      if (currentRetryTimes++ >= maxRetryTimes) {
+        return Optional.empty();
+      }
+    }
+  }
+
+  private TSStatus loadTsFile(final Pair<String, Boolean> filePair) throws 
FileNotFoundException {
+    final LoadTsFileStatement statement = new 
LoadTsFileStatement(filePair.getLeft());
+    statement.setDeleteAfterLoad(true);
+    statement.setVerifySchema(true);
+    statement.setAutoCreateDatabase(false);
+    return executeStatement(filePair.getRight() ? new 
PipeEnrichedStatement(statement) : statement);
+  }
+
+  private TSStatus executeStatement(final Statement statement) {
+    return Coordinator.getInstance()
+        .executeForTreeModel(
+            statement,
+            SessionManager.getInstance().requestQueryId(),
+            new SessionInfo(0, AuthorityChecker.SUPER_USER, 
ZoneId.systemDefault()),
+            "",
+            ClusterPartitionFetcher.getInstance(),
+            ClusterSchemaFetcher.getInstance(),
+            
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold())
+        .status;
+  }
+
+  private void handleLoadFailure(final Pair<String, Boolean> filePair, final 
TSStatus status) {
+    LOGGER.warn(
+        "Failed to auto load tsfile {} (isGeneratedByPipe = {}), status: {}. 
File will be moved to fail directory.",
+        filePair.getLeft(),
+        filePair.getRight(),
+        status);
+    removeFileAndResourceAndModsToFailDir(filePair.getLeft());
+  }
+
+  private void handleFileNotFoundException(final Pair<String, Boolean> 
filePair) {
+    LOGGER.warn(
+        "Failed to auto load tsfile {} (isGeneratedByPipe = {}) due to file 
not found, will skip this file.",
+        filePair.getLeft(),
+        filePair.getRight());
+    removeFileAndResourceAndModsToFailDir(filePair.getLeft());
+  }
+
+  private void handleOtherException(final Pair<String, Boolean> filePair, 
final Exception e) {
+    if (e.getMessage() != null && e.getMessage().contains("memory")) {
+      LOGGER.info(
+          "Rejecting auto load tsfile {} (isGeneratedByPipe = {}) due to 
memory constraints, will retry later.",
+          filePair.getLeft(),
+          filePair.getRight());
+      pendingQueue.enqueue(filePair.getLeft(), filePair.getRight());
+    } else {
+      LOGGER.warn(
+          "Failed to auto load tsfile {} (isGeneratedByPipe = {}) because of 
an unexpected exception. File will be moved to fail directory.",
+          filePair.getLeft(),
+          filePair.getRight(),
+          e);
+      removeFileAndResourceAndModsToFailDir(filePair.getLeft());
+    }
+  }
+
+  private void removeFileAndResourceAndModsToFailDir(final String filePath) {
+    removeToFailDir(filePath);
+    removeToFailDir(filePath + ".resource");
+    removeToFailDir(filePath + ".mods");
+  }
+
+  private void removeToFailDir(final String filePath) {
+    final File sourceFile = new File(filePath);
+    // prevent the resource or mods not exist
+    if (!sourceFile.exists()) {
+      return;
+    }
+
+    final File targetDir = new File(failDir.get());
+    try {
+      moveFileWithMD5Check(sourceFile, targetDir);
+    } catch (final IOException e) {
+      LOGGER.warn("Error occurred during moving file {} to fail directory.", 
filePath, e);
+    }
+  }
+
+  private static void moveFileWithMD5Check(final File sourceFile, final File 
targetDir)
+      throws IOException {
+    final String sourceFileName = sourceFile.getName();
+    final File targetFile = new File(targetDir, sourceFileName);
+
+    if (targetFile.exists()) {
+      if (haveSameMD5(sourceFile, targetFile)) {
+        FileUtils.forceDelete(sourceFile);
+        LOGGER.info(
+            "Deleted the file {} because it already exists in the fail 
directory: {}",
+            sourceFile.getName(),
+            targetDir.getAbsolutePath());
+      } else {
+        renameWithMD5(sourceFile, targetDir);
+        LOGGER.info(
+            "Renamed file {} to {} because it already exists in the fail 
directory: {}",
+            sourceFile.getName(),
+            targetFile.getName(),
+            targetDir.getAbsolutePath());
+      }
+    } else {
+      FileUtils.moveFileToDirectory(sourceFile, targetDir, true);
+      LOGGER.info(
+          "Moved file {} to fail directory {}.", sourceFile.getName(), 
targetDir.getAbsolutePath());
+    }
+  }
+
+  private static boolean haveSameMD5(final File file1, final File file2) {
+    try (final InputStream is1 = Files.newInputStream(file1.toPath());
+        final InputStream is2 = Files.newInputStream(file2.toPath())) {
+      return DigestUtils.md5Hex(is1).equals(DigestUtils.md5Hex(is2));
+    } catch (final Exception e) {
+      return false;
+    }
+  }
+
+  private static void renameWithMD5(File sourceFile, File targetDir) throws 
IOException {
+    try (final InputStream is = Files.newInputStream(sourceFile.toPath())) {
+      final String sourceFileBaseName = 
FilenameUtils.getBaseName(sourceFile.getName());
+      final String sourceFileExtension = 
FilenameUtils.getExtension(sourceFile.getName());
+      final String sourceFileMD5 = DigestUtils.md5Hex(is);
+
+      final String targetFileName =
+          sourceFileBaseName + "-" + sourceFileMD5.substring(0, 16) + "." + 
sourceFileExtension;
+      final File targetFile = new File(targetDir, targetFileName);
+
+      FileUtils.moveFile(sourceFile, targetFile, 
StandardCopyOption.REPLACE_EXISTING);
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileRateLimiter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/limiter/LoadTsFileRateLimiter.java
similarity index 98%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileRateLimiter.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/limiter/LoadTsFileRateLimiter.java
index 9876256e916..c10145b0398 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileRateLimiter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/limiter/LoadTsFileRateLimiter.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.queryengine.execution.load;
+package org.apache.iotdb.db.queryengine.execution.load.limiter;
 
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.db.conf.IoTDBConfig;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/AlignedChunkData.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/splitter/AlignedChunkData.java
similarity index 99%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/AlignedChunkData.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/splitter/AlignedChunkData.java
index 00d8a605a75..3726bd5610e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/AlignedChunkData.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/splitter/AlignedChunkData.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.queryengine.execution.load;
+package org.apache.iotdb.db.queryengine.execution.load.splitter;
 
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.commons.utils.TimePartitionUtils;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/ChunkData.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/splitter/ChunkData.java
similarity index 97%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/ChunkData.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/splitter/ChunkData.java
index a98d1b3ec31..3ab26c0b49a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/ChunkData.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/splitter/ChunkData.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.queryengine.execution.load;
+package org.apache.iotdb.db.queryengine.execution.load.splitter;
 
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/DeletionData.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/splitter/DeletionData.java
similarity index 97%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/DeletionData.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/splitter/DeletionData.java
index eda84fa096e..940cd2e75b2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/DeletionData.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/splitter/DeletionData.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.queryengine.execution.load;
+package org.apache.iotdb.db.queryengine.execution.load.splitter;
 
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/NonAlignedChunkData.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/splitter/NonAlignedChunkData.java
similarity index 99%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/NonAlignedChunkData.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/splitter/NonAlignedChunkData.java
index 2ae0acd2180..e4abec81e7e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/NonAlignedChunkData.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/splitter/NonAlignedChunkData.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.queryengine.execution.load;
+package org.apache.iotdb.db.queryengine.execution.load.splitter;
 
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.commons.utils.TimePartitionUtils;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileData.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/splitter/TsFileData.java
similarity index 95%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileData.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/splitter/TsFileData.java
index 93d56482b52..eece426fdbe 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileData.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/splitter/TsFileData.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.queryengine.execution.load;
+package org.apache.iotdb.db.queryengine.execution.load.splitter;
 
 import org.apache.iotdb.commons.exception.IllegalPathException;
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/splitter/TsFileSplitter.java
similarity index 99%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitter.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/splitter/TsFileSplitter.java
index 2a07dd6b1c4..6fd587c69c2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/splitter/TsFileSplitter.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.queryengine.execution.load;
+package org.apache.iotdb.db.queryengine.execution.load.splitter;
 
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.commons.utils.TimePartitionUtils;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
index 535261849d4..81c7ef4fd8a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
@@ -2819,9 +2819,9 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
     context.setQueryType(QueryType.WRITE);
 
     final long startTime = System.nanoTime();
-    try (final LoadTsfileAnalyzer loadTsfileAnalyzer =
-        new LoadTsfileAnalyzer(loadTsFileStatement, context, partitionFetcher, 
schemaFetcher)) {
-      return loadTsfileAnalyzer.analyzeFileByFile();
+    try (final LoadTsFileAnalyzer loadTsfileAnalyzer =
+        new LoadTsFileAnalyzer(loadTsFileStatement, context, partitionFetcher, 
schemaFetcher)) {
+      return 
loadTsfileAnalyzer.analyzeFileByFile(loadTsFileStatement.isDeleteAfterLoad());
     } catch (final Exception e) {
       final String exceptionMessage =
           String.format(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsfileAnalyzer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
similarity index 97%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsfileAnalyzer.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
index 67d25827e62..c7ec099903d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsfileAnalyzer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
@@ -36,6 +36,7 @@ import 
org.apache.iotdb.confignode.rpc.thrift.TShowDatabaseResp;
 import org.apache.iotdb.db.auth.AuthorityChecker;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.LoadEmptyFileException;
 import org.apache.iotdb.db.exception.LoadFileException;
 import org.apache.iotdb.db.exception.LoadReadOnlyException;
 import org.apache.iotdb.db.exception.LoadRuntimeOutOfMemoryException;
@@ -66,6 +67,7 @@ import org.apache.iotdb.db.utils.constant.SqlConstant;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.thrift.TException;
 import org.apache.tsfile.common.constant.TsFileConstant;
 import org.apache.tsfile.enums.TSDataType;
@@ -95,9 +97,9 @@ import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
-public class LoadTsfileAnalyzer implements AutoCloseable {
+public class LoadTsFileAnalyzer implements AutoCloseable {
 
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(LoadTsfileAnalyzer.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(LoadTsFileAnalyzer.class);
 
   private static final IClientManager<ConfigRegionId, ConfigNodeClient> 
CONFIG_NODE_CLIENT_MANAGER =
       ConfigNodeClientManager.getInstance();
@@ -123,7 +125,7 @@ public class LoadTsfileAnalyzer implements AutoCloseable {
 
   private final SchemaAutoCreatorAndVerifier schemaAutoCreatorAndVerifier;
 
-  LoadTsfileAnalyzer(
+  LoadTsFileAnalyzer(
       LoadTsFileStatement loadTsFileStatement,
       MPPQueryContext context,
       IPartitionFetcher partitionFetcher,
@@ -137,7 +139,7 @@ public class LoadTsfileAnalyzer implements AutoCloseable {
     this.schemaAutoCreatorAndVerifier = new SchemaAutoCreatorAndVerifier();
   }
 
-  public Analysis analyzeFileByFile() {
+  public Analysis analyzeFileByFile(final boolean isDeleteAfterLoad) {
     final Analysis analysis = new Analysis();
 
     // check if the system is read only
@@ -165,7 +167,7 @@ public class LoadTsfileAnalyzer implements AutoCloseable {
       }
 
       try {
-        analyzeSingleTsFile(tsFile);
+        analyzeSingleTsFile(tsFile, isDeleteAfterLoad);
         if (LOGGER.isInfoEnabled()) {
           LOGGER.info(
               "Load - Analysis Stage: {}/{} tsfiles have been analyzed, 
progress: {}%",
@@ -225,7 +227,8 @@ public class LoadTsfileAnalyzer implements AutoCloseable {
     schemaAutoCreatorAndVerifier.close();
   }
 
-  private void analyzeSingleTsFile(File tsFile) throws IOException, 
AuthException {
+  private void analyzeSingleTsFile(final File tsFile, final boolean 
isDeleteAfterLoad)
+      throws IOException, AuthException {
     try (final TsFileSequenceReader reader = new 
TsFileSequenceReader(tsFile.getAbsolutePath())) {
       // can be reused when constructing tsfile resource
       final TsFileSequenceReaderTimeseriesMetadataIterator 
timeseriesMetadataIterator =
@@ -243,8 +246,7 @@ public class LoadTsfileAnalyzer implements AutoCloseable {
 
       // check if the tsfile is empty
       if (!timeseriesMetadataIterator.hasNext()) {
-        LOGGER.warn("device2TimeseriesMetadata is empty, because maybe the 
tsfile is empty");
-        return;
+        throw new LoadEmptyFileException(tsFile.getAbsolutePath());
       }
 
       long writePointCount = 0;
@@ -277,6 +279,11 @@ public class LoadTsfileAnalyzer implements AutoCloseable {
 
       loadTsFileStatement.addTsFileResource(tsFileResource);
       loadTsFileStatement.addWritePointCount(writePointCount);
+    } catch (final LoadEmptyFileException loadEmptyFileException) {
+      LOGGER.warn("Failed to load empty file: {}", tsFile.getAbsolutePath());
+      if (isDeleteAfterLoad) {
+        FileUtils.deleteQuietly(tsFile);
+      }
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFilePieceNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFilePieceNode.java
index c3c4c3fa2e3..b8dd843e27c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFilePieceNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFilePieceNode.java
@@ -21,7 +21,7 @@ package 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load;
 
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.commons.exception.IllegalPathException;
-import org.apache.iotdb.db.queryengine.execution.load.TsFileData;
+import org.apache.iotdb.db.queryengine.execution.load.splitter.TsFileData;
 import org.apache.iotdb.db.queryengine.plan.analyze.IAnalysis;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
index 08a146bab51..87ec3b42ed0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
@@ -45,9 +45,9 @@ import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
 import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
 import org.apache.iotdb.db.queryengine.execution.QueryStateMachine;
 import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInfo;
-import org.apache.iotdb.db.queryengine.execution.load.ChunkData;
-import org.apache.iotdb.db.queryengine.execution.load.TsFileData;
-import org.apache.iotdb.db.queryengine.execution.load.TsFileSplitter;
+import org.apache.iotdb.db.queryengine.execution.load.splitter.ChunkData;
+import org.apache.iotdb.db.queryengine.execution.load.splitter.TsFileData;
+import org.apache.iotdb.db.queryengine.execution.load.splitter.TsFileSplitter;
 import org.apache.iotdb.db.queryengine.load.LoadTsFileDataCacheMemoryBlock;
 import org.apache.iotdb.db.queryengine.load.LoadTsFileMemoryManager;
 import org.apache.iotdb.db.queryengine.metric.load.LoadTsFileCostMetricsSet;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
index 89aa5e88096..04e9054fd95 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
@@ -53,7 +53,7 @@ import org.apache.iotdb.db.exception.TsFileProcessorException;
 import org.apache.iotdb.db.exception.WriteProcessRejectException;
 import org.apache.iotdb.db.exception.runtime.StorageEngineFailureException;
 import org.apache.iotdb.db.queryengine.execution.load.LoadTsFileManager;
-import org.apache.iotdb.db.queryengine.execution.load.LoadTsFileRateLimiter;
+import 
org.apache.iotdb.db.queryengine.execution.load.limiter.LoadTsFileRateLimiter;
 import 
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeTTLCache;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 4ba71db22d2..81e895a762f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -51,7 +51,7 @@ import 
org.apache.iotdb.db.exception.quota.ExceedQuotaException;
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.listener.PipeInsertionDataNodeListener;
 import org.apache.iotdb.db.queryengine.common.DeviceContext;
 import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
-import org.apache.iotdb.db.queryengine.execution.load.LoadTsFileRateLimiter;
+import 
org.apache.iotdb.db.queryengine.execution.load.limiter.LoadTsFileRateLimiter;
 import org.apache.iotdb.db.queryengine.metric.QueryResourceMetricSet;
 import 
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeSchemaCache;
 import 
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeTTLCache;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileID.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileID.java
index 37b2d082384..dbb7511009f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileID.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileID.java
@@ -52,12 +52,12 @@ public class TsFileID {
     if (pathLength >= 3) {
       try {
         tmpRegionId = Integer.parseInt(pathSegments[pathLength - 3]);
-      } catch (NumberFormatException e) {
+      } catch (Exception e) {
         // ignore, load will get in here
       }
       try {
         tmpTimePartitionId = Long.parseLong(pathSegments[pathLength - 2]);
-      } catch (NumberFormatException e) {
+      } catch (Exception e) {
         // ignore, load will get in here
       }
     }
@@ -67,7 +67,7 @@ public class TsFileID {
     long[] arr = null;
     try {
       arr = splitAndGetVersionArray(pathSegments[pathLength - 1]);
-    } catch (NumberFormatException e) {
+    } catch (Exception e) {
       // ignore, load will get in here
     }
     this.fileVersion = arr == null || arr.length != 2 ? -1 : arr[0];
diff --git 
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
 
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
index 279aa063b85..f75b96d9db0 100644
--- 
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
+++ 
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
@@ -451,7 +451,7 @@ cn_max_client_count_for_each_node_in_client_manager=300
 # Datatype: int
 dn_session_timeout_threshold=0
 
-# weather enable thrift compression
+# whether enable thrift compression
 # effectiveMode: restart
 # Datatype: boolean
 dn_rpc_thrift_compression_enable=false
@@ -1867,6 +1867,48 @@ load_clean_up_task_execution_delay_time_seconds=1800
 # Datatype: int
 load_write_throughput_bytes_per_second=-1
 
+# Whether to enable the active listening mode for tsfile loading.
+# effectiveMode: hot_reload
+# Datatype: Boolean
+load_active_listening_enable=true
+
+# The directory to be actively listened for tsfile loading.
+# Multiple directories should be separated by a ','.
+# The default directory is 'ext/load/pending'.
+# effectiveMode: hot_reload
+# Datatype: String
+# For windows platform
+# If its prefix is a drive specifier followed by "\\", or if its prefix is 
"\\\\", then the path is absolute.
+# Otherwise, it is relative.
+# load_active_listening_dirs=ext\\load\\pending
+# For Linux platform
+# If its prefix is "/", then the path is absolute. Otherwise, it is relative.
+load_active_listening_dirs=ext/load/pending
+
+# The directory where tsfile are moved if the active listening mode fails to 
load them.
+# Only one directory can be configured.
+# effectiveMode: hot_reload
+# Datatype: String
+# For windows platform
+# If its prefix is a drive specifier followed by "\\", or if its prefix is 
"\\\\", then the path is absolute.
+# Otherwise, it is relative.
+# load_active_listening_fail_dir=ext\\load\\failed
+# For Linux platform
+# If its prefix is "/", then the path is absolute. Otherwise, it is relative.
+load_active_listening_fail_dir=ext/load/failed
+
+# The maximum number of threads that can be used to load tsfile actively.
+# The default value, when this parameter is commented out, is the minimum of 8 
or the number of CPU cores.
+# effectiveMode: restart
+# Datatype: int
+load_active_listening_max_thread_num=8
+
+# The interval specified in seconds for the active listening mode to check the 
directory specified in load_active_listening_dirs.
+# The active listening mode will check the directory every 
load_active_listening_check_interval_seconds seconds.
+# effectiveMode: restart
+# Datatype: int
+load_active_listening_check_interval_seconds=5
+
 ####################
 ### Dispatch Retry Configuration
 ####################
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index 00701d163e6..3765165b2c6 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -45,7 +45,6 @@ public enum ThreadName {
   DATANODE_INTERNAL_RPC_PROCESSOR("DataNodeInternalRPC-Processor"),
   MPP_COORDINATOR_WRITE_EXECUTOR("MPP-Coordinator-Write-Executor"),
   
ASYNC_DATANODE_MPP_DATA_EXCHANGE_CLIENT_POOL("AsyncDataNodeMPPDataExchangeServiceClientPool"),
-
   // -------------------------- Compaction --------------------------
   COMPACTION_WORKER("Compaction-Worker"),
   COMPACTION_SUB_TASK("Compaction-Sub-Task"),
@@ -102,7 +101,6 @@ public enum ThreadName {
   PIPE_CONSENSUS_RPC_SERVICE("PipeConsensusRPC-Service"),
   PIPE_CONSENSUS_RPC_PROCESSOR("PipeConsensusRPC-Processor"),
   
ASYNC_DATANODE_PIPE_CONSENSUS_CLIENT_POOL("AsyncDataNodePipeConsensusServiceClientPool"),
-
   // -------------------------- IoTConsensus --------------------------
   IOT_CONSENSUS_RPC_SERVICE("IoTConsensusRPC-Service"),
   IOT_CONSENSUS_RPC_PROCESSOR("IoTConsensusRPC-Processor"),
@@ -172,6 +170,8 @@ public enum ThreadName {
   PROMETHEUS_REACTOR_HTTP_NIO("reactor-http-nio"),
   PROMETHEUS_BOUNDED_ELASTIC("boundedElastic-evictor"),
   // -------------------------- Other --------------------------
+  ACTIVE_LOAD_TSFILE_LOADER("Active-Load-TsFile-Loader"),
+  ACTIVE_LOAD_DIR_SCANNER("Active-Load-Dir-Scanner"),
   SETTLE("Settle"),
   INFLUXDB_RPC_SERVICE("InfluxdbRPC-Service"),
   INFLUXDB_RPC_PROCESSOR("InfluxdbRPC-Processor"),
@@ -355,6 +355,8 @@ public enum ThreadName {
   private static final Set<ThreadName> otherThreadNames =
       new HashSet<>(
           Arrays.asList(
+              ACTIVE_LOAD_TSFILE_LOADER,
+              ACTIVE_LOAD_DIR_SCANNER,
               SETTLE,
               INFLUXDB_RPC_SERVICE,
               INFLUXDB_RPC_PROCESSOR,
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
index 023e9114491..cbcedd497ad 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
@@ -243,6 +243,8 @@ public class IoTDBConstant {
   public static final String SYSTEM_FOLDER_NAME = "system";
   public static final String SCHEMA_FOLDER_NAME = "schema";
   public static final String LOAD_TSFILE_FOLDER_NAME = "load";
+  public static final String LOAD_TSFILE_ACTIVE_LISTENING_PENDING_FOLDER_NAME 
= "pending";
+  public static final String LOAD_TSFILE_ACTIVE_LISTENING_FAILED_FOLDER_NAME = 
"failed";
   public static final String SYNC_FOLDER_NAME = "sync";
   public static final String QUERY_FOLDER_NAME = "query";
   public static final String EXT_FOLDER_NAME = "ext";

Reply via email to