This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 5e6f1c20cc5 Pipe: serialize sink transfers by region (#17946)
5e6f1c20cc5 is described below

commit 5e6f1c20cc5e86086c7a45466612ca7f08deff18
Author: Caideyipi <[email protected]>
AuthorDate: Tue Jun 16 16:55:09 2026 +0800

    Pipe: serialize sink transfers by region (#17946)
---
 .../manual/enhanced/IoTDBPipeSinkParallelIT.java   |  1 +
 .../auto/basic/IoTDBPipeSinkParallelIT.java        |  1 +
 .../task/subtask/sink/PipeSinkSubtaskManager.java  | 43 ++++++++++++----------
 .../agent/plugin/PipeDataNodePluginAgentTest.java  | 16 ++++++++
 .../plugin/constructor/PipeSinkConstructor.java    |  8 +---
 .../pipe/config/constant/PipeSinkConstant.java     | 22 +++++++++++
 6 files changed, 64 insertions(+), 27 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeSinkParallelIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeSinkParallelIT.java
index 85f3b93aa76..ae466349d72 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeSinkParallelIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeSinkParallelIT.java
@@ -80,6 +80,7 @@ public class IoTDBPipeSinkParallelIT extends 
AbstractPipeTableModelDualManualIT
       connectorAttributes.put("connector.batch.enable", "true");
       connectorAttributes.put("connector.ip", receiverIp);
       connectorAttributes.put("connector.port", 
Integer.toString(receiverPort));
+      connectorAttributes.put("connector.serialize-by-region", "false");
       connectorAttributes.put("connector.parallel.tasks", "3");
 
       final TSStatus status =
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSinkParallelIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSinkParallelIT.java
index c21e6456ef1..df8e27a8546 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSinkParallelIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSinkParallelIT.java
@@ -71,6 +71,7 @@ public class IoTDBPipeSinkParallelIT extends 
AbstractPipeDualTreeModelAutoIT {
       sinkAttributes.put("sink.batch.enable", "false");
       sinkAttributes.put("sink.ip", receiverIp);
       sinkAttributes.put("sink.port", Integer.toString(receiverPort));
+      sinkAttributes.put("sink.serialize-by-region", "false");
       sinkAttributes.put("sink.parallel.tasks", "3");
 
       final TSStatus status =
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
index 01552eec5ae..36c024090a8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
@@ -20,7 +20,6 @@
 package org.apache.iotdb.db.pipe.agent.task.subtask.sink;
 
 import org.apache.iotdb.commons.consensus.DataRegionId;
-import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
 import 
org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeRuntimeMeta;
 import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
@@ -70,11 +69,10 @@ public class PipeSinkSubtaskManager {
       final Supplier<? extends PipeSinkSubtaskExecutor> executorSupplier,
       final PipeParameters pipeSinkParameters,
       final PipeTaskSinkRuntimeEnvironment environment) {
+    final String connectorName =
+        PipeSinkConstant.getConnectorOrSinkNameWithDefault(pipeSinkParameters);
     final String connectorKey =
-        pipeSinkParameters
-            .getStringOrDefault(
-                Arrays.asList(PipeSinkConstant.CONNECTOR_KEY, 
PipeSinkConstant.SINK_KEY),
-                BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName())
+        connectorName
             // Convert the value of `CONNECTOR_KEY` or `SINK_KEY` to lowercase
             // for matching in `CONNECTOR_CONSTRUCTORS`
             .toLowerCase();
@@ -93,30 +91,31 @@ public class PipeSinkSubtaskManager {
 
     final int sinkNum;
     boolean realTimeFirst = false;
+    boolean serializeByRegion = false;
     String attributeSortedString = 
generateAttributeSortedString(pipeSinkParameters);
     final String attributeDisplayString = 
generateAttributeDisplayString(pipeSinkParameters);
     if (isDataRegionSink) {
+      serializeByRegion = 
PipeSinkConstant.isSerializeByRegionEnabled(pipeSinkParameters);
       sinkNum =
-          pipeSinkParameters.getIntOrDefault(
-              Arrays.asList(
-                  PipeSinkConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_KEY,
-                  PipeSinkConstant.SINK_IOTDB_PARALLEL_TASKS_KEY),
-              PipeSinkConstant.SINGLE_THREAD_DEFAULT_SINK.contains(
-                      pipeSinkParameters
-                          .getStringOrDefault(
-                              Arrays.asList(
-                                  PipeSinkConstant.CONNECTOR_KEY, 
PipeSinkConstant.SINK_KEY),
-                              
BuiltinPipePlugin.IOTDB_THRIFT_SINK.getPipePluginName())
-                          .toLowerCase())
-                  ? 1
-                  : 
PipeSinkConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_DEFAULT_VALUE);
+          serializeByRegion
+              ? 1
+              : pipeSinkParameters.getIntOrDefault(
+                  Arrays.asList(
+                      PipeSinkConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_KEY,
+                      PipeSinkConstant.SINK_IOTDB_PARALLEL_TASKS_KEY),
+                  
PipeSinkConstant.SINGLE_THREAD_DEFAULT_SINK.contains(connectorKey)
+                      ? 1
+                      : 
PipeSinkConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_DEFAULT_VALUE);
       realTimeFirst =
           pipeSinkParameters.getBooleanOrDefault(
               Arrays.asList(
                   PipeSinkConstant.CONNECTOR_REALTIME_FIRST_KEY,
                   PipeSinkConstant.SINK_REALTIME_FIRST_KEY),
               PipeSinkConstant.CONNECTOR_REALTIME_FIRST_DEFAULT_VALUE);
-      attributeSortedString = "data_" + attributeSortedString;
+      attributeSortedString =
+          serializeByRegion
+              ? "data_region_" + environment.getRegionId() + "_" + 
attributeSortedString
+              : "data_" + attributeSortedString;
     } else {
       // Do not allow parallel tasks for schema region connectors
       // to avoid the potential disorder of the schema region data transfer
@@ -124,7 +123,11 @@ public class PipeSinkSubtaskManager {
       attributeSortedString = "schema_" + attributeSortedString;
     }
     final String attributeDisplayStringWithPrefix =
-        isDataRegionSink ? "data_" + attributeDisplayString : "schema_" + 
attributeDisplayString;
+        isDataRegionSink
+            ? serializeByRegion
+                ? "data_region_" + environment.getRegionId() + "_" + 
attributeDisplayString
+                : "data_" + attributeDisplayString
+            : "schema_" + attributeDisplayString;
     environment.setAttributeSortedString(attributeDisplayStringWithPrefix);
 
     if 
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) 
{
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgentTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgentTest.java
index b0f0b34e92a..04c759935cd 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgentTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgentTest.java
@@ -29,6 +29,7 @@ import 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
 import 
org.apache.iotdb.db.pipe.processor.iotconsensusv2.IoTConsensusV2Processor;
 import 
org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.IoTConsensusV2AsyncSink;
 import 
org.apache.iotdb.db.pipe.sink.protocol.thrift.async.IoTDBDataRegionAsyncSink;
+import 
org.apache.iotdb.db.pipe.sink.protocol.thrift.sync.IoTDBDataRegionSyncSink;
 import org.apache.iotdb.db.pipe.source.dataregion.IoTDBDataRegionSource;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 
@@ -148,6 +149,21 @@ public class PipeDataNodePluginAgentTest {
                       }
                     }))
             .getClass());
+    Assert.assertEquals(
+        IoTDBDataRegionSyncSink.class,
+        agent.dataRegion().reflectSink(new PipeParameters(new 
HashMap<>())).getClass());
+    Assert.assertEquals(
+        IoTDBDataRegionAsyncSink.class,
+        agent
+            .dataRegion()
+            .reflectSink(
+                new PipeParameters(
+                    new HashMap<String, String>() {
+                      {
+                        
put(PipeSinkConstant.CONNECTOR_SERIALIZE_BY_REGION_KEY, "false");
+                      }
+                    }))
+            .getClass());
     Assert.assertEquals(
         IoTConsensusV2AsyncSink.class,
         agent
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/constructor/PipeSinkConstructor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/constructor/PipeSinkConstructor.java
index 16f5ad49b14..95aa29f14c9 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/constructor/PipeSinkConstructor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/constructor/PipeSinkConstructor.java
@@ -19,14 +19,11 @@
 
 package org.apache.iotdb.commons.pipe.agent.plugin.constructor;
 
-import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
 import org.apache.iotdb.commons.pipe.agent.plugin.meta.PipePluginMetaKeeper;
 import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant;
 import org.apache.iotdb.pipe.api.PipeConnector;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 
-import java.util.Arrays;
-
 public abstract class PipeSinkConstructor extends PipePluginConstructor {
 
   protected PipeSinkConstructor(PipePluginMetaKeeper pipePluginMetaKeeper) {
@@ -41,10 +38,7 @@ public abstract class PipeSinkConstructor extends 
PipePluginConstructor {
   public final PipeConnector reflectPlugin(PipeParameters connectorParameters) 
{
     return (PipeConnector)
         reflectPluginByKey(
-            connectorParameters
-                .getStringOrDefault(
-                    Arrays.asList(PipeSinkConstant.CONNECTOR_KEY, 
PipeSinkConstant.SINK_KEY),
-                    
BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName())
+            
PipeSinkConstant.getConnectorOrSinkNameWithDefault(connectorParameters)
                 // Convert the value of `CONNECTOR_KEY` or `SINK_KEY` to 
lowercase for matching in
                 // `PLUGIN_CONSTRUCTORS`
                 .toLowerCase());
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
index 058b17e2f4f..39be064f09a 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.i18n.PipeMessages;
 import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 
 import com.github.luben.zstd.Zstd;
 
@@ -69,6 +70,27 @@ public class PipeSinkConstant {
   public static final String SINK_REALTIME_FIRST_KEY = "sink.realtime-first";
   public static final boolean CONNECTOR_REALTIME_FIRST_DEFAULT_VALUE = true;
 
+  public static final String CONNECTOR_SERIALIZE_BY_REGION_KEY = 
"connector.serialize-by-region";
+  public static final String SINK_SERIALIZE_BY_REGION_KEY = 
"sink.serialize-by-region";
+  public static final boolean CONNECTOR_SERIALIZE_BY_REGION_DEFAULT_VALUE = 
true;
+
+  public static boolean isSerializeByRegionEnabled(final PipeParameters 
parameters) {
+    return parameters.getBooleanOrDefault(
+        Arrays.asList(CONNECTOR_SERIALIZE_BY_REGION_KEY, 
SINK_SERIALIZE_BY_REGION_KEY),
+        CONNECTOR_SERIALIZE_BY_REGION_DEFAULT_VALUE);
+  }
+
+  public static String getConnectorOrSinkNameWithDefault(final PipeParameters 
parameters) {
+    return parameters.getStringOrDefault(
+        Arrays.asList(CONNECTOR_KEY, SINK_KEY), 
getDefaultConnectorOrSinkName(parameters));
+  }
+
+  private static String getDefaultConnectorOrSinkName(final PipeParameters 
parameters) {
+    return isSerializeByRegionEnabled(parameters)
+        ? BuiltinPipePlugin.IOTDB_THRIFT_SYNC_CONNECTOR.getPipePluginName()
+        : BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName();
+  }
+
   public static final String CONNECTOR_IOTDB_BATCH_MODE_ENABLE_KEY = 
"connector.batch.enable";
   public static final String SINK_IOTDB_BATCH_MODE_ENABLE_KEY = 
"sink.batch.enable";
   public static final boolean CONNECTOR_IOTDB_BATCH_MODE_ENABLE_DEFAULT_VALUE 
= true;

Reply via email to