This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 5e6f1c20cc5 Pipe: serialize sink transfers by region (#17946)
5e6f1c20cc5 is described below
commit 5e6f1c20cc5e86086c7a45466612ca7f08deff18
Author: Caideyipi <[email protected]>
AuthorDate: Tue Jun 16 16:55:09 2026 +0800
Pipe: serialize sink transfers by region (#17946)
---
.../manual/enhanced/IoTDBPipeSinkParallelIT.java | 1 +
.../auto/basic/IoTDBPipeSinkParallelIT.java | 1 +
.../task/subtask/sink/PipeSinkSubtaskManager.java | 43 ++++++++++++----------
.../agent/plugin/PipeDataNodePluginAgentTest.java | 16 ++++++++
.../plugin/constructor/PipeSinkConstructor.java | 8 +---
.../pipe/config/constant/PipeSinkConstant.java | 22 +++++++++++
6 files changed, 64 insertions(+), 27 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeSinkParallelIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeSinkParallelIT.java
index 85f3b93aa76..ae466349d72 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeSinkParallelIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeSinkParallelIT.java
@@ -80,6 +80,7 @@ public class IoTDBPipeSinkParallelIT extends
AbstractPipeTableModelDualManualIT
connectorAttributes.put("connector.batch.enable", "true");
connectorAttributes.put("connector.ip", receiverIp);
connectorAttributes.put("connector.port",
Integer.toString(receiverPort));
+ connectorAttributes.put("connector.serialize-by-region", "false");
connectorAttributes.put("connector.parallel.tasks", "3");
final TSStatus status =
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSinkParallelIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSinkParallelIT.java
index c21e6456ef1..df8e27a8546 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSinkParallelIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSinkParallelIT.java
@@ -71,6 +71,7 @@ public class IoTDBPipeSinkParallelIT extends
AbstractPipeDualTreeModelAutoIT {
sinkAttributes.put("sink.batch.enable", "false");
sinkAttributes.put("sink.ip", receiverIp);
sinkAttributes.put("sink.port", Integer.toString(receiverPort));
+ sinkAttributes.put("sink.serialize-by-region", "false");
sinkAttributes.put("sink.parallel.tasks", "3");
final TSStatus status =
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
index 01552eec5ae..36c024090a8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
@@ -20,7 +20,6 @@
package org.apache.iotdb.db.pipe.agent.task.subtask.sink;
import org.apache.iotdb.commons.consensus.DataRegionId;
-import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
import
org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeRuntimeMeta;
import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
@@ -70,11 +69,10 @@ public class PipeSinkSubtaskManager {
final Supplier<? extends PipeSinkSubtaskExecutor> executorSupplier,
final PipeParameters pipeSinkParameters,
final PipeTaskSinkRuntimeEnvironment environment) {
+ final String connectorName =
+ PipeSinkConstant.getConnectorOrSinkNameWithDefault(pipeSinkParameters);
final String connectorKey =
- pipeSinkParameters
- .getStringOrDefault(
- Arrays.asList(PipeSinkConstant.CONNECTOR_KEY,
PipeSinkConstant.SINK_KEY),
- BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName())
+ connectorName
// Convert the value of `CONNECTOR_KEY` or `SINK_KEY` to lowercase
// for matching in `CONNECTOR_CONSTRUCTORS`
.toLowerCase();
@@ -93,30 +91,31 @@ public class PipeSinkSubtaskManager {
final int sinkNum;
boolean realTimeFirst = false;
+ boolean serializeByRegion = false;
String attributeSortedString =
generateAttributeSortedString(pipeSinkParameters);
final String attributeDisplayString =
generateAttributeDisplayString(pipeSinkParameters);
if (isDataRegionSink) {
+ serializeByRegion =
PipeSinkConstant.isSerializeByRegionEnabled(pipeSinkParameters);
sinkNum =
- pipeSinkParameters.getIntOrDefault(
- Arrays.asList(
- PipeSinkConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_KEY,
- PipeSinkConstant.SINK_IOTDB_PARALLEL_TASKS_KEY),
- PipeSinkConstant.SINGLE_THREAD_DEFAULT_SINK.contains(
- pipeSinkParameters
- .getStringOrDefault(
- Arrays.asList(
- PipeSinkConstant.CONNECTOR_KEY,
PipeSinkConstant.SINK_KEY),
-
BuiltinPipePlugin.IOTDB_THRIFT_SINK.getPipePluginName())
- .toLowerCase())
- ? 1
- :
PipeSinkConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_DEFAULT_VALUE);
+ serializeByRegion
+ ? 1
+ : pipeSinkParameters.getIntOrDefault(
+ Arrays.asList(
+ PipeSinkConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_KEY,
+ PipeSinkConstant.SINK_IOTDB_PARALLEL_TASKS_KEY),
+
PipeSinkConstant.SINGLE_THREAD_DEFAULT_SINK.contains(connectorKey)
+ ? 1
+ :
PipeSinkConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_DEFAULT_VALUE);
realTimeFirst =
pipeSinkParameters.getBooleanOrDefault(
Arrays.asList(
PipeSinkConstant.CONNECTOR_REALTIME_FIRST_KEY,
PipeSinkConstant.SINK_REALTIME_FIRST_KEY),
PipeSinkConstant.CONNECTOR_REALTIME_FIRST_DEFAULT_VALUE);
- attributeSortedString = "data_" + attributeSortedString;
+ attributeSortedString =
+ serializeByRegion
+ ? "data_region_" + environment.getRegionId() + "_" +
attributeSortedString
+ : "data_" + attributeSortedString;
} else {
// Do not allow parallel tasks for schema region connectors
// to avoid the potential disorder of the schema region data transfer
@@ -124,7 +123,11 @@ public class PipeSinkSubtaskManager {
attributeSortedString = "schema_" + attributeSortedString;
}
final String attributeDisplayStringWithPrefix =
- isDataRegionSink ? "data_" + attributeDisplayString : "schema_" +
attributeDisplayString;
+ isDataRegionSink
+ ? serializeByRegion
+ ? "data_region_" + environment.getRegionId() + "_" +
attributeDisplayString
+ : "data_" + attributeDisplayString
+ : "schema_" + attributeDisplayString;
environment.setAttributeSortedString(attributeDisplayStringWithPrefix);
if
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString))
{
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgentTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgentTest.java
index b0f0b34e92a..04c759935cd 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgentTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgentTest.java
@@ -29,6 +29,7 @@ import
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
import
org.apache.iotdb.db.pipe.processor.iotconsensusv2.IoTConsensusV2Processor;
import
org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.IoTConsensusV2AsyncSink;
import
org.apache.iotdb.db.pipe.sink.protocol.thrift.async.IoTDBDataRegionAsyncSink;
+import
org.apache.iotdb.db.pipe.sink.protocol.thrift.sync.IoTDBDataRegionSyncSink;
import org.apache.iotdb.db.pipe.source.dataregion.IoTDBDataRegionSource;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
@@ -148,6 +149,21 @@ public class PipeDataNodePluginAgentTest {
}
}))
.getClass());
+ Assert.assertEquals(
+ IoTDBDataRegionSyncSink.class,
+ agent.dataRegion().reflectSink(new PipeParameters(new
HashMap<>())).getClass());
+ Assert.assertEquals(
+ IoTDBDataRegionAsyncSink.class,
+ agent
+ .dataRegion()
+ .reflectSink(
+ new PipeParameters(
+ new HashMap<String, String>() {
+ {
+
put(PipeSinkConstant.CONNECTOR_SERIALIZE_BY_REGION_KEY, "false");
+ }
+ }))
+ .getClass());
Assert.assertEquals(
IoTConsensusV2AsyncSink.class,
agent
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/constructor/PipeSinkConstructor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/constructor/PipeSinkConstructor.java
index 16f5ad49b14..95aa29f14c9 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/constructor/PipeSinkConstructor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/constructor/PipeSinkConstructor.java
@@ -19,14 +19,11 @@
package org.apache.iotdb.commons.pipe.agent.plugin.constructor;
-import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
import org.apache.iotdb.commons.pipe.agent.plugin.meta.PipePluginMetaKeeper;
import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant;
import org.apache.iotdb.pipe.api.PipeConnector;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
-import java.util.Arrays;
-
public abstract class PipeSinkConstructor extends PipePluginConstructor {
protected PipeSinkConstructor(PipePluginMetaKeeper pipePluginMetaKeeper) {
@@ -41,10 +38,7 @@ public abstract class PipeSinkConstructor extends
PipePluginConstructor {
public final PipeConnector reflectPlugin(PipeParameters connectorParameters)
{
return (PipeConnector)
reflectPluginByKey(
- connectorParameters
- .getStringOrDefault(
- Arrays.asList(PipeSinkConstant.CONNECTOR_KEY,
PipeSinkConstant.SINK_KEY),
-
BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName())
+
PipeSinkConstant.getConnectorOrSinkNameWithDefault(connectorParameters)
// Convert the value of `CONNECTOR_KEY` or `SINK_KEY` to
lowercase for matching in
// `PLUGIN_CONSTRUCTORS`
.toLowerCase());
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
index 058b17e2f4f..39be064f09a 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.i18n.PipeMessages;
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import com.github.luben.zstd.Zstd;
@@ -69,6 +70,27 @@ public class PipeSinkConstant {
public static final String SINK_REALTIME_FIRST_KEY = "sink.realtime-first";
public static final boolean CONNECTOR_REALTIME_FIRST_DEFAULT_VALUE = true;
+ public static final String CONNECTOR_SERIALIZE_BY_REGION_KEY =
"connector.serialize-by-region";
+ public static final String SINK_SERIALIZE_BY_REGION_KEY =
"sink.serialize-by-region";
+ public static final boolean CONNECTOR_SERIALIZE_BY_REGION_DEFAULT_VALUE =
true;
+
+ public static boolean isSerializeByRegionEnabled(final PipeParameters
parameters) {
+ return parameters.getBooleanOrDefault(
+ Arrays.asList(CONNECTOR_SERIALIZE_BY_REGION_KEY,
SINK_SERIALIZE_BY_REGION_KEY),
+ CONNECTOR_SERIALIZE_BY_REGION_DEFAULT_VALUE);
+ }
+
+ public static String getConnectorOrSinkNameWithDefault(final PipeParameters
parameters) {
+ return parameters.getStringOrDefault(
+ Arrays.asList(CONNECTOR_KEY, SINK_KEY),
getDefaultConnectorOrSinkName(parameters));
+ }
+
+ private static String getDefaultConnectorOrSinkName(final PipeParameters
parameters) {
+ return isSerializeByRegionEnabled(parameters)
+ ? BuiltinPipePlugin.IOTDB_THRIFT_SYNC_CONNECTOR.getPipePluginName()
+ : BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName();
+ }
+
public static final String CONNECTOR_IOTDB_BATCH_MODE_ENABLE_KEY =
"connector.batch.enable";
public static final String SINK_IOTDB_BATCH_MODE_ENABLE_KEY =
"sink.batch.enable";
public static final boolean CONNECTOR_IOTDB_BATCH_MODE_ENABLE_DEFAULT_VALUE
= true;