This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new b46c58b324d Fix pipe tsfile receiver database handling (#17815)
b46c58b324d is described below

commit b46c58b324db40c4a93428bc30b35bc326ff3479
Author: Caideyipi <[email protected]>
AuthorDate: Wed Jun 3 16:26:42 2026 +0800

    Fix pipe tsfile receiver database handling (#17815)
---
 .../protocol/thrift/IoTDBDataNodeReceiver.java     |  62 +++++++++---
 .../visitor/PipeStatementExceptionVisitor.java     |   7 ++
 .../plan/statement/crud/LoadTsFileStatement.java   |  25 +++++
 .../receiver/PipeStatementTsStatusVisitorTest.java |  14 +++
 .../protocol/thrift/IoTDBDataNodeReceiverTest.java | 110 +++++++++++++++++++++
 5 files changed, 206 insertions(+), 12 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index 55f46be8f99..37b277efbaf 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -59,6 +59,7 @@ import 
org.apache.iotdb.db.pipe.receiver.visitor.PipeTableStatementDataTypeConve
 import 
org.apache.iotdb.db.pipe.receiver.visitor.PipeTreeStatementDataTypeConvertExecutionVisitor;
 import 
org.apache.iotdb.db.pipe.receiver.visitor.PipeTreeStatementToBatchVisitor;
 import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
+import org.apache.iotdb.db.pipe.resource.log.PipePeriodicalLogReducer;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
 import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferDataNodeHandshakeV1Req;
 import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferDataNodeHandshakeV2Req;
@@ -568,12 +569,10 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
   private TSStatus loadTsFileAsync(final String dataBaseName, final 
List<String> absolutePaths)
       throws IOException {
     final Map<String, String> loadAttributes =
-        ActiveLoadPathHelper.buildAttributes(
+        buildLoadTsFileAttributesForAsync(
             dataBaseName,
-            null,
             shouldConvertDataTypeOnTypeMismatch,
             validateTsFile.get(),
-            null,
             shouldMarkAsPipeRequest.get());
 
     if (!LoadUtil.loadFilesToActiveDir(loadAttributes, absolutePaths, true)) {
@@ -582,17 +581,38 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
     return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
   }
 
+  static Map<String, String> buildLoadTsFileAttributesForAsync(
+      final String dataBaseName,
+      final boolean shouldConvertDataTypeOnTypeMismatch,
+      final boolean validateTsFile,
+      final boolean shouldMarkAsPipeRequest) {
+    return ActiveLoadPathHelper.buildAttributes(
+        dataBaseName,
+        LoadTsFileStatement.getDatabaseLevelByTreeDatabase(dataBaseName),
+        shouldConvertDataTypeOnTypeMismatch,
+        validateTsFile,
+        null,
+        shouldMarkAsPipeRequest);
+  }
+
   private TSStatus loadTsFileSync(final String dataBaseName, final String 
fileAbsolutePath)
       throws FileNotFoundException {
+    return executeStatementAndClassifyExceptions(
+        buildLoadTsFileStatementForSync(dataBaseName, fileAbsolutePath, 
validateTsFile.get()));
+  }
+
+  static LoadTsFileStatement buildLoadTsFileStatementForSync(
+      final String dataBaseName, final String fileAbsolutePath, final boolean 
validateTsFile)
+      throws FileNotFoundException {
     final LoadTsFileStatement statement = 
LoadTsFileStatement.createUnchecked(fileAbsolutePath);
     statement.setDeleteAfterLoad(true);
     statement.setConvertOnTypeMismatch(true);
-    statement.setVerifySchema(validateTsFile.get());
+    statement.setVerifySchema(validateTsFile);
     statement.setAutoCreateDatabase(
         IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled());
     statement.setDatabase(dataBaseName);
-
-    return executeStatementAndClassifyExceptions(statement);
+    statement.updateDatabaseLevelByTreeDatabase();
+    return statement;
   }
 
   private TSStatus loadSchemaSnapShot(
@@ -845,12 +865,7 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
         return STATEMENT_STATUS_VISITOR.process(statement, result);
       }
     } catch (final Exception e) {
-      PipeLogger.log(
-          LOGGER::warn,
-          e,
-          "Receiver id = %s: Exception encountered while executing statement 
%s: ",
-          receiverId.get(),
-          statement.getPipeLoggingString());
+      logStatementExceptionIfNecessary(statement, e);
       return STATEMENT_EXCEPTION_VISITOR.process(statement, e);
     } finally {
       if (Objects.nonNull(allocatedMemoryBlock)) {
@@ -860,6 +875,29 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
     }
   }
 
+  private void logStatementExceptionIfNecessary(final Statement statement, 
final Exception e) {
+    if (shouldLogStatementException(receiverId.get(), statement, e)) {
+      PipeLogger.log(
+          LOGGER::warn,
+          e,
+          "Receiver id = %s: Exception encountered while executing statement 
%s: ",
+          receiverId.get(),
+          Objects.isNull(statement) ? null : statement.getPipeLoggingString());
+    }
+  }
+
+  static boolean shouldLogStatementException(
+      final long receiverId, final Statement statement, final Exception e) {
+    // Use the reducer cache as a gate. The actual stack trace is logged only 
when it passes.
+    return PipePeriodicalLogReducer.log(
+        message -> {},
+        "Receiver id = %s, statement = %s, exception = %s, message = %s",
+        receiverId,
+        Objects.isNull(statement) ? null : statement.getPipeLoggingString(),
+        e.getClass().getName(),
+        e.getMessage());
+  }
+
   private TSStatus 
executeStatementWithPermissionCheckAndRetryOnDataTypeMismatch(
       final Statement statement) {
     if (statement == null) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementExceptionVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementExceptionVisitor.java
index 42b1f0e5b1e..8e590c5847c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementExceptionVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementExceptionVisitor.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.receiver.visitor;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.exception.IoTDBException;
+import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
 import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.exception.SemanticException;
 import org.apache.iotdb.commons.exception.auth.AccessDeniedException;
@@ -52,6 +53,12 @@ public class PipeStatementExceptionVisitor extends 
StatementVisitor<TSStatus, Ex
     if (context instanceof AccessDeniedException) {
       return new TSStatus(TSStatusCode.NO_PERMISSION.getStatusCode())
           .setMessage(context.getMessage());
+    } else if (context instanceof IoTDBRuntimeException
+        && ((IoTDBRuntimeException) context).getErrorCode()
+            == TSStatusCode.DATABASE_NOT_EXIST.getStatusCode()) {
+      return new TSStatus(
+              
TSStatusCode.PIPE_RECEIVER_PARALLEL_OR_USER_CONFLICT_EXCEPTION.getStatusCode())
+          .setMessage(context.getMessage());
     }
     return new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode())
         .setMessage(context.getMessage());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
index 59c7f9a57ef..73133611ed2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
@@ -19,7 +19,9 @@
 
 package org.apache.iotdb.db.queryengine.plan.statement.crud;
 
+import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.utils.PathUtils;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
@@ -44,6 +46,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.iotdb.commons.conf.IoTDBConstant.PATH_ROOT;
 import static 
org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.ASYNC_LOAD_KEY;
 import static 
org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.CONVERT_ON_TYPE_MISMATCH_KEY;
 import static 
org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.DATABASE_LEVEL_KEY;
@@ -337,6 +340,28 @@ public class LoadTsFileStatement extends Statement {
     }
   }
 
+  public void updateDatabaseLevelByTreeDatabase() {
+    final Integer databaseLevel = getDatabaseLevelByTreeDatabase(database);
+    if (databaseLevel != null) {
+      this.databaseLevel = databaseLevel;
+    }
+  }
+
+  public static Integer getDatabaseLevelByTreeDatabase(final String database) {
+    if (database == null) {
+      return null;
+    }
+    try {
+      final String[] nodes = PathUtils.splitPathToDetachedNodes(database);
+      if (nodes.length > 1 && PATH_ROOT.equals(nodes[0])) {
+        return nodes.length - 1;
+      }
+    } catch (final IllegalPathException ignored) {
+      // Keep the configured database level when database is not a legal tree 
path.
+    }
+    return null;
+  }
+
   public boolean reconstructStatementIfMiniFileConverted(final List<Boolean> 
isMiniTsFile) {
     int lastNonMiniTsFileIndex = -1;
 
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTsStatusVisitorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTsStatusVisitorTest.java
index 2b20f1d91ef..756d1181825 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTsStatusVisitorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTsStatusVisitorTest.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.pipe.receiver;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
 import org.apache.iotdb.commons.utils.StatusUtils;
 import org.apache.iotdb.db.pipe.receiver.protocol.thrift.IoTDBDataNodeReceiver;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
@@ -62,4 +63,17 @@ public class PipeStatementTsStatusVisitorTest {
                             StatusUtils.OK, new 
TSStatus(TSStatusCode.OUT_OF_TTL.getStatusCode()))))
             .getCode());
   }
