This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new e9584eced72 Pipe: Fixed the remaining correctness bug for PR 17661 && 
the replication configuration of IoTDBPipeReceiverAutoCreateDisabledIT (#17678)
e9584eced72 is described below

commit e9584eced726a39733b68d42bc5bb02857d91fad
Author: Caideyipi <[email protected]>
AuthorDate: Fri May 15 16:38:04 2026 +0800

    Pipe: Fixed the remaining correctness bug for PR 17661 && the replication 
configuration of IoTDBPipeReceiverAutoCreateDisabledIT (#17678)
    
    * Update IoTDBPipeReceiverAutoCreateDisabledIT.java
    
    * Fix
    
    * Update IoTDBPipeReceiverAutoCreateDisabledIT.java
    
    * Update IoTDBDataNodeReceiver.java
    
    * Update IoTDBDataNodeReceiver.java
    
    * Fix
---
 .../IoTDBPipeReceiverAutoCreateDisabledIT.java     | 17 +++++++-
 .../protocol/thrift/IoTDBDataNodeReceiver.java     | 48 +++++++++++++++-------
 .../protocol/airgap/IoTDBDataRegionAirGapSink.java |  4 +-
 .../thrift/async/IoTDBDataRegionAsyncSink.java     |  2 +-
 .../thrift/sync/IoTDBDataRegionSyncSink.java       |  2 +-
 5 files changed, 52 insertions(+), 21 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeReceiverAutoCreateDisabledIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeReceiverAutoCreateDisabledIT.java
index 6d4f2b80b33..72646f81b86 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeReceiverAutoCreateDisabledIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeReceiverAutoCreateDisabledIT.java
@@ -57,7 +57,17 @@ public class IoTDBPipeReceiverAutoCreateDisabledIT extends 
AbstractPipeDualTreeM
   @Override
   protected void setupConfig() {
     super.setupConfig();
-    
receiverEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(false);
+    senderEnv
+        .getConfig()
+        .getCommonConfig()
+        .setDataReplicationFactor(1)
+        .setSchemaReplicationFactor(1);
+    receiverEnv
+        .getConfig()
+        .getCommonConfig()
+        .setAutoCreateSchemaEnabled(false)
+        .setDataReplicationFactor(1)
+        .setSchemaReplicationFactor(1);
   }
 
   @Test
@@ -69,10 +79,12 @@ public class IoTDBPipeReceiverAutoCreateDisabledIT extends 
AbstractPipeDualTreeM
 
     final String createPipeSql =
         String.format(
-            "create pipe test with source 
('inclusion'='all','source.realtime.mode'='stream','source.realtime.enable'='true')
 "
+            "create pipe test with source ('inclusion'='all') "
                 + "with sink ('sink'='iotdb-thrift-sink', 
'sink.node-urls'='%s');",
             receiverEnv.getDataNodeWrapper(0).getIpAndPortString());
     final String createDatabaseSql = "create database root.test.sg;";
+    final String createSecondDatabaseSql =
+        "create database 
root.test.ABCDEFGHIJKLMNOPQRSTUVWXYZ_abcdefghijklmnopqrstuvwxyz1;";
     final String createFirstTimeSeriesSql =
         "create timeseries 
root.test.sg.`1~!@#$%^&*()_+=:'\"/|[]{}`.`~!@#$%^&*()_+=:'\"/|[]{}` float;";
     final String insertFirstSql =
@@ -95,6 +107,7 @@ public class IoTDBPipeReceiverAutoCreateDisabledIT extends 
AbstractPipeDualTreeM
       statement.execute(createFirstTimeSeriesSql);
       statement.execute(insertFirstSql);
       final QueryResult firstQueryResult = queryForResult(statement, 
firstSelectSql);
+      statement.execute(createSecondDatabaseSql);
       statement.execute(createSecondTimeSeriesSql);
       statement.execute(insertSecondSql);
       final QueryResult secondQueryResult = queryForResult(statement, 
secondSelectSql);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index dd6e684031c..dbf77339c0f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -871,9 +871,11 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
     // Judge which model the statement belongs to
     final boolean isTableModelStatement;
     final String databaseName;
-    if (statement instanceof LoadTsFileStatement
-        && ((LoadTsFileStatement) statement).getDatabase() != null) {
-      isTableModelStatement = true;
+    if (statement instanceof LoadTsFileStatement) {
+      // Pipe receiver always constructs a tree-model LoadTsFileStatement. Its 
database field is
+      // only an explicit database hint for table data or pipe-generated 
tree-model loads, so it
+      // must not be used to route execution into the table-model pipeline.
+      isTableModelStatement = false;
       databaseName = ((LoadTsFileStatement) statement).getDatabase();
     } else if (statement instanceof InsertBaseStatement
         && ((InsertBaseStatement) statement).isWriteToTable()) {
@@ -925,18 +927,34 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
 
     // Try to convert data type if the statement is a tree model statement
     // and the status code is not success
-    return shouldConvertDataTypeOnTypeMismatch
-            && ((statement instanceof InsertBaseStatement
-                    && ((InsertBaseStatement) 
statement).hasFailedMeasurements())
-                || (status.getCode() != 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()
-                    && status.getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()))
-        ? (isTableModelStatement
-            ? statement
-                .accept(
-                    tableStatementDataTypeConvertExecutionVisitor, new 
Pair<>(status, databaseName))
-                .orElse(status)
-            : statement.accept(treeStatementDataTypeConvertExecutionVisitor, 
status).orElse(status))
-        : status;
+    if (!shouldConvertDataTypeOnTypeMismatch
+        || !((statement instanceof InsertBaseStatement
+                && ((InsertBaseStatement) statement).hasFailedMeasurements())
+            || (status.getCode() != 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()
+                && status.getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()))) {
+      return status;
+    }
+
+    if (statement instanceof LoadTsFileStatement
+        && shouldUseTableModelVisitorForLoadStatement((LoadTsFileStatement) 
statement)) {
+      return statement
+          .accept(tableStatementDataTypeConvertExecutionVisitor, new 
Pair<>(status, databaseName))
+          .orElse(status);
+    }
+
+    return isTableModelStatement
+        ? statement
+            .accept(tableStatementDataTypeConvertExecutionVisitor, new 
Pair<>(status, databaseName))
+            .orElse(status)
+        : statement.accept(treeStatementDataTypeConvertExecutionVisitor, 
status).orElse(status);
+  }
+
+  private boolean shouldUseTableModelVisitorForLoadStatement(
+      final LoadTsFileStatement loadTsFileStatement) {
+    final List<Boolean> isTableModel = loadTsFileStatement.getIsTableModel();
+    return Objects.nonNull(isTableModel)
+        && !isTableModel.isEmpty()
+        && isTableModel.stream().allMatch(Boolean.TRUE::equals);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
index 7f904324bbb..64124c56256 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
@@ -341,7 +341,7 @@ public class IoTDBDataRegionAirGapSink extends 
IoTDBDataNodeAirGapSink {
               tsFile.length(),
               pipeTsFileInsertionEvent.isTableModelEvent()
                   ? pipeTsFileInsertionEvent.getTableModelDatabaseName()
-                  : null))) {
+                  : pipeTsFileInsertionEvent.getTreeModelDatabaseName()))) {
         receiverStatusHandler.handle(
             new 
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
                 .setMessage(errorMessage),
@@ -362,7 +362,7 @@ public class IoTDBDataRegionAirGapSink extends 
IoTDBDataNodeAirGapSink {
               tsFile.length(),
               pipeTsFileInsertionEvent.isTableModelEvent()
                   ? pipeTsFileInsertionEvent.getTableModelDatabaseName()
-                  : null))) {
+                  : pipeTsFileInsertionEvent.getTreeModelDatabaseName()))) {
         receiverStatusHandler.handle(
             new 
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
                 .setMessage(errorMessage),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
index 554c6c43dfb..97e9309a435 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
@@ -423,7 +423,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
                   && clientManager.supportModsIfIsDataNodeReceiver(),
               pipeTsFileInsertionEvent.isTableModelEvent()
                   ? pipeTsFileInsertionEvent.getTableModelDatabaseName()
-                  : null);
+                  : pipeTsFileInsertionEvent.getTreeModelDatabaseName());
 
       transfer(pipeTransferTsFileHandler);
       return true;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
index e8c4420861c..4309e5dae4e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
@@ -499,7 +499,7 @@ public class IoTDBDataRegionSyncSink extends 
IoTDBDataNodeSyncSink {
           pipeTsFileInsertionEvent.isWithMod() ? 
pipeTsFileInsertionEvent.getModFile() : null,
           pipeTsFileInsertionEvent.isTableModelEvent()
               ? pipeTsFileInsertionEvent.getTableModelDatabaseName()
-              : null);
+              : pipeTsFileInsertionEvent.getTreeModelDatabaseName());
     } finally {
       pipeTsFileInsertionEvent.decreaseReferenceCount(
           IoTDBDataRegionSyncSink.class.getName(), false);

Reply via email to