This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch try-separate-lock
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/try-separate-lock by this push:
     new 250c591eda3 y
250c591eda3 is described below

commit 250c591eda3bd317926c633e5da2032d845af23f
Author: Caideyipi <[email protected]>
AuthorDate: Thu Jan 29 15:25:40 2026 +0800

    y
---
 .../agent/plugin/PipeConfigNodePluginAgent.java    |  2 +-
 .../dataregion/PipeDataRegionPluginAgent.java      |  4 ++--
 .../schemaregion/PipeSchemaRegionPluginAgent.java  |  4 ++--
 .../commons/pipe/agent/plugin/PipePluginAgent.java | 26 +++++++++++-----------
 4 files changed, 18 insertions(+), 18 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/plugin/PipeConfigNodePluginAgent.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/plugin/PipeConfigNodePluginAgent.java
index f1441c4c6e4..44e5675402c 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/plugin/PipeConfigNodePluginAgent.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/plugin/PipeConfigNodePluginAgent.java
@@ -44,7 +44,7 @@ public class PipeConfigNodePluginAgent extends 
PipePluginAgent {
   }
 
   @Override
-  protected PipeSinkConstructor createPipeConnectorConstructor(
+  protected PipeSinkConstructor createPipeSinkConstructor(
       PipePluginMetaKeeper pipePluginMetaKeeper) {
     return new PipeConfigRegionSinkConstructor();
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionPluginAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionPluginAgent.java
index ec6d4f2fa98..a8d002fb27c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionPluginAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionPluginAgent.java
@@ -42,7 +42,7 @@ public class PipeDataRegionPluginAgent extends 
PipePluginAgent {
   }
 
   @Override
-  protected PipeSourceConstructor createPipeExtractorConstructor(
+  protected PipeSourceConstructor createPipeSourceConstructor(
       PipePluginMetaKeeper pipePluginMetaKeeper) {
     return new PipeDataRegionSourceConstructor((DataNodePipePluginMetaKeeper) 
pipePluginMetaKeeper);
   }
@@ -55,7 +55,7 @@ public class PipeDataRegionPluginAgent extends 
PipePluginAgent {
   }
 
   @Override
-  protected PipeSinkConstructor createPipeConnectorConstructor(
+  protected PipeSinkConstructor createPipeSinkConstructor(
       PipePluginMetaKeeper pipePluginMetaKeeper) {
     return new PipeDataRegionSinkConstructor((DataNodePipePluginMetaKeeper) 
pipePluginMetaKeeper);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/schemaregion/PipeSchemaRegionPluginAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/schemaregion/PipeSchemaRegionPluginAgent.java
index 549030073b1..cd4293c6ac0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/schemaregion/PipeSchemaRegionPluginAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/schemaregion/PipeSchemaRegionPluginAgent.java
@@ -32,7 +32,7 @@ public class PipeSchemaRegionPluginAgent extends 
PipePluginAgent {
   }
 
   @Override
-  protected PipeSourceConstructor createPipeExtractorConstructor(
+  protected PipeSourceConstructor createPipeSourceConstructor(
       PipePluginMetaKeeper pipePluginMetaKeeper) {
     return new PipeSchemaRegionSourceConstructor();
   }
@@ -44,7 +44,7 @@ public class PipeSchemaRegionPluginAgent extends 
PipePluginAgent {
   }
 
   @Override
-  protected PipeSinkConstructor createPipeConnectorConstructor(
+  protected PipeSinkConstructor createPipeSinkConstructor(
       PipePluginMetaKeeper pipePluginMetaKeeper) {
     return new PipeSchemaRegionSinkConstructor();
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/PipePluginAgent.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/PipePluginAgent.java
index 1649660425f..519d263449b 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/PipePluginAgent.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/PipePluginAgent.java
@@ -55,18 +55,18 @@ public abstract class PipePluginAgent {
 
   protected PipePluginAgent(final PipePluginMetaKeeper pipePluginMetaKeeper) {
     this.pipePluginMetaKeeper = pipePluginMetaKeeper;
-    pipeExtractorConstructor = 
createPipeExtractorConstructor(pipePluginMetaKeeper);
+    pipeExtractorConstructor = 
createPipeSourceConstructor(pipePluginMetaKeeper);
     pipeProcessorConstructor = 
createPipeProcessorConstructor(pipePluginMetaKeeper);
-    pipeSinkConstructor = createPipeConnectorConstructor(pipePluginMetaKeeper);
+    pipeSinkConstructor = createPipeSinkConstructor(pipePluginMetaKeeper);
   }
 
-  protected abstract PipeSourceConstructor createPipeExtractorConstructor(
+  protected abstract PipeSourceConstructor createPipeSourceConstructor(
       final PipePluginMetaKeeper pipePluginMetaKeeper);
 
   protected abstract PipeProcessorConstructor createPipeProcessorConstructor(
       final PipePluginMetaKeeper pipePluginMetaKeeper);
 
-  protected abstract PipeSinkConstructor createPipeConnectorConstructor(
+  protected abstract PipeSinkConstructor createPipeSinkConstructor(
       final PipePluginMetaKeeper pipePluginMetaKeeper);
 
   public final PipeExtractor reflectSource(final PipeParameters 
sourceParameters) {
@@ -124,24 +124,24 @@ public abstract class PipePluginAgent {
     return temporaryProcessor;
   }
 
-  protected PipeConnector validateSink(String pipeName, Map<String, String> 
connectorAttributes)
+  protected PipeConnector validateSink(String pipeName, Map<String, String> 
sinkAttributes)
       throws Exception {
-    final PipeParameters connectorParameters = new 
PipeParameters(connectorAttributes);
-    final PipeConnector temporaryConnector = reflectSink(connectorParameters);
+    final PipeParameters sinkParameters = new PipeParameters(sinkAttributes);
+    final PipeConnector temporarySink = reflectSink(sinkParameters);
     try {
-      temporaryConnector.validate(new 
PipeParameterValidator(connectorParameters));
-      temporaryConnector.customize(
-          connectorParameters,
+      temporarySink.validate(new PipeParameterValidator(sinkParameters));
+      temporarySink.customize(
+          sinkParameters,
           new PipeTaskRuntimeConfiguration(new 
PipeTaskTemporaryRuntimeEnvironment(pipeName)));
-      temporaryConnector.handshake();
+      temporarySink.handshake();
     } finally {
       try {
-        temporaryConnector.close();
+        temporarySink.close();
       } catch (Exception e) {
         LOGGER.warn("Failed to close temporary connector: {}", e.getMessage(), 
e);
       }
     }
-    return temporaryConnector;
+    return temporarySink;
   }
 
   /**

Reply via email to