This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch try-separate-lock
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/try-separate-lock by this push:
     new 090aefece97 ams
090aefece97 is described below

commit 090aefece971a88a7e6d099c4167a78feea46bad
Author: Caideyipi <[email protected]>
AuthorDate: Thu Jan 29 15:24:04 2026 +0800

    ams
---
 .../agent/plugin/PipeConfigNodePluginAgent.java    |  2 +-
 .../pipe/sink/protocol/IoTDBConfigRegionSink.java  |  4 +-
 .../dataregion/PipeDataRegionPluginAgent.java      | 14 +++----
 .../task/builder/PipeDataNodeTaskBuilder.java      |  2 +-
 .../pipe/agent/task/stage/PipeTaskSinkStage.java   | 12 +++---
 .../subtask/sink/PipeSinkSubtaskLifeCycle.java     |  6 +--
 .../task/subtask/sink/PipeSinkSubtaskManager.java  |  2 +-
 .../PipeDataNodeRemainingEventAndTimeOperator.java | 10 ++---
 .../task/stage/SubscriptionTaskSinkStage.java      | 16 ++++----
 .../subtask/SubscriptionSinkSubtaskManager.java    | 37 +++++++++---------
 .../commons/pipe/agent/plugin/PipePluginAgent.java | 44 +++++++++++-----------
 11 files changed, 72 insertions(+), 77 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/plugin/PipeConfigNodePluginAgent.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/plugin/PipeConfigNodePluginAgent.java
index 8ddbb73398c..f1441c4c6e4 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/plugin/PipeConfigNodePluginAgent.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/plugin/PipeConfigNodePluginAgent.java
@@ -32,7 +32,7 @@ public class PipeConfigNodePluginAgent extends 
PipePluginAgent {
   }
 
   @Override
-  protected PipeSourceConstructor createPipeExtractorConstructor(
+  protected PipeSourceConstructor createPipeSourceConstructor(
       PipePluginMetaKeeper pipePluginMetaKeeper) {
     return new PipeConfigRegionSourceConstructor();
   }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionSink.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionSink.java
index 5ce03f02598..36bd1cb1a5e 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionSink.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionSink.java
@@ -192,9 +192,7 @@ public class IoTDBConfigRegionSink extends IoTDBSslSyncSink 
{
           true);
     }
 
