This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 4e06a94d06b Pipe: serialize sink transfers by region (#17946) (#17970)
4e06a94d06b is described below

commit 4e06a94d06bd9bfd296295f947b8da51291c7cf4
Author: Caideyipi <[email protected]>
AuthorDate: Thu Jun 18 11:04:33 2026 +0800

    Pipe: serialize sink transfers by region (#17946) (#17970)
    
    (cherry picked from commit 5e6f1c20cc5e86086c7a45466612ca7f08deff18)
---
 .../it/autocreate/IoTDBPipeSinkParallelIT.java     |  1 +
 .../task/subtask/sink/PipeSinkSubtaskManager.java  | 37 +++++++++++-----------
 .../agent/plugin/PipeDataNodePluginAgentTest.java  | 16 ++++++++++
 .../plugin/constructor/PipeSinkConstructor.java    |  8 +----
 .../pipe/config/constant/PipeSinkConstant.java     | 22 +++++++++++++
 5 files changed, 58 insertions(+), 26 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSinkParallelIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSinkParallelIT.java
index 8138b350864..e1c9ec5472c 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSinkParallelIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSinkParallelIT.java
@@ -60,6 +60,7 @@ public class IoTDBPipeSinkParallelIT extends 
AbstractPipeDualAutoIT {
       connectorAttributes.put("connector.batch.enable", "false");
       connectorAttributes.put("connector.ip", receiverIp);
       connectorAttributes.put("connector.port", 
Integer.toString(receiverPort));
+      connectorAttributes.put("connector.serialize-by-region", "false");
       connectorAttributes.put("connector.parallel.tasks", "3");
 
       final TSStatus status =
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
index 817471c785a..4385cc59504 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
@@ -20,7 +20,6 @@
 package org.apache.iotdb.db.pipe.agent.task.subtask.sink;
 
 import org.apache.iotdb.commons.consensus.DataRegionId;
-import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
 import 
org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
 import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
 import 
org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager;
@@ -65,11 +64,10 @@ public class PipeSinkSubtaskManager {
       final Supplier<? extends PipeSinkSubtaskExecutor> executorSupplier,
       final PipeParameters pipeSinkParameters,
       final PipeTaskSinkRuntimeEnvironment environment) {
+    final String connectorName =
+        PipeSinkConstant.getConnectorOrSinkNameWithDefault(pipeSinkParameters);
     final String connectorKey =
-        pipeSinkParameters
-            .getStringOrDefault(
-                Arrays.asList(PipeSinkConstant.CONNECTOR_KEY, 
PipeSinkConstant.SINK_KEY),
-                BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName())
+        connectorName
             // Convert the value of `CONNECTOR_KEY` or `SINK_KEY` to lowercase
             // for matching in `CONNECTOR_CONSTRUCTORS`
             .toLowerCase();
@@ -87,29 +85,30 @@ public class PipeSinkSubtaskManager {
 
     final int sinkNum;
     boolean realTimeFirst = false;
+    boolean serializeByRegion = false;
     String attributeSortedString = 
generateAttributeSortedString(pipeSinkParameters);
     if (isDataSinkConnector) {
+      serializeByRegion = 
PipeSinkConstant.isSerializeByRegionEnabled(pipeSinkParameters);
       sinkNum =
-          pipeSinkParameters.getIntOrDefault(
-              Arrays.asList(
-                  PipeSinkConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_KEY,
-                  PipeSinkConstant.SINK_IOTDB_PARALLEL_TASKS_KEY),
-              PipeSinkConstant.SINGLE_THREAD_DEFAULT_SINK.contains(
-                      pipeSinkParameters
-                          .getStringOrDefault(
-                              Arrays.asList(
-                                  PipeSinkConstant.CONNECTOR_KEY, 
PipeSinkConstant.SINK_KEY),
-                              
BuiltinPipePlugin.IOTDB_THRIFT_SINK.getPipePluginName())
-                          .toLowerCase())
-                  ? 1
-                  : 
PipeSinkConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_DEFAULT_VALUE);
+          serializeByRegion
+              ? 1
+              : pipeSinkParameters.getIntOrDefault(
+                  Arrays.asList(
+                      PipeSinkConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_KEY,
+                      PipeSinkConstant.SINK_IOTDB_PARALLEL_TASKS_KEY),
+                  
PipeSinkConstant.SINGLE_THREAD_DEFAULT_SINK.contains(connectorKey)
+                      ? 1
+                      : 
PipeSinkConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_DEFAULT_VALUE);
       realTimeFirst =
           pipeSinkParameters.getBooleanOrDefault(
               Arrays.asList(
                   PipeSinkConstant.CONNECTOR_REALTIME_FIRST_KEY,
                   PipeSinkConstant.SINK_REALTIME_FIRST_KEY),
               PipeSinkConstant.CONNECTOR_REALTIME_FIRST_DEFAULT_VALUE);
-      attributeSortedString = "data_" + attributeSortedString;
+      attributeSortedString =
+          serializeByRegion
+              ? "data_region_" + environment.getRegionId() + "_" + 
attributeSortedString
+              : "data_" + attributeSortedString;
     } else {
       // Do not allow parallel tasks for schema region connectors
       // to avoid the potential disorder of the schema region data transfer
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgentTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgentTest.java
index 73ad4fcf68a..ce898fab64a 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgentTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgentTest.java
@@ -27,6 +27,7 @@ import 
org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant;
 import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant;
 import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
 import 
org.apache.iotdb.db.pipe.sink.protocol.thrift.async.IoTDBDataRegionAsyncSink;
+import 
org.apache.iotdb.db.pipe.sink.protocol.thrift.sync.IoTDBDataRegionSyncSink;
 import org.apache.iotdb.db.pipe.source.dataregion.IoTDBDataRegionSource;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 
@@ -132,5 +133,20 @@ public class PipeDataNodePluginAgentTest {
                       }
                     }))
             .getClass());
+    Assert.assertEquals(
+        IoTDBDataRegionSyncSink.class,
+        agent.dataRegion().reflectSink(new PipeParameters(new 
HashMap<>())).getClass());
+    Assert.assertEquals(
+        IoTDBDataRegionAsyncSink.class,
+        agent
+            .dataRegion()
+            .reflectSink(
+                new PipeParameters(
+                    new HashMap<String, String>() {
+                      {
+                        
put(PipeSinkConstant.CONNECTOR_SERIALIZE_BY_REGION_KEY, "false");
+                      }
+                    }))
+            .getClass());
   }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/constructor/PipeSinkConstructor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/constructor/PipeSinkConstructor.java
index 16f5ad49b14..95aa29f14c9 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/constructor/PipeSinkConstructor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/constructor/PipeSinkConstructor.java
@@ -19,14 +19,11 @@
 
 package org.apache.iotdb.commons.pipe.agent.plugin.constructor;
 
-import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
 import org.apache.iotdb.commons.pipe.agent.plugin.meta.PipePluginMetaKeeper;
 import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant;
 import org.apache.iotdb.pipe.api.PipeConnector;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 
-import java.util.Arrays;
-
 public abstract class PipeSinkConstructor extends PipePluginConstructor {
 
   protected PipeSinkConstructor(PipePluginMetaKeeper pipePluginMetaKeeper) {
@@ -41,10 +38,7 @@ public abstract class PipeSinkConstructor extends 
PipePluginConstructor {
   public final PipeConnector reflectPlugin(PipeParameters connectorParameters) 
{
     return (PipeConnector)
         reflectPluginByKey(
-            connectorParameters
-                .getStringOrDefault(
-                    Arrays.asList(PipeSinkConstant.CONNECTOR_KEY, 
PipeSinkConstant.SINK_KEY),
-                    
BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName())
+            
PipeSinkConstant.getConnectorOrSinkNameWithDefault(connectorParameters)
                 // Convert the value of `CONNECTOR_KEY` or `SINK_KEY` to 
lowercase for matching in
                 // `PLUGIN_CONSTRUCTORS`
                 .toLowerCase());
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
index e04f2578daa..d19d6d2dc16 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.commons.pipe.config.constant;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 
 import com.github.luben.zstd.Zstd;
 
@@ -68,6 +69,27 @@ public class PipeSinkConstant {
   public static final String SINK_REALTIME_FIRST_KEY = "sink.realtime-first";
   public static final boolean CONNECTOR_REALTIME_FIRST_DEFAULT_VALUE = true;
 
+  public static final String CONNECTOR_SERIALIZE_BY_REGION_KEY = 
"connector.serialize-by-region";
+  public static final String SINK_SERIALIZE_BY_REGION_KEY = 
"sink.serialize-by-region";
+  public static final boolean CONNECTOR_SERIALIZE_BY_REGION_DEFAULT_VALUE = 
true;
+
+  public static boolean isSerializeByRegionEnabled(final PipeParameters 
parameters) {
+    return parameters.getBooleanOrDefault(
+        Arrays.asList(CONNECTOR_SERIALIZE_BY_REGION_KEY, 
SINK_SERIALIZE_BY_REGION_KEY),
+        CONNECTOR_SERIALIZE_BY_REGION_DEFAULT_VALUE);
+  }
+
+  public static String getConnectorOrSinkNameWithDefault(final PipeParameters 
parameters) {
+    return parameters.getStringOrDefault(
+        Arrays.asList(CONNECTOR_KEY, SINK_KEY), 
getDefaultConnectorOrSinkName(parameters));
+  }
+
+  private static String getDefaultConnectorOrSinkName(final PipeParameters 
parameters) {
+    return isSerializeByRegionEnabled(parameters)
+        ? BuiltinPipePlugin.IOTDB_THRIFT_SYNC_CONNECTOR.getPipePluginName()
+        : BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName();
+  }
+
   public static final String CONNECTOR_IOTDB_BATCH_MODE_ENABLE_KEY = 
"connector.batch.enable";
   public static final String SINK_IOTDB_BATCH_MODE_ENABLE_KEY = 
"sink.batch.enable";
   public static final boolean CONNECTOR_IOTDB_BATCH_MODE_ENABLE_DEFAULT_VALUE 
= true;

Reply via email to