This is an automated email from the ASF dual-hosted git repository.

Caideyipi pushed a commit to branch historical-semantic
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/historical-semantic by this 
push:
     new 4b0cf4f4163 historical
4b0cf4f4163 is described below

commit 4b0cf4f41632aadbdc3882a99c8ddca725e941fa
Author: Caideyipi <[email protected]>
AuthorDate: Wed May 6 11:49:07 2026 +0800

    historical
---
 .../treemodel/auto/basic/IoTDBPipeAutoSplitIT.java | 14 +++++++++++++-
 .../config/executor/ClusterConfigTaskExecutor.java | 22 ++++++++++++++--------
 2 files changed, 27 insertions(+), 9 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeAutoSplitIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeAutoSplitIT.java
index 5be609aba2f..53ab9181a95 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeAutoSplitIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeAutoSplitIT.java
@@ -140,9 +140,21 @@ public class IoTDBPipeAutoSplitIT extends 
AbstractPipeDualTreeModelAutoIT {
             "insert into root.test.device(time, field) values(0,1),(1,2)",
             "delete from root.test.device.* where time == 0",
             String.format(
-                "create pipe a2b with source ('inclusion'='all') with sink 
('node-urls'='%s')",
+                "create pipe a2b with source ('inclusion'='all') with sink "
+                    + "('node-urls'='%s', 'enable-send-tsfile-limit'='false')",
                 receiverDataNode.getIpAndPortString())));
 
+    try (final SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
+      final List<TShowPipeInfo> showPipeResult =
+          client.showPipe(new 
TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
+      showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
+      Assert.assertTrue(
+          showPipeResult.stream()
+              .filter(i -> Objects.equals(i.id, "a2b_history"))
+              .anyMatch(i -> 
i.pipeConnector.contains("enable-send-tsfile-limit=false")));
+    }
+
     TestUtils.assertDataEventuallyOnEnv(
         receiverEnv,
         "select * from root.test.device",
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index 659b24b2767..3c156eb872b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -2260,6 +2260,19 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
         }
 
         // 2. Send request to create the historical data synchronization 
pipeline
+        final Map<String, String> historySinkAttributes =
+            sinkPipeParameters.hasAnyAttributes(
+                    PipeSinkConstant.SINK_ENABLE_SEND_TSFILE_LIMIT,
+                    PipeSinkConstant.CONNECTOR_ENABLE_SEND_TSFILE_LIMIT)
+                ? createPipeStatement.getSinkAttributes()
+                : sinkPipeParameters
+                    .addOrReplaceEquivalentAttributesWithClone(
+                        new PipeParameters(
+                            Collections.singletonMap(
+                                PipeSinkConstant.SINK_ENABLE_SEND_TSFILE_LIMIT,
+                                Boolean.TRUE.toString())))
+                    .getAttribute();
+
         final TCreatePipeReq historyReq =
             new TCreatePipeReq()
                 // Append suffix to the pipeline name for historical data
@@ -2292,14 +2305,7 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
                                     
PipeSourceConstant.EXTRACTOR_EXCLUSION_DEFAULT_VALUE)))
                         .getAttribute())
                 
.setProcessorAttributes(createPipeStatement.getProcessorAttributes())
-                .setConnectorAttributes(
-                    sinkPipeParameters
-                        .addOrReplaceEquivalentAttributesWithClone(
-                            new PipeParameters(
-                                Collections.singletonMap(
-                                    
PipeSinkConstant.SINK_ENABLE_SEND_TSFILE_LIMIT,
-                                    Boolean.TRUE.toString())))
-                        .getAttribute());
+                .setConnectorAttributes(historySinkAttributes);
 
         final TSStatus historyTsStatus = 
configNodeClient.createPipe(historyReq);
         // If creation fails, immediately return with exception

Reply via email to