This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 5f956e18ae7 Pipe Load: Binded database for pipe-generated tree model 
load (#17661)
5f956e18ae7 is described below

commit 5f956e18ae705f506f3730df6e8fc8f1fdb17d8c
Author: Caideyipi <[email protected]>
AuthorDate: Thu May 14 22:02:27 2026 +0800

    Pipe Load: Binded database for pipe-generated tree model load (#17661)
    
    * fix
    
    * xb
    
    * Update LoadTsFileScheduler.java
---
 .../IoTDBPipeReceiverAutoCreateDisabledIT.java     | 141 +++++++++++++++++++++
 .../plan/scheduler/load/LoadTsFileScheduler.java   |  41 ++++--
 .../scheduler/load/LoadTsFileSchedulerTest.java    |  20 +++
 3 files changed, 192 insertions(+), 10 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeReceiverAutoCreateDisabledIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeReceiverAutoCreateDisabledIT.java
new file mode 100644
index 00000000000..6d4f2b80b33
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeReceiverAutoCreateDisabledIT.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.it.dual.treemodel.auto.basic;
+
+import org.apache.iotdb.db.it.utils.TestUtils;
+import org.apache.iotdb.it.env.MultiEnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.MultiClusterIT2DualTreeAutoBasic;
+import 
org.apache.iotdb.pipe.it.dual.treemodel.auto.AbstractPipeDualTreeModelAutoIT;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.HashSet;
+import java.util.Set;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({MultiClusterIT2DualTreeAutoBasic.class})
+public class IoTDBPipeReceiverAutoCreateDisabledIT extends 
AbstractPipeDualTreeModelAutoIT {
+
+  @Override
+  @Before
+  public void setUp() {
+    MultiEnvFactory.createEnv(2);
+    senderEnv = MultiEnvFactory.getEnv(0);
+    receiverEnv = MultiEnvFactory.getEnv(1);
+    setupConfig();
+    senderEnv.initClusterEnvironment(1, 1);
+    receiverEnv.initClusterEnvironment(1, 1);
+  }
+
+  @Override
+  protected void setupConfig() {
+    super.setupConfig();
+    
receiverEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(false);
+  }
+
+  @Test
+  public void testReceiverAutoCreateSchemaDisabledWithSpecialTimeSeries() 
throws Exception {
+    Assert.assertEquals(1, senderEnv.getConfigNodeWrapperList().size());
+    Assert.assertEquals(1, senderEnv.getDataNodeWrapperList().size());
+    Assert.assertEquals(1, receiverEnv.getConfigNodeWrapperList().size());
+    Assert.assertEquals(1, receiverEnv.getDataNodeWrapperList().size());
+
+    final String createPipeSql =
+        String.format(
+            "create pipe test with source 
('inclusion'='all','source.realtime.mode'='stream','source.realtime.enable'='true')
 "
+                + "with sink ('sink'='iotdb-thrift-sink', 
'sink.node-urls'='%s');",
+            receiverEnv.getDataNodeWrapper(0).getIpAndPortString());
+    final String createDatabaseSql = "create database root.test.sg;";
+    final String createFirstTimeSeriesSql =
+        "create timeseries 
root.test.sg.`1~!@#$%^&*()_+=:'\"/|[]{}`.`~!@#$%^&*()_+=:'\"/|[]{}` float;";
+    final String insertFirstSql =
+        "insert into root.test.sg.`1~!@#$%^&*()_+=:'\"/|[]{}`(time, 
`~!@#$%^&*()_+=:'\"/|[]{}`) "
+            + "values (1706659200,3.5),(1706660000, 15.5);";
+    final String firstSelectSql =
+        "select `~!@#$%^&*()_+=:'\"/|[]{}` from 
root.test.sg.`1~!@#$%^&*()_+=:'\"/|[]{}`;";
+    final String createSecondTimeSeriesSql =
+        "create timeseries 
root.test.ABCDEFGHIJKLMNOPQRSTUVWXYZ_abcdefghijklmnopqrstuvwxyz1.`~!@#$%^&*().,<>?_-+=:'\"/|[]{}`.`~!@#$%^&*().,<>?_-+=:'\"/|[]{}`
 int32;";
+    final String insertSecondSql =
+        "insert into 
root.test.ABCDEFGHIJKLMNOPQRSTUVWXYZ_abcdefghijklmnopqrstuvwxyz1.`~!@#$%^&*().,<>?_-+=:'\"/|[]{}`"
+            + "(time, `~!@#$%^&*().,<>?_-+=:'\"/|[]{}`) values 
(1706666400,23456),(1706667400,23456),(1706686400,23456);";
+    final String secondSelectSql =
+        "select * from 
root.test.ABCDEFGHIJKLMNOPQRSTUVWXYZ_abcdefghijklmnopqrstuvwxyz1.`~!@#$%^&*().,<>?_-+=:'\"/|[]{}`;";
+
+    try (final Connection connection = senderEnv.getConnection();
+        final Statement statement = connection.createStatement()) {
+      statement.execute(createPipeSql);
+      statement.execute(createDatabaseSql);
+      statement.execute(createFirstTimeSeriesSql);
+      statement.execute(insertFirstSql);
+      final QueryResult firstQueryResult = queryForResult(statement, 
firstSelectSql);
+      statement.execute(createSecondTimeSeriesSql);
+      statement.execute(insertSecondSql);
+      final QueryResult secondQueryResult = queryForResult(statement, 
secondSelectSql);
+
+      awaitUntilFlush(senderEnv);
+
+      TestUtils.assertDataEventuallyOnEnv(
+          receiverEnv, firstSelectSql, firstQueryResult.header, 
firstQueryResult.rows);
+      TestUtils.assertDataEventuallyOnEnv(
+          receiverEnv, secondSelectSql, secondQueryResult.header, 
secondQueryResult.rows);
+    }
+  }
+
+  private QueryResult queryForResult(final Statement statement, final String 
sql)
+      throws SQLException {
+    try (final ResultSet resultSet = statement.executeQuery(sql)) {
+      final ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+      final StringBuilder headerBuilder = new StringBuilder();
+      for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+        headerBuilder.append(resultSetMetaData.getColumnName(i)).append(",");
+      }
+
+      final Set<String> rows = new HashSet<>();
+      while (resultSet.next()) {
+        final StringBuilder rowBuilder = new StringBuilder();
+        for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+          rowBuilder.append(resultSet.getString(i)).append(",");
+        }
+        rows.add(rowBuilder.toString());
+      }
+      return new QueryResult(headerBuilder.toString(), rows);
+    }
+  }
+
+  private static class QueryResult {
+    private final String header;
+    private final Set<String> rows;
+
+    private QueryResult(final String header, final Set<String> rows) {
+      this.header = header;
+      this.rows = rows;
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
index 6671dad3548..6251a67689a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
@@ -86,6 +86,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.DataOutputStream;
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -179,11 +180,7 @@ public class LoadTsFileScheduler implements IScheduler {
         final LoadSingleTsFileNode node = tsFileNodeList.get(i);
         final String filePath = node.getTsFileResource().getTsFilePath();
 
-        if (node.isTableModel()) {
-          partitionFetcher.setDatabase(node.getDatabase());
-        } else {
-          partitionFetcher.setDatabase(null);
-        }
+        partitionFetcher.setDatabase(getPartitionQueryDatabase(node, 
isGeneratedByPipe));
 
         boolean isLoadSingleTsFileSuccess = true;
         boolean shouldRemoveFileFromLoadingSet = false;
@@ -593,9 +590,10 @@ public class LoadTsFileScheduler implements IScheduler {
                     .orElse(null)
                 : loadTsFileDataTypeConverter
                     .convertForTreeModel(
-                        LoadTsFileStatement.createUnchecked(filePath)
-                            .setDeleteAfterLoad(failedNode.isDeleteAfterLoad())
-                            .setConvertOnTypeMismatch(true))
+                        buildRetryTreeLoadStatement(
+                            filePath,
+                            failedNode.isDeleteAfterLoad(),
+                            getPartitionQueryDatabase(failedNode, 
isGeneratedByPipe)))
                     .orElse(null);
 
         if (loadTsFileDataTypeConverter.isSuccessful(status)) {
@@ -634,6 +632,27 @@ public class LoadTsFileScheduler implements IScheduler {
     }
   }
 
+  static String getPartitionQueryDatabase(
+      final LoadSingleTsFileNode node, final boolean isGeneratedByPipe) {
+    return node.isTableModel() || isGeneratedByPipe ? node.getDatabase() : 
null;
+  }
+
+  private LoadTsFileStatement buildRetryTreeLoadStatement(
+      final String filePath, final boolean deleteAfterLoad, final String 
database)
+      throws FileNotFoundException {
+    final LoadTsFileStatement statement =
+        LoadTsFileStatement.createUnchecked(filePath)
+            .setDeleteAfterLoad(deleteAfterLoad)
+            .setConvertOnTypeMismatch(true);
+    if (database != null) {
+      statement.setDatabase(database);
+    }
+    if (isGeneratedByPipe) {
+      statement.markIsGeneratedByPipe();
+    }
+    return statement;
+  }
+
   @Override
   public void stop(Throwable t) {
     // Do nothing
@@ -851,7 +870,8 @@ public class LoadTsFileScheduler implements IScheduler {
             subSlotList.stream()
                 .map(
                     pair ->
-                        // (database != null) means this file will be loaded 
into table-model
+                        // database is an explicit database hint for 
table-model loads and
+                        // pipe-generated tree-model loads.
                         database != null
                             ? dataPartition.getDataRegionReplicaSetForWriting(
                                 pair.left, pair.right, database)
@@ -874,7 +894,8 @@ public class LoadTsFileScheduler implements IScheduler {
               entry -> {
                 DataPartitionQueryParam queryParam =
                     new DataPartitionQueryParam(entry.getKey(), new 
ArrayList<>(entry.getValue()));
-                // (database != null) means this file will be loaded into 
table-model
+                // database is an explicit database hint for table-model loads 
and
+                // pipe-generated tree-model loads.
                 if (database != null) {
                   queryParam.setDatabaseName(database);
                 }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileSchedulerTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileSchedulerTest.java
index 161a95b4ec9..2db41c2ccb0 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileSchedulerTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileSchedulerTest.java
@@ -27,6 +27,7 @@ import 
org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.PlanFragment;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.SubPlan;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadSingleTsFileNode;
 
 import org.junit.Assert;
 import org.junit.Before;
@@ -67,4 +68,23 @@ public class LoadTsFileSchedulerTest {
     Assert.assertNull(t.getTotalCpuTime());
     Assert.assertNull(t.getFragmentInfo());
   }
+
+  @Test
+  public void testGetPartitionQueryDatabaseForPipeGeneratedTreeModelLoad() {
+    final LoadSingleTsFileNode node = mock(LoadSingleTsFileNode.class);
+    when(node.isTableModel()).thenReturn(false);
+    when(node.getDatabase()).thenReturn("root.test.sg");
+
+    Assert.assertEquals("root.test.sg", 
LoadTsFileScheduler.getPartitionQueryDatabase(node, true));
+    Assert.assertNull(LoadTsFileScheduler.getPartitionQueryDatabase(node, 
false));
+  }
+
+  @Test
+  public void testGetPartitionQueryDatabaseForTableModelLoad() {
+    final LoadSingleTsFileNode node = mock(LoadSingleTsFileNode.class);
+    when(node.isTableModel()).thenReturn(true);
+    when(node.getDatabase()).thenReturn("test");
+
+    Assert.assertEquals("test", 
LoadTsFileScheduler.getPartitionQueryDatabase(node, false));
+  }
 }

Reply via email to