This is an automated email from the ASF dual-hosted git repository.

JackieTien97 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 3f4c44ab3e8 Pipe: Fixed some unstable ITs (#17713)
3f4c44ab3e8 is described below

commit 3f4c44ab3e861a2d74ae16c20f5770ad8c75410e
Author: Caideyipi <[email protected]>
AuthorDate: Tue May 19 11:46:27 2026 +0800

    Pipe: Fixed some unstable ITs (#17713)
---
 .../treemodel/manual/IoTDBPipePermissionIT.java    |   5 +
 .../evolvable/batch/PipeTabletEventPlainBatch.java |  19 +--
 .../request/PipeTransferTabletBatchReqV2.java      |  82 +++++++----
 .../request/PipeTransferTabletBinaryReqV2.java     |  11 +-
 .../request/PipeTransferTabletInsertNodeReqV2.java |  11 +-
 .../request/PipeTransferTabletRawReqV2.java        |  33 +++--
 .../protocol/airgap/IoTDBDataRegionAirGapSink.java |   4 +-
 .../thrift/async/IoTDBDataRegionAsyncSink.java     |   4 +-
 .../thrift/sync/IoTDBDataRegionSyncSink.java       |   4 +-
 .../sink/protocol/writeback/WriteBackSink.java     |   6 +-
 .../pipe/sink/util/TabletStatementConverter.java   |  21 ++-
 .../pipe/sink/PipeDataNodeThriftRequestTest.java   | 152 +++++++++++++++++++++
 12 files changed, 288 insertions(+), 64 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java
index cf6eaf6a9c5..d43bb6564b1 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java
@@ -590,12 +590,16 @@ public class IoTDBPipePermissionIT extends 
AbstractPipeDualTreeModelManualIT {
         "count(root.vehicle.plane.pressure),",
         Collections.singleton("1,"));
 
+    // After restart, the pipe keeps retrying with the stale password and may 
trigger login lock.
+    statement.execute("alter user thulab account unlock");
+
     try {
       statement.execute("alter pipe a2b modify source ('password'='fake')");
     } catch (final SQLException e) {
       Assert.assertEquals("801: Failed to check password for pipe a2b.", 
e.getMessage());
     }
 
+    statement.execute("alter user thulab account unlock");
     statement.execute("alter pipe a2b modify source 
('password'='newST@ongPassword')");
 
     // Test empty alter
@@ -620,6 +624,7 @@ public class IoTDBPipePermissionIT extends 
AbstractPipeDualTreeModelManualIT {
     statement = connection.createStatement();
     TestUtils.executeNonQuery(
         senderEnv, "insert into root.vehicle.plane(temperature, pressure) 
values (36.5, 1103)");
+    statement.execute("alter user thulab account unlock");
     statement.execute("alter user thulab set password 'newST@ongPassword'");
     statement.execute("alter pipe a2b");
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java
index 065ad3be840..34837424b98 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java
@@ -54,7 +54,6 @@ public class PipeTabletEventPlainBatch extends 
PipeTabletEventBatch {
   private final List<ByteBuffer> insertNodeBuffers = new ArrayList<>();
   private final List<ByteBuffer> tabletBuffers = new ArrayList<>();
 
-  private static final String TREE_MODEL_DATABASE_PLACEHOLDER = null;
   private final List<String> insertNodeDataBases = new ArrayList<>();
   private final List<String> tabletDataBases = new ArrayList<>();
 
@@ -160,13 +159,14 @@ public class PipeTabletEventPlainBatch extends 
PipeTabletEventBatch {
         buffer = insertNode.serializeToByteBuffer();
         insertNodeBuffers.add(buffer);
         if (pipeInsertNodeTabletInsertionEvent.isTableModelEvent()) {
-          estimateSize =
-              RamUsageEstimator.sizeOf(
-                  
pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName());
-          
insertNodeDataBases.add(pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName());
+          final String databaseName =
+              pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName();
+          estimateSize = RamUsageEstimator.sizeOf(databaseName);
+          insertNodeDataBases.add(databaseName);
         } else {
-          estimateSize = 4;
-          insertNodeDataBases.add(TREE_MODEL_DATABASE_PLACEHOLDER);
+          final String databaseName = 
pipeInsertNodeTabletInsertionEvent.getTreeModelDatabaseName();
+          estimateSize = RamUsageEstimator.sizeOf(databaseName);
+          insertNodeDataBases.add(databaseName);
         }
         estimateSize += buffer.limit();
       } else {
@@ -192,9 +192,10 @@ public class PipeTabletEventPlainBatch extends 
PipeTabletEventBatch {
           ReadWriteIOUtils.write(pipeRawTabletInsertionEvent.isAligned(), 
outputStream);
           buffer = ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size());
         }
-        estimateSize = 4 + buffer.limit();
+        final String databaseName = 
pipeRawTabletInsertionEvent.getTreeModelDatabaseName();
+        estimateSize = RamUsageEstimator.sizeOf(databaseName) + buffer.limit();
         tabletBuffers.add(buffer);
-        tabletDataBases.add(TREE_MODEL_DATABASE_PLACEHOLDER);
+        tabletDataBases.add(databaseName);
       }
     }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReqV2.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReqV2.java
index f626d496b55..80550b6350f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReqV2.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReqV2.java
@@ -38,7 +38,7 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -55,14 +55,12 @@ public class PipeTransferTabletBatchReqV2 extends 
TPipeTransferReq {
   public List<InsertBaseStatement> constructStatements() {
     final List<InsertBaseStatement> statements = new ArrayList<>();
 
-    final InsertRowsStatement insertRowsStatement = new InsertRowsStatement();
-    final InsertMultiTabletsStatement insertMultiTabletsStatement =
-        new InsertMultiTabletsStatement();
-
-    final List<InsertRowStatement> insertRowStatementList = new ArrayList<>();
-    final List<InsertTabletStatement> insertTabletStatementList = new 
ArrayList<>();
     final Map<String, List<InsertRowStatement>> 
tableModelDatabaseInsertRowStatementMap =
-        new HashMap<>();
+        new LinkedHashMap<>();
+    final Map<String, List<InsertRowStatement>> 
treeModelDatabaseInsertRowStatementMap =
+        new LinkedHashMap<>();
+    final Map<String, List<InsertTabletStatement>> 
treeModelDatabaseInsertTabletStatementMap =
+        new LinkedHashMap<>();
 
     for (final PipeTransferTabletInsertNodeReqV2 insertNodeReq : 
insertNodeReqs) {
       final InsertBaseStatement statement = insertNodeReq.constructStatement();
@@ -77,9 +75,12 @@ public class PipeTransferTabletBatchReqV2 extends 
TPipeTransferReq {
         } else if (statement instanceof InsertTabletStatement) {
           statements.add(statement);
         } else if (statement instanceof InsertRowsStatement) {
-          tableModelDatabaseInsertRowStatementMap
-              .computeIfAbsent(statement.getDatabaseName().get(), k -> new 
ArrayList<>())
-              .addAll(((InsertRowsStatement) 
statement).getInsertRowStatementList());
+          for (final InsertRowStatement insertRowStatement :
+              ((InsertRowsStatement) statement).getInsertRowStatementList()) {
+            tableModelDatabaseInsertRowStatementMap
+                .computeIfAbsent(insertRowStatement.getDatabaseName().get(), k 
-> new ArrayList<>())
+                .add(insertRowStatement);
+          }
         } else {
           throw new UnsupportedOperationException(
               String.format(
@@ -89,12 +90,21 @@ public class PipeTransferTabletBatchReqV2 extends 
TPipeTransferReq {
         continue;
       }
       if (statement instanceof InsertRowStatement) {
-        insertRowStatementList.add((InsertRowStatement) statement);
+        treeModelDatabaseInsertRowStatementMap
+            .computeIfAbsent(statement.getDatabaseName().orElse(null), k -> 
new ArrayList<>())
+            .add((InsertRowStatement) statement);
       } else if (statement instanceof InsertTabletStatement) {
-        insertTabletStatementList.add((InsertTabletStatement) statement);
+        treeModelDatabaseInsertTabletStatementMap
+            .computeIfAbsent(statement.getDatabaseName().orElse(null), k -> 
new ArrayList<>())
+            .add((InsertTabletStatement) statement);
       } else if (statement instanceof InsertRowsStatement) {
-        insertRowStatementList.addAll(
-            ((InsertRowsStatement) statement).getInsertRowStatementList());
+        for (final InsertRowStatement insertRowStatement :
+            ((InsertRowsStatement) statement).getInsertRowStatementList()) {
+          treeModelDatabaseInsertRowStatementMap
+              .computeIfAbsent(
+                  insertRowStatement.getDatabaseName().orElse(null), k -> new 
ArrayList<>())
+              .add(insertRowStatement);
+        }
       } else {
         throw new UnsupportedOperationException(
             String.format(
@@ -112,17 +122,13 @@ public class PipeTransferTabletBatchReqV2 extends 
TPipeTransferReq {
         statements.add(statement);
         continue;
       }
-      insertTabletStatementList.add(statement);
+      treeModelDatabaseInsertTabletStatementMap
+          .computeIfAbsent(statement.getDatabaseName().orElse(null), k -> new 
ArrayList<>())
+          .add(statement);
     }
 
-    insertRowsStatement.setInsertRowStatementList(insertRowStatementList);
-    
insertMultiTabletsStatement.setInsertTabletStatementList(insertTabletStatementList);
-    if (!insertRowsStatement.isEmpty()) {
-      statements.add(insertRowsStatement);
-    }
-    if (!insertMultiTabletsStatement.isEmpty()) {
-      statements.add(insertMultiTabletsStatement);
-    }
+    addTreeModelInsertRowsStatements(statements, 
treeModelDatabaseInsertRowStatementMap);
+    addTreeModelInsertTabletsStatements(statements, 
treeModelDatabaseInsertTabletStatementMap);
 
     for (final Map.Entry<String, List<InsertRowStatement>> insertRows :
         tableModelDatabaseInsertRowStatementMap.entrySet()) {
@@ -136,6 +142,34 @@ public class PipeTransferTabletBatchReqV2 extends 
TPipeTransferReq {
     return statements;
   }
 
+  private void addTreeModelInsertRowsStatements(
+      final List<InsertBaseStatement> statements,
+      final Map<String, List<InsertRowStatement>> 
databaseInsertRowStatementMap) {
+    for (final Map.Entry<String, List<InsertRowStatement>> insertRows :
+        databaseInsertRowStatementMap.entrySet()) {
+      final InsertRowsStatement statement = new InsertRowsStatement();
+      statement.setInsertRowStatementList(insertRows.getValue());
+      if (insertRows.getKey() != null) {
+        statement.setDatabaseName(insertRows.getKey());
+      }
+      statements.add(statement);
+    }
+  }
+
+  private void addTreeModelInsertTabletsStatements(
+      final List<InsertBaseStatement> statements,
+      final Map<String, List<InsertTabletStatement>> 
databaseInsertTabletStatementMap) {
+    for (final Map.Entry<String, List<InsertTabletStatement>> insertTablets :
+        databaseInsertTabletStatementMap.entrySet()) {
+      final InsertMultiTabletsStatement statement = new 
InsertMultiTabletsStatement();
+      statement.setInsertTabletStatementList(insertTablets.getValue());
+      if (insertTablets.getKey() != null) {
+        statement.setDatabaseName(insertTablets.getKey());
+      }
+      statements.add(statement);
+    }
+  }
+
   /////////////////////////////// Thrift ///////////////////////////////
 
   public static PipeTransferTabletBatchReqV2 toTPipeTransferReq(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBinaryReqV2.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBinaryReqV2.java
index 8166e6b07ef..75501fe0f22 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBinaryReqV2.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBinaryReqV2.java
@@ -21,6 +21,7 @@ package 
org.apache.iotdb.db.pipe.sink.payload.evolvable.request;
 
 import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion;
 import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType;
+import org.apache.iotdb.commons.utils.PathUtils;
 import org.apache.iotdb.db.pipe.receiver.protocol.thrift.IoTDBDataNodeReceiver;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
@@ -72,14 +73,18 @@ public class PipeTransferTabletBinaryReqV2 extends 
PipeTransferTabletBinaryReq {
       return statement;
     }
 
-    // Table model
-    statement.setWriteToTable(true);
+    final boolean isTableModel = PathUtils.isTableModelDatabase(dataBaseName);
+    if (isTableModel) {
+      statement.setWriteToTable(true);
+    }
     if (statement instanceof InsertRowsStatement) {
       List<InsertRowStatement> rowStatements =
           ((InsertRowsStatement) statement).getInsertRowStatementList();
       if (rowStatements != null && !rowStatements.isEmpty()) {
         for (InsertRowStatement insertRowStatement : rowStatements) {
-          insertRowStatement.setWriteToTable(true);
+          if (isTableModel) {
+            insertRowStatement.setWriteToTable(true);
+          }
           insertRowStatement.setDatabaseName(dataBaseName);
         }
       }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletInsertNodeReqV2.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletInsertNodeReqV2.java
index 599e12af9ed..e39330b5b0e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletInsertNodeReqV2.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletInsertNodeReqV2.java
@@ -21,6 +21,7 @@ package 
org.apache.iotdb.db.pipe.sink.payload.evolvable.request;
 
 import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion;
 import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType;
+import org.apache.iotdb.commons.utils.PathUtils;
 import org.apache.iotdb.db.pipe.receiver.protocol.thrift.IoTDBDataNodeReceiver;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.PlanFragment;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
@@ -72,14 +73,18 @@ public class PipeTransferTabletInsertNodeReqV2 extends 
PipeTransferTabletInsertN
       return statement;
     }
 
-    // Table model
-    statement.setWriteToTable(true);
+    final boolean isTableModel = PathUtils.isTableModelDatabase(dataBaseName);
+    if (isTableModel) {
+      statement.setWriteToTable(true);
+    }
     if (statement instanceof InsertRowsStatement) {
       List<InsertRowStatement> rowStatements =
           ((InsertRowsStatement) statement).getInsertRowStatementList();
       if (rowStatements != null && !rowStatements.isEmpty()) {
         for (InsertRowStatement insertRowStatement : rowStatements) {
-          insertRowStatement.setWriteToTable(true);
+          if (isTableModel) {
+            insertRowStatement.setWriteToTable(true);
+          }
           insertRowStatement.setDatabaseName(dataBaseName);
         }
       }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReqV2.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReqV2.java
index f6b910a8844..2458e5e243f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReqV2.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReqV2.java
@@ -22,6 +22,7 @@ package 
org.apache.iotdb.db.pipe.sink.payload.evolvable.request;
 import org.apache.iotdb.commons.exception.MetadataException;
 import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion;
 import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType;
+import org.apache.iotdb.commons.utils.PathUtils;
 import org.apache.iotdb.db.i18n.DataNodePipeMessages;
 import org.apache.iotdb.db.pipe.sink.util.TabletStatementConverter;
 import 
org.apache.iotdb.db.pipe.sink.util.sorter.PipeTableModelTabletEventSorter;
@@ -54,29 +55,45 @@ public class PipeTransferTabletRawReqV2 extends 
PipeTransferTabletRawReq {
 
   @Override
   public InsertTabletStatement constructStatement() {
+    final boolean isTableModel =
+        Objects.nonNull(dataBaseName) && 
PathUtils.isTableModelDatabase(dataBaseName);
+
     if (statement != null) {
-      if (Objects.isNull(dataBaseName)) {
-        new 
PipeTreeModelTabletEventSorter(statement).deduplicateAndSortTimestampsIfNecessary();
-      } else {
+      if (isTableModel) {
         new 
PipeTableModelTabletEventSorter(statement).sortByTimestampIfNecessary();
+        statement.setWriteToTable(true);
+      } else {
+        new 
PipeTreeModelTabletEventSorter(statement).deduplicateAndSortTimestampsIfNecessary();
+      }
+      if (Objects.nonNull(dataBaseName)) {
+        statement.setDatabaseName(dataBaseName);
       }
 
       return statement;
     }
 
-    if (Objects.isNull(dataBaseName)) {
-      new 
PipeTreeModelTabletEventSorter(tablet).deduplicateAndSortTimestampsIfNecessary();
-    } else {
+    if (isTableModel) {
       new PipeTableModelTabletEventSorter(tablet).sortByTimestampIfNecessary();
+    } else {
+      new 
PipeTreeModelTabletEventSorter(tablet).deduplicateAndSortTimestampsIfNecessary();
     }
 
     try {
       if (isTabletEmpty(tablet)) {
         // Empty statement, will be filtered after construction
-        return new InsertTabletStatement();
+        statement = new InsertTabletStatement();
+        return statement;
       }
 
-      return new InsertTabletStatement(tablet, isAligned, dataBaseName);
+      if (isTableModel) {
+        statement = new InsertTabletStatement(tablet, isAligned, dataBaseName);
+      } else {
+        statement = new InsertTabletStatement(tablet, isAligned, null);
+        if (Objects.nonNull(dataBaseName)) {
+          statement.setDatabaseName(dataBaseName);
+        }
+      }
+      return statement;
     } catch (final MetadataException e) {
       LOGGER.warn(DataNodePipeMessages.GENERATE_STATEMENT_FROM_TABLET_ERROR, 
tablet, e);
       return null;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
index 27740b12e07..3bcdcaf02e6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
@@ -243,7 +243,7 @@ public class IoTDBDataRegionAirGapSink extends 
IoTDBDataNodeAirGapSink {
             insertNode,
             pipeInsertNodeTabletInsertionEvent.isTableModelEvent()
                 ? 
pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName()
-                : null);
+                : 
pipeInsertNodeTabletInsertionEvent.getTreeModelDatabaseName());
 
     if (!send(
         pipeInsertNodeTabletInsertionEvent.getPipeName(),
@@ -290,7 +290,7 @@ public class IoTDBDataRegionAirGapSink extends 
IoTDBDataNodeAirGapSink {
             pipeRawTabletInsertionEvent.isAligned(),
             pipeRawTabletInsertionEvent.isTableModelEvent()
                 ? pipeRawTabletInsertionEvent.getTableModelDatabaseName()
-                : null))) {
+                : pipeRawTabletInsertionEvent.getTreeModelDatabaseName()))) {
       final String errorMessage =
           String.format(
               "Transfer PipeRawTabletInsertionEvent %s error. Socket: %s.",
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
index 14c854fa2ad..9adbcf6cf16 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
@@ -300,7 +300,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
       final String databaseName =
           pipeInsertNodeTabletInsertionEvent.isTableModelEvent()
               ? pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName()
-              : null;
+              : pipeInsertNodeTabletInsertionEvent.getTreeModelDatabaseName();
       final TPipeTransferReq pipeTransferReq =
           compressIfNeeded(
               PipeTransferTabletInsertNodeReqV2.toTPipeTransferReq(insertNode, 
databaseName));
@@ -327,7 +327,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
                   pipeRawTabletInsertionEvent.isAligned(),
                   pipeRawTabletInsertionEvent.isTableModelEvent()
                       ? pipeRawTabletInsertionEvent.getTableModelDatabaseName()
-                      : null));
+                      : 
pipeRawTabletInsertionEvent.getTreeModelDatabaseName()));
       final PipeTransferTabletRawEventHandler pipeTransferTabletReqHandler =
           new PipeTransferTabletRawEventHandler(
               pipeRawTabletInsertionEvent, pipeTransferTabletRawReq, this);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
index 6cc416f4022..5e6297d8438 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
@@ -387,7 +387,7 @@ public class IoTDBDataRegionSyncSink extends 
IoTDBDataNodeSyncSink {
                   insertNode,
                   pipeInsertNodeTabletInsertionEvent.isTableModelEvent()
                       ? 
pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName()
-                      : null));
+                      : 
pipeInsertNodeTabletInsertionEvent.getTreeModelDatabaseName()));
       rateLimitIfNeeded(
           pipeInsertNodeTabletInsertionEvent.getPipeName(),
           pipeInsertNodeTabletInsertionEvent.getCreationTime(),
@@ -452,7 +452,7 @@ public class IoTDBDataRegionSyncSink extends 
IoTDBDataNodeSyncSink {
                   pipeRawTabletInsertionEvent.isAligned(),
                   pipeRawTabletInsertionEvent.isTableModelEvent()
                       ? pipeRawTabletInsertionEvent.getTableModelDatabaseName()
-                      : null));
+                      : 
pipeRawTabletInsertionEvent.getTreeModelDatabaseName()));
       rateLimitIfNeeded(
           pipeRawTabletInsertionEvent.getPipeName(),
           pipeRawTabletInsertionEvent.getCreationTime(),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java
index 4c0795f12cd..581792475c0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java
@@ -121,8 +121,6 @@ public class WriteBackSink implements PipeConnector {
 
   private UserEntity userEntity;
 
-  private static final String TREE_MODEL_DATABASE_NAME_IDENTIFIER = null;
-
   private static final SqlParser RELATIONAL_SQL_PARSER = new SqlParser();
 
   private static final Set<String> ALREADY_CREATED_DATABASES = 
ConcurrentHashMap.newKeySet();
@@ -268,7 +266,7 @@ public class WriteBackSink implements PipeConnector {
     final String dataBaseName =
         pipeInsertNodeTabletInsertionEvent.isTableModelEvent()
             ? pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName()
-            : TREE_MODEL_DATABASE_NAME_IDENTIFIER;
+            : pipeInsertNodeTabletInsertionEvent.getTreeModelDatabaseName();
 
     final InsertBaseStatement insertBaseStatement;
     insertBaseStatement =
@@ -311,7 +309,7 @@ public class WriteBackSink implements PipeConnector {
     final String dataBaseName =
         pipeRawTabletInsertionEvent.isTableModelEvent()
             ? pipeRawTabletInsertionEvent.getTableModelDatabaseName()
-            : TREE_MODEL_DATABASE_NAME_IDENTIFIER;
+            : pipeRawTabletInsertionEvent.getTreeModelDatabaseName();
 
     final InsertTabletStatement insertTabletStatement =
         PipeTransferTabletRawReqV2.toTPipeTransferRawReq(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java
index c5b9ebed4d5..e8b8e36cb49 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.pipe.sink.util;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
+import org.apache.iotdb.commons.utils.PathUtils;
 import org.apache.iotdb.db.pipe.resource.memory.InsertNodeMemoryEstimator;
 import 
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeDevicePathCache;
 import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
@@ -219,16 +220,22 @@ public class TabletStatementConverter {
       final String databaseName = ReadWriteIOUtils.readString(byteBuffer);
       if (databaseName != null) {
         statement.setDatabaseName(databaseName);
-        statement.setWriteToTable(true);
-        // For table model, insertTargetName is table name, convert to 
lowercase
-        statement.setDevicePath(new 
PartialPath(insertTargetName.toLowerCase(), false));
         // Calculate memory for databaseName
         memorySize += 
org.apache.tsfile.utils.RamUsageEstimator.sizeOf(databaseName);
 
-        statement.setColumnCategories(columnCategories);
-
-        memorySize += columnCategoriesMemorySize;
-        memorySize += tagColumnIndicesSize;
+        if (PathUtils.isTableModelDatabase(databaseName)) {
+          statement.setWriteToTable(true);
+          // For table model, insertTargetName is table name, convert to 
lowercase
+          statement.setDevicePath(new 
PartialPath(insertTargetName.toLowerCase(), false));
+          statement.setColumnCategories(columnCategories);
+
+          memorySize += columnCategoriesMemorySize;
+          memorySize += tagColumnIndicesSize;
+        } else {
+          statement.setDevicePath(
+              
DataNodeDevicePathCache.getInstance().getPartialPath(insertTargetName));
+          statement.setColumnCategories(null);
+        }
       } else {
         // For tree model, use DataNodeDevicePathCache
         statement.setDevicePath(
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java
index 1246ec9049c..38704ec7fea 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java
@@ -44,6 +44,8 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.Cre
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
 import org.apache.iotdb.db.queryengine.plan.statement.Statement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
 
@@ -66,7 +68,9 @@ import java.nio.ByteBuffer;
 import java.time.LocalDate;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 public class PipeDataNodeThriftRequestTest {
 
@@ -201,6 +205,33 @@ public class PipeDataNodeThriftRequestTest {
     Assert.assertEquals(statement.getDatabaseName().get(), "test");
   }
 
+  @Test
+  public void testPipeTransferInsertNodeReqV2WithTreeModelDatabase() {
+    final PipeTransferTabletInsertNodeReqV2 req =
+        PipeTransferTabletInsertNodeReqV2.toTPipeTransferReq(
+            new InsertRowNode(
+                new PlanNodeId(""),
+                new PartialPath(new String[] {"root", "test", "d"}),
+                false,
+                new String[] {"s"},
+                new TSDataType[] {TSDataType.INT32},
+                1,
+                new Object[] {1},
+                false),
+            "root.test");
+    final PipeTransferTabletInsertNodeReqV2 deserializeReq =
+        PipeTransferTabletInsertNodeReqV2.fromTPipeTransferReq(req);
+
+    final InsertBaseStatement statement = deserializeReq.constructStatement();
+    final List<PartialPath> paths = new ArrayList<>();
+    paths.add(new PartialPath(new String[] {"root", "test", "d", "s"}));
+
+    Assert.assertEquals(statement.getPaths(), paths);
+    Assert.assertFalse(statement.isWriteToTable());
+    Assert.assertTrue(statement.getDatabaseName().isPresent());
+    Assert.assertEquals("root.test", statement.getDatabaseName().get());
+  }
+
   @Test
   public void testPipeTransferTabletBinaryReq() {
     // Not do real test here since "serializeToWal" needs private inner class 
of walBuffer
@@ -365,6 +396,39 @@ public class PipeDataNodeThriftRequestTest {
     }
   }
 
+  @Test
+  public void testPipeTransferTabletReqV2WithTreeModelDatabase() {
+    try {
+      final List<IMeasurementSchema> schemaList = new ArrayList<>();
+      schemaList.add(new MeasurementSchema("s1", TSDataType.INT32));
+      schemaList.add(new MeasurementSchema("s2", TSDataType.TEXT));
+      final Tablet tablet = new Tablet("root.test.d", schemaList, 8);
+      tablet.addTimestamp(0, 2);
+      tablet.addTimestamp(1, 1);
+      tablet.addValue("s1", 0, 2);
+      tablet.addValue("s2", 0, "2");
+      tablet.addValue("s1", 1, 1);
+      tablet.addValue("s2", 1, "1");
+
+      final PipeTransferTabletRawReqV2 req =
+          PipeTransferTabletRawReqV2.toTPipeTransferReq(tablet, false, 
"root.test");
+      final PipeTransferTabletRawReqV2 deserializeReq =
+          PipeTransferTabletRawReqV2.fromTPipeTransferReq(req);
+
+      final InsertBaseStatement statement = 
deserializeReq.constructStatement();
+      final List<PartialPath> paths = new ArrayList<>();
+      paths.add(new PartialPath(new String[] {"root", "test", "d", "s1"}));
+      paths.add(new PartialPath(new String[] {"root", "test", "d", "s2"}));
+
+      Assert.assertEquals(paths, statement.getPaths());
+      Assert.assertFalse(statement.isWriteToTable());
+      Assert.assertTrue(statement.getDatabaseName().isPresent());
+      Assert.assertEquals("root.test", statement.getDatabaseName().get());
+    } catch (final IOException e) {
+      Assert.fail();
+    }
+  }
+
   @Test
   public void testPipeTransferTabletBatchReq() throws IOException {
     final List<ByteBuffer> insertNodeBuffers = new ArrayList<>();
@@ -503,6 +567,94 @@ public class PipeDataNodeThriftRequestTest {
     Assert.assertEquals("test", 
deserializedReq.getInsertNodeReqs().get(0).getDataBaseName());
   }
 
+  @Test
+  public void testPipeTransferTabletBatchReqV2WithMultipleTreeModelDatabases() 
throws IOException {
+    final List<ByteBuffer> insertNodeBuffers = new ArrayList<>();
+    final List<ByteBuffer> tabletBuffers = new ArrayList<>();
+    final List<String> insertDataBase = new ArrayList<>();
+    final List<String> tabletDataBase = new ArrayList<>();
+
+    insertNodeBuffers.add(
+        new InsertRowNode(
+                new PlanNodeId(""),
+                new PartialPath(new String[] {"root", "db1", "d"}),
+                false,
+                new String[] {"s"},
+                new TSDataType[] {TSDataType.INT32},
+                1,
+                new Object[] {1},
+                false)
+            .serializeToByteBuffer());
+    insertDataBase.add("root.db1");
+
+    insertNodeBuffers.add(
+        new InsertRowNode(
+                new PlanNodeId(""),
+                new PartialPath(new String[] {"root", "db2", "d"}),
+                false,
+                new String[] {"s"},
+                new TSDataType[] {TSDataType.INT32},
+                2,
+                new Object[] {2},
+                false)
+            .serializeToByteBuffer());
+    insertDataBase.add("root.db2");
+
+    final List<IMeasurementSchema> schemaList = new ArrayList<>();
+    schemaList.add(new MeasurementSchema("s1", TSDataType.INT32));
+
+    final Tablet db1Tablet = new Tablet("root.db1.d", schemaList, 8);
+    db1Tablet.addTimestamp(0, 1);
+    db1Tablet.addValue("s1", 0, 1);
+    try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+        final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
+      db1Tablet.serialize(outputStream);
+      ReadWriteIOUtils.write(false, outputStream);
+      tabletBuffers.add(
+          ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size()));
+      tabletDataBase.add("root.db1");
+    }
+
+    final Tablet db2Tablet = new Tablet("root.db2.d", schemaList, 8);
+    db2Tablet.addTimestamp(0, 2);
+    db2Tablet.addValue("s1", 0, 2);
+    try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+        final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
+      db2Tablet.serialize(outputStream);
+      ReadWriteIOUtils.write(false, outputStream);
+      tabletBuffers.add(
+          ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size()));
+      tabletDataBase.add("root.db2");
+    }
+
+    final PipeTransferTabletBatchReqV2 req =
+        PipeTransferTabletBatchReqV2.toTPipeTransferReq(
+            insertNodeBuffers, tabletBuffers, insertDataBase, tabletDataBase);
+    final PipeTransferTabletBatchReqV2 deserializedReq =
+        PipeTransferTabletBatchReqV2.fromTPipeTransferReq(req);
+
+    final List<InsertBaseStatement> statements = 
deserializedReq.constructStatements();
+    final Set<String> insertRowsDatabases = new HashSet<>();
+    final Set<String> insertTabletsDatabases = new HashSet<>();
+
+    for (final InsertBaseStatement statement : statements) {
+      Assert.assertFalse(statement.isWriteToTable());
+      Assert.assertTrue(statement.getDatabaseName().isPresent());
+      if (statement instanceof InsertRowsStatement) {
+        insertRowsDatabases.add(statement.getDatabaseName().get());
+      } else if (statement instanceof InsertMultiTabletsStatement) {
+        insertTabletsDatabases.add(statement.getDatabaseName().get());
+      } else {
+        Assert.fail("Unexpected statement type: " + 
statement.getClass().getName());
+      }
+    }
+
+    Assert.assertEquals(
+        new HashSet<>(java.util.Arrays.asList("root.db1", "root.db2")), 
insertRowsDatabases);
+    Assert.assertEquals(
+        new HashSet<>(java.util.Arrays.asList("root.db1", "root.db2")), 
insertTabletsDatabases);
+  }
+
   @Test
   public void testPipeTransferFilePieceReq() throws IOException {
     final byte[] body = "testPipeTransferFilePieceReq".getBytes();


Reply via email to