This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch 1.3-sub
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/1.3-sub by this push:
     new 312e0477dc6 Optimized the hint for subscription (#17115)
312e0477dc6 is described below

commit 312e0477dc632a6ec5c026364a7cdbba7672875d
Author: Caideyipi <[email protected]>
AuthorDate: Fri Jan 30 13:58:29 2026 +0800

    Optimized the hint for subscription (#17115)
    
    * ams
    
    * y
    
    * fix
---
 .../agent/plugin/PipeConfigNodePluginAgent.java    |  4 +-
 .../pipe/sink/protocol/IoTDBConfigRegionSink.java  |  4 +-
 .../pipe/agent/plugin/PipeDataNodePluginAgent.java | 13 ++---
 .../dataregion/PipeDataRegionPluginAgent.java      |  4 +-
 .../schemaregion/PipeSchemaRegionPluginAgent.java  |  4 +-
 .../task/builder/PipeDataNodeTaskBuilder.java      |  2 +-
 .../pipe/agent/task/stage/PipeTaskSinkStage.java   | 12 ++--
 .../subtask/sink/PipeSinkSubtaskLifeCycle.java     |  6 +-
 .../task/subtask/sink/PipeSinkSubtaskManager.java  |  2 +-
 .../PipeDataNodeRemainingEventAndTimeOperator.java | 10 ++--
 .../config/executor/ClusterConfigTaskExecutor.java | 13 ++---
 .../task/stage/SubscriptionTaskSinkStage.java      | 16 +++---
 .../subtask/SubscriptionSinkSubtaskManager.java    | 37 ++++++------
 .../commons/pipe/agent/plugin/PipePluginAgent.java | 66 +++++++++++-----------
 14 files changed, 94 insertions(+), 99 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/plugin/PipeConfigNodePluginAgent.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/plugin/PipeConfigNodePluginAgent.java
index 8ddbb73398c..44e5675402c 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/plugin/PipeConfigNodePluginAgent.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/plugin/PipeConfigNodePluginAgent.java
@@ -32,7 +32,7 @@ public class PipeConfigNodePluginAgent extends 
PipePluginAgent {
   }
 
   @Override
-  protected PipeSourceConstructor createPipeExtractorConstructor(
+  protected PipeSourceConstructor createPipeSourceConstructor(
       PipePluginMetaKeeper pipePluginMetaKeeper) {
     return new PipeConfigRegionSourceConstructor();
   }
@@ -44,7 +44,7 @@ public class PipeConfigNodePluginAgent extends 
PipePluginAgent {
   }
 
   @Override
-  protected PipeSinkConstructor createPipeConnectorConstructor(
+  protected PipeSinkConstructor createPipeSinkConstructor(
       PipePluginMetaKeeper pipePluginMetaKeeper) {
     return new PipeConfigRegionSinkConstructor();
   }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionSink.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionSink.java
index 1b508eb2f5c..5ee983d7945 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionSink.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionSink.java
@@ -183,9 +183,7 @@ public class IoTDBConfigRegionSink extends IoTDBSslSyncSink 
{
           true);
     }
 
-    if (LOGGER.isDebugEnabled()) {
-      LOGGER.debug("Successfully transferred config event {}.", 
pipeConfigRegionWritePlanEvent);
-    }
+    LOGGER.info("Successfully transferred config event {}.", 
pipeConfigRegionWritePlanEvent);
   }
 
   private void doTransferWrapper(final PipeConfigRegionSnapshotEvent 
pipeConfigRegionSnapshotEvent)
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgent.java
index 31f404de9d2..437b197b081 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgent.java
@@ -212,9 +212,9 @@ public class PipeDataNodePluginAgent {
         pipeName, extractorAttributes, processorAttributes, 
connectorAttributes);
   }
 
-  public void validateExtractor(Map<String, String> extractorAttributes) 
throws Exception {
-    dataRegionAgent.validateExtractor(extractorAttributes);
-    schemaRegionAgent.validateExtractor(extractorAttributes);
+  public void validateSource(Map<String, String> sourceAttributes) throws 
Exception {
+    dataRegionAgent.validateSource(sourceAttributes);
+    schemaRegionAgent.validateSource(sourceAttributes);
   }
 
   public void validateProcessor(Map<String, String> processorAttributes) 
throws Exception {
@@ -222,9 +222,8 @@ public class PipeDataNodePluginAgent {
     schemaRegionAgent.validateProcessor(processorAttributes);
   }
 
-  public void validateConnector(String pipeName, Map<String, String> 
connectorAttributes)
-      throws Exception {
-    dataRegionAgent.validateConnector(pipeName, connectorAttributes);
-    schemaRegionAgent.validateConnector(pipeName, connectorAttributes);
+  public void validateSource(String pipeName, Map<String, String> 
sinkAttributes) throws Exception {
+    dataRegionAgent.validateSink(pipeName, sinkAttributes);
+    schemaRegionAgent.validateSink(pipeName, sinkAttributes);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionPluginAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionPluginAgent.java
index a8583559394..9f41abe28cd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionPluginAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionPluginAgent.java
@@ -33,7 +33,7 @@ public class PipeDataRegionPluginAgent extends 
PipePluginAgent {
   }
 
   @Override
-  protected PipeSourceConstructor createPipeExtractorConstructor(
+  protected PipeSourceConstructor createPipeSourceConstructor(
       PipePluginMetaKeeper pipePluginMetaKeeper) {
     return new PipeDataRegionSourceConstructor((DataNodePipePluginMetaKeeper) 
pipePluginMetaKeeper);
   }
@@ -46,7 +46,7 @@ public class PipeDataRegionPluginAgent extends 
PipePluginAgent {
   }
 
   @Override
-  protected PipeSinkConstructor createPipeConnectorConstructor(
+  protected PipeSinkConstructor createPipeSinkConstructor(
       PipePluginMetaKeeper pipePluginMetaKeeper) {
     return new PipeDataRegionSinkConstructor((DataNodePipePluginMetaKeeper) 
pipePluginMetaKeeper);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/schemaregion/PipeSchemaRegionPluginAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/schemaregion/PipeSchemaRegionPluginAgent.java
index 549030073b1..cd4293c6ac0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/schemaregion/PipeSchemaRegionPluginAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/schemaregion/PipeSchemaRegionPluginAgent.java
@@ -32,7 +32,7 @@ public class PipeSchemaRegionPluginAgent extends 
PipePluginAgent {
   }
 
   @Override
-  protected PipeSourceConstructor createPipeExtractorConstructor(
+  protected PipeSourceConstructor createPipeSourceConstructor(
       PipePluginMetaKeeper pipePluginMetaKeeper) {
     return new PipeSchemaRegionSourceConstructor();
   }
@@ -44,7 +44,7 @@ public class PipeSchemaRegionPluginAgent extends 
PipePluginAgent {
   }
 
   @Override
-  protected PipeSinkConstructor createPipeConnectorConstructor(
+  protected PipeSinkConstructor createPipeSinkConstructor(
       PipePluginMetaKeeper pipePluginMetaKeeper) {
     return new PipeSchemaRegionSinkConstructor();
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
index 2edc5b943f6..e767731c1ac 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
@@ -124,7 +124,7 @@ public class PipeDataNodeTaskBuilder {
             
blendUserAndSystemParameters(pipeStaticMeta.getProcessorParameters()),
             regionId,
             sourceStage.getEventSupplier(),
-            sinkStage.getPipeConnectorPendingQueue(),
+            sinkStage.getPipeSinkPendingQueue(),
             PROCESSOR_EXECUTOR,
             pipeTaskMeta,
             pipeStaticMeta
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskSinkStage.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskSinkStage.java
index c24db53e610..a22fbb536d7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskSinkStage.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskSinkStage.java
@@ -34,7 +34,7 @@ public class PipeTaskSinkStage extends PipeTaskStage {
 
   protected final String pipeName;
   protected final long creationTime;
-  protected final PipeParameters pipeConnectorParameters;
+  protected final PipeParameters pipeSinkParameters;
   protected final int regionId;
   protected final Supplier<? extends PipeSinkSubtaskExecutor> executor;
 
@@ -43,12 +43,12 @@ public class PipeTaskSinkStage extends PipeTaskStage {
   public PipeTaskSinkStage(
       String pipeName,
       long creationTime,
-      PipeParameters pipeConnectorParameters,
+      PipeParameters pipeSinkParameters,
       int regionId,
       Supplier<? extends PipeSinkSubtaskExecutor> executor) {
     this.pipeName = pipeName;
     this.creationTime = creationTime;
-    this.pipeConnectorParameters = pipeConnectorParameters;
+    this.pipeSinkParameters = pipeSinkParameters;
     this.regionId = regionId;
     this.executor = executor;
 
@@ -60,7 +60,7 @@ public class PipeTaskSinkStage extends PipeTaskStage {
         PipeSinkSubtaskManager.instance()
             .register(
                 executor,
-                pipeConnectorParameters,
+                pipeSinkParameters,
                 new PipeTaskSinkRuntimeEnvironment(pipeName, creationTime, 
regionId));
   }
 
@@ -85,7 +85,7 @@ public class PipeTaskSinkStage extends PipeTaskStage {
         .deregister(pipeName, creationTime, regionId, connectorSubtaskId);
   }
 
-  public UnboundedBlockingPendingQueue<Event> getPipeConnectorPendingQueue() {
-    return 
PipeSinkSubtaskManager.instance().getPipeConnectorPendingQueue(connectorSubtaskId);
+  public UnboundedBlockingPendingQueue<Event> getPipeSinkPendingQueue() {
+    return 
PipeSinkSubtaskManager.instance().getPipeSinkPendingQueue(connectorSubtaskId);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java
index 0df3a773b9c..35f7983075d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java
@@ -39,9 +39,9 @@ public class PipeSinkSubtaskLifeCycle implements 
AutoCloseable {
   protected int registeredTaskCount;
 
   public PipeSinkSubtaskLifeCycle(
-      PipeSinkSubtaskExecutor executor,
-      PipeSinkSubtask subtask,
-      UnboundedBlockingPendingQueue<Event> pendingQueue) {
+      final PipeSinkSubtaskExecutor executor,
+      final PipeSinkSubtask subtask,
+      final UnboundedBlockingPendingQueue<Event> pendingQueue) {
     this.executor = executor;
     this.subtask = subtask;
     this.pendingQueue = pendingQueue;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
index 97859938828..905e3b535b5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
@@ -240,7 +240,7 @@ public class PipeSinkSubtaskManager {
     }
   }
 
-  public UnboundedBlockingPendingQueue<Event> getPipeConnectorPendingQueue(
+  public UnboundedBlockingPendingQueue<Event> getPipeSinkPendingQueue(
       final String attributeSortedString) {
     if 
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) 
{
       throw new PipeException(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
index f54e8bcfccb..38cfb579622 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
@@ -43,7 +43,7 @@ import java.util.concurrent.atomic.AtomicReference;
 public class PipeDataNodeRemainingEventAndTimeOperator extends 
PipeRemainingOperator {
 
   // Calculate from schema region extractors directly for it requires less 
computation
-  private final Set<IoTDBSchemaRegionSource> schemaRegionExtractors =
+  private final Set<IoTDBSchemaRegionSource> schemaRegionSources =
       Collections.newSetFromMap(new ConcurrentHashMap<>());
 
   private final AtomicInteger insertNodeEventCount = new AtomicInteger(0);
@@ -105,7 +105,7 @@ public class PipeDataNodeRemainingEventAndTimeOperator 
extends PipeRemainingOper
         tsfileEventCount.get()
             + rawTabletEventCount.get()
             + insertNodeEventCount.get()
-            + schemaRegionExtractors.stream()
+            + schemaRegionSources.stream()
                 .map(IoTDBSchemaRegionSource::getUnTransferredEventCount)
                 .reduce(Long::sum)
                 .orElse(0L);
@@ -157,7 +157,7 @@ public class PipeDataNodeRemainingEventAndTimeOperator 
extends PipeRemainingOper
     }
 
     final long totalSchemaRegionWriteEventCount =
-        schemaRegionExtractors.stream()
+        schemaRegionSources.stream()
             .map(IoTDBSchemaRegionSource::getUnTransferredEventCount)
             .reduce(Long::sum)
             .orElse(0L);
@@ -192,8 +192,8 @@ public class PipeDataNodeRemainingEventAndTimeOperator 
extends PipeRemainingOper
 
   //////////////////////////// Register & deregister (pipe integration) 
////////////////////////////
 
-  void register(final IoTDBSchemaRegionSource extractor) {
-    schemaRegionExtractors.add(extractor);
+  void register(final IoTDBSchemaRegionSource source) {
+    schemaRegionSources.add(source);
   }
 
   //////////////////////////// Rate ////////////////////////////
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index b711b652829..d4ba79d2c5d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -323,7 +323,7 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
     SUBSCRIPTION_NOT_ENABLED_ERROR_FUTURE = SettableFuture.create();
     SUBSCRIPTION_NOT_ENABLED_ERROR_FUTURE.setException(
         new IoTDBException(
-            "Subscription not enabled, please set config 
`subscription_enabled` to true.",
+            "Subscription is not enabled.",
             TSStatusCode.SUBSCRIPTION_NOT_ENABLED_ERROR.getStatusCode()));
   }
 
@@ -1977,7 +1977,7 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
     try {
       if (!alterPipeStatement.getExtractorAttributes().isEmpty()) {
         if (alterPipeStatement.isReplaceAllExtractorAttributes()) {
-          
PipeDataNodeAgent.plugin().validateExtractor(alterPipeStatement.getExtractorAttributes());
+          
PipeDataNodeAgent.plugin().validateSource(alterPipeStatement.getExtractorAttributes());
         } else {
           pipeMetaFromCoordinator
               .getStaticMeta()
@@ -1985,7 +1985,7 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
               .addOrReplaceEquivalentAttributes(
                   new 
PipeParameters(alterPipeStatement.getExtractorAttributes()));
           PipeDataNodeAgent.plugin()
-              .validateExtractor(
+              .validateSource(
                   
pipeMetaFromCoordinator.getStaticMeta().getExtractorParameters().getAttribute());
         }
       }
@@ -2008,7 +2008,7 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
       if (!alterPipeStatement.getConnectorAttributes().isEmpty()) {
         if (alterPipeStatement.isReplaceAllConnectorAttributes()) {
           PipeDataNodeAgent.plugin()
-              .validateConnector(pipeName, 
alterPipeStatement.getConnectorAttributes());
+              .validateSource(pipeName, 
alterPipeStatement.getConnectorAttributes());
         } else {
           pipeMetaFromCoordinator
               .getStaticMeta()
@@ -2016,7 +2016,7 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
               .addOrReplaceEquivalentAttributes(
                   new 
PipeParameters(alterPipeStatement.getConnectorAttributes()));
           PipeDataNodeAgent.plugin()
-              .validateConnector(
+              .validateSource(
                   pipeName,
                   
pipeMetaFromCoordinator.getStaticMeta().getConnectorParameters().getAttribute());
         }
@@ -2274,8 +2274,7 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
     final TopicMeta temporaryTopicMeta =
         new TopicMeta(topicName, System.currentTimeMillis(), topicAttributes);
     try {
-      PipeDataNodeAgent.plugin()
-          .validateExtractor(temporaryTopicMeta.generateExtractorAttributes());
+      
PipeDataNodeAgent.plugin().validateSource(temporaryTopicMeta.generateExtractorAttributes());
       PipeDataNodeAgent.plugin()
           .validateProcessor(temporaryTopicMeta.generateProcessorAttributes());
     } catch (Exception e) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/stage/SubscriptionTaskSinkStage.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/stage/SubscriptionTaskSinkStage.java
index 233164b4d1d..73fca57a1fd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/stage/SubscriptionTaskSinkStage.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/stage/SubscriptionTaskSinkStage.java
@@ -31,12 +31,12 @@ import org.apache.iotdb.pipe.api.exception.PipeException;
 public class SubscriptionTaskSinkStage extends PipeTaskSinkStage {
 
   public SubscriptionTaskSinkStage(
-      String pipeName,
-      long creationTime,
-      PipeParameters pipeConnectorParameters,
-      int regionId,
-      PipeSinkSubtaskExecutor executor) {
-    super(pipeName, creationTime, pipeConnectorParameters, regionId, () -> 
executor);
+      final String pipeName,
+      final long creationTime,
+      final PipeParameters pipeSinkParameters,
+      final int regionId,
+      final PipeSinkSubtaskExecutor executor) {
+    super(pipeName, creationTime, pipeSinkParameters, regionId, () -> 
executor);
   }
 
   @Override
@@ -45,7 +45,7 @@ public class SubscriptionTaskSinkStage extends 
PipeTaskSinkStage {
         SubscriptionSinkSubtaskManager.instance()
             .register(
                 executor.get(),
-                pipeConnectorParameters,
+                pipeSinkParameters,
                 new PipeTaskSinkRuntimeEnvironment(pipeName, creationTime, 
regionId));
   }
 
@@ -70,7 +70,7 @@ public class SubscriptionTaskSinkStage extends 
PipeTaskSinkStage {
         .deregister(pipeName, creationTime, regionId, connectorSubtaskId);
   }
 
-  public UnboundedBlockingPendingQueue<Event> getPipeConnectorPendingQueue() {
+  public UnboundedBlockingPendingQueue<Event> getPipeSinkPendingQueue() {
     return SubscriptionSinkSubtaskManager.instance()
         .getPipeConnectorPendingQueue(connectorSubtaskId);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskManager.java
index 959870898a2..07def3ff4d3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskManager.java
@@ -64,10 +64,10 @@ public class SubscriptionSinkSubtaskManager {
 
   public synchronized String register(
       final PipeSinkSubtaskExecutor executor,
-      final PipeParameters pipeConnectorParameters,
+      final PipeParameters pipeSinkParameters,
       final PipeTaskSinkRuntimeEnvironment environment) {
     final String connectorKey =
-        pipeConnectorParameters
+        pipeSinkParameters
             .getStringOrDefault(
                 Arrays.asList(PipeSinkConstant.CONNECTOR_KEY, 
PipeSinkConstant.SINK_KEY),
                 BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName())
@@ -86,13 +86,13 @@ public class SubscriptionSinkSubtaskManager {
             environment.getRegionId(),
             connectorKey);
 
-    boolean realTimeFirst =
-        pipeConnectorParameters.getBooleanOrDefault(
+    final boolean realTimeFirst =
+        pipeSinkParameters.getBooleanOrDefault(
             Arrays.asList(
                 PipeSinkConstant.CONNECTOR_REALTIME_FIRST_KEY,
                 PipeSinkConstant.SINK_REALTIME_FIRST_KEY),
             PipeSinkConstant.CONNECTOR_REALTIME_FIRST_DEFAULT_VALUE);
-    String attributeSortedString = 
generateAttributeSortedString(pipeConnectorParameters);
+    String attributeSortedString = 
generateAttributeSortedString(pipeSinkParameters);
     attributeSortedString = "__subscription_" + attributeSortedString;
 
     if 
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) 
{
@@ -101,36 +101,33 @@ public class SubscriptionSinkSubtaskManager {
               ? new PipeRealtimePriorityBlockingQueue()
               : new UnboundedBlockingPendingQueue<>(new 
PipeDataRegionEventCounter());
 
-      final PipeConnector pipeConnector =
-          
PipeDataNodeAgent.plugin().dataRegion().reflectConnector(pipeConnectorParameters);
+      final PipeConnector pipeSink =
+          
PipeDataNodeAgent.plugin().dataRegion().reflectSink(pipeSinkParameters);
       // 1. Construct, validate and customize PipeConnector, and then 
handshake (create connection)
       // with the target
       try {
-        pipeConnector.validate(new 
PipeParameterValidator(pipeConnectorParameters));
-        pipeConnector.customize(
-            pipeConnectorParameters, new 
PipeTaskRuntimeConfiguration(environment));
-        pipeConnector.handshake();
+        pipeSink.validate(new PipeParameterValidator(pipeSinkParameters));
+        pipeSink.customize(pipeSinkParameters, new 
PipeTaskRuntimeConfiguration(environment));
+        pipeSink.handshake();
       } catch (final Exception e) {
         try {
-          pipeConnector.close();
+          pipeSink.close();
         } catch (final Exception closeException) {
           LOGGER.warn(
-              "Failed to close connector after failed to initialize connector. 
"
-                  + "Ignore this exception.",
+              "Failed to close sink after failed to initialize sink. " + 
"Ignore this exception.",
               closeException);
         }
-        throw new PipeException(
-            "Failed to construct PipeConnector, because of " + e.getMessage(), 
e);
+        throw new PipeException("Failed to construct PipeSink, because of " + 
e.getMessage(), e);
       }
 
       // 2. Fetch topic and consumer group id from connector parameters
-      final String topicName = 
pipeConnectorParameters.getString(PipeSinkConstant.SINK_TOPIC_KEY);
+      final String topicName = 
pipeSinkParameters.getString(PipeSinkConstant.SINK_TOPIC_KEY);
       final String consumerGroupId =
-          
pipeConnectorParameters.getString(PipeSinkConstant.SINK_CONSUMER_GROUP_KEY);
+          
pipeSinkParameters.getString(PipeSinkConstant.SINK_CONSUMER_GROUP_KEY);
       if (Objects.isNull(topicName) || Objects.isNull(consumerGroupId)) {
         throw new SubscriptionException(
             String.format(
-                "Failed to construct subscription connector, because of %s or 
%s does not exist in pipe connector parameters",
+                "Failed to construct subscription sink, because of %s or %s 
does not exist in pipe connector parameters",
                 PipeSinkConstant.SINK_TOPIC_KEY, 
PipeSinkConstant.SINK_CONSUMER_GROUP_KEY));
       }
 
@@ -142,7 +139,7 @@ public class SubscriptionSinkSubtaskManager {
               attributeSortedString,
               0,
               pendingQueue,
-              pipeConnector,
+              pipeSink,
               topicName,
               consumerGroupId);
       final PipeSinkSubtaskLifeCycle pipeSinkSubtaskLifeCycle =
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/PipePluginAgent.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/PipePluginAgent.java
index aca42727082..caffb9ef859 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/PipePluginAgent.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/PipePluginAgent.java
@@ -53,57 +53,58 @@ public abstract class PipePluginAgent {
   private final PipeProcessorConstructor pipeProcessorConstructor;
   private final PipeSinkConstructor pipeSinkConstructor;
 
-  protected PipePluginAgent(PipePluginMetaKeeper pipePluginMetaKeeper) {
+  protected PipePluginAgent(final PipePluginMetaKeeper pipePluginMetaKeeper) {
     this.pipePluginMetaKeeper = pipePluginMetaKeeper;
-    pipeSourceConstructor = 
createPipeExtractorConstructor(pipePluginMetaKeeper);
+    pipeSourceConstructor = createPipeSourceConstructor(pipePluginMetaKeeper);
     pipeProcessorConstructor = 
createPipeProcessorConstructor(pipePluginMetaKeeper);
-    pipeSinkConstructor = createPipeConnectorConstructor(pipePluginMetaKeeper);
+    pipeSinkConstructor = createPipeSinkConstructor(pipePluginMetaKeeper);
   }
 
-  protected abstract PipeSourceConstructor createPipeExtractorConstructor(
-      PipePluginMetaKeeper pipePluginMetaKeeper);
+  protected abstract PipeSourceConstructor createPipeSourceConstructor(
+      final PipePluginMetaKeeper pipePluginMetaKeeper);
 
   protected abstract PipeProcessorConstructor createPipeProcessorConstructor(
-      PipePluginMetaKeeper pipePluginMetaKeeper);
+      final PipePluginMetaKeeper pipePluginMetaKeeper);
 
-  protected abstract PipeSinkConstructor createPipeConnectorConstructor(
-      PipePluginMetaKeeper pipePluginMetaKeeper);
+  protected abstract PipeSinkConstructor createPipeSinkConstructor(
+      final PipePluginMetaKeeper pipePluginMetaKeeper);
 
-  public final PipeExtractor reflectExtractor(PipeParameters 
extractorParameters) {
-    return pipeSourceConstructor.reflectPlugin(extractorParameters);
+  public final PipeExtractor reflectSource(final PipeParameters 
sourceParameters) {
+    return pipeSourceConstructor.reflectPlugin(sourceParameters);
   }
 
-  public final PipeProcessor reflectProcessor(PipeParameters 
processorParameters) {
+  public final PipeProcessor reflectProcessor(final PipeParameters 
processorParameters) {
     return pipeProcessorConstructor.reflectPlugin(processorParameters);
   }
 
-  public final PipeConnector reflectConnector(PipeParameters 
connectorParameters) {
-    return pipeSinkConstructor.reflectPlugin(connectorParameters);
+  public final PipeConnector reflectSink(final PipeParameters sinkParameters) {
+    return pipeSinkConstructor.reflectPlugin(sinkParameters);
   }
 
   public void validate(
-      String pipeName,
-      Map<String, String> extractorAttributes,
-      Map<String, String> processorAttributes,
-      Map<String, String> connectorAttributes)
+      final String pipeName,
+      final Map<String, String> sourceAttributes,
+      final Map<String, String> processorAttributes,
+      final Map<String, String> sinkAttributes)
       throws Exception {
-    validateExtractor(extractorAttributes);
+    validateSource(sourceAttributes);
     validateProcessor(processorAttributes);
-    validateConnector(pipeName, connectorAttributes);
+    validateSink(pipeName, sinkAttributes);
   }
 
-  public void validateExtractor(Map<String, String> extractorAttributes) 
throws Exception {
-    final PipeParameters extractorParameters = new 
PipeParameters(extractorAttributes);
-    final PipeExtractor temporaryExtractor = 
reflectExtractor(extractorParameters);
+  public PipeExtractor validateSource(final Map<String, String> 
sourceAttributes) throws Exception {
+    final PipeParameters sourceParameters = new 
PipeParameters(sourceAttributes);
+    final PipeExtractor temporaryExtractor = reflectSource(sourceParameters);
     try {
-      temporaryExtractor.validate(new 
PipeParameterValidator(extractorParameters));
+      temporaryExtractor.validate(new 
PipeParameterValidator(sourceParameters));
     } finally {
       try {
         temporaryExtractor.close();
       } catch (Exception e) {
-        LOGGER.warn("Failed to close temporary extractor: {}", e.getMessage(), 
e);
+        LOGGER.warn("Failed to close temporary source: {}", e.getMessage(), e);
       }
     }
+    return temporaryExtractor;
   }
 
   public void validateProcessor(Map<String, String> processorAttributes) 
throws Exception {
@@ -120,23 +121,24 @@ public abstract class PipePluginAgent {
     }
   }
 
-  public void validateConnector(String pipeName, Map<String, String> 
connectorAttributes)
+  public PipeConnector validateSink(String pipeName, Map<String, String> 
sinkAttributes)
       throws Exception {
-    final PipeParameters connectorParameters = new 
PipeParameters(connectorAttributes);
-    final PipeConnector temporaryConnector = 
reflectConnector(connectorParameters);
+    final PipeParameters sinkParameters = new PipeParameters(sinkAttributes);
+    final PipeConnector temporarySink = reflectSink(sinkParameters);
     try {
-      temporaryConnector.validate(new 
PipeParameterValidator(connectorParameters));
-      temporaryConnector.customize(
-          connectorParameters,
+      temporarySink.validate(new PipeParameterValidator(sinkParameters));
+      temporarySink.customize(
+          sinkParameters,
           new PipeTaskRuntimeConfiguration(new 
PipeTaskTemporaryRuntimeEnvironment(pipeName)));
-      temporaryConnector.handshake();
+      temporarySink.handshake();
     } finally {
       try {
-        temporaryConnector.close();
+        temporarySink.close();
       } catch (Exception e) {
         LOGGER.warn("Failed to close temporary connector: {}", e.getMessage(), 
e);
       }
     }
+    return temporarySink;
   }
 
   /**

Reply via email to