This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 251125ae7f4 Fix pipe tsfile receiver database handling (#17815) 
(#17836)
251125ae7f4 is described below

commit 251125ae7f4e7618b82f6808e9fe634e939bce02
Author: Caideyipi <[email protected]>
AuthorDate: Thu Jun 4 10:32:53 2026 +0800

    Fix pipe tsfile receiver database handling (#17815) (#17836)
---
 .../common/tsfile/PipeTsFileInsertionEvent.java    |   4 +
 .../protocol/thrift/IoTDBDataNodeReceiver.java     |  88 ++++++++++++-----
 .../visitor/PipeStatementExceptionVisitor.java     |   8 ++
 .../request/PipeTransferTsFileSealWithModReq.java  |  68 ++++++++++++-
 .../protocol/airgap/IoTDBDataRegionAirGapSink.java |  11 ++-
 .../thrift/async/IoTDBDataRegionAsyncSink.java     |   6 +-
 .../async/handler/PipeTransferTsFileHandler.java   |  17 +++-
 .../thrift/sync/IoTDBDataRegionSyncSink.java       |  20 +++-
 .../plan/statement/crud/LoadTsFileStatement.java   |  39 ++++++++
 .../load/active/ActiveLoadPathHelper.java          |  30 ++++++
 .../load/config/LoadTsFileConfigurator.java        |   7 ++
 .../receiver/PipeStatementTsStatusVisitorTest.java |  14 +++
 .../protocol/thrift/IoTDBDataNodeReceiverTest.java | 110 +++++++++++++++++++++
 13 files changed, 385 insertions(+), 37 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 8ffbc9f2f9b..c9cd2d44af0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -265,6 +265,10 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
     return tsFile;
   }
 
+  public String getDatabaseName() {
+    return Objects.isNull(resource) ? null : resource.getDatabaseName();
+  }
+
   public File getModFile() {
     return modFile;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index 58d4c29eddc..1ec4d6d53b7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -52,6 +52,7 @@ import 
org.apache.iotdb.db.pipe.receiver.visitor.PipeStatementPatternParseVisito
 import org.apache.iotdb.db.pipe.receiver.visitor.PipeStatementTSStatusVisitor;
 import org.apache.iotdb.db.pipe.receiver.visitor.PipeStatementToBatchVisitor;
 import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
+import org.apache.iotdb.db.pipe.resource.log.PipePeriodicalLogReducer;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
 import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferDataNodeHandshakeV1Req;
 import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferDataNodeHandshakeV2Req;
@@ -459,29 +460,31 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
   protected TSStatus loadFileV1(final PipeTransferFileSealReqV1 req, final 
String fileAbsolutePath)
       throws IOException {
     return isUsingAsyncLoadTsFileStrategy.get()
-        ? loadTsFileAsync(Collections.singletonList(fileAbsolutePath))
-        : loadTsFileSync(fileAbsolutePath);
+        ? loadTsFileAsync(null, Collections.singletonList(fileAbsolutePath))
+        : loadTsFileSync(null, fileAbsolutePath);
   }
 
   @Override
   protected TSStatus loadFileV2(
       final PipeTransferFileSealReqV2 req, final List<String> 
fileAbsolutePaths)
       throws IOException, IllegalPathException {
-    return req instanceof PipeTransferTsFileSealWithModReq
-        // TsFile's absolute path will be the second element
-        ? (isUsingAsyncLoadTsFileStrategy.get()
-            ? loadTsFileAsync(fileAbsolutePaths)
-            : loadTsFileSync(fileAbsolutePaths.get(1)))
-        : loadSchemaSnapShot(req.getParameters(), fileAbsolutePaths);
+    if (req instanceof PipeTransferTsFileSealWithModReq) {
+      final String dataBaseName =
+          ((PipeTransferTsFileSealWithModReq) 
req).getDatabaseNameByTsFileName();
+      return isUsingAsyncLoadTsFileStrategy.get()
+          ? loadTsFileAsync(dataBaseName, fileAbsolutePaths)
+          : loadTsFileSync(dataBaseName, 
fileAbsolutePaths.get(req.getFileNames().size() - 1));
+    }
+    return loadSchemaSnapShot(req.getParameters(), fileAbsolutePaths);
   }
 
-  private TSStatus loadTsFileAsync(final List<String> absolutePaths) throws 
IOException {
+  private TSStatus loadTsFileAsync(final String dataBaseName, final 
List<String> absolutePaths)
+      throws IOException {
     final Map<String, String> loadAttributes =
-        ActiveLoadPathHelper.buildAttributes(
-            null,
+        buildLoadTsFileAttributesForAsync(
+            dataBaseName,
             shouldConvertDataTypeOnTypeMismatch,
             validateTsFile.get(),
-            null,
             shouldMarkAsPipeRequest.get());
     if (!ActiveLoadUtil.loadFilesToActiveDir(loadAttributes, absolutePaths, 
true)) {
       throw new PipeException("Load active listening pipe dir is not set.");
@@ -489,15 +492,38 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
     return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
   }
 
-  private TSStatus loadTsFileSync(final String fileAbsolutePath) throws 
FileNotFoundException {
+  static Map<String, String> buildLoadTsFileAttributesForAsync(
+      final String dataBaseName,
+      final boolean shouldConvertDataTypeOnTypeMismatch,
+      final boolean validateTsFile,
+      final boolean shouldMarkAsPipeRequest) {
+    return ActiveLoadPathHelper.buildAttributes(
+        dataBaseName,
+        LoadTsFileStatement.getDatabaseLevelByTreeDatabase(dataBaseName),
+        shouldConvertDataTypeOnTypeMismatch,
+        validateTsFile,
+        null,
+        shouldMarkAsPipeRequest);
+  }
+
+  private TSStatus loadTsFileSync(final String dataBaseName, final String 
fileAbsolutePath)
+      throws FileNotFoundException {
+    return executeStatementAndClassifyExceptions(
+        buildLoadTsFileStatementForSync(dataBaseName, fileAbsolutePath, 
validateTsFile.get()));
+  }
+
+  static LoadTsFileStatement buildLoadTsFileStatementForSync(
+      final String dataBaseName, final String fileAbsolutePath, final boolean 
validateTsFile)
+      throws FileNotFoundException {
     final LoadTsFileStatement statement = 
LoadTsFileStatement.createUnchecked(fileAbsolutePath);
     statement.setDeleteAfterLoad(true);
     statement.setConvertOnTypeMismatch(true);
-    statement.setVerifySchema(validateTsFile.get());
+    statement.setVerifySchema(validateTsFile);
     statement.setAutoCreateDatabase(
         IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled());
-
-    return executeStatementAndClassifyExceptions(statement);
+    statement.setDatabase(dataBaseName);
+    statement.updateDatabaseLevelByTreeDatabase();
+    return statement;
   }
 
   private TSStatus loadSchemaSnapShot(
@@ -704,12 +730,7 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
         return STATEMENT_STATUS_VISITOR.process(statement, result);
       }
     } catch (final Exception e) {
-      PipeLogger.log(
-          LOGGER::warn,
-          e,
-          "Receiver id = %s: Exception encountered while executing statement 
%s: ",
-          receiverId.get(),
-          statement.getPipeLoggingString());
+      logStatementExceptionIfNecessary(statement, e);
       return STATEMENT_EXCEPTION_VISITOR.process(statement, e);
     } finally {
       if (Objects.nonNull(allocatedMemoryBlock)) {
@@ -719,6 +740,29 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
     }
   }
 
+  private void logStatementExceptionIfNecessary(final Statement statement, 
final Exception e) {
+    if (shouldLogStatementException(receiverId.get(), statement, e)) {
+      PipeLogger.log(
+          LOGGER::warn,
+          e,
+          "Receiver id = %s: Exception encountered while executing statement 
%s: ",
+          receiverId.get(),
+          Objects.isNull(statement) ? null : statement.getPipeLoggingString());
+    }
+  }
+
+  static boolean shouldLogStatementException(
+      final long receiverId, final Statement statement, final Exception e) {
+    // Use the reducer cache as a gate. The actual stack trace is logged only 
when it passes.
+    return PipePeriodicalLogReducer.log(
+        message -> {},
+        "Receiver id = %s, statement = %s, exception = %s, message = %s",
+        receiverId,
+        Objects.isNull(statement) ? null : statement.getPipeLoggingString(),
+        e.getClass().getName(),
+        e.getMessage());
+  }
+
   private TSStatus executeStatementWithRetryOnDataTypeMismatch(final Statement 
statement) {
     if (statement == null) {
       return RpcUtils.getStatus(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementExceptionVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementExceptionVisitor.java
index 098c983977a..7b8246e3dab 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementExceptionVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementExceptionVisitor.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.receiver.visitor;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.exception.IoTDBException;
+import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
 import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.db.exception.load.LoadRuntimeOutOfMemoryException;
 import org.apache.iotdb.db.exception.sql.SemanticException;
@@ -48,6 +49,13 @@ import java.util.Objects;
 public class PipeStatementExceptionVisitor extends StatementVisitor<TSStatus, 
Exception> {
   @Override
   public TSStatus visitNode(final StatementNode node, final Exception context) 
{
+    if (context instanceof IoTDBRuntimeException
+        && ((IoTDBRuntimeException) context).getErrorCode()
+            == TSStatusCode.DATABASE_NOT_EXIST.getStatusCode()) {
+      return new TSStatus(
+              
TSStatusCode.PIPE_RECEIVER_PARALLEL_OR_USER_CONFLICT_EXCEPTION.getStatusCode())
+          .setMessage(context.getMessage());
+    }
     return new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode())
         .setMessage(context.getMessage());
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTsFileSealWithModReq.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTsFileSealWithModReq.java
index 28959a1a090..7d0aa99cb13 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTsFileSealWithModReq.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTsFileSealWithModReq.java
@@ -25,7 +25,9 @@ import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.Map;
 
 public class PipeTransferTsFileSealWithModReq extends 
PipeTransferFileSealReqV2 {
 
@@ -38,17 +40,59 @@ public class PipeTransferTsFileSealWithModReq extends 
PipeTransferFileSealReqV2
     return PipeRequestType.TRANSFER_TS_FILE_SEAL_WITH_MOD;
   }
 
+  private static final String DATABASE_NAME_KEY_PREFIX = "DATABASE_NAME_";
+
+  public String getDatabaseNameByTsFileName() {
+    return getParameters() == null
+        ? null
+        : getParameters()
+            .get(
+                
generateDatabaseNameWithFileNameKey(getFileNames().get(getFileNames().size() - 
1)));
+  }
+
+  private static String generateDatabaseNameWithFileNameKey(final String 
fileName) {
+    return DATABASE_NAME_KEY_PREFIX + fileName;
+  }
+
+  private static Map<String, String> generateDatabaseNameParameter(
+      final String tsFileName, final String dataBaseName) {
+    return dataBaseName == null
+        ? new HashMap<>()
+        : 
Collections.singletonMap(generateDatabaseNameWithFileNameKey(tsFileName), 
dataBaseName);
+  }
+
   /////////////////////////////// Thrift ///////////////////////////////
 
   public static PipeTransferTsFileSealWithModReq toTPipeTransferReq(
       String modFileName, long modFileLength, String tsFileName, long 
tsFileLength)
       throws IOException {
+    return toTPipeTransferReq(modFileName, modFileLength, tsFileName, 
tsFileLength, null);
+  }
+
+  public static PipeTransferTsFileSealWithModReq toTPipeTransferReq(
+      final String modFileName,
+      final long modFileLength,
+      final String tsFileName,
+      final long tsFileLength,
+      final String dataBaseName)
+      throws IOException {
     return (PipeTransferTsFileSealWithModReq)
         new PipeTransferTsFileSealWithModReq()
             .convertToTPipeTransferReq(
                 Arrays.asList(modFileName, tsFileName),
                 Arrays.asList(modFileLength, tsFileLength),
-                new HashMap<>());
+                generateDatabaseNameParameter(tsFileName, dataBaseName));
+  }
+
+  public static PipeTransferTsFileSealWithModReq toTPipeTransferReq(
+      final String tsFileName, final long tsFileLength, final String 
dataBaseName)
+      throws IOException {
+    return (PipeTransferTsFileSealWithModReq)
+        new PipeTransferTsFileSealWithModReq()
+            .convertToTPipeTransferReq(
+                Collections.singletonList(tsFileName),
+                Collections.singletonList(tsFileLength),
+                generateDatabaseNameParameter(tsFileName, dataBaseName));
   }
 
   public static PipeTransferTsFileSealWithModReq 
fromTPipeTransferReq(TPipeTransferReq req) {
@@ -61,11 +105,31 @@ public class PipeTransferTsFileSealWithModReq extends 
PipeTransferFileSealReqV2
   public static byte[] toTPipeTransferBytes(
       String modFileName, long modFileLength, String tsFileName, long 
tsFileLength)
       throws IOException {
+    return toTPipeTransferBytes(modFileName, modFileLength, tsFileName, 
tsFileLength, null);
+  }
+
+  public static byte[] toTPipeTransferBytes(
+      final String modFileName,
+      final long modFileLength,
+      final String tsFileName,
+      final long tsFileLength,
+      final String dataBaseName)
+      throws IOException {
     return new PipeTransferTsFileSealWithModReq()
         .convertToTPipeTransferSnapshotSealBytes(
             Arrays.asList(modFileName, tsFileName),
             Arrays.asList(modFileLength, tsFileLength),
-            new HashMap<>());
+            generateDatabaseNameParameter(tsFileName, dataBaseName));
+  }
+
+  public static byte[] toTPipeTransferBytes(
+      final String tsFileName, final long tsFileLength, final String 
dataBaseName)
+      throws IOException {
+    return new PipeTransferTsFileSealWithModReq()
+        .convertToTPipeTransferSnapshotSealBytes(
+            Collections.singletonList(tsFileName),
+            Collections.singletonList(tsFileLength),
+            generateDatabaseNameParameter(tsFileName, dataBaseName));
   }
 
   /////////////////////////////// Object ///////////////////////////////
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
index 81e745dc6a7..b2af96a37c5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
@@ -255,7 +255,7 @@ public class IoTDBDataRegionAirGapSink extends 
IoTDBDataNodeAirGapSink {
     final Map<Pair<String, Long>, Double> pipe2WeightMap = 
batchToTransfer.deepCopyPipe2WeightMap();
 
     for (final File tsFile : sealedFiles) {
-      doTransfer(pipe2WeightMap, socket, tsFile, null, tsFile.getName());
+      doTransfer(pipe2WeightMap, socket, tsFile, null, null, tsFile.getName());
       try {
         RetryUtils.retryOnException(
             () -> {
@@ -379,6 +379,7 @@ public class IoTDBDataRegionAirGapSink extends 
IoTDBDataNodeAirGapSink {
         pipeTsFileInsertionEvent.isWithMod() && supportModsIfIsDataNodeReceiver
             ? pipeTsFileInsertionEvent.getModFile()
             : null,
+        pipeTsFileInsertionEvent.getDatabaseName(),
         pipeTsFileInsertionEvent.toString());
   }
 
@@ -387,6 +388,7 @@ public class IoTDBDataRegionAirGapSink extends 
IoTDBDataNodeAirGapSink {
       final AirGapSocket socket,
       final File tsFile,
       final File modFile,
+      final String dataBaseName,
       final String receiverStatusContext)
       throws PipeException, IOException {
     final String errorMessage = String.format("Seal file %s error. Socket 
%s.", tsFile, socket);
@@ -397,7 +399,7 @@ public class IoTDBDataRegionAirGapSink extends 
IoTDBDataNodeAirGapSink {
       if (!sendWeighted(
           socket,
           PipeTransferTsFileSealWithModReq.toTPipeTransferBytes(
-              modFile.getName(), modFile.length(), tsFile.getName(), 
tsFile.length()),
+              modFile.getName(), modFile.length(), tsFile.getName(), 
tsFile.length(), dataBaseName),
           pipe2WeightMap)) {
         receiverStatusHandler.handle(
             new 
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
@@ -411,7 +413,10 @@ public class IoTDBDataRegionAirGapSink extends 
IoTDBDataNodeAirGapSink {
       transferFilePieces(pipe2WeightMap, tsFile, socket, false);
       if (!sendWeighted(
           socket,
-          PipeTransferTsFileSealReq.toTPipeTransferBytes(tsFile.getName(), 
tsFile.length()),
+          dataBaseName == null
+              ? 
PipeTransferTsFileSealReq.toTPipeTransferBytes(tsFile.getName(), 
tsFile.length())
+              : PipeTransferTsFileSealWithModReq.toTPipeTransferBytes(
+                  tsFile.getName(), tsFile.length(), dataBaseName),
           pipe2WeightMap)) {
         receiverStatusHandler.handle(
             new 
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
index 9d0c1563c78..b61cb4543c7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
@@ -255,7 +255,8 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
                   eventsHadBeenAddedToRetryQueue,
                   sealedFile,
                   null,
-                  false));
+                  false,
+                  null));
         }
       } catch (final Exception e) {
         PipeLogger.log(LOGGER::warn, e, "Failed to transfer tsfile batch 
(%s).", sealedFiles);
@@ -400,7 +401,8 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
               pipeTsFileInsertionEvent.getTsFile(),
               pipeTsFileInsertionEvent.getModFile(),
               pipeTsFileInsertionEvent.isWithMod()
-                  && clientManager.supportModsIfIsDataNodeReceiver());
+                  && clientManager.supportModsIfIsDataNodeReceiver(),
+              pipeTsFileInsertionEvent.getDatabaseName());
 
       transfer(pipeTransferTsFileHandler);
       return true;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
index 8d9648f5292..d7515141dae 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
@@ -78,6 +78,7 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
   private File currentFile;
 
   private final boolean transferMod;
+  private final String dataBaseName;
 
   private final int readFileBufferSize;
   private PipeTsFileMemoryBlock memoryBlock;
@@ -98,7 +99,8 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
       final AtomicBoolean eventsHadBeenAddedToRetryQueue,
       final File tsFile,
       final File modFile,
-      final boolean transferMod)
+      final boolean transferMod,
+      final String dataBaseName)
       throws InterruptedException {
     super(connector);
 
@@ -111,6 +113,7 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
     this.tsFile = tsFile;
     this.modFile = modFile;
     this.transferMod = transferMod;
+    this.dataBaseName = dataBaseName;
     currentFile = transferMod ? modFile : tsFile;
 
     // NOTE: Waiting for resource enough for slicing here may cause deadlock!
@@ -191,8 +194,16 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
         final TPipeTransferReq uncompressedReq =
             transferMod
                 ? PipeTransferTsFileSealWithModReq.toTPipeTransferReq(
-                    modFile.getName(), modFile.length(), tsFile.getName(), 
tsFile.length())
-                : 
PipeTransferTsFileSealReq.toTPipeTransferReq(tsFile.getName(), tsFile.length());
+                    modFile.getName(),
+                    modFile.length(),
+                    tsFile.getName(),
+                    tsFile.length(),
+                    dataBaseName)
+                : dataBaseName == null
+                    ? PipeTransferTsFileSealReq.toTPipeTransferReq(
+                        tsFile.getName(), tsFile.length())
+                    : PipeTransferTsFileSealWithModReq.toTPipeTransferReq(
+                        tsFile.getName(), tsFile.length(), dataBaseName);
         final TPipeTransferReq req = sink.compressIfNeeded(uncompressedReq);
 
         pipeName2WeightMap.forEach(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
index ef3d59f0d2a..1bb0c383ff8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
@@ -275,7 +275,7 @@ public class IoTDBDataRegionSyncSink extends 
IoTDBDataNodeSyncSink {
     final Map<Pair<String, Long>, Double> pipe2WeightMap = 
batchToTransfer.deepCopyPipe2WeightMap();
 
     for (final File tsFile : sealedFiles) {
-      doTransfer(pipe2WeightMap, tsFile, null);
+      doTransfer(pipe2WeightMap, tsFile, null, null);
       try {
         RetryUtils.retryOnException(
             () -> {
@@ -428,7 +428,8 @@ public class IoTDBDataRegionSyncSink extends 
IoTDBDataNodeSyncSink {
                   pipeTsFileInsertionEvent.getCreationTime()),
               1.0),
           pipeTsFileInsertionEvent.getTsFile(),
-          pipeTsFileInsertionEvent.isWithMod() ? 
pipeTsFileInsertionEvent.getModFile() : null);
+          pipeTsFileInsertionEvent.isWithMod() ? 
pipeTsFileInsertionEvent.getModFile() : null,
+          pipeTsFileInsertionEvent.getDatabaseName());
     } finally {
       pipeTsFileInsertionEvent.decreaseReferenceCount(
           IoTDBDataRegionSyncSink.class.getName(), false);
@@ -438,7 +439,8 @@ public class IoTDBDataRegionSyncSink extends 
IoTDBDataNodeSyncSink {
   private void doTransfer(
       final Map<Pair<String, Long>, Double> pipeName2WeightMap,
       final File tsFile,
-      final File modFile)
+      final File modFile,
+      final String dataBaseName)
       throws PipeException, IOException {
 
     final Pair<IoTDBSyncClient, Boolean> clientAndStatus = 
clientManager.getClient();
@@ -454,7 +456,11 @@ public class IoTDBDataRegionSyncSink extends 
IoTDBDataNodeSyncSink {
         final TPipeTransferReq req =
             compressIfNeeded(
                 PipeTransferTsFileSealWithModReq.toTPipeTransferReq(
-                    modFile.getName(), modFile.length(), tsFile.getName(), 
tsFile.length()));
+                    modFile.getName(),
+                    modFile.length(),
+                    tsFile.getName(),
+                    tsFile.length(),
+                    dataBaseName));
 
         pipeName2WeightMap.forEach(
             (pipePair, weight) ->
@@ -479,7 +485,11 @@ public class IoTDBDataRegionSyncSink extends 
IoTDBDataNodeSyncSink {
       try {
         final TPipeTransferReq req =
             compressIfNeeded(
-                PipeTransferTsFileSealReq.toTPipeTransferReq(tsFile.getName(), 
tsFile.length()));
+                dataBaseName == null
+                    ? PipeTransferTsFileSealReq.toTPipeTransferReq(
+                        tsFile.getName(), tsFile.length())
+                    : PipeTransferTsFileSealWithModReq.toTPipeTransferReq(
+                        tsFile.getName(), tsFile.length(), dataBaseName));
 
         pipeName2WeightMap.forEach(
             (pipePair, weight) ->
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
index 6e74ceed206..404b957c786 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
@@ -20,7 +20,9 @@
 package org.apache.iotdb.db.queryengine.plan.statement.crud;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.utils.PathUtils;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.queryengine.plan.statement.Statement;
@@ -42,10 +44,13 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.iotdb.commons.conf.IoTDBConstant.PATH_ROOT;
+
 public class LoadTsFileStatement extends Statement {
 
   private final File file;
   private int databaseLevel;
+  private String database;
   private boolean verifySchema = true;
   private boolean deleteAfterLoad = false;
   private boolean convertOnTypeMismatch = true;
@@ -201,6 +206,14 @@ public class LoadTsFileStatement extends Statement {
     return databaseLevel;
   }
 
+  public void setDatabase(String database) {
+    this.database = database;
+  }
+
+  public String getDatabase() {
+    return database;
+  }
+
   public void setVerifySchema(boolean verifySchema) {
     this.verifySchema = verifySchema;
   }
@@ -281,6 +294,7 @@ public class LoadTsFileStatement extends Statement {
 
   private void initAttributes(final Map<String, String> loadAttributes) {
     this.databaseLevel = 
LoadTsFileConfigurator.parseOrGetDefaultDatabaseLevel(loadAttributes);
+    this.database = LoadTsFileConfigurator.parseDatabaseName(loadAttributes);
     this.deleteAfterLoad = 
LoadTsFileConfigurator.parseOrGetDefaultOnSuccess(loadAttributes);
     this.convertOnTypeMismatch =
         
LoadTsFileConfigurator.parseOrGetDefaultConvertOnTypeMismatch(loadAttributes);
@@ -293,6 +307,28 @@ public class LoadTsFileStatement extends Statement {
     }
   }
 
+  public void updateDatabaseLevelByTreeDatabase() {
+    final Integer databaseLevel = getDatabaseLevelByTreeDatabase(database);
+    if (databaseLevel != null) {
+      this.databaseLevel = databaseLevel;
+    }
+  }
+
+  public static Integer getDatabaseLevelByTreeDatabase(final String database) {
+    if (database == null) {
+      return null;
+    }
+    try {
+      final String[] nodes = PathUtils.splitPathToDetachedNodes(database);
+      if (nodes.length > 1 && PATH_ROOT.equals(nodes[0])) {
+        return nodes.length - 1;
+      }
+    } catch (final IllegalPathException ignored) {
+      // Keep the configured database level when database is not a legal tree 
path.
+    }
+    return null;
+  }
+
   public boolean reconstructStatementIfMiniFileConverted(final List<Boolean> 
isMiniTsFile) {
     int lastNonMiniTsFileIndex = -1;
 
@@ -352,6 +388,7 @@ public class LoadTsFileStatement extends Statement {
 
       final LoadTsFileStatement statement = new LoadTsFileStatement();
       statement.databaseLevel = this.databaseLevel;
+      statement.database = this.database;
       statement.verifySchema = this.verifySchema;
       statement.deleteAfterLoad = this.deleteAfterLoad;
       statement.convertOnTypeMismatch = this.convertOnTypeMismatch;
@@ -395,6 +432,8 @@ public class LoadTsFileStatement extends Statement {
         + deleteAfterLoad
         + ", database-level="
         + databaseLevel
+        + ", database="
+        + database
         + ", verify-schema="
         + verifySchema
         + ", convert-on-type-mismatch="
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPathHelper.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPathHelper.java
index 965f2941dc6..2503b822b9e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPathHelper.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPathHelper.java
@@ -46,6 +46,7 @@ public final class ActiveLoadPathHelper {
   private static final List<String> KEY_ORDER =
       Collections.unmodifiableList(
           Arrays.asList(
+              LoadTsFileConfigurator.DATABASE_NAME_KEY,
               LoadTsFileConfigurator.DATABASE_LEVEL_KEY,
               LoadTsFileConfigurator.CONVERT_ON_TYPE_MISMATCH_KEY,
               LoadTsFileConfigurator.TABLET_CONVERSION_THRESHOLD_KEY,
@@ -62,8 +63,28 @@ public final class ActiveLoadPathHelper {
       final Boolean verify,
       final Long tabletConversionThresholdBytes,
       final Boolean pipeGenerated) {
+    return buildAttributes(
+        null,
+        databaseLevel,
+        convertOnTypeMismatch,
+        verify,
+        tabletConversionThresholdBytes,
+        pipeGenerated);
+  }
+
+  public static Map<String, String> buildAttributes(
+      final String databaseName,
+      final Integer databaseLevel,
+      final Boolean convertOnTypeMismatch,
+      final Boolean verify,
+      final Long tabletConversionThresholdBytes,
+      final Boolean pipeGenerated) {
     final Map<String, String> attributes = new LinkedHashMap<>();
 
+    if (Objects.nonNull(databaseName) && !databaseName.isEmpty()) {
+      attributes.put(LoadTsFileConfigurator.DATABASE_NAME_KEY, databaseName);
+    }
+
     if (Objects.nonNull(databaseLevel)) {
       attributes.put(LoadTsFileConfigurator.DATABASE_LEVEL_KEY, 
databaseLevel.toString());
     }
@@ -149,6 +170,10 @@ public final class ActiveLoadPathHelper {
       final LoadTsFileStatement statement,
       final boolean defaultVerify) {
 
+    
Optional.ofNullable(attributes.get(LoadTsFileConfigurator.DATABASE_NAME_KEY))
+        .filter(name -> !name.isEmpty())
+        .ifPresent(statement::setDatabase);
+
     
Optional.ofNullable(attributes.get(LoadTsFileConfigurator.DATABASE_LEVEL_KEY))
         .ifPresent(
             level -> {
@@ -216,6 +241,11 @@ public final class ActiveLoadPathHelper {
 
   private static void validateAttributeValue(final String key, final String 
value) {
     switch (key) {
+      case LoadTsFileConfigurator.DATABASE_NAME_KEY:
+        if (value == null || value.isEmpty()) {
+          throw new SemanticException("Database name must not be empty.");
+        }
+        break;
       case LoadTsFileConfigurator.DATABASE_LEVEL_KEY:
         LoadTsFileConfigurator.validateDatabaseLevelParam(value);
         break;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java
index 510d47b0b23..8b689c6fb22 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java
@@ -40,6 +40,7 @@ public class LoadTsFileConfigurator {
       case ON_SUCCESS_KEY:
         validateOnSuccessParam(value);
         break;
+      case DATABASE_NAME_KEY:
       case TABLET_CONVERSION_THRESHOLD_KEY:
         break;
       case CONVERT_ON_TYPE_MISMATCH_KEY:
@@ -87,6 +88,12 @@ public class LoadTsFileConfigurator {
             DATABASE_LEVEL_KEY, String.valueOf(DATABASE_LEVEL_DEFAULT_VALUE)));
   }
 
+  public static final String DATABASE_NAME_KEY = "database-name";
+
+  public static String parseDatabaseName(final Map<String, String> 
loadAttributes) {
+    return loadAttributes.get(DATABASE_NAME_KEY);
+  }
+
   public static final String ON_SUCCESS_KEY = "on-success";
   public static final String ON_SUCCESS_DELETE_VALUE = "delete";
   public static final String ON_SUCCESS_NONE_VALUE = "none";
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTsStatusVisitorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTsStatusVisitorTest.java
index 2b20f1d91ef..756d1181825 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTsStatusVisitorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTsStatusVisitorTest.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.pipe.receiver;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
 import org.apache.iotdb.commons.utils.StatusUtils;
 import org.apache.iotdb.db.pipe.receiver.protocol.thrift.IoTDBDataNodeReceiver;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
@@ -62,4 +63,17 @@ public class PipeStatementTsStatusVisitorTest {
                             StatusUtils.OK, new 
TSStatus(TSStatusCode.OUT_OF_TTL.getStatusCode()))))
             .getCode());
   }
+
+  @Test
+  public void testDatabaseNotExistRuntimeExceptionClassification() {
+    Assert.assertEquals(
+        
TSStatusCode.PIPE_RECEIVER_PARALLEL_OR_USER_CONFLICT_EXCEPTION.getStatusCode(),
+        IoTDBDataNodeReceiver.STATEMENT_EXCEPTION_VISITOR
+            .process(
+                new InsertRowsStatement(),
+                new IoTDBRuntimeException(
+                    "Create DataPartition failed because the database: 
root.test.sg_0 is not exists",
+                    TSStatusCode.DATABASE_NOT_EXIST.getStatusCode()))
+            .getCode());
+  }
 }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverTest.java
new file mode 100644
index 00000000000..f41c44763f9
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.receiver.protocol.thrift;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
+import org.apache.iotdb.db.storageengine.load.active.ActiveLoadPathHelper;
+import org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Map;
+
+public class IoTDBDataNodeReceiverTest {
+
+  @Test
+  public void 
testLoadTsFileSyncStatementUsesTreeDatabaseLevelFromDatabaseName() throws 
Exception {
+    final Path tsFile = Files.createTempFile("pipe-load-tree-database-level", 
".tsfile");
+    try {
+      final LoadTsFileStatement statement =
+          IoTDBDataNodeReceiver.buildLoadTsFileStatementForSync(
+              "root.test.sg_0", tsFile.toString(), true);
+
+      Assert.assertEquals("root.test.sg_0", statement.getDatabase());
+      Assert.assertEquals(2, statement.getDatabaseLevel());
+    } finally {
+      Files.deleteIfExists(tsFile);
+    }
+  }
+
+  @Test
+  public void 
testLoadTsFileAsyncAttributesUseTreeDatabaseLevelFromDatabaseName() throws 
Exception {
+    final Path tsFile = 
Files.createTempFile("pipe-async-load-tree-database-level", ".tsfile");
+    try {
+      final Map<String, String> attributes =
+          IoTDBDataNodeReceiver.buildLoadTsFileAttributesForAsync(
+              "root.test.sg_0", true, true, true);
+
+      Assert.assertEquals(
+          "root.test.sg_0", 
attributes.get(LoadTsFileConfigurator.DATABASE_NAME_KEY));
+      Assert.assertEquals("2", 
attributes.get(LoadTsFileConfigurator.DATABASE_LEVEL_KEY));
+
+      final LoadTsFileStatement statement = 
LoadTsFileStatement.createUnchecked(tsFile.toString());
+      ActiveLoadPathHelper.applyAttributesToStatement(attributes, statement, 
true);
+      Assert.assertEquals("root.test.sg_0", statement.getDatabase());
+      Assert.assertEquals(2, statement.getDatabaseLevel());
+    } finally {
+      Files.deleteIfExists(tsFile);
+    }
+  }
+
+  @Test
+  public void 
testLoadTsFileSyncStatementKeepsDefaultDatabaseLevelWhenDatabaseNameIsNull()
+      throws Exception {
+    final Path tsFile = 
Files.createTempFile("pipe-load-default-database-level", ".tsfile");
+    try {
+      final LoadTsFileStatement statement =
+          IoTDBDataNodeReceiver.buildLoadTsFileStatementForSync(null, 
tsFile.toString(), true);
+
+      Assert.assertNull(statement.getDatabase());
+      Assert.assertEquals(
+          IoTDBDescriptor.getInstance().getConfig().getDefaultDatabaseLevel(),
+          statement.getDatabaseLevel());
+    } finally {
+      Files.deleteIfExists(tsFile);
+    }
+  }
+
+  @Test
+  public void testRepeatedStatementExceptionLogIsReduced() throws Exception {
+    final Path tsFile = Files.createTempFile("pipe-load-log-reducer", 
".tsfile");
+    try {
+      final LoadTsFileStatement statement =
+          IoTDBDataNodeReceiver.buildLoadTsFileStatementForSync(
+              "root.test.sg_0", tsFile.toString(), true);
+      final long receiverId = System.nanoTime();
+      final Exception exception = new RuntimeException("repeated receiver 
exception " + receiverId);
+
+      Assert.assertTrue(
+          IoTDBDataNodeReceiver.shouldLogStatementException(receiverId, 
statement, exception));
+      Assert.assertFalse(
+          IoTDBDataNodeReceiver.shouldLogStatementException(receiverId, 
statement, exception));
+      Assert.assertTrue(
+          IoTDBDataNodeReceiver.shouldLogStatementException(
+              receiverId, statement, new RuntimeException("another receiver 
exception")));
+    } finally {
+      Files.deleteIfExists(tsFile);
+    }
+  }
+}


Reply via email to