This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new b21485c254c [To dev/1.3] Pipe: split full-sync pipe into history and
realtime pipes (#16250) (#16299)
b21485c254c is described below
commit b21485c254c4d77ce9569a9d5ab61915cad1af0c
Author: VGalaxies <[email protected]>
AuthorDate: Fri Aug 29 11:47:53 2025 +0800
[To dev/1.3] Pipe: split full-sync pipe into history and realtime pipes
(#16250) (#16299)
* setup
* minor improve
* apply Copilot review
* add internal config
* fixup
* fixup
* add IoTDBPipeAutoSplitIT
* set if not exists always to true to handle partial failure
---
.../it/env/cluster/config/MppCommonConfig.java | 6 ++
.../env/cluster/config/MppSharedCommonConfig.java | 7 ++
.../it/env/remote/config/RemoteCommonConfig.java | 5 +
.../org/apache/iotdb/itbase/env/CommonConfig.java | 2 +
.../pipe/it/autocreate/AbstractPipeDualAutoIT.java | 6 +-
.../pipe/it/autocreate/IoTDBPipeAutoSplitIT.java | 104 +++++++++++++++++++++
.../it/autocreate/IoTDBPipeSinkCompressionIT.java | 6 +-
.../pipe/it/autocreate/IoTDBPipeSourceIT.java | 6 +-
.../pipe/it/manual/AbstractPipeDualManualIT.java | 7 +-
.../pipe/it/manual/IoTDBPipePermissionIT.java | 6 +-
.../iotdb/pipe/it/single/AbstractPipeSingleIT.java | 3 +-
.../db/pipe/agent/task/PipeDataNodeTaskAgent.java | 51 +++++++---
.../config/executor/ClusterConfigTaskExecutor.java | 77 ++++++++++++++-
.../apache/iotdb/commons/conf/CommonConfig.java | 10 ++
.../iotdb/commons/pipe/config/PipeConfig.java | 8 ++
.../iotdb/commons/pipe/config/PipeDescriptor.java | 6 ++
16 files changed, 285 insertions(+), 25 deletions(-)
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
index b029023f8e2..2e715176eb8 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
@@ -514,6 +514,12 @@ public class MppCommonConfig extends MppBaseConfig
implements CommonConfig {
return this;
}
+ @Override
+ public CommonConfig setPipeAutoSplitFullEnabled(boolean
pipeAutoSplitFullEnabled) {
+ setProperty("pipe_auto_split_full_enabled",
String.valueOf(pipeAutoSplitFullEnabled));
+ return this;
+ }
+
@Override
public CommonConfig setQueryMemoryProportion(String queryMemoryProportion) {
setProperty("chunk_timeseriesmeta_free_memory_proportion",
queryMemoryProportion);
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
index d89674948da..f836f11d779 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
@@ -526,6 +526,13 @@ public class MppSharedCommonConfig implements CommonConfig
{
return this;
}
+ @Override
+ public CommonConfig setPipeAutoSplitFullEnabled(boolean
pipeAutoSplitFullEnabled) {
+ dnConfig.setPipeAutoSplitFullEnabled(pipeAutoSplitFullEnabled);
+ cnConfig.setPipeAutoSplitFullEnabled(pipeAutoSplitFullEnabled);
+ return this;
+ }
+
@Override
public CommonConfig setQueryMemoryProportion(String queryMemoryProportion) {
dnConfig.setQueryMemoryProportion(queryMemoryProportion);
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
index 738e380758a..291a34c9e9d 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
@@ -371,6 +371,11 @@ public class RemoteCommonConfig implements CommonConfig {
return this;
}
+ @Override
+ public CommonConfig setPipeAutoSplitFullEnabled(boolean
pipeAutoSplitFullEnabled) {
+ return this;
+ }
+
@Override
public CommonConfig setQueryMemoryProportion(String queryMemoryProportion) {
return this;
diff --git
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
index 4bbfed0e07a..4cdf2bcd743 100644
---
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
@@ -164,6 +164,8 @@ public interface CommonConfig {
CommonConfig setPipeConnectorRequestSliceThresholdBytes(
int pipeConnectorRequestSliceThresholdBytes);
+ CommonConfig setPipeAutoSplitFullEnabled(boolean pipeAutoSplitFullEnabled);
+
CommonConfig setQueryMemoryProportion(String queryMemoryProportion);
CommonConfig setSubscriptionPrefetchTsFileBatchMaxDelayInMs(
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/AbstractPipeDualAutoIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/AbstractPipeDualAutoIT.java
index 6d3f2e85d8b..2c7f5719e9b 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/AbstractPipeDualAutoIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/AbstractPipeDualAutoIT.java
@@ -50,7 +50,8 @@ abstract class AbstractPipeDualAutoIT {
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setPipeMemoryManagementEnabled(false)
- .setIsPipeEnableMemoryCheck(false);
+ .setIsPipeEnableMemoryCheck(false)
+ .setPipeAutoSplitFullEnabled(false);
receiverEnv
.getConfig()
.getCommonConfig()
@@ -58,7 +59,8 @@ abstract class AbstractPipeDualAutoIT {
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setPipeMemoryManagementEnabled(false)
- .setIsPipeEnableMemoryCheck(false);
+ .setIsPipeEnableMemoryCheck(false)
+ .setPipeAutoSplitFullEnabled(false);
// 10 min, assert that the operations will not time out
senderEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoSplitIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoSplitIT.java
new file mode 100644
index 00000000000..5f732b6d390
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoSplitIT.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.it.autocreate;
+
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.it.env.MultiEnvFactory;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.MultiClusterIT2AutoCreateSchema;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+import java.util.Objects;
+
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({MultiClusterIT2AutoCreateSchema.class})
+public class IoTDBPipeAutoSplitIT extends AbstractPipeDualAutoIT {
+
+ @Override
+ @Before
+ public void setUp() {
+ MultiEnvFactory.createEnv(2);
+ senderEnv = MultiEnvFactory.getEnv(0);
+ receiverEnv = MultiEnvFactory.getEnv(1);
+ senderEnv
+ .getConfig()
+ .getCommonConfig()
+ .setAutoCreateSchemaEnabled(true)
+ .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+ .setPipeMemoryManagementEnabled(false)
+ .setIsPipeEnableMemoryCheck(false)
+ .setPipeAutoSplitFullEnabled(true); // set auto split to true
+ receiverEnv
+ .getConfig()
+ .getCommonConfig()
+ .setAutoCreateSchemaEnabled(true)
+ .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+ .setPipeMemoryManagementEnabled(false)
+ .setIsPipeEnableMemoryCheck(false)
+ .setPipeAutoSplitFullEnabled(true); // set auto split to true
+ senderEnv.initClusterEnvironment();
+ receiverEnv.initClusterEnvironment();
+ }
+
+ @Test
+ public void testSingleEnv() throws Exception {
+ final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+ final String sql =
+ String.format(
+ "create pipe a2b with source ('source'='iotdb-source') with
processor ('processor'='do-nothing-processor') with sink ('node-urls'='%s')",
+ receiverDataNode.getIpAndPortString());
+ try (final Connection connection = senderEnv.getConnection();
+ final Statement statement = connection.createStatement()) {
+ statement.execute(sql);
+ } catch (final SQLException e) {
+ fail(e.getMessage());
+ }
+
+ try (final SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
+ final List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
+ showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
+ Assert.assertEquals(2, showPipeResult.size());
+ Assert.assertTrue(
+ (Objects.equals(showPipeResult.get(0).id, "a2b_history")
+ && Objects.equals(showPipeResult.get(1).id, "a2b_realtime"))
+ || (Objects.equals(showPipeResult.get(1).id, "a2b_history")
+ && Objects.equals(showPipeResult.get(0).id,
"a2b_realtime")));
+ }
+ }
+}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSinkCompressionIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSinkCompressionIT.java
index 7c576696c63..a2b8442fa21 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSinkCompressionIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSinkCompressionIT.java
@@ -69,7 +69,8 @@ public class IoTDBPipeSinkCompressionIT extends
AbstractPipeDualAutoIT {
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setPipeMemoryManagementEnabled(false)
- .setIsPipeEnableMemoryCheck(false);
+ .setIsPipeEnableMemoryCheck(false)
+ .setPipeAutoSplitFullEnabled(false);
receiverEnv
.getConfig()
@@ -79,7 +80,8 @@ public class IoTDBPipeSinkCompressionIT extends
AbstractPipeDualAutoIT {
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setPipeMemoryManagementEnabled(false)
- .setIsPipeEnableMemoryCheck(false);
+ .setIsPipeEnableMemoryCheck(false)
+ .setPipeAutoSplitFullEnabled(false);
// 10 min, assert that the operations will not time out
senderEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSourceIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSourceIT.java
index fe3c5503344..1ad6a4f37dc 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSourceIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSourceIT.java
@@ -73,7 +73,8 @@ public class IoTDBPipeSourceIT extends AbstractPipeDualAutoIT
{
.setEnableUnseqSpaceCompaction(false)
.setEnableCrossSpaceCompaction(false)
.setPipeMemoryManagementEnabled(false)
- .setIsPipeEnableMemoryCheck(false);
+ .setIsPipeEnableMemoryCheck(false)
+ .setPipeAutoSplitFullEnabled(false);
receiverEnv
.getConfig()
.getCommonConfig()
@@ -81,7 +82,8 @@ public class IoTDBPipeSourceIT extends AbstractPipeDualAutoIT
{
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setPipeMemoryManagementEnabled(false)
- .setIsPipeEnableMemoryCheck(false);
+ .setIsPipeEnableMemoryCheck(false)
+ .setPipeAutoSplitFullEnabled(false);
// 10 min, assert that the operations will not time out
senderEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/AbstractPipeDualManualIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/AbstractPipeDualManualIT.java
index a13e8dc152d..299d1b55c5b 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/AbstractPipeDualManualIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/AbstractPipeDualManualIT.java
@@ -50,7 +50,9 @@ abstract class AbstractPipeDualManualIT {
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setPipeMemoryManagementEnabled(false)
- .setIsPipeEnableMemoryCheck(false);
+ .setIsPipeEnableMemoryCheck(false)
+ .setPipeAutoSplitFullEnabled(false);
+
receiverEnv
.getConfig()
.getCommonConfig()
@@ -58,7 +60,8 @@ abstract class AbstractPipeDualManualIT {
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setPipeMemoryManagementEnabled(false)
- .setIsPipeEnableMemoryCheck(false);
+ .setIsPipeEnableMemoryCheck(false)
+ .setPipeAutoSplitFullEnabled(false);
// 10 min, assert that the operations will not time out
senderEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipePermissionIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipePermissionIT.java
index fb1b4121e2b..e43332d1270 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipePermissionIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipePermissionIT.java
@@ -63,7 +63,8 @@ public class IoTDBPipePermissionIT extends
AbstractPipeDualManualIT {
.setDefaultSchemaRegionGroupNumPerDatabase(1)
.setTimestampPrecision("ms")
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
-
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS);
+
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+ .setPipeAutoSplitFullEnabled(false);
receiverEnv
.getConfig()
.getCommonConfig()
@@ -73,7 +74,8 @@ public class IoTDBPipePermissionIT extends
AbstractPipeDualManualIT {
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
.setSchemaReplicationFactor(3)
- .setDataReplicationFactor(2);
+ .setDataReplicationFactor(2)
+ .setPipeAutoSplitFullEnabled(false);
// 10 min, assert that the operations will not time out
senderEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/AbstractPipeSingleIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/AbstractPipeSingleIT.java
index 324fc529b5a..ea41dc9b579 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/AbstractPipeSingleIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/AbstractPipeSingleIT.java
@@ -39,7 +39,8 @@ abstract class AbstractPipeSingleIT {
env.getConfig()
.getCommonConfig()
.setPipeMemoryManagementEnabled(false)
- .setIsPipeEnableMemoryCheck(false);
+ .setIsPipeEnableMemoryCheck(false)
+ .setPipeAutoSplitFullEnabled(false);
env.initClusterEnvironment();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 0e8ddef54ea..76d10eb0812 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -106,6 +106,10 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.E
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_ENABLE_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_END_TIME_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_START_TIME_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_DEFAULT_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_QUERY_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_SNAPSHOT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATH_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATTERN_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE;
@@ -115,6 +119,7 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.S
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_HISTORY_ENABLE_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_HISTORY_END_TIME_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_HISTORY_START_TIME_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_MODE_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATH_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATTERN_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_REALTIME_ENABLE_KEY;
@@ -415,16 +420,14 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
.getStaticMeta()
.getExtractorParameters()
.getStringOrDefault(
- Arrays.asList(
- PipeSourceConstant.EXTRACTOR_MODE_KEY,
PipeSourceConstant.SOURCE_MODE_KEY),
- PipeSourceConstant.EXTRACTOR_MODE_DEFAULT_VALUE);
+ Arrays.asList(EXTRACTOR_MODE_KEY, SOURCE_MODE_KEY),
+ EXTRACTOR_MODE_DEFAULT_VALUE);
final boolean includeDataAndNeedDrop =
DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair(
pipeMeta.getStaticMeta().getExtractorParameters())
.getLeft()
- &&
(sourceModeValue.equalsIgnoreCase(PipeSourceConstant.EXTRACTOR_MODE_QUERY_VALUE)
- || sourceModeValue.equalsIgnoreCase(
- PipeSourceConstant.EXTRACTOR_MODE_SNAPSHOT_VALUE));
+ &&
(sourceModeValue.equalsIgnoreCase(EXTRACTOR_MODE_QUERY_VALUE)
+ ||
sourceModeValue.equalsIgnoreCase(EXTRACTOR_MODE_SNAPSHOT_VALUE));
final boolean isCompleted = isAllDataRegionCompleted &&
includeDataAndNeedDrop;
final Pair<Long, Double> remainingEventAndTime =
@@ -497,17 +500,14 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
.getStaticMeta()
.getExtractorParameters()
.getStringOrDefault(
- Arrays.asList(
- PipeSourceConstant.EXTRACTOR_MODE_KEY,
PipeSourceConstant.SOURCE_MODE_KEY),
- PipeSourceConstant.EXTRACTOR_MODE_DEFAULT_VALUE);
+ Arrays.asList(EXTRACTOR_MODE_KEY, SOURCE_MODE_KEY),
+ EXTRACTOR_MODE_DEFAULT_VALUE);
final boolean includeDataAndNeedDrop =
DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair(
pipeMeta.getStaticMeta().getExtractorParameters())
.getLeft()
- && (extractorModeValue.equalsIgnoreCase(
- PipeSourceConstant.EXTRACTOR_MODE_QUERY_VALUE)
- || extractorModeValue.equalsIgnoreCase(
- PipeSourceConstant.EXTRACTOR_MODE_SNAPSHOT_VALUE));
+ &&
(extractorModeValue.equalsIgnoreCase(EXTRACTOR_MODE_QUERY_VALUE)
+ ||
extractorModeValue.equalsIgnoreCase(EXTRACTOR_MODE_SNAPSHOT_VALUE));
final boolean isCompleted = isAllDataRegionCompleted &&
includeDataAndNeedDrop;
final Pair<Long, Double> remainingEventAndTime =
@@ -580,6 +580,31 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
}
}
+ public boolean isFullSync(final PipeParameters parameters) {
+ if (isSnapshotMode(parameters)) {
+ return false;
+ }
+
+ final boolean isHistoryEnable =
+ parameters.getBooleanOrDefault(
+ Arrays.asList(EXTRACTOR_HISTORY_ENABLE_KEY,
SOURCE_HISTORY_ENABLE_KEY),
+ EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE);
+ final boolean isRealtimeEnable =
+ parameters.getBooleanOrDefault(
+ Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY,
SOURCE_REALTIME_ENABLE_KEY),
+ EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE);
+
+ return isHistoryEnable && isRealtimeEnable;
+ }
+
+ private boolean isSnapshotMode(final PipeParameters parameters) {
+ final String sourceModeValue =
+ parameters.getStringOrDefault(
+ Arrays.asList(EXTRACTOR_MODE_KEY, SOURCE_MODE_KEY),
EXTRACTOR_MODE_DEFAULT_VALUE);
+ return sourceModeValue.equalsIgnoreCase(EXTRACTOR_MODE_SNAPSHOT_VALUE)
+ || sourceModeValue.equalsIgnoreCase(EXTRACTOR_MODE_QUERY_VALUE);
+ }
+
@Override
public void runPipeTasks(
final Collection<PipeTask> pipeTasks, final Consumer<PipeTask>
runSingle) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index e633ad60735..b1af7cee4d3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -50,6 +50,8 @@ import
org.apache.iotdb.commons.pipe.agent.plugin.service.PipePluginClassLoader;
import
org.apache.iotdb.commons.pipe.agent.plugin.service.PipePluginExecutableManager;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
import
org.apache.iotdb.commons.pipe.sink.payload.airgap.AirGapPseudoTPipeTransferRequest;
import org.apache.iotdb.commons.schema.view.LogicalViewSchema;
import org.apache.iotdb.commons.schema.view.viewExpression.ViewExpression;
@@ -260,6 +262,7 @@ import org.apache.iotdb.trigger.api.Trigger;
import org.apache.iotdb.trigger.api.enums.FailureStrategy;
import org.apache.iotdb.udf.api.UDF;
+import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.SettableFuture;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.thrift.TException;
@@ -1797,7 +1800,79 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
return future;
}
- try (ConfigNodeClient configNodeClient =
+ // Syntactic sugar: if full-sync mode is detected (i.e. not snapshot mode,
or both realtime
+ // and history are true), the pipe is split into history-only and
realtime–only modes.
+ final PipeParameters extractorPipeParameters =
+ new PipeParameters(createPipeStatement.getExtractorAttributes());
+ if (PipeConfig.getInstance().getPipeAutoSplitFullEnabled()
+ && PipeDataNodeAgent.task().isFullSync(extractorPipeParameters)) {
+ try (final ConfigNodeClient configNodeClient =
+
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
+ // 1. Send request to create the historical data synchronization
pipeline
+ final TCreatePipeReq historyReq =
+ new TCreatePipeReq()
+ // Append suffix to the pipeline name for historical data
+ .setPipeName(createPipeStatement.getPipeName() + "_history")
+ // NOTE: set if not exists always to true to handle partial
failure
+ .setIfNotExistsCondition(true)
+ // Use extractor parameters for historical data
+ .setExtractorAttributes(
+ extractorPipeParameters
+ .addOrReplaceEquivalentAttributesWithClone(
+ new PipeParameters(
+ ImmutableMap.of(
+
PipeSourceConstant.EXTRACTOR_HISTORY_ENABLE_KEY,
+ Boolean.toString(true),
+
PipeSourceConstant.EXTRACTOR_REALTIME_ENABLE_KEY,
+ Boolean.toString(false))))
+ .getAttribute())
+
.setProcessorAttributes(createPipeStatement.getProcessorAttributes())
+
.setConnectorAttributes(createPipeStatement.getConnectorAttributes());
+
+ final TSStatus historyTsStatus =
configNodeClient.createPipe(historyReq);
+ // If creation fails, immediately return with exception
+ if (TSStatusCode.SUCCESS_STATUS.getStatusCode() !=
historyTsStatus.getCode()) {
+ future.setException(new IoTDBException(historyTsStatus));
+ return future;
+ }
+
+ // 2. Send request to create the real-time data synchronization
pipeline
+ final TCreatePipeReq realtimeReq =
+ new TCreatePipeReq()
+ // Append suffix to the pipeline name for real-time data
+ .setPipeName(createPipeStatement.getPipeName() + "_realtime")
+
.setIfNotExistsCondition(createPipeStatement.hasIfNotExistsCondition())
+ // Use extractor parameters for real-time data
+ .setExtractorAttributes(
+ extractorPipeParameters
+ .addOrReplaceEquivalentAttributesWithClone(
+ new PipeParameters(
+ ImmutableMap.of(
+
PipeSourceConstant.EXTRACTOR_HISTORY_ENABLE_KEY,
+ Boolean.toString(false),
+
PipeSourceConstant.EXTRACTOR_REALTIME_ENABLE_KEY,
+ Boolean.toString(true))))
+ .getAttribute())
+
.setProcessorAttributes(createPipeStatement.getProcessorAttributes())
+
.setConnectorAttributes(createPipeStatement.getConnectorAttributes());
+
+ final TSStatus realtimeTsStatus =
configNodeClient.createPipe(realtimeReq);
+ // If creation fails, immediately return with exception
+ if (TSStatusCode.SUCCESS_STATUS.getStatusCode() !=
realtimeTsStatus.getCode()) {
+ future.setException(new IoTDBException(realtimeTsStatus));
+ return future;
+ }
+
+ // 3. Set success status only if both pipelines are created
successfully
+ future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+ } catch (final Exception e) {
+ // Catch any other exceptions (e.g., network issues)
+ future.setException(e);
+ }
+ return future;
+ }
+
+ try (final ConfigNodeClient configNodeClient =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
TCreatePipeReq req =
new TCreatePipeReq()
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 29c06a55589..5c8c1963a13 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -335,6 +335,8 @@ public class CommonConfig {
private boolean pipeEventReferenceTrackingEnabled = true;
private long pipeEventReferenceEliminateIntervalSeconds = 10;
+ private boolean pipeAutoSplitFullEnabled = true;
+
private boolean subscriptionEnabled = false;
private float subscriptionCacheMemoryUsagePercentage = 0.2F;
@@ -2038,6 +2040,14 @@ public class CommonConfig {
pipeEventReferenceEliminateIntervalSeconds);
}
+ public boolean getPipeAutoSplitFullEnabled() {
+ return pipeAutoSplitFullEnabled;
+ }
+
+ public void setPipeAutoSplitFullEnabled(boolean pipeAutoSplitFullEnabled) {
+ this.pipeAutoSplitFullEnabled = pipeAutoSplitFullEnabled;
+ }
+
public boolean getSubscriptionEnabled() {
return subscriptionEnabled;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 1dcc849f86c..5604ca2dcef 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -419,6 +419,12 @@ public class PipeConfig {
return COMMON_CONFIG.getPipeEventReferenceEliminateIntervalSeconds();
}
+ /////////////////////////////// Syntactic sugar
///////////////////////////////
+
+ public boolean getPipeAutoSplitFullEnabled() {
+ return COMMON_CONFIG.getPipeAutoSplitFullEnabled();
+ }
+
/////////////////////////////// Utils ///////////////////////////////
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeConfig.class);
@@ -612,6 +618,8 @@ public class PipeConfig {
LOGGER.info(
"PipeEventReferenceEliminateIntervalSeconds: {}",
getPipeEventReferenceEliminateIntervalSeconds());
+
+ LOGGER.info("PipeAutoSplitFullEnabled: {}", getPipeAutoSplitFullEnabled());
}
/////////////////////////////// Singleton ///////////////////////////////
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index a60ac57da02..c0710d957d0 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -537,6 +537,12 @@ public class PipeDescriptor {
Long.parseLong(
properties.getProperty(
"pipe_max_wait_finish_time",
String.valueOf(config.getPipeMaxWaitFinishTime()))));
+
+ config.setPipeAutoSplitFullEnabled(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "pipe_auto_split_full_enabled",
+ String.valueOf(config.getPipeAutoSplitFullEnabled()))));
}
public static void loadPipeExternalConfig(