This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 34c63cf1219 Load: Support Datanode actively listening dirs and auto
load tsfile (#13085)
34c63cf1219 is described below
commit 34c63cf1219eac631015cb1a59f868ddc1ef12f0
Author: YC27 <[email protected]>
AuthorDate: Wed Aug 14 15:40:10 2024 +0800
Load: Support Datanode actively listening dirs and auto load tsfile (#13085)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 97 +++++++
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 86 +++++-
.../iotdb/db/exception/LoadEmptyFileException.java | 4 +-
.../execution/load/LoadTsFileManager.java | 10 +
.../load/active/ActiveLoadDirScanner.java | 175 ++++++++++++
.../load/active/ActiveLoadPendingQueue.java | 64 +++++
.../load/active/ActiveLoadTsFileLoader.java | 310 +++++++++++++++++++++
.../load/{ => limiter}/LoadTsFileRateLimiter.java | 2 +-
.../load/{ => splitter}/AlignedChunkData.java | 2 +-
.../execution/load/{ => splitter}/ChunkData.java | 2 +-
.../load/{ => splitter}/DeletionData.java | 2 +-
.../load/{ => splitter}/NonAlignedChunkData.java | 2 +-
.../execution/load/{ => splitter}/TsFileData.java | 2 +-
.../load/{ => splitter}/TsFileSplitter.java | 2 +-
.../queryengine/plan/analyze/AnalyzeVisitor.java | 6 +-
...TsfileAnalyzer.java => LoadTsFileAnalyzer.java} | 23 +-
.../plan/node/load/LoadTsFilePieceNode.java | 2 +-
.../plan/scheduler/load/LoadTsFileScheduler.java | 6 +-
.../iotdb/db/storageengine/StorageEngine.java | 2 +-
.../db/storageengine/dataregion/DataRegion.java | 2 +-
.../storageengine/dataregion/tsfile/TsFileID.java | 6 +-
.../conf/iotdb-system.properties.template | 44 ++-
.../iotdb/commons/concurrent/ThreadName.java | 6 +-
.../apache/iotdb/commons/conf/IoTDBConstant.java | 2 +
24 files changed, 814 insertions(+), 45 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 80bffe7074b..bbe142d8cea 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -1122,6 +1122,35 @@ public class IoTDBConfig {
private double loadWriteThroughputBytesPerSecond = -1; // Bytes/s
+ private boolean loadActiveListeningEnable = true;
+
+ private String[] loadActiveListeningDirs =
+ new String[] {
+ IoTDBConstant.EXT_FOLDER_NAME
+ + File.separator
+ + IoTDBConstant.LOAD_TSFILE_FOLDER_NAME
+ + File.separator
+ + IoTDBConstant.LOAD_TSFILE_ACTIVE_LISTENING_PENDING_FOLDER_NAME
+ };
+
+ private String loadActiveListeningPipeDir =
+ IoTDBConstant.EXT_FOLDER_NAME
+ + File.separator
+ + IoTDBConstant.LOAD_TSFILE_FOLDER_NAME
+ + File.separator
+ + IoTDBConstant.PIPE_FOLDER_NAME;
+
+ private String loadActiveListeningFailDir =
+ IoTDBConstant.EXT_FOLDER_NAME
+ + File.separator
+ + IoTDBConstant.LOAD_TSFILE_FOLDER_NAME
+ + File.separator
+ + IoTDBConstant.LOAD_TSFILE_ACTIVE_LISTENING_FAILED_FOLDER_NAME;
+
+ private long loadActiveListeningCheckIntervalSeconds = 5L;
+
+ private int loadActiveListeningMaxThreadNum = 8;
+
/** Pipe related */
/** initialized as empty, updated based on the latest `systemDir` during
querying */
private String[] pipeReceiverFileDirs = new String[0];
@@ -1287,6 +1316,11 @@ public class IoTDBConfig {
schemaRegionConsensusDir = addDataHomeDir(schemaRegionConsensusDir);
indexRootFolder = addDataHomeDir(indexRootFolder);
extDir = addDataHomeDir(extDir);
+ for (int i = 0; i < loadActiveListeningDirs.length; i++) {
+ loadActiveListeningDirs[i] = addDataHomeDir(loadActiveListeningDirs[i]);
+ }
+ loadActiveListeningPipeDir = addDataHomeDir(loadActiveListeningPipeDir);
+ loadActiveListeningFailDir = addDataHomeDir(loadActiveListeningFailDir);
udfDir = addDataHomeDir(udfDir);
udfTemporaryLibDir = addDataHomeDir(udfTemporaryLibDir);
triggerDir = addDataHomeDir(triggerDir);
@@ -3886,6 +3920,69 @@ public class IoTDBConfig {
this.loadWriteThroughputBytesPerSecond = loadWriteThroughputBytesPerSecond;
}
+ public int getLoadActiveListeningMaxThreadNum() {
+ return loadActiveListeningMaxThreadNum;
+ }
+
+ public void setLoadActiveListeningMaxThreadNum(int
loadActiveListeningMaxThreadNum) {
+ this.loadActiveListeningMaxThreadNum = loadActiveListeningMaxThreadNum;
+ }
+
+ public long getLoadActiveListeningCheckIntervalSeconds() {
+ return loadActiveListeningCheckIntervalSeconds;
+ }
+
+ public void setLoadActiveListeningCheckIntervalSeconds(
+ long loadActiveListeningCheckIntervalSeconds) {
+ this.loadActiveListeningCheckIntervalSeconds =
loadActiveListeningCheckIntervalSeconds;
+ }
+
+ public String getLoadActiveListeningFailDir() {
+ return loadActiveListeningFailDir == null ||
Objects.equals(loadActiveListeningFailDir, "")
+ ? extDir
+ + File.separator
+ + IoTDBConstant.LOAD_TSFILE_FOLDER_NAME
+ + File.separator
+ + IoTDBConstant.LOAD_TSFILE_ACTIVE_LISTENING_FAILED_FOLDER_NAME
+ : loadActiveListeningFailDir;
+ }
+
+ public void setLoadActiveListeningFailDir(String loadActiveListeningFailDir)
{
+ this.loadActiveListeningFailDir =
addDataHomeDir(loadActiveListeningFailDir);
+ }
+
+ public String getLoadActiveListeningPipeDir() {
+ return loadActiveListeningPipeDir;
+ }
+
+ public String[] getLoadActiveListeningDirs() {
+ return (Objects.isNull(this.loadActiveListeningDirs)
+ || this.loadActiveListeningDirs.length == 0)
+ ? new String[] {
+ extDir
+ + File.separator
+ + IoTDBConstant.LOAD_TSFILE_FOLDER_NAME
+ + File.separator
+ + IoTDBConstant.LOAD_TSFILE_ACTIVE_LISTENING_PENDING_FOLDER_NAME
+ }
+ : this.loadActiveListeningDirs;
+ }
+
+ public void setLoadActiveListeningDirs(String[] loadActiveListeningDirs) {
+ for (int i = 0; i < loadActiveListeningDirs.length; i++) {
+ loadActiveListeningDirs[i] = addDataHomeDir(loadActiveListeningDirs[i]);
+ }
+ this.loadActiveListeningDirs = loadActiveListeningDirs;
+ }
+
+ public boolean getLoadActiveListeningEnable() {
+ return loadActiveListeningEnable;
+ }
+
+ public void setLoadActiveListeningEnable(boolean loadActiveListeningEnable) {
+ this.loadActiveListeningEnable = loadActiveListeningEnable;
+ }
+
public void setPipeReceiverFileDirs(String[] pipeReceiverFileDirs) {
this.pipeReceiverFileDirs = pipeReceiverFileDirs;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 564c146bc94..77168a1794e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -1818,19 +1818,7 @@ public class IoTDBDescriptor {
loadCompactionHotModifiedProps(properties);
// update load config
- conf.setLoadCleanupTaskExecutionDelayTimeSeconds(
- Long.parseLong(
- properties.getProperty(
- "load_clean_up_task_execution_delay_time_seconds",
- ConfigurationFileUtils.getConfigurationDefaultValue(
- "load_clean_up_task_execution_delay_time_seconds"))));
-
- conf.setLoadWriteThroughputBytesPerSecond(
- Double.parseDouble(
- properties.getProperty(
- "load_write_throughput_bytes_per_second",
- ConfigurationFileUtils.getConfigurationDefaultValue(
- "load_write_throughput_bytes_per_second"))));
+ loadLoadTsFileHotModifiedProp(properties);
// update pipe config
commonDescriptor
@@ -2161,6 +2149,78 @@ public class IoTDBDescriptor {
properties.getProperty(
"load_write_throughput_bytes_per_second",
String.valueOf(conf.getLoadWriteThroughputBytesPerSecond()))));
+
+ conf.setLoadActiveListeningEnable(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "load_active_listening_enable",
+ Boolean.toString(conf.getLoadActiveListeningEnable()))));
+ conf.setLoadActiveListeningDirs(
+ Arrays.stream(
+ properties
+ .getProperty(
+ "load_active_listening_dirs",
+ String.join(",", conf.getLoadActiveListeningDirs()))
+ .trim()
+ .split(","))
+ .filter(dir -> !dir.isEmpty())
+ .toArray(String[]::new));
+ conf.setLoadActiveListeningFailDir(
+ properties.getProperty(
+ "load_active_listening_fail_dir",
conf.getLoadActiveListeningFailDir()));
+ conf.setLoadActiveListeningCheckIntervalSeconds(
+ Long.parseLong(
+ properties.getProperty(
+ "load_active_listening_check_interval_seconds",
+
Long.toString(conf.getLoadActiveListeningCheckIntervalSeconds()))));
+ conf.setLoadActiveListeningMaxThreadNum(
+ Integer.parseInt(
+ properties.getProperty(
+ "load_active_listening_max_thread_num",
+ Integer.toString(
+ Math.min(
+ conf.getLoadActiveListeningMaxThreadNum(),
+ Math.max(1, Runtime.getRuntime().availableProcessors()
/ 2))))));
+ }
+
+ private void loadLoadTsFileHotModifiedProp(Properties properties) throws
IOException {
+ conf.setLoadCleanupTaskExecutionDelayTimeSeconds(
+ Long.parseLong(
+ properties.getProperty(
+ "load_clean_up_task_execution_delay_time_seconds",
+ ConfigurationFileUtils.getConfigurationDefaultValue(
+ "load_clean_up_task_execution_delay_time_seconds"))));
+
+ conf.setLoadWriteThroughputBytesPerSecond(
+ Double.parseDouble(
+ properties.getProperty(
+ "load_write_throughput_bytes_per_second",
+ ConfigurationFileUtils.getConfigurationDefaultValue(
+ "load_write_throughput_bytes_per_second"))));
+
+ conf.setLoadActiveListeningEnable(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "load_active_listening_enable",
+ ConfigurationFileUtils.getConfigurationDefaultValue(
+ "load_active_listening_enable"))));
+ conf.setLoadActiveListeningDirs(
+ Arrays.stream(
+ properties
+ .getProperty(
+ "load_active_listening_dirs",
+ String.join(
+ ",",
+
ConfigurationFileUtils.getConfigurationDefaultValue(
+ "load_active_listening_dirs")))
+ .trim()
+ .split(","))
+ .filter(dir -> !dir.isEmpty())
+ .toArray(String[]::new));
+ conf.setLoadActiveListeningFailDir(
+ properties.getProperty(
+ "load_active_listening_fail_dir",
+
ConfigurationFileUtils.getConfigurationDefaultValue("load_active_listening_fail_dir")));
}
@SuppressWarnings("squid:S3518") // "proportionSum" can't be zero
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/LoadEmptyFileException.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/LoadEmptyFileException.java
index 1c9e9bc4590..59e8a1d13f0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/LoadEmptyFileException.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/LoadEmptyFileException.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.db.exception;
public class LoadEmptyFileException extends LoadFileException {
- public LoadEmptyFileException() {
- super("Cannot load an empty file");
+ public LoadEmptyFileException(final String fileName) {
+ super(fileName);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
index 1d2b7d9f378..509032f0b35 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
@@ -35,6 +35,11 @@ import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
import org.apache.iotdb.db.exception.LoadFileException;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
+import
org.apache.iotdb.db.queryengine.execution.load.active.ActiveLoadDirScanner;
+import
org.apache.iotdb.db.queryengine.execution.load.active.ActiveLoadTsFileLoader;
+import org.apache.iotdb.db.queryengine.execution.load.splitter.ChunkData;
+import org.apache.iotdb.db.queryengine.execution.load.splitter.DeletionData;
+import org.apache.iotdb.db.queryengine.execution.load.splitter.TsFileData;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
import
org.apache.iotdb.db.queryengine.plan.scheduler.load.LoadTsFileScheduler.LoadCommand;
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
@@ -92,9 +97,14 @@ public class LoadTsFileManager {
private final Map<String, CleanupTask> uuid2CleanupTask = new
ConcurrentHashMap<>();
private final PriorityBlockingQueue<CleanupTask> cleanupTaskQueue = new
PriorityBlockingQueue<>();
+ private final ActiveLoadTsFileLoader activeLoadTsFileLoader = new
ActiveLoadTsFileLoader();
+ private final ActiveLoadDirScanner activeLoadDirScanner =
+ new ActiveLoadDirScanner(activeLoadTsFileLoader);
+
public LoadTsFileManager() {
registerCleanupTaskExecutor();
recover();
+ activeLoadDirScanner.start();
}
private void registerCleanupTaskExecutor() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/active/ActiveLoadDirScanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/active/ActiveLoadDirScanner.java
new file mode 100644
index 00000000000..e49789a8c0b
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/active/ActiveLoadDirScanner.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.load.active;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.tsfile.common.conf.TSFileConfig;
+import org.apache.tsfile.read.TsFileSequenceReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class ActiveLoadDirScanner {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ActiveLoadDirScanner.class);
+
+ private static final String RESOURCE = ".resource";
+ private static final String MODS = ".mods";
+
+ private static final IoTDBConfig IOTDB_CONFIG =
IoTDBDescriptor.getInstance().getConfig();
+
+ private static final ScheduledExecutorService DIR_SCAN_JOB_EXECUTOR =
+ IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+ ThreadName.ACTIVE_LOAD_DIR_SCANNER.getName());
+ private static final long MIN_SCAN_INTERVAL_SECONDS =
+
IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningCheckIntervalSeconds();
+
+ private final AtomicReference<String[]> listeningDirsConfig = new
AtomicReference<>();
+ private final Set<String> listeningDirs = new HashSet<>();
+
+ private final ActiveLoadTsFileLoader activeLoadTsFileLoader;
+
+ private Future<?> dirScanJobFuture;
+
+ public ActiveLoadDirScanner(final ActiveLoadTsFileLoader
activeLoadTsFileLoader) {
+ this.activeLoadTsFileLoader = activeLoadTsFileLoader;
+ }
+
+ public synchronized void start() {
+ if (dirScanJobFuture == null) {
+ dirScanJobFuture =
+ ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
+ DIR_SCAN_JOB_EXECUTOR,
+ this::scanSafely,
+ MIN_SCAN_INTERVAL_SECONDS,
+ MIN_SCAN_INTERVAL_SECONDS,
+ TimeUnit.SECONDS);
+ LOGGER.info(
+ "Active load dir scanner started. Scan interval: {}s.",
MIN_SCAN_INTERVAL_SECONDS);
+ }
+ }
+
+ public synchronized void stop() {
+ if (dirScanJobFuture != null) {
+ dirScanJobFuture.cancel(false);
+ dirScanJobFuture = null;
+ LOGGER.info("Active load dir scanner stopped.");
+ }
+ }
+
+ private void scanSafely() {
+ try {
+ scan();
+ } catch (final Exception e) {
+ LOGGER.warn("Error occurred during active load dir scanning.", e);
+ }
+ }
+
+ private void scan() throws IOException {
+ hotReloadActiveLoadDirs();
+
+ for (final String listeningDir : listeningDirs) {
+ final int currentAllowedPendingSize =
activeLoadTsFileLoader.getCurrentAllowedPendingSize();
+ if (currentAllowedPendingSize <= 0) {
+ return;
+ }
+
+ final boolean isGeneratedByPipe =
+ listeningDir.equals(IOTDB_CONFIG.getLoadActiveListeningPipeDir());
+ FileUtils.streamFiles(new File(listeningDir), true, (String[]) null)
+ .map(
+ file ->
+ (file.getName().endsWith(RESOURCE) ||
file.getName().endsWith(MODS))
+ ? getTsFilePath(file.getAbsolutePath())
+ : file.getAbsolutePath())
+ .filter(this::isTsFileCompleted)
+ .limit(currentAllowedPendingSize)
+ .forEach(file -> activeLoadTsFileLoader.tryTriggerTsFileLoad(file,
isGeneratedByPipe));
+ }
+ }
+
+ private boolean isTsFileCompleted(final String file) {
+ try (final TsFileSequenceReader reader = new TsFileSequenceReader(file,
false)) {
+ return TSFileConfig.MAGIC_STRING.equals(reader.readTailMagic());
+ } catch (final Exception e) {
+ return false;
+ }
+ }
+
+ private void hotReloadActiveLoadDirs() {
+ try {
+ // Hot reload active load listening dirs if active listening is enabled
+ if (IOTDB_CONFIG.getLoadActiveListeningEnable()) {
+ if (IOTDB_CONFIG.getLoadActiveListeningDirs() !=
listeningDirsConfig.get()) {
+ synchronized (this) {
+ if (IOTDB_CONFIG.getLoadActiveListeningDirs() !=
listeningDirsConfig.get()) {
+
listeningDirsConfig.set(IOTDB_CONFIG.getLoadActiveListeningDirs());
+
listeningDirs.addAll(Arrays.asList(IOTDB_CONFIG.getLoadActiveListeningDirs()));
+ }
+ }
+ }
+ }
+
+ // Hot reload active load listening dir for pipe data sync
+ // Active load is always enabled for pipe data sync
+ listeningDirs.add(IOTDB_CONFIG.getLoadActiveListeningPipeDir());
+
+ // Create directories if not exists
+ listeningDirs.forEach(this::createDirectoriesIfNotExists);
+ } catch (final Exception e) {
+ LOGGER.warn(
+ "Error occurred during hot reload active load dirs. "
+ + "Current active load listening dirs: {}.",
+ listeningDirs,
+ e);
+ }
+ }
+
+ private void createDirectoriesIfNotExists(final String dirPath) {
+ try {
+ FileUtils.forceMkdir(new File(dirPath));
+ } catch (final IOException e) {
+ LOGGER.warn("Error occurred during creating directory {} for active
load.", dirPath, e);
+ }
+ }
+
+ private static String getTsFilePath(final String
filePathWithResourceOrModsTail) {
+ return filePathWithResourceOrModsTail.endsWith(RESOURCE)
+ ? filePathWithResourceOrModsTail.substring(
+ 0, filePathWithResourceOrModsTail.length() - RESOURCE.length())
+ : filePathWithResourceOrModsTail.substring(
+ 0, filePathWithResourceOrModsTail.length() - MODS.length());
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/active/ActiveLoadPendingQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/active/ActiveLoadPendingQueue.java
new file mode 100644
index 00000000000..618eff470c6
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/active/ActiveLoadPendingQueue.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.load.active;
+
+import org.apache.tsfile.utils.Pair;
+
+import java.util.HashSet;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+public class ActiveLoadPendingQueue {
+
+ private final Set<String> pendingFileSet = new HashSet<>();
+ private final Queue<Pair<String, Boolean>> pendingFileQueue = new
ConcurrentLinkedQueue<>();
+
+ private final Set<String> loadingFileSet = new HashSet<>();
+
+ public synchronized boolean enqueue(final String file, final boolean
isGeneratedByPipe) {
+ if (!loadingFileSet.contains(file) && pendingFileSet.add(file)) {
+ pendingFileQueue.offer(new Pair<>(file, isGeneratedByPipe));
+ return true;
+ }
+ return false;
+ }
+
+ public synchronized Pair<String, Boolean> dequeueFromPending() {
+ final Pair<String, Boolean> pair = pendingFileQueue.poll();
+ if (pair != null) {
+ pendingFileSet.remove(pair.left);
+ loadingFileSet.add(pair.left);
+ }
+ return pair;
+ }
+
+ public synchronized void removeFromLoading(final String file) {
+ loadingFileSet.remove(file);
+ }
+
+ public int size() {
+ return pendingFileQueue.size() + loadingFileSet.size();
+ }
+
+ public boolean isEmpty() {
+ return pendingFileQueue.isEmpty() && loadingFileSet.isEmpty();
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/active/ActiveLoadTsFileLoader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/active/ActiveLoadTsFileLoader.java
new file mode 100644
index 00000000000..fb4800deef9
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/active/ActiveLoadTsFileLoader.java
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.load.active;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.concurrent.IoTThreadFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import
org.apache.iotdb.commons.concurrent.threadpool.WrappedThreadPoolExecutor;
+import org.apache.iotdb.db.auth.AuthorityChecker;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.protocol.session.SessionManager;
+import org.apache.iotdb.db.queryengine.common.SessionInfo;
+import org.apache.iotdb.db.queryengine.plan.Coordinator;
+import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher;
+import
org.apache.iotdb.db.queryengine.plan.analyze.schema.ClusterSchemaFetcher;
+import org.apache.iotdb.db.queryengine.plan.statement.Statement;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.tsfile.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
+import java.time.ZoneId;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.LockSupport;
+
+public class ActiveLoadTsFileLoader {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ActiveLoadTsFileLoader.class);
+
+ private static final int MAX_PENDING_SIZE = 1000;
+ private final ActiveLoadPendingQueue pendingQueue = new
ActiveLoadPendingQueue();
+
+ private final AtomicReference<WrappedThreadPoolExecutor> activeLoadExecutor =
+ new AtomicReference<>();
+ private final AtomicReference<String> failDir = new AtomicReference<>();
+
+ public int getCurrentAllowedPendingSize() {
+ return MAX_PENDING_SIZE - pendingQueue.size();
+ }
+
+ public void tryTriggerTsFileLoad(String absolutePath, boolean
isGeneratedByPipe) {
+ if (pendingQueue.enqueue(absolutePath, isGeneratedByPipe)) {
+ initFailDirIfNecessary();
+ adjustExecutorIfNecessary();
+ }
+ }
+
+ private void initFailDirIfNecessary() {
+ if (failDir.get() == null) {
+ synchronized (failDir) {
+ if (failDir.get() == null) {
+ final File failDirFile =
+ new
File(IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningFailDir());
+ try {
+ FileUtils.forceMkdir(failDirFile);
+ } catch (final IOException e) {
+ LOGGER.warn(
+ "Error occurred during creating fail directory {} for active
load.",
+ failDirFile,
+ e);
+ }
+
failDir.set(IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningFailDir());
+ }
+ }
+ }
+ }
+
+ private void adjustExecutorIfNecessary() {
+ if (activeLoadExecutor.get() == null) {
+ synchronized (activeLoadExecutor) {
+ if (activeLoadExecutor.get() == null) {
+ activeLoadExecutor.set(
+ new WrappedThreadPoolExecutor(
+
IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningMaxThreadNum(),
+
IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningMaxThreadNum(),
+ 0L,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(),
+ new
IoTThreadFactory(ThreadName.ACTIVE_LOAD_TSFILE_LOADER.name()),
+ ThreadName.ACTIVE_LOAD_TSFILE_LOADER.name()));
+ }
+ }
+ }
+
+ final int targetCorePoolSize =
+ Math.min(
+ pendingQueue.size(),
+
IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningMaxThreadNum());
+
+ if (activeLoadExecutor.get().getCorePoolSize() != targetCorePoolSize) {
+ activeLoadExecutor.get().setCorePoolSize(targetCorePoolSize);
+ }
+
+ // calculate how many threads need to be loaded
+ final int threadsToBeAdded =
+ Math.max(targetCorePoolSize -
activeLoadExecutor.get().getActiveCount(), 0);
+ for (int i = 0; i < threadsToBeAdded; i++) {
+ activeLoadExecutor.get().execute(this::tryLoadPendingTsFiles);
+ }
+ }
+
+ private void tryLoadPendingTsFiles() {
+ while (true) {
+ final Optional<Pair<String, Boolean>> filePair = tryGetNextPendingFile();
+ if (!filePair.isPresent()) {
+ return;
+ }
+
+ try {
+ final TSStatus result = loadTsFile(filePair.get());
+ if (result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ || result.getCode() ==
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
+ LOGGER.info(
+ "Successfully auto load tsfile {} (isGeneratedByPipe = {})",
+ filePair.get().getLeft(),
+ filePair.get().getRight());
+ } else {
+ handleLoadFailure(filePair.get(), result);
+ }
+ } catch (final FileNotFoundException e) {
+ handleFileNotFoundException(filePair.get());
+ } catch (final Exception e) {
+ handleOtherException(filePair.get(), e);
+ } finally {
+ pendingQueue.removeFromLoading(filePair.get().getLeft());
+ }
+ }
+ }
+
+ private Optional<Pair<String, Boolean>> tryGetNextPendingFile() {
+ final long maxRetryTimes =
+ Math.max(
+ 1,
+
IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningCheckIntervalSeconds()
+ << 1);
+ long currentRetryTimes = 0;
+
+ while (true) {
+ final Pair<String, Boolean> filePair = pendingQueue.dequeueFromPending();
+ if (Objects.nonNull(filePair)) {
+ return Optional.of(filePair);
+ }
+
+ LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
+
+ if (currentRetryTimes++ >= maxRetryTimes) {
+ return Optional.empty();
+ }
+ }
+ }
+
+ private TSStatus loadTsFile(final Pair<String, Boolean> filePair) throws
FileNotFoundException {
+ final LoadTsFileStatement statement = new
LoadTsFileStatement(filePair.getLeft());
+ statement.setDeleteAfterLoad(true);
+ statement.setVerifySchema(true);
+ statement.setAutoCreateDatabase(false);
+ return executeStatement(filePair.getRight() ? new
PipeEnrichedStatement(statement) : statement);
+ }
+
+ private TSStatus executeStatement(final Statement statement) {
+ return Coordinator.getInstance()
+ .executeForTreeModel(
+ statement,
+ SessionManager.getInstance().requestQueryId(),
+ new SessionInfo(0, AuthorityChecker.SUPER_USER,
ZoneId.systemDefault()),
+ "",
+ ClusterPartitionFetcher.getInstance(),
+ ClusterSchemaFetcher.getInstance(),
+
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold())
+ .status;
+ }
+
+ private void handleLoadFailure(final Pair<String, Boolean> filePair, final
TSStatus status) {
+ LOGGER.warn(
+ "Failed to auto load tsfile {} (isGeneratedByPipe = {}), status: {}.
File will be moved to fail directory.",
+ filePair.getLeft(),
+ filePair.getRight(),
+ status);
+ removeFileAndResourceAndModsToFailDir(filePair.getLeft());
+ }
+
+ private void handleFileNotFoundException(final Pair<String, Boolean>
filePair) {
+ LOGGER.warn(
+ "Failed to auto load tsfile {} (isGeneratedByPipe = {}) due to file
not found, will skip this file.",
+ filePair.getLeft(),
+ filePair.getRight());
+ removeFileAndResourceAndModsToFailDir(filePair.getLeft());
+ }
+
+ private void handleOtherException(final Pair<String, Boolean> filePair,
final Exception e) {
+ if (e.getMessage() != null && e.getMessage().contains("memory")) {
+ LOGGER.info(
+ "Rejecting auto load tsfile {} (isGeneratedByPipe = {}) due to
memory constraints, will retry later.",
+ filePair.getLeft(),
+ filePair.getRight());
+ pendingQueue.enqueue(filePair.getLeft(), filePair.getRight());
+ } else {
+ LOGGER.warn(
+ "Failed to auto load tsfile {} (isGeneratedByPipe = {}) because of
an unexpected exception. File will be moved to fail directory.",
+ filePair.getLeft(),
+ filePair.getRight(),
+ e);
+ removeFileAndResourceAndModsToFailDir(filePair.getLeft());
+ }
+ }
+
+ private void removeFileAndResourceAndModsToFailDir(final String filePath) {
+ removeToFailDir(filePath);
+ removeToFailDir(filePath + ".resource");
+ removeToFailDir(filePath + ".mods");
+ }
+
+ private void removeToFailDir(final String filePath) {
+ final File sourceFile = new File(filePath);
+ // prevent the resource or mods not exist
+ if (!sourceFile.exists()) {
+ return;
+ }
+
+ final File targetDir = new File(failDir.get());
+ try {
+ moveFileWithMD5Check(sourceFile, targetDir);
+ } catch (final IOException e) {
+ LOGGER.warn("Error occurred during moving file {} to fail directory.",
filePath, e);
+ }
+ }
+
+ private static void moveFileWithMD5Check(final File sourceFile, final File
targetDir)
+ throws IOException {
+ final String sourceFileName = sourceFile.getName();
+ final File targetFile = new File(targetDir, sourceFileName);
+
+ if (targetFile.exists()) {
+ if (haveSameMD5(sourceFile, targetFile)) {
+ FileUtils.forceDelete(sourceFile);
+ LOGGER.info(
+ "Deleted the file {} because it already exists in the fail
directory: {}",
+ sourceFile.getName(),
+ targetDir.getAbsolutePath());
+ } else {
+ renameWithMD5(sourceFile, targetDir);
+ LOGGER.info(
+ "Renamed file {} to {} because it already exists in the fail
directory: {}",
+ sourceFile.getName(),
+ targetFile.getName(),
+ targetDir.getAbsolutePath());
+ }
+ } else {
+ FileUtils.moveFileToDirectory(sourceFile, targetDir, true);
+ LOGGER.info(
+ "Moved file {} to fail directory {}.", sourceFile.getName(),
targetDir.getAbsolutePath());
+ }
+ }
+
+ private static boolean haveSameMD5(final File file1, final File file2) {
+ try (final InputStream is1 = Files.newInputStream(file1.toPath());
+ final InputStream is2 = Files.newInputStream(file2.toPath())) {
+ return DigestUtils.md5Hex(is1).equals(DigestUtils.md5Hex(is2));
+ } catch (final Exception e) {
+ return false;
+ }
+ }
+
+ private static void renameWithMD5(File sourceFile, File targetDir) throws
IOException {
+ try (final InputStream is = Files.newInputStream(sourceFile.toPath())) {
+ final String sourceFileBaseName =
FilenameUtils.getBaseName(sourceFile.getName());
+ final String sourceFileExtension =
FilenameUtils.getExtension(sourceFile.getName());
+ final String sourceFileMD5 = DigestUtils.md5Hex(is);
+
+ final String targetFileName =
+ sourceFileBaseName + "-" + sourceFileMD5.substring(0, 16) + "." +
sourceFileExtension;
+ final File targetFile = new File(targetDir, targetFileName);
+
+ FileUtils.moveFile(sourceFile, targetFile,
StandardCopyOption.REPLACE_EXISTING);
+ }
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileRateLimiter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/limiter/LoadTsFileRateLimiter.java
similarity index 98%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileRateLimiter.java
rename to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/limiter/LoadTsFileRateLimiter.java
index 9876256e916..c10145b0398 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileRateLimiter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/limiter/LoadTsFileRateLimiter.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.queryengine.execution.load;
+package org.apache.iotdb.db.queryengine.execution.load.limiter;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.db.conf.IoTDBConfig;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/AlignedChunkData.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/splitter/AlignedChunkData.java
similarity index 99%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/AlignedChunkData.java
rename to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/splitter/AlignedChunkData.java
index 00d8a605a75..3726bd5610e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/AlignedChunkData.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/splitter/AlignedChunkData.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.queryengine.execution.load;
+package org.apache.iotdb.db.queryengine.execution.load.splitter;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.utils.TimePartitionUtils;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/ChunkData.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/splitter/ChunkData.java
similarity index 97%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/ChunkData.java
rename to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/splitter/ChunkData.java
index a98d1b3ec31..3ab26c0b49a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/ChunkData.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/splitter/ChunkData.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.queryengine.execution.load;
+package org.apache.iotdb.db.queryengine.execution.load.splitter;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/DeletionData.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/splitter/DeletionData.java
similarity index 97%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/DeletionData.java
rename to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/splitter/DeletionData.java
index eda84fa096e..940cd2e75b2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/DeletionData.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/splitter/DeletionData.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.queryengine.execution.load;
+package org.apache.iotdb.db.queryengine.execution.load.splitter;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/NonAlignedChunkData.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/splitter/NonAlignedChunkData.java
similarity index 99%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/NonAlignedChunkData.java
rename to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/splitter/NonAlignedChunkData.java
index 2ae0acd2180..e4abec81e7e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/NonAlignedChunkData.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/splitter/NonAlignedChunkData.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.queryengine.execution.load;
+package org.apache.iotdb.db.queryengine.execution.load.splitter;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.utils.TimePartitionUtils;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileData.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/splitter/TsFileData.java
similarity index 95%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileData.java
rename to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/splitter/TsFileData.java
index 93d56482b52..eece426fdbe 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileData.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/splitter/TsFileData.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.queryengine.execution.load;
+package org.apache.iotdb.db.queryengine.execution.load.splitter;
import org.apache.iotdb.commons.exception.IllegalPathException;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/splitter/TsFileSplitter.java
similarity index 99%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitter.java
rename to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/splitter/TsFileSplitter.java
index 2a07dd6b1c4..6fd587c69c2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/splitter/TsFileSplitter.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.queryengine.execution.load;
+package org.apache.iotdb.db.queryengine.execution.load.splitter;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.utils.TimePartitionUtils;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
index 535261849d4..81c7ef4fd8a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
@@ -2819,9 +2819,9 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
context.setQueryType(QueryType.WRITE);
final long startTime = System.nanoTime();
- try (final LoadTsfileAnalyzer loadTsfileAnalyzer =
- new LoadTsfileAnalyzer(loadTsFileStatement, context, partitionFetcher,
schemaFetcher)) {
- return loadTsfileAnalyzer.analyzeFileByFile();
+ try (final LoadTsFileAnalyzer loadTsfileAnalyzer =
+ new LoadTsFileAnalyzer(loadTsFileStatement, context, partitionFetcher,
schemaFetcher)) {
+ return
loadTsfileAnalyzer.analyzeFileByFile(loadTsFileStatement.isDeleteAfterLoad());
} catch (final Exception e) {
final String exceptionMessage =
String.format(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsfileAnalyzer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
similarity index 97%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsfileAnalyzer.java
rename to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
index 67d25827e62..c7ec099903d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsfileAnalyzer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
@@ -36,6 +36,7 @@ import
org.apache.iotdb.confignode.rpc.thrift.TShowDatabaseResp;
import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.LoadEmptyFileException;
import org.apache.iotdb.db.exception.LoadFileException;
import org.apache.iotdb.db.exception.LoadReadOnlyException;
import org.apache.iotdb.db.exception.LoadRuntimeOutOfMemoryException;
@@ -66,6 +67,7 @@ import org.apache.iotdb.db.utils.constant.SqlConstant;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.commons.io.FileUtils;
import org.apache.thrift.TException;
import org.apache.tsfile.common.constant.TsFileConstant;
import org.apache.tsfile.enums.TSDataType;
@@ -95,9 +97,9 @@ import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
-public class LoadTsfileAnalyzer implements AutoCloseable {
+public class LoadTsFileAnalyzer implements AutoCloseable {
- private static final Logger LOGGER =
LoggerFactory.getLogger(LoadTsfileAnalyzer.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(LoadTsFileAnalyzer.class);
private static final IClientManager<ConfigRegionId, ConfigNodeClient>
CONFIG_NODE_CLIENT_MANAGER =
ConfigNodeClientManager.getInstance();
@@ -123,7 +125,7 @@ public class LoadTsfileAnalyzer implements AutoCloseable {
private final SchemaAutoCreatorAndVerifier schemaAutoCreatorAndVerifier;
- LoadTsfileAnalyzer(
+ LoadTsFileAnalyzer(
LoadTsFileStatement loadTsFileStatement,
MPPQueryContext context,
IPartitionFetcher partitionFetcher,
@@ -137,7 +139,7 @@ public class LoadTsfileAnalyzer implements AutoCloseable {
this.schemaAutoCreatorAndVerifier = new SchemaAutoCreatorAndVerifier();
}
- public Analysis analyzeFileByFile() {
+ public Analysis analyzeFileByFile(final boolean isDeleteAfterLoad) {
final Analysis analysis = new Analysis();
// check if the system is read only
@@ -165,7 +167,7 @@ public class LoadTsfileAnalyzer implements AutoCloseable {
}
try {
- analyzeSingleTsFile(tsFile);
+ analyzeSingleTsFile(tsFile, isDeleteAfterLoad);
if (LOGGER.isInfoEnabled()) {
LOGGER.info(
"Load - Analysis Stage: {}/{} tsfiles have been analyzed,
progress: {}%",
@@ -225,7 +227,8 @@ public class LoadTsfileAnalyzer implements AutoCloseable {
schemaAutoCreatorAndVerifier.close();
}
- private void analyzeSingleTsFile(File tsFile) throws IOException,
AuthException {
+ private void analyzeSingleTsFile(final File tsFile, final boolean
isDeleteAfterLoad)
+ throws IOException, AuthException {
try (final TsFileSequenceReader reader = new
TsFileSequenceReader(tsFile.getAbsolutePath())) {
// can be reused when constructing tsfile resource
final TsFileSequenceReaderTimeseriesMetadataIterator
timeseriesMetadataIterator =
@@ -243,8 +246,7 @@ public class LoadTsfileAnalyzer implements AutoCloseable {
// check if the tsfile is empty
if (!timeseriesMetadataIterator.hasNext()) {
- LOGGER.warn("device2TimeseriesMetadata is empty, because maybe the
tsfile is empty");
- return;
+ throw new LoadEmptyFileException(tsFile.getAbsolutePath());
}
long writePointCount = 0;
@@ -277,6 +279,11 @@ public class LoadTsfileAnalyzer implements AutoCloseable {
loadTsFileStatement.addTsFileResource(tsFileResource);
loadTsFileStatement.addWritePointCount(writePointCount);
+ } catch (final LoadEmptyFileException loadEmptyFileException) {
+ LOGGER.warn("Failed to load empty file: {}", tsFile.getAbsolutePath());
+ if (isDeleteAfterLoad) {
+ FileUtils.deleteQuietly(tsFile);
+ }
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFilePieceNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFilePieceNode.java
index c3c4c3fa2e3..b8dd843e27c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFilePieceNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFilePieceNode.java
@@ -21,7 +21,7 @@ package
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.commons.exception.IllegalPathException;
-import org.apache.iotdb.db.queryengine.execution.load.TsFileData;
+import org.apache.iotdb.db.queryengine.execution.load.splitter.TsFileData;
import org.apache.iotdb.db.queryengine.plan.analyze.IAnalysis;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
index 08a146bab51..87ec3b42ed0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
@@ -45,9 +45,9 @@ import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
import org.apache.iotdb.db.queryengine.execution.QueryStateMachine;
import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInfo;
-import org.apache.iotdb.db.queryengine.execution.load.ChunkData;
-import org.apache.iotdb.db.queryengine.execution.load.TsFileData;
-import org.apache.iotdb.db.queryengine.execution.load.TsFileSplitter;
+import org.apache.iotdb.db.queryengine.execution.load.splitter.ChunkData;
+import org.apache.iotdb.db.queryengine.execution.load.splitter.TsFileData;
+import org.apache.iotdb.db.queryengine.execution.load.splitter.TsFileSplitter;
import org.apache.iotdb.db.queryengine.load.LoadTsFileDataCacheMemoryBlock;
import org.apache.iotdb.db.queryengine.load.LoadTsFileMemoryManager;
import org.apache.iotdb.db.queryengine.metric.load.LoadTsFileCostMetricsSet;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
index 89aa5e88096..04e9054fd95 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
@@ -53,7 +53,7 @@ import org.apache.iotdb.db.exception.TsFileProcessorException;
import org.apache.iotdb.db.exception.WriteProcessRejectException;
import org.apache.iotdb.db.exception.runtime.StorageEngineFailureException;
import org.apache.iotdb.db.queryengine.execution.load.LoadTsFileManager;
-import org.apache.iotdb.db.queryengine.execution.load.LoadTsFileRateLimiter;
+import
org.apache.iotdb.db.queryengine.execution.load.limiter.LoadTsFileRateLimiter;
import
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeTTLCache;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 4ba71db22d2..81e895a762f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -51,7 +51,7 @@ import
org.apache.iotdb.db.exception.quota.ExceedQuotaException;
import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.listener.PipeInsertionDataNodeListener;
import org.apache.iotdb.db.queryengine.common.DeviceContext;
import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
-import org.apache.iotdb.db.queryengine.execution.load.LoadTsFileRateLimiter;
+import
org.apache.iotdb.db.queryengine.execution.load.limiter.LoadTsFileRateLimiter;
import org.apache.iotdb.db.queryengine.metric.QueryResourceMetricSet;
import
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeSchemaCache;
import
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeTTLCache;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileID.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileID.java
index 37b2d082384..dbb7511009f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileID.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileID.java
@@ -52,12 +52,12 @@ public class TsFileID {
if (pathLength >= 3) {
try {
tmpRegionId = Integer.parseInt(pathSegments[pathLength - 3]);
- } catch (NumberFormatException e) {
+ } catch (Exception e) {
// ignore, load will get in here
}
try {
tmpTimePartitionId = Long.parseLong(pathSegments[pathLength - 2]);
- } catch (NumberFormatException e) {
+ } catch (Exception e) {
// ignore, load will get in here
}
}
@@ -67,7 +67,7 @@ public class TsFileID {
long[] arr = null;
try {
arr = splitAndGetVersionArray(pathSegments[pathLength - 1]);
- } catch (NumberFormatException e) {
+ } catch (Exception e) {
// ignore, load will get in here
}
this.fileVersion = arr == null || arr.length != 2 ? -1 : arr[0];
diff --git
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
index 279aa063b85..f75b96d9db0 100644
---
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
+++
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
@@ -451,7 +451,7 @@ cn_max_client_count_for_each_node_in_client_manager=300
# Datatype: int
dn_session_timeout_threshold=0
-# weather enable thrift compression
+# whether enable thrift compression
# effectiveMode: restart
# Datatype: boolean
dn_rpc_thrift_compression_enable=false
@@ -1867,6 +1867,48 @@ load_clean_up_task_execution_delay_time_seconds=1800
# Datatype: int
load_write_throughput_bytes_per_second=-1
+# Whether to enable the active listening mode for tsfile loading.
+# effectiveMode: hot_reload
+# Datatype: Boolean
+load_active_listening_enable=true
+
+# The directory to be actively listened for tsfile loading.
+# Multiple directories should be separated by a ','.
+# The default directory is 'ext/load/pending'.
+# effectiveMode: hot_reload
+# Datatype: String
+# For windows platform
+# If its prefix is a drive specifier followed by "\\", or if its prefix is
"\\\\", then the path is absolute.
+# Otherwise, it is relative.
+# load_active_listening_dirs=ext\\load\\pending
+# For Linux platform
+# If its prefix is "/", then the path is absolute. Otherwise, it is relative.
+load_active_listening_dirs=ext/load/pending
+
+# The directory where tsfile are moved if the active listening mode fails to
load them.
+# Only one directory can be configured.
+# effectiveMode: hot_reload
+# Datatype: String
+# For windows platform
+# If its prefix is a drive specifier followed by "\\", or if its prefix is
"\\\\", then the path is absolute.
+# Otherwise, it is relative.
+# load_active_listening_fail_dir=ext\\load\\failed
+# For Linux platform
+# If its prefix is "/", then the path is absolute. Otherwise, it is relative.
+load_active_listening_fail_dir=ext/load/failed
+
+# The maximum number of threads that can be used to load tsfile actively.
+# The default value, when this parameter is commented out, is the minimum of 8
or the number of CPU cores.
+# effectiveMode: restart
+# Datatype: int
+load_active_listening_max_thread_num=8
+
+# The interval specified in seconds for the active listening mode to check the
directory specified in load_active_listening_dirs.
+# The active listening mode will check the directory every
load_active_listening_check_interval_seconds seconds.
+# effectiveMode: restart
+# Datatype: int
+load_active_listening_check_interval_seconds=5
+
####################
### Dispatch Retry Configuration
####################
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index 00701d163e6..3765165b2c6 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -45,7 +45,6 @@ public enum ThreadName {
DATANODE_INTERNAL_RPC_PROCESSOR("DataNodeInternalRPC-Processor"),
MPP_COORDINATOR_WRITE_EXECUTOR("MPP-Coordinator-Write-Executor"),
ASYNC_DATANODE_MPP_DATA_EXCHANGE_CLIENT_POOL("AsyncDataNodeMPPDataExchangeServiceClientPool"),
-
// -------------------------- Compaction --------------------------
COMPACTION_WORKER("Compaction-Worker"),
COMPACTION_SUB_TASK("Compaction-Sub-Task"),
@@ -102,7 +101,6 @@ public enum ThreadName {
PIPE_CONSENSUS_RPC_SERVICE("PipeConsensusRPC-Service"),
PIPE_CONSENSUS_RPC_PROCESSOR("PipeConsensusRPC-Processor"),
ASYNC_DATANODE_PIPE_CONSENSUS_CLIENT_POOL("AsyncDataNodePipeConsensusServiceClientPool"),
-
// -------------------------- IoTConsensus --------------------------
IOT_CONSENSUS_RPC_SERVICE("IoTConsensusRPC-Service"),
IOT_CONSENSUS_RPC_PROCESSOR("IoTConsensusRPC-Processor"),
@@ -172,6 +170,8 @@ public enum ThreadName {
PROMETHEUS_REACTOR_HTTP_NIO("reactor-http-nio"),
PROMETHEUS_BOUNDED_ELASTIC("boundedElastic-evictor"),
// -------------------------- Other --------------------------
+ ACTIVE_LOAD_TSFILE_LOADER("Active-Load-TsFile-Loader"),
+ ACTIVE_LOAD_DIR_SCANNER("Active-Load-Dir-Scanner"),
SETTLE("Settle"),
INFLUXDB_RPC_SERVICE("InfluxdbRPC-Service"),
INFLUXDB_RPC_PROCESSOR("InfluxdbRPC-Processor"),
@@ -355,6 +355,8 @@ public enum ThreadName {
private static final Set<ThreadName> otherThreadNames =
new HashSet<>(
Arrays.asList(
+ ACTIVE_LOAD_TSFILE_LOADER,
+ ACTIVE_LOAD_DIR_SCANNER,
SETTLE,
INFLUXDB_RPC_SERVICE,
INFLUXDB_RPC_PROCESSOR,
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
index 023e9114491..cbcedd497ad 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
@@ -243,6 +243,8 @@ public class IoTDBConstant {
public static final String SYSTEM_FOLDER_NAME = "system";
public static final String SCHEMA_FOLDER_NAME = "schema";
public static final String LOAD_TSFILE_FOLDER_NAME = "load";
+ public static final String LOAD_TSFILE_ACTIVE_LISTENING_PENDING_FOLDER_NAME
= "pending";
+ public static final String LOAD_TSFILE_ACTIVE_LISTENING_FAILED_FOLDER_NAME =
"failed";
public static final String SYNC_FOLDER_NAME = "sync";
public static final String QUERY_FOLDER_NAME = "query";
public static final String EXT_FOLDER_NAME = "ext";