+
+  @Test
+  public void testDatabaseNotExistRuntimeExceptionClassification() {
+    Assert.assertEquals(
+        
TSStatusCode.PIPE_RECEIVER_PARALLEL_OR_USER_CONFLICT_EXCEPTION.getStatusCode(),
+        IoTDBDataNodeReceiver.STATEMENT_EXCEPTION_VISITOR
+            .process(
+                new InsertRowsStatement(),
+                new IoTDBRuntimeException(
+                    "Create DataPartition failed because the database: 
root.test.sg_0 is not exists",
+                    TSStatusCode.DATABASE_NOT_EXIST.getStatusCode()))
+            .getCode());
+  }
 }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverTest.java
new file mode 100644
index 00000000000..f41c44763f9
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.receiver.protocol.thrift;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
+import org.apache.iotdb.db.storageengine.load.active.ActiveLoadPathHelper;
+import org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Map;
+
+public class IoTDBDataNodeReceiverTest {
+
+  @Test
+  public void 
testLoadTsFileSyncStatementUsesTreeDatabaseLevelFromDatabaseName() throws 
Exception {
+    final Path tsFile = Files.createTempFile("pipe-load-tree-database-level", 
".tsfile");
+    try {
+      final LoadTsFileStatement statement =
+          IoTDBDataNodeReceiver.buildLoadTsFileStatementForSync(
+              "root.test.sg_0", tsFile.toString(), true);
+
+      Assert.assertEquals("root.test.sg_0", statement.getDatabase());
+      Assert.assertEquals(2, statement.getDatabaseLevel());
+    } finally {
+      Files.deleteIfExists(tsFile);
+    }
+  }
+
+  @Test
+  public void 
testLoadTsFileAsyncAttributesUseTreeDatabaseLevelFromDatabaseName() throws 
Exception {
+    final Path tsFile = 
Files.createTempFile("pipe-async-load-tree-database-level", ".tsfile");
+    try {
+      final Map<String, String> attributes =
+          IoTDBDataNodeReceiver.buildLoadTsFileAttributesForAsync(
+              "root.test.sg_0", true, true, true);
+
+      Assert.assertEquals(
+          "root.test.sg_0", 
attributes.get(LoadTsFileConfigurator.DATABASE_NAME_KEY));
+      Assert.assertEquals("2", 
attributes.get(LoadTsFileConfigurator.DATABASE_LEVEL_KEY));
+
+      final LoadTsFileStatement statement = 
LoadTsFileStatement.createUnchecked(tsFile.toString());
+      ActiveLoadPathHelper.applyAttributesToStatement(attributes, statement, 
true);
+      Assert.assertEquals("root.test.sg_0", statement.getDatabase());
+      Assert.assertEquals(2, statement.getDatabaseLevel());
+    } finally {
+      Files.deleteIfExists(tsFile);
+    }
+  }
+
+  @Test
+  public void 
testLoadTsFileSyncStatementKeepsDefaultDatabaseLevelWhenDatabaseNameIsNull()
+      throws Exception {
+    final Path tsFile = 
Files.createTempFile("pipe-load-default-database-level", ".tsfile");
+    try {
+      final LoadTsFileStatement statement =
+          IoTDBDataNodeReceiver.buildLoadTsFileStatementForSync(null, 
tsFile.toString(), true);
+
+      Assert.assertNull(statement.getDatabase());
+      Assert.assertEquals(
+          IoTDBDescriptor.getInstance().getConfig().getDefaultDatabaseLevel(),
+          statement.getDatabaseLevel());
+    } finally {
+      Files.deleteIfExists(tsFile);
+    }
+  }
+
+  @Test
+  public void testRepeatedStatementExceptionLogIsReduced() throws Exception {
+    final Path tsFile = Files.createTempFile("pipe-load-log-reducer", 
".tsfile");
+    try {
+      final LoadTsFileStatement statement =
+          IoTDBDataNodeReceiver.buildLoadTsFileStatementForSync(
+              "root.test.sg_0", tsFile.toString(), true);
+      final long receiverId = System.nanoTime();
+      final Exception exception = new RuntimeException("repeated receiver 
exception " + receiverId);
+
+      Assert.assertTrue(
+          IoTDBDataNodeReceiver.shouldLogStatementException(receiverId, 
statement, exception));
+      Assert.assertFalse(
+          IoTDBDataNodeReceiver.shouldLogStatementException(receiverId, 
statement, exception));
+      Assert.assertTrue(
+          IoTDBDataNodeReceiver.shouldLogStatementException(
+              receiverId, statement, new RuntimeException("another receiver 
exception")));
+    } finally {
+      Files.deleteIfExists(tsFile);
+    }
+  }
+}

Reply via email to