This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new b21485c254c [To dev/1.3] Pipe: split full-sync pipe into history and 
realtime pipes (#16250) (#16299)
b21485c254c is described below

commit b21485c254c4d77ce9569a9d5ab61915cad1af0c
Author: VGalaxies <[email protected]>
AuthorDate: Fri Aug 29 11:47:53 2025 +0800

    [To dev/1.3] Pipe: split full-sync pipe into history and realtime pipes 
(#16250) (#16299)
    
    * setup
    
    * minor improve
    
    * apply Copilot review
    
    * add internal config
    
    * fixup
    
    * fixup
    
    * add IoTDBPipeAutoSplitIT
    
    * set if not exists always to true to handle partial failure
---
 .../it/env/cluster/config/MppCommonConfig.java     |   6 ++
 .../env/cluster/config/MppSharedCommonConfig.java  |   7 ++
 .../it/env/remote/config/RemoteCommonConfig.java   |   5 +
 .../org/apache/iotdb/itbase/env/CommonConfig.java  |   2 +
 .../pipe/it/autocreate/AbstractPipeDualAutoIT.java |   6 +-
 .../pipe/it/autocreate/IoTDBPipeAutoSplitIT.java   | 104 +++++++++++++++++++++
 .../it/autocreate/IoTDBPipeSinkCompressionIT.java  |   6 +-
 .../pipe/it/autocreate/IoTDBPipeSourceIT.java      |   6 +-
 .../pipe/it/manual/AbstractPipeDualManualIT.java   |   7 +-
 .../pipe/it/manual/IoTDBPipePermissionIT.java      |   6 +-
 .../iotdb/pipe/it/single/AbstractPipeSingleIT.java |   3 +-
 .../db/pipe/agent/task/PipeDataNodeTaskAgent.java  |  51 +++++++---
 .../config/executor/ClusterConfigTaskExecutor.java |  77 ++++++++++++++-
 .../apache/iotdb/commons/conf/CommonConfig.java    |  10 ++
 .../iotdb/commons/pipe/config/PipeConfig.java      |   8 ++
 .../iotdb/commons/pipe/config/PipeDescriptor.java  |   6 ++
 16 files changed, 285 insertions(+), 25 deletions(-)

diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
index b029023f8e2..2e715176eb8 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
@@ -514,6 +514,12 @@ public class MppCommonConfig extends MppBaseConfig 
implements CommonConfig {
     return this;
   }
 
+  @Override
+  public CommonConfig setPipeAutoSplitFullEnabled(boolean 
pipeAutoSplitFullEnabled) {
+    setProperty("pipe_auto_split_full_enabled", 
String.valueOf(pipeAutoSplitFullEnabled));
+    return this;
+  }
+
   @Override
   public CommonConfig setQueryMemoryProportion(String queryMemoryProportion) {
     setProperty("chunk_timeseriesmeta_free_memory_proportion", 
queryMemoryProportion);
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
index d89674948da..f836f11d779 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
@@ -526,6 +526,13 @@ public class MppSharedCommonConfig implements CommonConfig 
{
     return this;
   }
 
+  @Override
+  public CommonConfig setPipeAutoSplitFullEnabled(boolean 
pipeAutoSplitFullEnabled) {
+    dnConfig.setPipeAutoSplitFullEnabled(pipeAutoSplitFullEnabled);
+    cnConfig.setPipeAutoSplitFullEnabled(pipeAutoSplitFullEnabled);
+    return this;
+  }
+
   @Override
   public CommonConfig setQueryMemoryProportion(String queryMemoryProportion) {
     dnConfig.setQueryMemoryProportion(queryMemoryProportion);
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
index 738e380758a..291a34c9e9d 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
@@ -371,6 +371,11 @@ public class RemoteCommonConfig implements CommonConfig {
     return this;
   }
 
+  @Override
+  public CommonConfig setPipeAutoSplitFullEnabled(boolean 
pipeAutoSplitFullEnabled) {
+    return this;
+  }
+
   @Override
   public CommonConfig setQueryMemoryProportion(String queryMemoryProportion) {
     return this;
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java 
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
index 4bbfed0e07a..4cdf2bcd743 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
@@ -164,6 +164,8 @@ public interface CommonConfig {
   CommonConfig setPipeConnectorRequestSliceThresholdBytes(
       int pipeConnectorRequestSliceThresholdBytes);
 
+  CommonConfig setPipeAutoSplitFullEnabled(boolean pipeAutoSplitFullEnabled);
+
   CommonConfig setQueryMemoryProportion(String queryMemoryProportion);
 
   CommonConfig setSubscriptionPrefetchTsFileBatchMaxDelayInMs(
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/AbstractPipeDualAutoIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/AbstractPipeDualAutoIT.java
index 6d3f2e85d8b..2c7f5719e9b 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/AbstractPipeDualAutoIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/AbstractPipeDualAutoIT.java
@@ -50,7 +50,8 @@ abstract class AbstractPipeDualAutoIT {
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         .setPipeMemoryManagementEnabled(false)
-        .setIsPipeEnableMemoryCheck(false);
+        .setIsPipeEnableMemoryCheck(false)
+        .setPipeAutoSplitFullEnabled(false);
     receiverEnv
         .getConfig()
         .getCommonConfig()
@@ -58,7 +59,8 @@ abstract class AbstractPipeDualAutoIT {
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         .setPipeMemoryManagementEnabled(false)
-        .setIsPipeEnableMemoryCheck(false);
+        .setIsPipeEnableMemoryCheck(false)
+        .setPipeAutoSplitFullEnabled(false);
 
     // 10 min, assert that the operations will not time out
     senderEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoSplitIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoSplitIT.java
new file mode 100644
index 00000000000..5f732b6d390
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoSplitIT.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.it.autocreate;
+
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.it.env.MultiEnvFactory;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.MultiClusterIT2AutoCreateSchema;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+import java.util.Objects;
+
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({MultiClusterIT2AutoCreateSchema.class})
+public class IoTDBPipeAutoSplitIT extends AbstractPipeDualAutoIT {
+
+  @Override
+  @Before
+  public void setUp() {
+    MultiEnvFactory.createEnv(2);
+    senderEnv = MultiEnvFactory.getEnv(0);
+    receiverEnv = MultiEnvFactory.getEnv(1);
+    senderEnv
+        .getConfig()
+        .getCommonConfig()
+        .setAutoCreateSchemaEnabled(true)
+        .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+        
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+        .setPipeMemoryManagementEnabled(false)
+        .setIsPipeEnableMemoryCheck(false)
+        .setPipeAutoSplitFullEnabled(true); // set auto split to true
+    receiverEnv
+        .getConfig()
+        .getCommonConfig()
+        .setAutoCreateSchemaEnabled(true)
+        .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+        
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+        .setPipeMemoryManagementEnabled(false)
+        .setIsPipeEnableMemoryCheck(false)
+        .setPipeAutoSplitFullEnabled(true); // set auto split to true
+    senderEnv.initClusterEnvironment();
+    receiverEnv.initClusterEnvironment();
+  }
+
+  @Test
+  public void testSingleEnv() throws Exception {
+    final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+    final String sql =
+        String.format(
+            "create pipe a2b with source ('source'='iotdb-source') with 
processor ('processor'='do-nothing-processor') with sink ('node-urls'='%s')",
+            receiverDataNode.getIpAndPortString());
+    try (final Connection connection = senderEnv.getConnection();
+        final Statement statement = connection.createStatement()) {
+      statement.execute(sql);
+    } catch (final SQLException e) {
+      fail(e.getMessage());
+    }
+
+    try (final SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
+      final List<TShowPipeInfo> showPipeResult = client.showPipe(new 
TShowPipeReq()).pipeInfoList;
+      showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
+      Assert.assertEquals(2, showPipeResult.size());
+      Assert.assertTrue(
+          (Objects.equals(showPipeResult.get(0).id, "a2b_history")
+                  && Objects.equals(showPipeResult.get(1).id, "a2b_realtime"))
+              || (Objects.equals(showPipeResult.get(1).id, "a2b_history")
+                  && Objects.equals(showPipeResult.get(0).id, 
"a2b_realtime")));
+    }
+  }
+}
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSinkCompressionIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSinkCompressionIT.java
index 7c576696c63..a2b8442fa21 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSinkCompressionIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSinkCompressionIT.java
@@ -69,7 +69,8 @@ public class IoTDBPipeSinkCompressionIT extends 
AbstractPipeDualAutoIT {
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         .setPipeMemoryManagementEnabled(false)
-        .setIsPipeEnableMemoryCheck(false);
+        .setIsPipeEnableMemoryCheck(false)
+        .setPipeAutoSplitFullEnabled(false);
 
     receiverEnv
         .getConfig()
@@ -79,7 +80,8 @@ public class IoTDBPipeSinkCompressionIT extends 
AbstractPipeDualAutoIT {
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         .setPipeMemoryManagementEnabled(false)
-        .setIsPipeEnableMemoryCheck(false);
+        .setIsPipeEnableMemoryCheck(false)
+        .setPipeAutoSplitFullEnabled(false);
 
     // 10 min, assert that the operations will not time out
     senderEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSourceIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSourceIT.java
index fe3c5503344..1ad6a4f37dc 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSourceIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSourceIT.java
@@ -73,7 +73,8 @@ public class IoTDBPipeSourceIT extends AbstractPipeDualAutoIT 
{
         .setEnableUnseqSpaceCompaction(false)
         .setEnableCrossSpaceCompaction(false)
         .setPipeMemoryManagementEnabled(false)
-        .setIsPipeEnableMemoryCheck(false);
+        .setIsPipeEnableMemoryCheck(false)
+        .setPipeAutoSplitFullEnabled(false);
     receiverEnv
         .getConfig()
         .getCommonConfig()
@@ -81,7 +82,8 @@ public class IoTDBPipeSourceIT extends AbstractPipeDualAutoIT 
{
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         .setPipeMemoryManagementEnabled(false)
-        .setIsPipeEnableMemoryCheck(false);
+        .setIsPipeEnableMemoryCheck(false)
+        .setPipeAutoSplitFullEnabled(false);
 
     // 10 min, assert that the operations will not time out
     senderEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/AbstractPipeDualManualIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/AbstractPipeDualManualIT.java
index a13e8dc152d..299d1b55c5b 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/AbstractPipeDualManualIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/AbstractPipeDualManualIT.java
@@ -50,7 +50,9 @@ abstract class AbstractPipeDualManualIT {
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         .setPipeMemoryManagementEnabled(false)
-        .setIsPipeEnableMemoryCheck(false);
+        .setIsPipeEnableMemoryCheck(false)
+        .setPipeAutoSplitFullEnabled(false);
+
     receiverEnv
         .getConfig()
         .getCommonConfig()
@@ -58,7 +60,8 @@ abstract class AbstractPipeDualManualIT {
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         .setPipeMemoryManagementEnabled(false)
-        .setIsPipeEnableMemoryCheck(false);
+        .setIsPipeEnableMemoryCheck(false)
+        .setPipeAutoSplitFullEnabled(false);
 
     // 10 min, assert that the operations will not time out
     senderEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipePermissionIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipePermissionIT.java
index fb1b4121e2b..e43332d1270 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipePermissionIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipePermissionIT.java
@@ -63,7 +63,8 @@ public class IoTDBPipePermissionIT extends 
AbstractPipeDualManualIT {
         .setDefaultSchemaRegionGroupNumPerDatabase(1)
         .setTimestampPrecision("ms")
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
-        
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS);
+        
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+        .setPipeAutoSplitFullEnabled(false);
     receiverEnv
         .getConfig()
         .getCommonConfig()
@@ -73,7 +74,8 @@ public class IoTDBPipePermissionIT extends 
AbstractPipeDualManualIT {
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
         .setSchemaReplicationFactor(3)
-        .setDataReplicationFactor(2);
+        .setDataReplicationFactor(2)
+        .setPipeAutoSplitFullEnabled(false);
 
     // 10 min, assert that the operations will not time out
     senderEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/AbstractPipeSingleIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/AbstractPipeSingleIT.java
index 324fc529b5a..ea41dc9b579 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/AbstractPipeSingleIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/AbstractPipeSingleIT.java
@@ -39,7 +39,8 @@ abstract class AbstractPipeSingleIT {
     env.getConfig()
         .getCommonConfig()
         .setPipeMemoryManagementEnabled(false)
-        .setIsPipeEnableMemoryCheck(false);
+        .setIsPipeEnableMemoryCheck(false)
+        .setPipeAutoSplitFullEnabled(false);
     env.initClusterEnvironment();
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 0e8ddef54ea..76d10eb0812 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -106,6 +106,10 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.E
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_ENABLE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_END_TIME_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_START_TIME_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_DEFAULT_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_QUERY_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_SNAPSHOT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATH_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATTERN_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE;
@@ -115,6 +119,7 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.S
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_HISTORY_ENABLE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_HISTORY_END_TIME_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_HISTORY_START_TIME_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_MODE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATH_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATTERN_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_REALTIME_ENABLE_KEY;
@@ -415,16 +420,14 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
                 .getStaticMeta()
                 .getExtractorParameters()
                 .getStringOrDefault(
-                    Arrays.asList(
-                        PipeSourceConstant.EXTRACTOR_MODE_KEY, 
PipeSourceConstant.SOURCE_MODE_KEY),
-                    PipeSourceConstant.EXTRACTOR_MODE_DEFAULT_VALUE);
+                    Arrays.asList(EXTRACTOR_MODE_KEY, SOURCE_MODE_KEY),
+                    EXTRACTOR_MODE_DEFAULT_VALUE);
         final boolean includeDataAndNeedDrop =
             
DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair(
                         pipeMeta.getStaticMeta().getExtractorParameters())
                     .getLeft()
-                && 
(sourceModeValue.equalsIgnoreCase(PipeSourceConstant.EXTRACTOR_MODE_QUERY_VALUE)
-                    || sourceModeValue.equalsIgnoreCase(
-                        PipeSourceConstant.EXTRACTOR_MODE_SNAPSHOT_VALUE));
+                && 
(sourceModeValue.equalsIgnoreCase(EXTRACTOR_MODE_QUERY_VALUE)
+                    || 
sourceModeValue.equalsIgnoreCase(EXTRACTOR_MODE_SNAPSHOT_VALUE));
 
         final boolean isCompleted = isAllDataRegionCompleted && 
includeDataAndNeedDrop;
         final Pair<Long, Double> remainingEventAndTime =
@@ -497,17 +500,14 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
                 .getStaticMeta()
                 .getExtractorParameters()
                 .getStringOrDefault(
-                    Arrays.asList(
-                        PipeSourceConstant.EXTRACTOR_MODE_KEY, 
PipeSourceConstant.SOURCE_MODE_KEY),
-                    PipeSourceConstant.EXTRACTOR_MODE_DEFAULT_VALUE);
+                    Arrays.asList(EXTRACTOR_MODE_KEY, SOURCE_MODE_KEY),
+                    EXTRACTOR_MODE_DEFAULT_VALUE);
         final boolean includeDataAndNeedDrop =
             
DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair(
                         pipeMeta.getStaticMeta().getExtractorParameters())
                     .getLeft()
-                && (extractorModeValue.equalsIgnoreCase(
-                        PipeSourceConstant.EXTRACTOR_MODE_QUERY_VALUE)
-                    || extractorModeValue.equalsIgnoreCase(
-                        PipeSourceConstant.EXTRACTOR_MODE_SNAPSHOT_VALUE));
+                && 
(extractorModeValue.equalsIgnoreCase(EXTRACTOR_MODE_QUERY_VALUE)
+                    || 
extractorModeValue.equalsIgnoreCase(EXTRACTOR_MODE_SNAPSHOT_VALUE));
 
         final boolean isCompleted = isAllDataRegionCompleted && 
includeDataAndNeedDrop;
         final Pair<Long, Double> remainingEventAndTime =
@@ -580,6 +580,31 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
     }
   }
 
+  public boolean isFullSync(final PipeParameters parameters) {
+    if (isSnapshotMode(parameters)) {
+      return false;
+    }
+
+    final boolean isHistoryEnable =
+        parameters.getBooleanOrDefault(
+            Arrays.asList(EXTRACTOR_HISTORY_ENABLE_KEY, 
SOURCE_HISTORY_ENABLE_KEY),
+            EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE);
+    final boolean isRealtimeEnable =
+        parameters.getBooleanOrDefault(
+            Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY, 
SOURCE_REALTIME_ENABLE_KEY),
+            EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE);
+
+    return isHistoryEnable && isRealtimeEnable;
+  }
+
+  private boolean isSnapshotMode(final PipeParameters parameters) {
+    final String sourceModeValue =
+        parameters.getStringOrDefault(
+            Arrays.asList(EXTRACTOR_MODE_KEY, SOURCE_MODE_KEY), 
EXTRACTOR_MODE_DEFAULT_VALUE);
+    return sourceModeValue.equalsIgnoreCase(EXTRACTOR_MODE_SNAPSHOT_VALUE)
+        || sourceModeValue.equalsIgnoreCase(EXTRACTOR_MODE_QUERY_VALUE);
+  }
+
   @Override
   public void runPipeTasks(
       final Collection<PipeTask> pipeTasks, final Consumer<PipeTask> 
runSingle) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index e633ad60735..b1af7cee4d3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -50,6 +50,8 @@ import 
org.apache.iotdb.commons.pipe.agent.plugin.service.PipePluginClassLoader;
 import 
org.apache.iotdb.commons.pipe.agent.plugin.service.PipePluginExecutableManager;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
 import 
org.apache.iotdb.commons.pipe.sink.payload.airgap.AirGapPseudoTPipeTransferRequest;
 import org.apache.iotdb.commons.schema.view.LogicalViewSchema;
 import org.apache.iotdb.commons.schema.view.viewExpression.ViewExpression;
@@ -260,6 +262,7 @@ import org.apache.iotdb.trigger.api.Trigger;
 import org.apache.iotdb.trigger.api.enums.FailureStrategy;
 import org.apache.iotdb.udf.api.UDF;
 
+import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.SettableFuture;
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.thrift.TException;
@@ -1797,7 +1800,79 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
       return future;
     }
 
-    try (ConfigNodeClient configNodeClient =
+    // Syntactic sugar: if full-sync mode is detected (i.e. not snapshot mode, 
or both realtime
+    // and history are true), the pipe is split into history-only and 
realtime–only modes.
+    final PipeParameters extractorPipeParameters =
+        new PipeParameters(createPipeStatement.getExtractorAttributes());
+    if (PipeConfig.getInstance().getPipeAutoSplitFullEnabled()
+        && PipeDataNodeAgent.task().isFullSync(extractorPipeParameters)) {
+      try (final ConfigNodeClient configNodeClient =
+          
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
+        // 1. Send request to create the historical data synchronization 
pipeline
+        final TCreatePipeReq historyReq =
+            new TCreatePipeReq()
+                // Append suffix to the pipeline name for historical data
+                .setPipeName(createPipeStatement.getPipeName() + "_history")
+                // NOTE: set if not exists always to true to handle partial 
failure
+                .setIfNotExistsCondition(true)
+                // Use extractor parameters for historical data
+                .setExtractorAttributes(
+                    extractorPipeParameters
+                        .addOrReplaceEquivalentAttributesWithClone(
+                            new PipeParameters(
+                                ImmutableMap.of(
+                                    
PipeSourceConstant.EXTRACTOR_HISTORY_ENABLE_KEY,
+                                    Boolean.toString(true),
+                                    
PipeSourceConstant.EXTRACTOR_REALTIME_ENABLE_KEY,
+                                    Boolean.toString(false))))
+                        .getAttribute())
+                
.setProcessorAttributes(createPipeStatement.getProcessorAttributes())
+                
.setConnectorAttributes(createPipeStatement.getConnectorAttributes());
+
+        final TSStatus historyTsStatus = 
configNodeClient.createPipe(historyReq);
+        // If creation fails, immediately return with exception
+        if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != 
historyTsStatus.getCode()) {
+          future.setException(new IoTDBException(historyTsStatus));
+          return future;
+        }
+
+        // 2. Send request to create the real-time data synchronization 
pipeline
+        final TCreatePipeReq realtimeReq =
+            new TCreatePipeReq()
+                // Append suffix to the pipeline name for real-time data
+                .setPipeName(createPipeStatement.getPipeName() + "_realtime")
+                
.setIfNotExistsCondition(createPipeStatement.hasIfNotExistsCondition())
+                // Use extractor parameters for real-time data
+                .setExtractorAttributes(
+                    extractorPipeParameters
+                        .addOrReplaceEquivalentAttributesWithClone(
+                            new PipeParameters(
+                                ImmutableMap.of(
+                                    
PipeSourceConstant.EXTRACTOR_HISTORY_ENABLE_KEY,
+                                    Boolean.toString(false),
+                                    
PipeSourceConstant.EXTRACTOR_REALTIME_ENABLE_KEY,
+                                    Boolean.toString(true))))
+                        .getAttribute())
+                
.setProcessorAttributes(createPipeStatement.getProcessorAttributes())
+                
.setConnectorAttributes(createPipeStatement.getConnectorAttributes());
+
+        final TSStatus realtimeTsStatus = 
configNodeClient.createPipe(realtimeReq);
+        // If creation fails, immediately return with exception
+        if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != 
realtimeTsStatus.getCode()) {
+          future.setException(new IoTDBException(realtimeTsStatus));
+          return future;
+        }
+
+        // 3. Set success status only if both pipelines are created 
successfully
+        future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+      } catch (final Exception e) {
+        // Catch any other exceptions (e.g., network issues)
+        future.setException(e);
+      }
+      return future;
+    }
+
+    try (final ConfigNodeClient configNodeClient =
         
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
       TCreatePipeReq req =
           new TCreatePipeReq()
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 29c06a55589..5c8c1963a13 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -335,6 +335,8 @@ public class CommonConfig {
   private boolean pipeEventReferenceTrackingEnabled = true;
   private long pipeEventReferenceEliminateIntervalSeconds = 10;
 
+  private boolean pipeAutoSplitFullEnabled = true;
+
   private boolean subscriptionEnabled = false;
 
   private float subscriptionCacheMemoryUsagePercentage = 0.2F;
@@ -2038,6 +2040,14 @@ public class CommonConfig {
         pipeEventReferenceEliminateIntervalSeconds);
   }
 
+  public boolean getPipeAutoSplitFullEnabled() {
+    return pipeAutoSplitFullEnabled;
+  }
+
+  public void setPipeAutoSplitFullEnabled(boolean pipeAutoSplitFullEnabled) {
+    this.pipeAutoSplitFullEnabled = pipeAutoSplitFullEnabled;
+  }
+
   public boolean getSubscriptionEnabled() {
     return subscriptionEnabled;
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 1dcc849f86c..5604ca2dcef 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -419,6 +419,12 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeEventReferenceEliminateIntervalSeconds();
   }
 
+  /////////////////////////////// Syntactic sugar 
///////////////////////////////
+
+  public boolean getPipeAutoSplitFullEnabled() {
+    return COMMON_CONFIG.getPipeAutoSplitFullEnabled();
+  }
+
   /////////////////////////////// Utils ///////////////////////////////
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeConfig.class);
@@ -612,6 +618,8 @@ public class PipeConfig {
     LOGGER.info(
         "PipeEventReferenceEliminateIntervalSeconds: {}",
         getPipeEventReferenceEliminateIntervalSeconds());
+
+    LOGGER.info("PipeAutoSplitFullEnabled: {}", getPipeAutoSplitFullEnabled());
   }
 
   /////////////////////////////// Singleton ///////////////////////////////
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index a60ac57da02..c0710d957d0 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -537,6 +537,12 @@ public class PipeDescriptor {
         Long.parseLong(
             properties.getProperty(
                 "pipe_max_wait_finish_time", 
String.valueOf(config.getPipeMaxWaitFinishTime()))));
+
+    config.setPipeAutoSplitFullEnabled(
+        Boolean.parseBoolean(
+            properties.getProperty(
+                "pipe_auto_split_full_enabled",
+                String.valueOf(config.getPipeAutoSplitFullEnabled()))));
   }
 
   public static void loadPipeExternalConfig(

Reply via email to