This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch mergemaster0808
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 3fd8c2eb95018d186c71c85e5494647ded1c6b00
Author: Zhenyu Luo <[email protected]>
AuthorDate: Mon Aug 5 12:54:29 2024 +0800

    PipePlugin/Subscription: The Drop PipePlugin operation adds a check to see 
if there is a Topic that uses PipePlugin as a processor (#13048)
    
    (cherry picked from commit 97f9ef529421c2eb2aec1ccee92e047622e1c0b4)
---
 .../persistence/subscription/SubscriptionInfo.java | 31 ++++++++++++++++++++++
 .../impl/pipe/plugin/DropPipePluginProcedure.java  |  6 +++++
 2 files changed, 37 insertions(+)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfo.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfo.java
index 6b64331422b..d1071c511af 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfo.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfo.java
@@ -207,6 +207,37 @@ public class SubscriptionInfo implements SnapshotProcessor 
{
     throw new SubscriptionException(exceptionMessage);
   }
 
+  public void validatePipePluginUsageByTopic(String pipePluginName) throws 
SubscriptionException {
+    acquireReadLock();
+    try {
+      validatePipePluginUsageByTopicInternal(pipePluginName);
+    } finally {
+      releaseReadLock();
+    }
+  }
+
+  public void validatePipePluginUsageByTopicInternal(String pipePluginName)
+      throws SubscriptionException {
+    acquireReadLock();
+    try {
+      topicMetaKeeper
+          .getAllTopicMeta()
+          .forEach(
+              meta -> {
+                if 
(pipePluginName.equals(meta.getConfig().getAttribute().get("processor"))) {
+                  final String exceptionMessage =
+                      String.format(
+                          "PipePlugin '%s' is already used by Topic '%s' as a 
processor.",
+                          pipePluginName, meta.getTopicName());
+                  LOGGER.warn(exceptionMessage);
+                  throw new SubscriptionException(exceptionMessage);
+                }
+              });
+    } finally {
+      releaseReadLock();
+    }
+  }
+
   public void validateBeforeAlteringTopic(TopicMeta topicMeta) throws 
SubscriptionException {
     acquireReadLock();
     try {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
index dc9d4ce4f87..9bb99f8f64c 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
@@ -22,7 +22,9 @@ package 
org.apache.iotdb.confignode.procedure.impl.pipe.plugin;
 import 
org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.DropPipePluginPlan;
 import 
org.apache.iotdb.confignode.manager.pipe.coordinator.plugin.PipePluginCoordinator;
 import 
org.apache.iotdb.confignode.manager.pipe.coordinator.task.PipeTaskCoordinator;
+import 
org.apache.iotdb.confignode.manager.subscription.SubscriptionCoordinator;
 import org.apache.iotdb.confignode.persistence.pipe.PipeTaskInfo;
+import org.apache.iotdb.confignode.persistence.subscription.SubscriptionInfo;
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
 import 
org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
@@ -119,9 +121,12 @@ public class DropPipePluginProcedure extends 
AbstractNodeProcedure<DropPipePlugi
         env.getConfigManager().getPipeManager().getPipeTaskCoordinator();
     final PipePluginCoordinator pipePluginCoordinator =
         env.getConfigManager().getPipeManager().getPipePluginCoordinator();
+    final SubscriptionCoordinator subscriptionCoordinator =
+        
env.getConfigManager().getSubscriptionManager().getSubscriptionCoordinator();
 
     final AtomicReference<PipeTaskInfo> pipeTaskInfo = 
pipeTaskCoordinator.lock();
     pipePluginCoordinator.lock();
+    SubscriptionInfo subscriptionInfo = 
subscriptionCoordinator.getSubscriptionInfo();
 
     try {
       if (pipePluginCoordinator
@@ -137,6 +142,7 @@ public class DropPipePluginProcedure extends 
AbstractNodeProcedure<DropPipePlugi
       }
 
       pipeTaskInfo.get().validatePipePluginUsageByPipe(pluginName);
+      subscriptionInfo.validatePipePluginUsageByTopic(pluginName);
     } catch (PipeException e) {
       // if the pipe plugin is a built-in plugin, we should not drop it
       LOGGER.warn(e.getMessage());

Reply via email to