This is an automated email from the ASF dual-hosted git repository.
Caideyipi pushed a commit to branch historical-semantic
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/historical-semantic by this
push:
new 4b0cf4f4163 historical
4b0cf4f4163 is described below
commit 4b0cf4f41632aadbdc3882a99c8ddca725e941fa
Author: Caideyipi <[email protected]>
AuthorDate: Wed May 6 11:49:07 2026 +0800
historical
---
.../treemodel/auto/basic/IoTDBPipeAutoSplitIT.java | 14 +++++++++++++-
.../config/executor/ClusterConfigTaskExecutor.java | 22 ++++++++++++++--------
2 files changed, 27 insertions(+), 9 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeAutoSplitIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeAutoSplitIT.java
index 5be609aba2f..53ab9181a95 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeAutoSplitIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeAutoSplitIT.java
@@ -140,9 +140,21 @@ public class IoTDBPipeAutoSplitIT extends
AbstractPipeDualTreeModelAutoIT {
"insert into root.test.device(time, field) values(0,1),(1,2)",
"delete from root.test.device.* where time == 0",
String.format(
- "create pipe a2b with source ('inclusion'='all') with sink
('node-urls'='%s')",
+ "create pipe a2b with source ('inclusion'='all') with sink "
+ + "('node-urls'='%s', 'enable-send-tsfile-limit'='false')",
receiverDataNode.getIpAndPortString())));
+ try (final SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
+ final List<TShowPipeInfo> showPipeResult =
+ client.showPipe(new
TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
+ showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
+ Assert.assertTrue(
+ showPipeResult.stream()
+ .filter(i -> Objects.equals(i.id, "a2b_history"))
+ .anyMatch(i ->
i.pipeConnector.contains("enable-send-tsfile-limit=false")));
+ }
+
TestUtils.assertDataEventuallyOnEnv(
receiverEnv,
"select * from root.test.device",
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index 659b24b2767..3c156eb872b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -2260,6 +2260,19 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
}
// 2. Send request to create the historical data synchronization
pipeline
+ final Map<String, String> historySinkAttributes =
+ sinkPipeParameters.hasAnyAttributes(
+ PipeSinkConstant.SINK_ENABLE_SEND_TSFILE_LIMIT,
+ PipeSinkConstant.CONNECTOR_ENABLE_SEND_TSFILE_LIMIT)
+ ? createPipeStatement.getSinkAttributes()
+ : sinkPipeParameters
+ .addOrReplaceEquivalentAttributesWithClone(
+ new PipeParameters(
+ Collections.singletonMap(
+ PipeSinkConstant.SINK_ENABLE_SEND_TSFILE_LIMIT,
+ Boolean.TRUE.toString())))
+ .getAttribute();
+
final TCreatePipeReq historyReq =
new TCreatePipeReq()
// Append suffix to the pipeline name for historical data
@@ -2292,14 +2305,7 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
PipeSourceConstant.EXTRACTOR_EXCLUSION_DEFAULT_VALUE)))
.getAttribute())
.setProcessorAttributes(createPipeStatement.getProcessorAttributes())
- .setConnectorAttributes(
- sinkPipeParameters
- .addOrReplaceEquivalentAttributesWithClone(
- new PipeParameters(
- Collections.singletonMap(
-
PipeSinkConstant.SINK_ENABLE_SEND_TSFILE_LIMIT,
- Boolean.TRUE.toString())))
- .getAttribute());
+ .setConnectorAttributes(historySinkAttributes);
final TSStatus historyTsStatus =
configNodeClient.createPipe(historyReq);
// If creation fails, immediately return with exception