This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 7e4e01cc632 Load & Pipe: Support Active Load Table Model TsFiles & 
Support Async Load in SQL & Support Async Load Strategy in Pipe (#15208)
7e4e01cc632 is described below

commit 7e4e01cc6327a3163432c05b97e46dd70df923a2
Author: Zhenyu Luo <[email protected]>
AuthorDate: Fri Mar 28 23:21:08 2025 +0800

    Load & Pipe: Support Active Load Table Model TsFiles & Support Async Load 
in SQL & Support Async Load Strategy in Pipe (#15208)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../protocol/thrift/IoTDBDataNodeReceiver.java     |  20 ++--
 .../plan/analyze/load/LoadTsFileAnalyzer.java      |  76 +++++++++++++++
 .../plan/relational/sql/ast/LoadTsFile.java        |   6 ++
 .../plan/statement/crud/LoadTsFileStatement.java   |  10 ++
 .../load/active/ActiveLoadTsFileLoader.java        | 102 +++++++++++++--------
 .../load/config/LoadTsFileConfigurator.java        |  20 ++++
 .../org/apache/iotdb/commons/utils/FileUtils.java  |  49 ++++++++++
 7 files changed, 239 insertions(+), 44 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index 082655b45fe..ea2574f47ae 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -567,27 +567,31 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
 
   private TSStatus loadTsFileAsync(final String dataBaseName, final 
List<String> absolutePaths)
       throws IOException {
-    if (Objects.nonNull(dataBaseName)) {
-      throw new PipeException(
-          "Async load tsfile does not support table model tsfile. Given 
database name: "
-              + dataBaseName);
-    }
-
     final String loadActiveListeningPipeDir = 
IOTDB_CONFIG.getLoadActiveListeningPipeDir();
     if (Objects.isNull(loadActiveListeningPipeDir)) {
       throw new PipeException("Load active listening pipe dir is not set.");
     }
 
+    if (Objects.nonNull(dataBaseName)) {
+      final File targetDir = new File(loadActiveListeningPipeDir, 
dataBaseName);
+      return this.loadTsFileAsyncToTargetDir(targetDir, absolutePaths);
+    }
+
+    return loadTsFileAsyncToTargetDir(new File(loadActiveListeningPipeDir), 
absolutePaths);
+  }
+
+  private TSStatus loadTsFileAsyncToTargetDir(
+      final File targetDir, final List<String> absolutePaths) throws 
IOException {
     for (final String absolutePath : absolutePaths) {
       if (absolutePath == null) {
         continue;
       }
       final File sourceFile = new File(absolutePath);
       if (!Objects.equals(
-          loadActiveListeningPipeDir, 
sourceFile.getParentFile().getAbsolutePath())) {
+          targetDir.getAbsolutePath(), 
sourceFile.getParentFile().getAbsolutePath())) {
         RetryUtils.retryOnException(
             () -> {
-              FileUtils.moveFileWithMD5Check(sourceFile, new 
File(loadActiveListeningPipeDir));
+              FileUtils.moveFileWithMD5Check(sourceFile, targetDir);
               return null;
             });
       }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
index 3da8922f4e1..498e678b4bc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.queryengine.plan.analyze.load;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.auth.AuthException;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.utils.RetryUtils;
 import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.load.LoadAnalyzeException;
@@ -80,6 +81,8 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 
+import static org.apache.iotdb.commons.utils.FileUtils.copyFileWithMD5Check;
+import static org.apache.iotdb.commons.utils.FileUtils.moveFileWithMD5Check;
 import static 
org.apache.iotdb.db.queryengine.plan.execution.config.TableConfigTaskVisitor.DATABASE_NOT_SPECIFIED;
 import static 
org.apache.iotdb.db.queryengine.plan.execution.config.TableConfigTaskVisitor.validateDatabaseName;
 import static 
org.apache.iotdb.db.storageengine.load.metrics.LoadTsFileCostMetricsSet.ANALYSIS;
@@ -117,6 +120,7 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
   // User specified configs
   private final int databaseLevel;
   private String databaseForTableData;
+  private final boolean isAsyncLoad;
   private final boolean isVerifySchema;
   private final boolean isAutoCreateDatabase;
   private final boolean isDeleteAfterLoad;
@@ -143,6 +147,7 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
 
     this.databaseLevel = loadTsFileStatement.getDatabaseLevel();
     this.databaseForTableData = loadTsFileStatement.getDatabase();
+    this.isAsyncLoad = loadTsFileStatement.isAsyncLoad();
     this.isVerifySchema = loadTsFileStatement.isVerifySchema();
     this.isAutoCreateDatabase = loadTsFileStatement.isAutoCreateDatabase();
     this.isDeleteAfterLoad = loadTsFileStatement.isDeleteAfterLoad();
@@ -166,6 +171,7 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
 
     this.databaseLevel = loadTsFileTableStatement.getDatabaseLevel();
     this.databaseForTableData = loadTsFileTableStatement.getDatabase();
+    this.isAsyncLoad = loadTsFileTableStatement.isAsyncLoad();
     this.isVerifySchema = loadTsFileTableStatement.isVerifySchema();
     this.isAutoCreateDatabase = 
loadTsFileTableStatement.isAutoCreateDatabase();
     this.isDeleteAfterLoad = loadTsFileTableStatement.isDeleteAfterLoad();
@@ -199,6 +205,10 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
       return analysis;
     }
 
+    if (isAsyncLoad && doAsyncLoad(analysis)) {
+      return analysis;
+    }
+
     try {
       if (!doAnalyzeFileByFile(analysis)) {
         return analysis;
@@ -268,6 +278,72 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
     return true;
   }
 
+  private boolean doAsyncLoad(final IAnalysis analysis) {
+    final String[] loadActiveListeningDirs =
+        IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningDirs();
+    String targetFilePath = null;
+    for (int i = 0, size = loadActiveListeningDirs == null ? 0 : 
loadActiveListeningDirs.length;
+        i < size;
+        i++) {
+      if (loadActiveListeningDirs[i] != null) {
+        targetFilePath = loadActiveListeningDirs[i];
+        break;
+      }
+    }
+    if (targetFilePath == null) {
+      LOGGER.warn("Load active listening dir is not set. Will try sync load 
instead.");
+      return false;
+    }
+
+    try {
+      if (Objects.nonNull(databaseForTableData)) {
+        loadTsFilesAsyncToTargetDir(new File(targetFilePath, 
databaseForTableData), tsFiles);
+      } else {
+        loadTsFilesAsyncToTargetDir(new File(targetFilePath), tsFiles);
+      }
+    } catch (Exception e) {
+      LOGGER.warn(
+          "Failed to async load tsfiles {} to target dir {}. Will try sync 
load instead.",
+          tsFiles,
+          targetFilePath,
+          e);
+      return false;
+    }
+
+    analysis.setFinishQueryAfterAnalyze(true);
+    setRealStatement(analysis);
+    return true;
+  }
+
+  private void loadTsFilesAsyncToTargetDir(final File targetDir, final 
List<File> files)
+      throws IOException {
+    for (final File file : files) {
+      if (file == null) {
+        continue;
+      }
+
+      loadTsFileAsyncToTargetDir(targetDir, file);
+      loadTsFileAsyncToTargetDir(targetDir, new File(file.getAbsolutePath() + 
".resource"));
+      loadTsFileAsyncToTargetDir(targetDir, new File(file.getAbsolutePath() + 
".mods"));
+    }
+  }
+
+  private void loadTsFileAsyncToTargetDir(final File targetDir, final File 
file)
+      throws IOException {
+    if (!file.exists()) {
+      return;
+    }
+    RetryUtils.retryOnException(
+        () -> {
+          if (isDeleteAfterLoad) {
+            moveFileWithMD5Check(file, targetDir);
+          } else {
+            copyFileWithMD5Check(file, targetDir);
+          }
+          return null;
+        });
+  }
+
   private boolean doAnalyzeFileByFile(IAnalysis analysis) {
     // analyze tsfile metadata file by file
     for (int i = 0, tsfileNum = tsFiles.size(); i < tsfileNum; i++) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java
index f42ac22002e..180056c774e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java
@@ -46,6 +46,7 @@ public class LoadTsFile extends Statement {
   private long tabletConversionThresholdBytes = -1;
   private boolean autoCreateDatabase = true;
   private boolean verify = true;
+  private boolean isAsyncLoad = false;
 
   private boolean isGeneratedByPipe = false;
 
@@ -138,6 +139,10 @@ public class LoadTsFile extends Statement {
     return this;
   }
 
+  public boolean isAsyncLoad() {
+    return isAsyncLoad;
+  }
+
   public void markIsGeneratedByPipe() {
     isGeneratedByPipe = true;
   }
@@ -183,6 +188,7 @@ public class LoadTsFile extends Statement {
     this.tabletConversionThresholdBytes =
         
LoadTsFileConfigurator.parseOrGetDefaultTabletConversionThresholdBytes(loadAttributes);
     this.verify = 
LoadTsFileConfigurator.parseOrGetDefaultVerify(loadAttributes);
+    this.isAsyncLoad = 
LoadTsFileConfigurator.parseOrGetDefaultAsyncLoad(loadAttributes);
   }
 
   public boolean reconstructStatementIfMiniFileConverted(final List<Boolean> 
isMiniTsFile) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
index 2acd906cfa4..abb42b74417 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
@@ -43,6 +43,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static 
org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.ASYNC_LOAD_KEY;
 import static 
org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.CONVERT_ON_TYPE_MISMATCH_KEY;
 import static 
org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.DATABASE_LEVEL_KEY;
 import static 
org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.DATABASE_NAME_KEY;
@@ -62,6 +63,7 @@ public class LoadTsFileStatement extends Statement {
   private long tabletConversionThresholdBytes = -1;
   private boolean autoCreateDatabase = true;
   private boolean isGeneratedByPipe = false;
+  private boolean isAsyncLoad = false;
 
   private Map<String, String> loadAttributes;
 
@@ -249,6 +251,10 @@ public class LoadTsFileStatement extends Statement {
     initAttributes();
   }
 
+  public boolean isAsyncLoad() {
+    return isAsyncLoad;
+  }
+
   private void initAttributes() {
     this.databaseLevel = 
LoadTsFileConfigurator.parseOrGetDefaultDatabaseLevel(loadAttributes);
     this.database = LoadTsFileConfigurator.parseDatabaseName(loadAttributes);
@@ -258,6 +264,7 @@ public class LoadTsFileStatement extends Statement {
     this.tabletConversionThresholdBytes =
         
LoadTsFileConfigurator.parseOrGetDefaultTabletConversionThresholdBytes(loadAttributes);
     this.verifySchema = 
LoadTsFileConfigurator.parseOrGetDefaultVerify(loadAttributes);
+    this.isAsyncLoad = 
LoadTsFileConfigurator.parseOrGetDefaultAsyncLoad(loadAttributes);
   }
 
   public boolean reconstructStatementIfMiniFileConverted(final List<Boolean> 
isMiniTsFile) {
@@ -326,6 +333,7 @@ public class LoadTsFileStatement extends Statement {
     loadAttributes.put(CONVERT_ON_TYPE_MISMATCH_KEY, 
String.valueOf(convertOnTypeMismatch));
     loadAttributes.put(
         TABLET_CONVERSION_THRESHOLD_KEY, 
String.valueOf(tabletConversionThresholdBytes));
+    loadAttributes.put(ASYNC_LOAD_KEY, String.valueOf(isAsyncLoad));
 
     return new LoadTsFile(null, file.getAbsolutePath(), loadAttributes);
   }
@@ -350,6 +358,8 @@ public class LoadTsFileStatement extends Statement {
         + convertOnTypeMismatch
         + ", tablet-conversion-threshold="
         + tabletConversionThresholdBytes
+        + ", async-load="
+        + isAsyncLoad
         + ", tsFiles size="
         + tsFiles.size()
         + '}';
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
index 65665f5e3d4..2bae3175ca0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
@@ -24,12 +24,14 @@ import org.apache.iotdb.commons.concurrent.IoTThreadFactory;
 import org.apache.iotdb.commons.concurrent.ThreadName;
 import 
org.apache.iotdb.commons.concurrent.threadpool.WrappedThreadPoolExecutor;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.utils.RetryUtils;
 import org.apache.iotdb.db.auth.AuthorityChecker;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.protocol.session.IClientSession;
+import org.apache.iotdb.db.protocol.session.InternalClientSession;
 import org.apache.iotdb.db.protocol.session.SessionManager;
-import org.apache.iotdb.db.queryengine.common.SessionInfo;
 import org.apache.iotdb.db.queryengine.plan.Coordinator;
 import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher;
 import 
org.apache.iotdb.db.queryengine.plan.analyze.schema.ClusterSchemaFetcher;
@@ -54,6 +56,7 @@ import java.nio.file.Path;
 import java.nio.file.SimpleFileVisitor;
 import java.nio.file.attribute.BasicFileAttributes;
 import java.time.ZoneId;
+import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -67,6 +70,8 @@ public class ActiveLoadTsFileLoader {
 
   private static final IoTDBConfig IOTDB_CONFIG = 
IoTDBDescriptor.getInstance().getConfig();
 
+  private final SessionManager SESSION_MANAGER = SessionManager.getInstance();
+
   private static final int MAX_PENDING_SIZE = 1000;
   private final ActiveLoadPendingQueue pendingQueue = new 
ActiveLoadPendingQueue();
 
@@ -149,30 +154,43 @@ public class ActiveLoadTsFileLoader {
   }
 
   private void tryLoadPendingTsFiles() {
-    while (true) {
-      final Optional<Pair<String, Boolean>> filePair = tryGetNextPendingFile();
-      if (!filePair.isPresent()) {
-        return;
-      }
+    final IClientSession session =
+        new InternalClientSession(
+            String.format(
+                "%s_%s",
+                ActiveLoadTsFileLoader.class.getSimpleName(), 
Thread.currentThread().getName()));
+    session.setUsername(AuthorityChecker.SUPER_USER);
+    session.setClientVersion(IoTDBConstant.ClientVersion.V_1_0);
+    session.setZoneId(ZoneId.systemDefault());
+
+    try {
+      while (true) {
+        final Optional<Pair<String, Boolean>> filePair = 
tryGetNextPendingFile();
+        if (!filePair.isPresent()) {
+          return;
+        }
 
-      try {
-        final TSStatus result = loadTsFile(filePair.get());
-        if (result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
-            || result.getCode() == 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
-          LOGGER.info(
-              "Successfully auto load tsfile {} (isGeneratedByPipe = {})",
-              filePair.get().getLeft(),
-              filePair.get().getRight());
-        } else {
-          handleLoadFailure(filePair.get(), result);
+        try {
+          final TSStatus result = loadTsFile(filePair.get(), session);
+          if (result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+              || result.getCode() == 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
+            LOGGER.info(
+                "Successfully auto load tsfile {} (isGeneratedByPipe = {})",
+                filePair.get().getLeft(),
+                filePair.get().getRight());
+          } else {
+            handleLoadFailure(filePair.get(), result);
+          }
+        } catch (final FileNotFoundException e) {
+          handleFileNotFoundException(filePair.get());
+        } catch (final Exception e) {
+          handleOtherException(filePair.get(), e);
+        } finally {
+          pendingQueue.removeFromLoading(filePair.get().getLeft());
         }
-      } catch (final FileNotFoundException e) {
-        handleFileNotFoundException(filePair.get());
-      } catch (final Exception e) {
-        handleOtherException(filePair.get(), e);
-      } finally {
-        pendingQueue.removeFromLoading(filePair.get().getLeft());
       }
+    } finally {
+      SESSION_MANAGER.closeSession(session, 
Coordinator.getInstance()::cleanupQueryExecution);
     }
   }
 
@@ -195,27 +213,39 @@ public class ActiveLoadTsFileLoader {
     }
   }
 
-  private TSStatus loadTsFile(final Pair<String, Boolean> filePair) throws 
FileNotFoundException {
+  private TSStatus loadTsFile(final Pair<String, Boolean> filePair, final 
IClientSession session)
+      throws FileNotFoundException {
     final LoadTsFileStatement statement = new 
LoadTsFileStatement(filePair.getLeft());
+    final List<File> files = statement.getTsFiles();
+    if (!files.isEmpty()) {
+      final File parentFile = files.get(0).getParentFile();
+      statement.setDatabase(parentFile == null ? "null" : 
parentFile.getName());
+    }
     statement.setDeleteAfterLoad(true);
     statement.setConvertOnTypeMismatch(true);
     statement.setVerifySchema(isVerify);
     statement.setAutoCreateDatabase(false);
-    return executeStatement(filePair.getRight() ? new 
PipeEnrichedStatement(statement) : statement);
+    return executeStatement(
+        filePair.getRight() ? new PipeEnrichedStatement(statement) : 
statement, session);
   }
 
-  private TSStatus executeStatement(final Statement statement) {
-    return Coordinator.getInstance()
-        .executeForTreeModel(
-            statement,
-            SessionManager.getInstance().requestQueryId(),
-            new SessionInfo(0, AuthorityChecker.SUPER_USER, 
ZoneId.systemDefault()),
-            "",
-            ClusterPartitionFetcher.getInstance(),
-            ClusterSchemaFetcher.getInstance(),
-            IOTDB_CONFIG.getQueryTimeoutThreshold(),
-            false)
-        .status;
+  private TSStatus executeStatement(final Statement statement, final 
IClientSession session) {
+    SESSION_MANAGER.registerSession(session);
+    try {
+      return Coordinator.getInstance()
+          .executeForTreeModel(
+              statement,
+              SESSION_MANAGER.requestQueryId(),
+              SESSION_MANAGER.getSessionInfo(session),
+              "",
+              ClusterPartitionFetcher.getInstance(),
+              ClusterSchemaFetcher.getInstance(),
+              IOTDB_CONFIG.getQueryTimeoutThreshold(),
+              false)
+          .status;
+    } finally {
+      SESSION_MANAGER.removeCurrSession();
+    }
   }
 
   private void handleLoadFailure(final Pair<String, Boolean> filePair, final 
TSStatus status) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java
index 0a21c82be27..2c594b868c0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java
@@ -53,6 +53,9 @@ public class LoadTsFileConfigurator {
       case VERIFY_KEY:
         validateVerifyParam(value);
         break;
+      case ASYNC_LOAD_KEY:
+        validateAsyncLoadParam(value);
+        break;
       default:
         throw new SemanticException("Invalid parameter '" + key + "' for LOAD 
TSFILE command.");
     }
@@ -164,6 +167,23 @@ public class LoadTsFileConfigurator {
         loadAttributes.getOrDefault(VERIFY_KEY, 
String.valueOf(VERIFY_DEFAULT_VALUE)));
   }
 
+  public static final String ASYNC_LOAD_KEY = "async";
+  private static final boolean ASYNC_LOAD_DEFAULT_VALUE = false;
+
+  public static void validateAsyncLoadParam(final String asyncLoad) {
+    if (!"true".equalsIgnoreCase(asyncLoad) && 
!"false".equalsIgnoreCase(asyncLoad)) {
+      throw new SemanticException(
+          String.format(
+              "Given %s value '%s' is not supported, please input a valid 
boolean value.",
+              ASYNC_LOAD_KEY, asyncLoad));
+    }
+  }
+
+  public static boolean parseOrGetDefaultAsyncLoad(final Map<String, String> 
loadAttributes) {
+    return Boolean.parseBoolean(
+        loadAttributes.getOrDefault(ASYNC_LOAD_KEY, 
String.valueOf(ASYNC_LOAD_DEFAULT_VALUE)));
+  }
+
   private LoadTsFileConfigurator() {
     throw new IllegalStateException("Utility class");
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/FileUtils.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/FileUtils.java
index fbc0621caaa..f8de487c365 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/FileUtils.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/FileUtils.java
@@ -405,6 +405,35 @@ public class FileUtils {
     }
   }
 
+  public static void copyFileWithMD5Check(final File sourceFile, final File 
targetDir)
+      throws IOException {
+    final String sourceFileName = sourceFile.getName();
+    final File targetFile = new File(targetDir, sourceFileName);
+    if (targetFile.exists()) {
+      if (!haveSameMD5(sourceFile, targetFile)) {
+        final String renameFile = copyFileRenameWithMD5(sourceFile, targetDir);
+        LOGGER.info(
+            "Copy file {} to {} because it already exists in the target 
directory: {}",
+            sourceFile.getName(),
+            renameFile,
+            targetDir.getAbsolutePath());
+      }
+    } else {
+      if (!(targetDir.exists() || targetDir.mkdirs())) {
+        final String log =
+            String.format("failed to create target directory: %s", 
targetDir.getAbsolutePath());
+        LOGGER.warn(log);
+        throw new IOException(log);
+      }
+
+      Files.copy(
+          sourceFile.toPath(),
+          targetFile.toPath(),
+          StandardCopyOption.REPLACE_EXISTING,
+          StandardCopyOption.COPY_ATTRIBUTES);
+    }
+  }
+
   private static boolean haveSameMD5(final File file1, final File file2) {
     try (final InputStream is1 = Files.newInputStream(file1.toPath());
         final InputStream is2 = Files.newInputStream(file2.toPath())) {
@@ -428,4 +457,24 @@ public class FileUtils {
           sourceFile, targetFile, StandardCopyOption.REPLACE_EXISTING);
     }
   }
+
+  private static String copyFileRenameWithMD5(final File sourceFile, final 
File targetDir)
+      throws IOException {
+    try (final InputStream is = Files.newInputStream(sourceFile.toPath())) {
+      final String sourceFileBaseName = 
FilenameUtils.getBaseName(sourceFile.getName());
+      final String sourceFileExtension = 
FilenameUtils.getExtension(sourceFile.getName());
+      final String sourceFileMD5 = DigestUtils.md5Hex(is);
+
+      final String targetFileName =
+          sourceFileBaseName + "-" + sourceFileMD5.substring(0, 16) + "." + 
sourceFileExtension;
+      final File targetFile = new File(targetDir, targetFileName);
+
+      Files.copy(
+          sourceFile.toPath(),
+          targetFile.toPath(),
+          StandardCopyOption.REPLACE_EXISTING,
+          StandardCopyOption.COPY_ATTRIBUTES);
+      return targetFileName;
+    }
+  }
 }

Reply via email to