-    if (LOGGER.isDebugEnabled()) {
-      LOGGER.debug("Successfully transferred config event {}.", 
pipeConfigRegionWritePlanEvent);
-    }
+    LOGGER.info("Successfully transferred config event {}.", 
pipeConfigRegionWritePlanEvent);
   }
 
   private void doTransferWrapper(final PipeConfigRegionSnapshotEvent 
pipeConfigRegionSnapshotEvent)
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionPluginAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionPluginAgent.java
index ae27a7eff33..ec6d4f2fa98 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionPluginAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionPluginAgent.java
@@ -63,18 +63,18 @@ public class PipeDataRegionPluginAgent extends 
PipePluginAgent {
   @Override
   public void validate(
       String pipeName,
-      Map<String, String> extractorAttributes,
+      Map<String, String> sourceAttributes,
       Map<String, String> processorAttributes,
-      Map<String, String> connectorAttributes)
+      Map<String, String> sinkAttributes)
       throws Exception {
-    PipeExtractor temporaryExtractor = validateExtractor(extractorAttributes);
+    PipeExtractor temporaryExtractor = validateSource(sourceAttributes);
     PipeProcessor temporaryProcessor = validateProcessor(processorAttributes);
-    PipeConnector temporaryConnector = validateConnector(pipeName, 
connectorAttributes);
+    PipeConnector temporaryConnector = validateSink(pipeName, sinkAttributes);
 
     // validate visibility
     // TODO: validate visibility for schema region and config region
     Visibility pipeVisibility =
-        VisibilityUtils.calculateFromExtractorParameters(new 
PipeParameters(extractorAttributes));
+        VisibilityUtils.calculateFromExtractorParameters(new 
PipeParameters(sourceAttributes));
     Visibility extractorVisibility =
         
VisibilityUtils.calculateFromPluginClass(temporaryExtractor.getClass());
     Visibility processorVisibility =
@@ -88,13 +88,13 @@ public class PipeDataRegionPluginAgent extends 
PipePluginAgent {
               "The visibility of the pipe (%s, %s) is not compatible with the 
visibility of the extractor (%s, %s, %s), processor (%s, %s, %s), and connector 
(%s, %s, %s).",
               pipeName,
               pipeVisibility,
-              extractorAttributes,
+              sourceAttributes,
               temporaryExtractor.getClass().getName(),
               extractorVisibility,
               processorAttributes,
               temporaryProcessor.getClass().getName(),
               processorVisibility,
-              connectorAttributes,
+              sinkAttributes,
               temporaryConnector.getClass().getName(),
               connectorVisibility));
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
index 038420527fd..f9609869b45 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
@@ -127,7 +127,7 @@ public class PipeDataNodeTaskBuilder {
             
blendUserAndSystemParameters(pipeStaticMeta.getProcessorParameters()),
             regionId,
             sourceStage.getEventSupplier(),
-            sinkStage.getPipeConnectorPendingQueue(),
+            sinkStage.getPipeSinkPendingQueue(),
             PROCESSOR_EXECUTOR,
             pipeTaskMeta,
             pipeStaticMeta
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskSinkStage.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskSinkStage.java
index c24db53e610..a22fbb536d7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskSinkStage.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskSinkStage.java
@@ -34,7 +34,7 @@ public class PipeTaskSinkStage extends PipeTaskStage {
 
   protected final String pipeName;
   protected final long creationTime;
-  protected final PipeParameters pipeConnectorParameters;
+  protected final PipeParameters pipeSinkParameters;
   protected final int regionId;
   protected final Supplier<? extends PipeSinkSubtaskExecutor> executor;
 
@@ -43,12 +43,12 @@ public class PipeTaskSinkStage extends PipeTaskStage {
   public PipeTaskSinkStage(
       String pipeName,
       long creationTime,
-      PipeParameters pipeConnectorParameters,
+      PipeParameters pipeSinkParameters,
       int regionId,
       Supplier<? extends PipeSinkSubtaskExecutor> executor) {
     this.pipeName = pipeName;
     this.creationTime = creationTime;
-    this.pipeConnectorParameters = pipeConnectorParameters;
+    this.pipeSinkParameters = pipeSinkParameters;
     this.regionId = regionId;
     this.executor = executor;
 
@@ -60,7 +60,7 @@ public class PipeTaskSinkStage extends PipeTaskStage {
         PipeSinkSubtaskManager.instance()
             .register(
                 executor,
-                pipeConnectorParameters,
+                pipeSinkParameters,
                 new PipeTaskSinkRuntimeEnvironment(pipeName, creationTime, 
regionId));
   }
 
@@ -85,7 +85,7 @@ public class PipeTaskSinkStage extends PipeTaskStage {
         .deregister(pipeName, creationTime, regionId, connectorSubtaskId);
   }
 
-  public UnboundedBlockingPendingQueue<Event> getPipeConnectorPendingQueue() {
-    return 
PipeSinkSubtaskManager.instance().getPipeConnectorPendingQueue(connectorSubtaskId);
+  public UnboundedBlockingPendingQueue<Event> getPipeSinkPendingQueue() {
+    return 
PipeSinkSubtaskManager.instance().getPipeSinkPendingQueue(connectorSubtaskId);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java
index 0df3a773b9c..35f7983075d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java
@@ -39,9 +39,9 @@ public class PipeSinkSubtaskLifeCycle implements 
AutoCloseable {
   protected int registeredTaskCount;
 
   public PipeSinkSubtaskLifeCycle(
-      PipeSinkSubtaskExecutor executor,
-      PipeSinkSubtask subtask,
-      UnboundedBlockingPendingQueue<Event> pendingQueue) {
+      final PipeSinkSubtaskExecutor executor,
+      final PipeSinkSubtask subtask,
+      final UnboundedBlockingPendingQueue<Event> pendingQueue) {
     this.executor = executor;
     this.subtask = subtask;
     this.pendingQueue = pendingQueue;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
index b249ed7b1e9..3a461fe0395 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
@@ -243,7 +243,7 @@ public class PipeSinkSubtaskManager {
     }
   }
 
-  public UnboundedBlockingPendingQueue<Event> getPipeConnectorPendingQueue(
+  public UnboundedBlockingPendingQueue<Event> getPipeSinkPendingQueue(
       final String attributeSortedString) {
     if 
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) 
{
       throw new PipeException(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
index f54e8bcfccb..38cfb579622 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
@@ -43,7 +43,7 @@ import java.util.concurrent.atomic.AtomicReference;
 public class PipeDataNodeRemainingEventAndTimeOperator extends 
PipeRemainingOperator {
 
   // Calculate from schema region extractors directly for it requires less 
computation
-  private final Set<IoTDBSchemaRegionSource> schemaRegionExtractors =
+  private final Set<IoTDBSchemaRegionSource> schemaRegionSources =
       Collections.newSetFromMap(new ConcurrentHashMap<>());
 
   private final AtomicInteger insertNodeEventCount = new AtomicInteger(0);
@@ -105,7 +105,7 @@ public class PipeDataNodeRemainingEventAndTimeOperator 
extends PipeRemainingOper
         tsfileEventCount.get()
             + rawTabletEventCount.get()
             + insertNodeEventCount.get()
-            + schemaRegionExtractors.stream()
+            + schemaRegionSources.stream()
                 .map(IoTDBSchemaRegionSource::getUnTransferredEventCount)
                 .reduce(Long::sum)
                 .orElse(0L);
@@ -157,7 +157,7 @@ public class PipeDataNodeRemainingEventAndTimeOperator 
extends PipeRemainingOper
     }
 
     final long totalSchemaRegionWriteEventCount =
-        schemaRegionExtractors.stream()
+        schemaRegionSources.stream()
             .map(IoTDBSchemaRegionSource::getUnTransferredEventCount)
             .reduce(Long::sum)
             .orElse(0L);
@@ -192,8 +192,8 @@ public class PipeDataNodeRemainingEventAndTimeOperator 
extends PipeRemainingOper
 
   //////////////////////////// Register & deregister (pipe integration) 
////////////////////////////
 
-  void register(final IoTDBSchemaRegionSource extractor) {
-    schemaRegionExtractors.add(extractor);
+  void register(final IoTDBSchemaRegionSource source) {
+    schemaRegionSources.add(source);
   }
 
   //////////////////////////// Rate ////////////////////////////
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/stage/SubscriptionTaskSinkStage.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/stage/SubscriptionTaskSinkStage.java
index 233164b4d1d..73fca57a1fd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/stage/SubscriptionTaskSinkStage.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/stage/SubscriptionTaskSinkStage.java
@@ -31,12 +31,12 @@ import org.apache.iotdb.pipe.api.exception.PipeException;
 public class SubscriptionTaskSinkStage extends PipeTaskSinkStage {
 
   public SubscriptionTaskSinkStage(
-      String pipeName,
-      long creationTime,
-      PipeParameters pipeConnectorParameters,
-      int regionId,
-      PipeSinkSubtaskExecutor executor) {
-    super(pipeName, creationTime, pipeConnectorParameters, regionId, () -> 
executor);
+      final String pipeName,
+      final long creationTime,
+      final PipeParameters pipeSinkParameters,
+      final int regionId,
+      final PipeSinkSubtaskExecutor executor) {
+    super(pipeName, creationTime, pipeSinkParameters, regionId, () -> 
executor);
   }
 
   @Override
@@ -45,7 +45,7 @@ public class SubscriptionTaskSinkStage extends 
PipeTaskSinkStage {
         SubscriptionSinkSubtaskManager.instance()
             .register(
                 executor.get(),
-                pipeConnectorParameters,
+                pipeSinkParameters,
                 new PipeTaskSinkRuntimeEnvironment(pipeName, creationTime, 
regionId));
   }
 
@@ -70,7 +70,7 @@ public class SubscriptionTaskSinkStage extends 
PipeTaskSinkStage {
         .deregister(pipeName, creationTime, regionId, connectorSubtaskId);
   }
 
-  public UnboundedBlockingPendingQueue<Event> getPipeConnectorPendingQueue() {
+  public UnboundedBlockingPendingQueue<Event> getPipeSinkPendingQueue() {
     return SubscriptionSinkSubtaskManager.instance()
         .getPipeConnectorPendingQueue(connectorSubtaskId);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskManager.java
index edff8257c65..07def3ff4d3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskManager.java
@@ -64,10 +64,10 @@ public class SubscriptionSinkSubtaskManager {
 
   public synchronized String register(
       final PipeSinkSubtaskExecutor executor,
-      final PipeParameters pipeConnectorParameters,
+      final PipeParameters pipeSinkParameters,
       final PipeTaskSinkRuntimeEnvironment environment) {
     final String connectorKey =
-        pipeConnectorParameters
+        pipeSinkParameters
             .getStringOrDefault(
                 Arrays.asList(PipeSinkConstant.CONNECTOR_KEY, 
PipeSinkConstant.SINK_KEY),
                 BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName())
@@ -86,13 +86,13 @@ public class SubscriptionSinkSubtaskManager {
             environment.getRegionId(),
             connectorKey);
 
-    boolean realTimeFirst =
-        pipeConnectorParameters.getBooleanOrDefault(
+    final boolean realTimeFirst =
+        pipeSinkParameters.getBooleanOrDefault(
             Arrays.asList(
                 PipeSinkConstant.CONNECTOR_REALTIME_FIRST_KEY,
                 PipeSinkConstant.SINK_REALTIME_FIRST_KEY),
             PipeSinkConstant.CONNECTOR_REALTIME_FIRST_DEFAULT_VALUE);
-    String attributeSortedString = 
generateAttributeSortedString(pipeConnectorParameters);
+    String attributeSortedString = 
generateAttributeSortedString(pipeSinkParameters);
     attributeSortedString = "__subscription_" + attributeSortedString;
 
     if 
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) 
{
@@ -101,36 +101,33 @@ public class SubscriptionSinkSubtaskManager {
               ? new PipeRealtimePriorityBlockingQueue()
               : new UnboundedBlockingPendingQueue<>(new 
PipeDataRegionEventCounter());
 
-      final PipeConnector pipeConnector =
-          
PipeDataNodeAgent.plugin().dataRegion().reflectSink(pipeConnectorParameters);
+      final PipeConnector pipeSink =
+          
PipeDataNodeAgent.plugin().dataRegion().reflectSink(pipeSinkParameters);
       // 1. Construct, validate and customize PipeConnector, and then 
handshake (create connection)
       // with the target
       try {
-        pipeConnector.validate(new 
PipeParameterValidator(pipeConnectorParameters));
-        pipeConnector.customize(
-            pipeConnectorParameters, new 
PipeTaskRuntimeConfiguration(environment));
-        pipeConnector.handshake();
+        pipeSink.validate(new PipeParameterValidator(pipeSinkParameters));
+        pipeSink.customize(pipeSinkParameters, new 
PipeTaskRuntimeConfiguration(environment));
+        pipeSink.handshake();
       } catch (final Exception e) {
         try {
-          pipeConnector.close();
+          pipeSink.close();
         } catch (final Exception closeException) {
           LOGGER.warn(
-              "Failed to close connector after failed to initialize connector. 
"
-                  + "Ignore this exception.",
+              "Failed to close sink after failed to initialize sink. " + 
"Ignore this exception.",
               closeException);
         }
-        throw new PipeException(
-            "Failed to construct PipeConnector, because of " + e.getMessage(), 
e);
+        throw new PipeException("Failed to construct PipeSink, because of " + 
e.getMessage(), e);
       }
 
       // 2. Fetch topic and consumer group id from connector parameters
-      final String topicName = 
pipeConnectorParameters.getString(PipeSinkConstant.SINK_TOPIC_KEY);
+      final String topicName = 
pipeSinkParameters.getString(PipeSinkConstant.SINK_TOPIC_KEY);
       final String consumerGroupId =
-          
pipeConnectorParameters.getString(PipeSinkConstant.SINK_CONSUMER_GROUP_KEY);
+          
pipeSinkParameters.getString(PipeSinkConstant.SINK_CONSUMER_GROUP_KEY);
       if (Objects.isNull(topicName) || Objects.isNull(consumerGroupId)) {
         throw new SubscriptionException(
             String.format(
-                "Failed to construct subscription connector, because of %s or 
%s does not exist in pipe connector parameters",
+                "Failed to construct subscription sink, because of %s or %s 
does not exist in pipe connector parameters",
                 PipeSinkConstant.SINK_TOPIC_KEY, 
PipeSinkConstant.SINK_CONSUMER_GROUP_KEY));
       }
 
@@ -142,7 +139,7 @@ public class SubscriptionSinkSubtaskManager {
               attributeSortedString,
               0,
               pendingQueue,
-              pipeConnector,
+              pipeSink,
               topicName,
               consumerGroupId);
       final PipeSinkSubtaskLifeCycle pipeSinkSubtaskLifeCycle =
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/PipePluginAgent.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/PipePluginAgent.java
index 33acd4beb63..1649660425f 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/PipePluginAgent.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/PipePluginAgent.java
@@ -53,7 +53,7 @@ public abstract class PipePluginAgent {
   private final PipeProcessorConstructor pipeProcessorConstructor;
   private final PipeSinkConstructor pipeSinkConstructor;
 
-  protected PipePluginAgent(PipePluginMetaKeeper pipePluginMetaKeeper) {
+  protected PipePluginAgent(final PipePluginMetaKeeper pipePluginMetaKeeper) {
     this.pipePluginMetaKeeper = pipePluginMetaKeeper;
     pipeExtractorConstructor = 
createPipeExtractorConstructor(pipePluginMetaKeeper);
     pipeProcessorConstructor = 
createPipeProcessorConstructor(pipePluginMetaKeeper);
@@ -61,48 +61,48 @@ public abstract class PipePluginAgent {
   }
 
   protected abstract PipeSourceConstructor createPipeExtractorConstructor(
-      PipePluginMetaKeeper pipePluginMetaKeeper);
+      final PipePluginMetaKeeper pipePluginMetaKeeper);
 
   protected abstract PipeProcessorConstructor createPipeProcessorConstructor(
-      PipePluginMetaKeeper pipePluginMetaKeeper);
+      final PipePluginMetaKeeper pipePluginMetaKeeper);
 
   protected abstract PipeSinkConstructor createPipeConnectorConstructor(
-      PipePluginMetaKeeper pipePluginMetaKeeper);
+      final PipePluginMetaKeeper pipePluginMetaKeeper);
 
-  public final PipeExtractor reflectSource(PipeParameters extractorParameters) 
{
-    return pipeExtractorConstructor.reflectPlugin(extractorParameters);
+  public final PipeExtractor reflectSource(final PipeParameters 
sourceParameters) {
+    return pipeExtractorConstructor.reflectPlugin(sourceParameters);
   }
 
-  public final PipeProcessor reflectProcessor(PipeParameters 
processorParameters) {
+  public final PipeProcessor reflectProcessor(final PipeParameters 
processorParameters) {
     return pipeProcessorConstructor.reflectPlugin(processorParameters);
   }
 
-  public final PipeConnector reflectSink(PipeParameters connectorParameters) {
-    return pipeSinkConstructor.reflectPlugin(connectorParameters);
+  public final PipeConnector reflectSink(final PipeParameters sinkParameters) {
+    return pipeSinkConstructor.reflectPlugin(sinkParameters);
   }
 
   public void validate(
-      String pipeName,
-      Map<String, String> extractorAttributes,
-      Map<String, String> processorAttributes,
-      Map<String, String> connectorAttributes)
+      final String pipeName,
+      final Map<String, String> sourceAttributes,
+      final Map<String, String> processorAttributes,
+      final Map<String, String> sinkAttributes)
       throws Exception {
-    validateExtractor(extractorAttributes);
+    validateSource(sourceAttributes);
     validateProcessor(processorAttributes);
-    validateConnector(pipeName, connectorAttributes);
+    validateSink(pipeName, sinkAttributes);
   }
 
-  protected PipeExtractor validateExtractor(Map<String, String> 
extractorAttributes)
+  protected PipeExtractor validateSource(final Map<String, String> 
sourceAttributes)
       throws Exception {
-    final PipeParameters extractorParameters = new 
PipeParameters(extractorAttributes);
-    final PipeExtractor temporaryExtractor = 
reflectSource(extractorParameters);
+    final PipeParameters sourceParameters = new 
PipeParameters(sourceAttributes);
+    final PipeExtractor temporaryExtractor = reflectSource(sourceParameters);
     try {
-      temporaryExtractor.validate(new 
PipeParameterValidator(extractorParameters));
+      temporaryExtractor.validate(new 
PipeParameterValidator(sourceParameters));
     } finally {
       try {
         temporaryExtractor.close();
       } catch (Exception e) {
-        LOGGER.warn("Failed to close temporary extractor: {}", e.getMessage(), 
e);
+        LOGGER.warn("Failed to close temporary source: {}", e.getMessage(), e);
       }
     }
     return temporaryExtractor;
@@ -124,8 +124,8 @@ public abstract class PipePluginAgent {
     return temporaryProcessor;
   }
 
-  protected PipeConnector validateConnector(
-      String pipeName, Map<String, String> connectorAttributes) throws 
Exception {
+  protected PipeConnector validateSink(String pipeName, Map<String, String> 
connectorAttributes)
+      throws Exception {
     final PipeParameters connectorParameters = new 
PipeParameters(connectorAttributes);
     final PipeConnector temporaryConnector = reflectSink(connectorParameters);
     try {

Reply via email to