This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch mergemaster0808
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 2d5f2b344367006167b459c80e9dedb0f6f14d3f
Author: Zhenyu Luo <[email protected]>
AuthorDate: Thu Aug 1 14:33:44 2024 +0800

    Pipe/Subscription: Add 'Create If Not Exists' and 'Drop If Exists' Support 
for Pipes, Plugins, and Topics (#12969)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
    (cherry picked from commit 43e562a2ac1f032d3d3dd9de3d8acd95742e79b6)
---
 .../IoTDBPipeConditionalOperationsIT.java          | 228 +++++++++++++++++++++
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |   1 +
 .../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4   |  14 +-
 .../antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4  |   9 +
 .../iotdb/confignode/manager/ConfigManager.java    |  15 +-
 .../apache/iotdb/confignode/manager/IManager.java  |  11 +-
 .../iotdb/confignode/manager/ProcedureManager.java |  13 +-
 .../coordinator/plugin/PipePluginCoordinator.java  |  12 +-
 .../pipe/coordinator/task/PipeTaskCoordinator.java |  11 +-
 .../subscription/SubscriptionCoordinator.java      |  18 +-
 .../persistence/pipe/PipePluginInfo.java           |  26 ++-
 .../confignode/persistence/pipe/PipeTaskInfo.java  |  26 ++-
 .../persistence/subscription/SubscriptionInfo.java |  12 +-
 .../impl/pipe/AbstractOperatePipeProcedureV2.java  |   6 +-
 .../pipe/plugin/CreatePipePluginProcedure.java     |  28 ++-
 .../impl/pipe/plugin/DropPipePluginProcedure.java  |  22 +-
 .../runtime/PipeHandleLeaderChangeProcedure.java   |   2 +-
 .../runtime/PipeHandleMetaChangeProcedure.java     |   2 +-
 .../impl/pipe/runtime/PipeMetaSyncProcedure.java   |   2 +-
 .../impl/pipe/task/AlterPipeProcedureV2.java       |   6 +-
 .../impl/pipe/task/CreatePipeProcedureV2.java      |  15 +-
 .../impl/pipe/task/DropPipeProcedureV2.java        |   2 +-
 .../impl/pipe/task/StartPipeProcedureV2.java       |   4 +-
 .../impl/pipe/task/StopPipeProcedureV2.java        |   2 +-
 .../AbstractOperateSubscriptionProcedure.java      |  15 +-
 .../consumer/AlterConsumerGroupProcedure.java      |   3 +-
 .../runtime/ConsumerGroupMetaSyncProcedure.java    |   3 +-
 .../subscription/CreateSubscriptionProcedure.java  |   3 +-
 .../subscription/DropSubscriptionProcedure.java    |   3 +-
 .../subscription/topic/AlterTopicProcedure.java    |   4 +-
 .../subscription/topic/CreateTopicProcedure.java   |   7 +-
 .../subscription/topic/DropTopicProcedure.java     |   3 +-
 .../topic/runtime/TopicMetaSyncProcedure.java      |   3 +-
 .../thrift/ConfigNodeRPCServiceProcessor.java      |  20 +-
 .../iotdb/confignode/persistence/PipeInfoTest.java |   2 +-
 .../pipe/plugin/CreatePipePluginProcedureTest.java |   2 +-
 .../pipe/plugin/DropPipePluginProcedureTest.java   |   2 +-
 .../iotdb/db/protocol/client/ConfigNodeClient.java |  14 ++
 .../config/executor/ClusterConfigTaskExecutor.java |  37 +++-
 .../config/executor/IConfigTaskExecutor.java       |   3 +-
 .../config/metadata/DropPipePluginTask.java        |   6 +-
 .../db/queryengine/plan/parser/ASTVisitor.java     |  20 +-
 .../metadata/pipe/AlterPipeStatement.java          |   9 +
 .../metadata/pipe/CreatePipePluginStatement.java   |   9 +-
 .../metadata/pipe/CreatePipeStatement.java         |   9 +
 .../metadata/pipe/DropPipePluginStatement.java     |  18 +-
 .../statement/metadata/pipe/DropPipeStatement.java |   9 +
 .../subscription/CreateTopicStatement.java         |  10 +-
 .../metadata/subscription/DropTopicStatement.java  |   9 +
 .../src/main/thrift/confignode.thrift              |  21 ++
 50 files changed, 632 insertions(+), 99 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeConditionalOperationsIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeConditionalOperationsIT.java
new file mode 100644
index 00000000000..e6dfd37a307
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeConditionalOperationsIT.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.it.autocreate;
+
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.MultiClusterIT2AutoCreateSchema;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({MultiClusterIT2AutoCreateSchema.class})
+public class IoTDBPipeConditionalOperationsIT extends AbstractPipeDualAutoIT {
+
+  @Test
+  public void testBasicCreatePipeIfNotExists() throws Exception {
+    final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+    // Create pipe
+    String sql =
+        String.format(
+            "create pipe If Not Exists a2b with source 
('source'='iotdb-source', 'source.pattern'='root.test1', 
'source.realtime.mode'='stream') with processor 
('processor'='do-nothing-processor') with sink ('node-urls'='%s')",
+            receiverDataNode.getIpAndPortString());
+    try (final Connection connection = senderEnv.getConnection();
+        final Statement statement = connection.createStatement()) {
+      statement.execute(sql);
+    } catch (SQLException e) {
+      fail(e.getMessage());
+    }
+
+    // show pipe
+    long creationTime;
+    try (final SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
+      final List<TShowPipeInfo> showPipeResult = client.showPipe(new 
TShowPipeReq()).pipeInfoList;
+      Assert.assertEquals(1, showPipeResult.size());
+      // Check status
+      Assert.assertEquals("RUNNING", showPipeResult.get(0).state);
+      // Check configurations
+      
Assert.assertTrue(showPipeResult.get(0).pipeExtractor.contains("source=iotdb-source"));
+      
Assert.assertTrue(showPipeResult.get(0).pipeExtractor.contains("source.pattern=root.test1"));
+      Assert.assertTrue(
+          
showPipeResult.get(0).pipeExtractor.contains("source.realtime.mode=stream"));
+      Assert.assertTrue(
+          
showPipeResult.get(0).pipeProcessor.contains("processor=do-nothing-processor"));
+      Assert.assertTrue(
+          showPipeResult
+              .get(0)
+              .pipeConnector
+              .contains(String.format("node-urls=%s", 
receiverDataNode.getIpAndPortString())));
+      // Record last creation time
+      creationTime = showPipeResult.get(0).creationTime;
+    }
+
+    // Create pipe If Not Exists
+    sql =
+        String.format(
+            "create pipe If Not Exists a2b with source 
('source'='iotdb-source', 'source.path'='root.test2.**') with sink 
('node-urls'='%s')",
+            receiverDataNode.getIpAndPortString());
+    try (final Connection connection = senderEnv.getConnection();
+        final Statement statement = connection.createStatement()) {
+      statement.execute(sql);
+    } catch (SQLException e) {
+      fail(e.getMessage());
+    }
+
+    // show pipe
+    try (final SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
+      final List<TShowPipeInfo> showPipeResult = client.showPipe(new 
TShowPipeReq()).pipeInfoList;
+      Assert.assertEquals(1, showPipeResult.size());
+      // Check status
+      Assert.assertEquals("RUNNING", showPipeResult.get(0).state);
+      // Check configurations
+      
Assert.assertTrue(showPipeResult.get(0).pipeExtractor.contains("source=iotdb-source"));
+      
Assert.assertTrue(showPipeResult.get(0).pipeExtractor.contains("source.pattern=root.test1"));
+      Assert.assertTrue(
+          
showPipeResult.get(0).pipeExtractor.contains("source.realtime.mode=stream"));
+      Assert.assertTrue(
+          
showPipeResult.get(0).pipeProcessor.contains("processor=do-nothing-processor"));
+      Assert.assertTrue(
+          showPipeResult
+              .get(0)
+              .pipeConnector
+              .contains(String.format("node-urls=%s", 
receiverDataNode.getIpAndPortString())));
+      
Assert.assertFalse(showPipeResult.get(0).pipeExtractor.contains("source.path=root.test2.**"));
+      Assert.assertEquals(creationTime, showPipeResult.get(0).creationTime);
+    }
+  }
+
+  @Test
+  public void testBasicDropPipeIfExists() throws Exception {
+    final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+    // Create pipe
+    final String sql =
+        String.format(
+            "create pipe If Not Exists a2b with source 
('source'='iotdb-source', 'source.path'='root.test1.**') with processor 
('processor'='do-nothing-processor') with sink ('node-urls'='%s')",
+            receiverDataNode.getIpAndPortString());
+    try (final Connection connection = senderEnv.getConnection();
+        final Statement statement = connection.createStatement()) {
+      statement.execute(sql);
+    } catch (SQLException e) {
+      fail(e.getMessage());
+    }
+
+    // Drop pipe If Exists
+    try (final Connection connection = senderEnv.getConnection();
+        final Statement statement = connection.createStatement()) {
+      statement.execute("Drop pipe If Exists a2b");
+    } catch (SQLException e) {
+      fail(e.getMessage());
+    }
+
+    // show pipe
+    try (final SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
+      final List<TShowPipeInfo> showPipeResult = client.showPipe(new 
TShowPipeReq()).pipeInfoList;
+      Assert.assertEquals(0, showPipeResult.size());
+    }
+
+    // Drop pipe If Exists
+    try (final Connection connection = senderEnv.getConnection();
+        final Statement statement = connection.createStatement()) {
+      statement.execute("Drop pipe If Exists a2b");
+    } catch (SQLException e) {
+      fail(e.getMessage());
+    }
+  }
+
+  public void testBasicAlterPipeIfExists() throws Exception {
+    final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+    // Alter pipe If Exists
+    String sql =
+        String.format(
+            "Alter pipe If Exists a2b replace sink ('node-urls'='%s')",
+            receiverDataNode.getIpAndPortString());
+    try (final Connection connection = senderEnv.getConnection();
+        final Statement statement = connection.createStatement()) {
+      statement.execute(sql);
+    } catch (SQLException e) {
+      fail(e.getMessage());
+    }
+
+    // show pipe
+    try (final SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
+      final List<TShowPipeInfo> showPipeResult = client.showPipe(new 
TShowPipeReq()).pipeInfoList;
+      Assert.assertEquals(0, showPipeResult.size());
+    }
+
+    // Create pipe
+    sql =
+        String.format(
+            "create pipe If Not Exists a2b with source 
('source'='iotdb-source', 'source.pattern'='root.test1', 
'source.realtime.mode'='stream') with processor 
('processor'='do-nothing-processor') with sink ('node-urls'='%s')",
+            receiverDataNode.getIpAndPortString());
+    try (final Connection connection = senderEnv.getConnection();
+        final Statement statement = connection.createStatement()) {
+      statement.execute(sql);
+    } catch (SQLException e) {
+      fail(e.getMessage());
+    }
+
+    // Alter pipe If Exists
+    sql =
+        String.format(
+            "Alter pipe If Exists a2b replace source () replace processor () 
replace sink ('node-urls'='%s')",
+            receiverDataNode.getIpAndPortString());
+    try (final Connection connection = senderEnv.getConnection();
+        final Statement statement = connection.createStatement()) {
+      statement.execute(sql);
+    } catch (SQLException e) {
+      fail(e.getMessage());
+    }
+
+    // show pipe
+    try (final SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
+      final List<TShowPipeInfo> showPipeResult = client.showPipe(new 
TShowPipeReq()).pipeInfoList;
+      Assert.assertEquals(1, showPipeResult.size());
+      // Check status
+      Assert.assertEquals("RUNNING", showPipeResult.get(0).state);
+      // Check configurations
+      
Assert.assertFalse(showPipeResult.get(0).pipeExtractor.contains("source=iotdb-source"));
+      
Assert.assertFalse(showPipeResult.get(0).pipeExtractor.contains("source.pattern=root.test1"));
+      Assert.assertFalse(
+          
showPipeResult.get(0).pipeExtractor.contains("source.realtime.mode=stream"));
+      Assert.assertFalse(
+          
showPipeResult.get(0).pipeProcessor.contains("processor=do-nothing-processor"));
+      Assert.assertTrue(
+          showPipeResult
+              .get(0)
+              .pipeConnector
+              .contains(String.format("node-urls=%s", 
receiverDataNode.getIpAndPortString())));
+    }
+  }
+}
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index abbe1425a22..0f7ab2f3f3d 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -263,6 +263,7 @@ public enum TSStatusCode {
   ALTER_TOPIC_ERROR(2002),
   SHOW_TOPIC_ERROR(2003),
   TOPIC_PUSH_META_ERROR(2004),
+  TOPIC_NOT_EXIST_ERROR(2005),
 
   // Consumer
   CREATE_CONSUMER_ERROR(2100),
diff --git 
a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 
b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
index e66c875c9a7..8986fce18e1 100644
--- 
a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
+++ 
b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
@@ -535,7 +535,7 @@ verifyConnection
 
 // Pipe Task 
=========================================================================================
 createPipe
-    : CREATE PIPE pipeName=identifier
+    : CREATE PIPE  (IF NOT EXISTS)? pipeName=identifier
         extractorAttributesClause?
         processorAttributesClause?
         connectorAttributesClause
@@ -575,7 +575,7 @@ connectorAttributeClause
     ;
 
 alterPipe
-    : ALTER PIPE pipeName=identifier
+    : ALTER PIPE (IF EXISTS)? pipeName=identifier
         alterExtractorAttributesClause?
         alterProcessorAttributesClause?
         alterConnectorAttributesClause?
@@ -603,7 +603,7 @@ alterConnectorAttributesClause
     ;
 
 dropPipe
-    : DROP PIPE pipeName=identifier
+    : DROP PIPE (IF EXISTS)? pipeName=identifier
     ;
 
 startPipe
@@ -620,11 +620,11 @@ showPipes
 
 // Pipe Plugin 
=========================================================================================
 createPipePlugin
-    : CREATE PIPEPLUGIN pluginName=identifier AS className=STRING_LITERAL 
uriClause
+    : CREATE PIPEPLUGIN (IF NOT EXISTS)? pluginName=identifier AS 
className=STRING_LITERAL uriClause
     ;
 
 dropPipePlugin
-    : DROP PIPEPLUGIN pluginName=identifier
+    : DROP PIPEPLUGIN (IF EXISTS)? pluginName=identifier
     ;
 
 showPipePlugins
@@ -633,7 +633,7 @@ showPipePlugins
 
 // Topic 
=========================================================================================
 createTopic
-    : CREATE TOPIC topicName=identifier topicAttributesClause?
+    : CREATE TOPIC (IF NOT EXISTS)? topicName=identifier topicAttributesClause?
     ;
 
 topicAttributesClause
@@ -645,7 +645,7 @@ topicAttributeClause
     ;
 
 dropTopic
-    : DROP TOPIC topicName=identifier
+    : DROP TOPIC (IF EXISTS)? topicName=identifier
     ;
 
 showTopics
diff --git 
a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 
b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
index d1865a1e204..dc9c12c7c8c 100644
--- a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
+++ b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
@@ -286,6 +286,10 @@ EVERY
     : E V E R Y
     ;
 
+EXISTS
+    : E X I S T S
+    ;
+
 EXPLAIN
     : E X P L A I N
     ;
@@ -942,10 +946,15 @@ ELSE
     : E L S E
     ;
 
+IF
+    : I F
+    ;
+
 INF
     : I N F
     ;
 
+
 // Privileges Keywords
 
 PRIVILEGE_VALUE
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 4a7a6144b5d..68e605976b8 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -150,6 +150,9 @@ import 
org.apache.iotdb.confignode.rpc.thrift.TDeleteDatabasesReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDeleteLogicalViewReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDeleteTimeSeriesReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropCQReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDropPipePluginReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDropPipeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDropTopicReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
 import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetAllSubscriptionInfoResp;
@@ -1495,10 +1498,10 @@ public class ConfigManager implements IManager {
   }
 
   @Override
-  public TSStatus dropPipePlugin(String pipePluginName) {
+  public TSStatus dropPipePlugin(TDropPipePluginReq req) {
     TSStatus status = confirmLeader();
     return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
-        ? pipeManager.getPipePluginCoordinator().dropPipePlugin(pipePluginName)
+        ? pipeManager.getPipePluginCoordinator().dropPipePlugin(req)
         : status;
   }
 
@@ -2080,10 +2083,10 @@ public class ConfigManager implements IManager {
   }
 
   @Override
-  public TSStatus dropPipe(String pipeName) {
+  public TSStatus dropPipe(TDropPipeReq req) {
     TSStatus status = confirmLeader();
     return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
-        ? pipeManager.getPipeTaskCoordinator().dropPipe(pipeName)
+        ? pipeManager.getPipeTaskCoordinator().dropPipe(req)
         : status;
   }
 
@@ -2112,10 +2115,10 @@ public class ConfigManager implements IManager {
   }
 
   @Override
-  public TSStatus dropTopic(String topicName) {
+  public TSStatus dropTopic(TDropTopicReq req) {
     TSStatus status = confirmLeader();
     return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
-        ? subscriptionManager.getSubscriptionCoordinator().dropTopic(topicName)
+        ? subscriptionManager.getSubscriptionCoordinator().dropTopic(req)
         : status;
   }
 
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index 5e9dd2f57c2..5f3d66044b5 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@ -79,6 +79,9 @@ import 
org.apache.iotdb.confignode.rpc.thrift.TDeleteDatabasesReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDeleteLogicalViewReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDeleteTimeSeriesReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropCQReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDropPipePluginReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDropPipeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDropTopicReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
 import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetAllSubscriptionInfoResp;
@@ -484,7 +487,7 @@ public interface IManager {
   TSStatus createPipePlugin(TCreatePipePluginReq req);
 
   /** Drop pipe plugin. */
-  TSStatus dropPipePlugin(String pluginName);
+  TSStatus dropPipePlugin(TDropPipePluginReq req);
 
   /** Show pipe plugins. */
   TGetPipePluginTableResp getPipePluginTable();
@@ -651,11 +654,11 @@ public interface IManager {
   /**
    * Drop Pipe.
    *
-   * @param pipeName name of Pipe
+   * @param req Info about Pipe
    * @return {@link TSStatusCode#SUCCESS_STATUS} if dropped the pipe 
successfully, {@link
    *     TSStatusCode#PIPE_ERROR} if encountered failure.
    */
-  TSStatus dropPipe(String pipeName);
+  TSStatus dropPipe(TDropPipeReq req);
 
   /**
    * Get Pipe by name. If pipeName is empty, get all Pipe.
@@ -690,7 +693,7 @@ public interface IManager {
   TSStatus createTopic(TCreateTopicReq topic);
 
   /** Drop Topic. */
-  TSStatus dropTopic(String topicName);
+  TSStatus dropTopic(TDropTopicReq req);
 
   /** Show Topic. */
   TShowTopicResp showTopic(TShowTopicReq req);
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index 9f0f7765f87..71f363facc0 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -112,6 +112,7 @@ import 
org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
 import org.apache.iotdb.confignode.rpc.thrift.TCreateTopicReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
 import org.apache.iotdb.confignode.rpc.thrift.TDeleteLogicalViewReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDropPipePluginReq;
 import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TSubscribeReq;
 import org.apache.iotdb.confignode.rpc.thrift.TUnsubscribeReq;
@@ -856,9 +857,10 @@ public class ProcedureManager {
     return statusList.get(0);
   }
 
-  public TSStatus createPipePlugin(PipePluginMeta pipePluginMeta, byte[] 
jarFile) {
+  public TSStatus createPipePlugin(
+      PipePluginMeta pipePluginMeta, byte[] jarFile, boolean 
isSetIfNotExistsCondition) {
     final CreatePipePluginProcedure createPipePluginProcedure =
-        new CreatePipePluginProcedure(pipePluginMeta, jarFile);
+        new CreatePipePluginProcedure(pipePluginMeta, jarFile, 
isSetIfNotExistsCondition);
     try {
       if (jarFile != null
           && new 
UpdateProcedurePlan(createPipePluginProcedure).getSerializedSize()
@@ -886,8 +888,11 @@ public class ProcedureManager {
     }
   }
 
-  public TSStatus dropPipePlugin(String pluginName) {
-    final long procedureId = executor.submitProcedure(new 
DropPipePluginProcedure(pluginName));
+  public TSStatus dropPipePlugin(TDropPipePluginReq req) {
+    final long procedureId =
+        executor.submitProcedure(
+            new DropPipePluginProcedure(
+                req.getPluginName(), req.isSetIfExistsCondition() && 
req.isIfExistsCondition()));
     final List<TSStatus> statusList = new ArrayList<>();
     final boolean isSucceed =
         waitingProcedureFinished(Collections.singletonList(procedureId), 
statusList);
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/plugin/PipePluginCoordinator.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/plugin/PipePluginCoordinator.java
index 5c0ad6492e6..07259f842fb 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/plugin/PipePluginCoordinator.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/plugin/PipePluginCoordinator.java
@@ -28,6 +28,7 @@ import 
org.apache.iotdb.confignode.consensus.response.pipe.plugin.PipePluginTabl
 import org.apache.iotdb.confignode.manager.ConfigManager;
 import org.apache.iotdb.confignode.persistence.pipe.PipePluginInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TCreatePipePluginReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDropPipePluginReq;
 import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListReq;
 import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetPipePluginTableResp;
@@ -72,11 +73,16 @@ public class PipePluginCoordinator {
     final PipePluginMeta pipePluginMeta =
         new PipePluginMeta(pluginName, className, false, jarName, jarMD5);
 
-    return 
configManager.getProcedureManager().createPipePlugin(pipePluginMeta, 
req.getJarFile());
+    return configManager
+        .getProcedureManager()
+        .createPipePlugin(
+            pipePluginMeta,
+            req.getJarFile(),
+            req.isSetIfNotExistsCondition() && req.isIfNotExistsCondition());
   }
 
-  public TSStatus dropPipePlugin(String pluginName) {
-    return configManager.getProcedureManager().dropPipePlugin(pluginName);
+  public TSStatus dropPipePlugin(TDropPipePluginReq req) {
+    return configManager.getProcedureManager().dropPipePlugin(req);
   }
 
   public TGetPipePluginTableResp getPipePluginTable() {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
index b07203e43ed..100da334d35 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.confignode.manager.ConfigManager;
 import org.apache.iotdb.confignode.persistence.pipe.PipeTaskInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TAlterPipeReq;
 import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDropPipeReq;
 import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
 import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
 import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp;
@@ -160,13 +161,19 @@ public class PipeTaskCoordinator {
   }
 
   /** Caller should ensure that the method is called in the lock {@link 
#lock()}. */
-  public TSStatus dropPipe(String pipeName) {
+  public TSStatus dropPipe(TDropPipeReq req) {
+    final String pipeName = req.getPipeName();
     final boolean isPipeExistedBeforeDrop = 
pipeTaskInfo.isPipeExisted(pipeName);
     final TSStatus status = 
configManager.getProcedureManager().dropPipe(pipeName);
     if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       LOGGER.warn("Failed to drop pipe {}. Result status: {}.", pipeName, 
status);
     }
-    return isPipeExistedBeforeDrop
+
+    final boolean isSetIfExistsCondition =
+        req.isSetIfExistsCondition() && req.isIfExistsCondition();
+    // If the `IF EXISTS` condition is not set and the pipe does not exist 
before the delete
+    // operation, return an error status indicating that the pipe does not 
exist.
+    return isPipeExistedBeforeDrop || isSetIfExistsCondition
         ? status
         : RpcUtils.getStatus(
             TSStatusCode.PIPE_NOT_EXIST_ERROR,
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java
index c48f3e0f078..20dfce44bf2 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java
@@ -30,6 +30,7 @@ import 
org.apache.iotdb.confignode.persistence.subscription.SubscriptionInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TCloseConsumerReq;
 import org.apache.iotdb.confignode.rpc.thrift.TCreateConsumerReq;
 import org.apache.iotdb.confignode.rpc.thrift.TCreateTopicReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDropTopicReq;
 import org.apache.iotdb.confignode.rpc.thrift.TGetAllSubscriptionInfoResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetAllTopicInfoResp;
 import org.apache.iotdb.confignode.rpc.thrift.TShowSubscriptionReq;
@@ -38,6 +39,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TShowTopicReq;
 import org.apache.iotdb.confignode.rpc.thrift.TShowTopicResp;
 import org.apache.iotdb.confignode.rpc.thrift.TSubscribeReq;
 import org.apache.iotdb.confignode.rpc.thrift.TUnsubscribeReq;
+import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.slf4j.Logger;
@@ -149,12 +151,24 @@ public class SubscriptionCoordinator {
     return status;
   }
 
-  public TSStatus dropTopic(String topicName) {
+  public TSStatus dropTopic(TDropTopicReq req) {
+    final String topicName = req.getTopicName();
+    final boolean isTopicExistedBeforeDrop = 
subscriptionInfo.isTopicExisted(topicName);
     final TSStatus status = 
configManager.getProcedureManager().dropTopic(topicName);
     if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       LOGGER.warn("Failed to drop topic {}. Result status: {}.", topicName, 
status);
     }
-    return status;
+
+    // If the `IF EXISTS` condition is not set and the topic does not exist 
before the drop
+    // operation, return an error status indicating that the topic does not 
exist.
+    final boolean isIfExistedConditionSet =
+        req.isSetIfExistsCondition() && req.isIfExistsCondition();
+    return isTopicExistedBeforeDrop || isIfExistedConditionSet
+        ? status
+        : RpcUtils.getStatus(
+            TSStatusCode.TOPIC_NOT_EXIST_ERROR,
+            String.format(
+                "Failed to drop topic %s. Failures: %s does not exist.", 
topicName, topicName));
   }
 
   public TShowTopicResp showTopic(TShowTopicReq req) {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
index 1e4a81cfe44..64d235e7f23 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
@@ -93,19 +93,38 @@ public class PipePluginInfo implements SnapshotProcessor {
 
   /////////////////////////////// Validator ///////////////////////////////
 
-  public void validateBeforeCreatingPipePlugin(
-      final String pluginName, final String jarName, final String jarMD5) {
+  /**
+   * @return true if the pipe plugin is already created and the 
isSetIfNotExistsCondition is true,
+   *     false otherwise
+   * @throws PipeException if the pipe plugin is already created and the 
isSetIfNotExistsCondition
+   *     is false
+   */
+  public boolean validateBeforeCreatingPipePlugin(
+      final String pluginName, final boolean isSetIfNotExistsCondition) {
     // both build-in and user defined pipe plugin should be unique
     if (pipePluginMetaKeeper.containsPipePlugin(pluginName)) {
+      if (isSetIfNotExistsCondition) {
+        return true;
+      }
       throw new PipeException(
           String.format(
               "Failed to create PipePlugin [%s], the same name PipePlugin has 
been created",
               pluginName));
     }
+    return false;
   }
 
-  public void validateBeforeDroppingPipePlugin(final String pluginName) {
+  /**
+   * @return true if the pipe plugin is not created and the 
isSetIfExistsCondition is true, false
+   *     otherwise
+   * @throws PipeException if the pipe plugin is not created and the 
isSetIfExistsCondition is false
+   */
+  public boolean validateBeforeDroppingPipePlugin(
+      final String pluginName, final boolean isSetIfExistsCondition) {
     if (!pipePluginMetaKeeper.containsPipePlugin(pluginName)) {
+      if (isSetIfExistsCondition) {
+        return true;
+      }
       throw new PipeException(
           String.format(
               "Failed to drop PipePlugin [%s], this PipePlugin has not been 
created", pluginName));
@@ -116,6 +135,7 @@ public class PipePluginInfo implements SnapshotProcessor {
               "Failed to drop PipePlugin [%s], the PipePlugin is a built-in 
PipePlugin",
               pluginName));
     }
+    return false;
   }
 
   public boolean isJarNeededToBeSavedWhenCreatingPipePlugin(final String 
jarName) {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index c5a264c047a..ddc6ae6ac60 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -155,19 +155,25 @@ public class PipeTaskInfo implements SnapshotProcessor {
 
   /////////////////////////////// Validator ///////////////////////////////
 
-  public void checkBeforeCreatePipe(final TCreatePipeReq createPipeRequest) 
throws PipeException {
+  public boolean checkBeforeCreatePipe(final TCreatePipeReq createPipeRequest)
+      throws PipeException {
     acquireReadLock();
     try {
-      checkBeforeCreatePipeInternal(createPipeRequest);
+      return checkBeforeCreatePipeInternal(createPipeRequest);
     } finally {
       releaseReadLock();
     }
   }
 
-  private void checkBeforeCreatePipeInternal(final TCreatePipeReq 
createPipeRequest)
+  private boolean checkBeforeCreatePipeInternal(final TCreatePipeReq 
createPipeRequest)
       throws PipeException {
     if (!isPipeExisted(createPipeRequest.getPipeName())) {
-      return;
+      return true;
+    }
+
+    if (createPipeRequest.isSetIfNotExistsCondition()
+        && createPipeRequest.isIfNotExistsCondition()) {
+      return false;
     }
 
     final String exceptionMessage =
@@ -178,19 +184,23 @@ public class PipeTaskInfo implements SnapshotProcessor {
     throw new PipeException(exceptionMessage);
   }
 
-  public void checkAndUpdateRequestBeforeAlterPipe(final TAlterPipeReq 
alterPipeRequest)
+  public boolean checkAndUpdateRequestBeforeAlterPipe(final TAlterPipeReq 
alterPipeRequest)
       throws PipeException {
     acquireReadLock();
     try {
-      checkAndUpdateRequestBeforeAlterPipeInternal(alterPipeRequest);
+      return checkAndUpdateRequestBeforeAlterPipeInternal(alterPipeRequest);
     } finally {
       releaseReadLock();
     }
   }
 
-  private void checkAndUpdateRequestBeforeAlterPipeInternal(final 
TAlterPipeReq alterPipeRequest)
+  private boolean checkAndUpdateRequestBeforeAlterPipeInternal(final 
TAlterPipeReq alterPipeRequest)
       throws PipeException {
     if (!isPipeExisted(alterPipeRequest.getPipeName())) {
+      if (alterPipeRequest.isSetIfExistsCondition() && 
alterPipeRequest.isIfExistsCondition()) {
+        return false;
+      }
+
       final String exceptionMessage =
           String.format(
               "Failed to alter pipe %s, the pipe does not exist", 
alterPipeRequest.getPipeName());
@@ -254,6 +264,8 @@ public class PipeTaskInfo implements SnapshotProcessor {
                 .getAttribute());
       }
     }
+
+    return true;
   }
 
   public void checkBeforeStartPipe(final String pipeName) throws PipeException 
{
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfo.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfo.java
index b020ffc045d..6b64331422b 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfo.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfo.java
@@ -142,20 +142,24 @@ public class SubscriptionInfo implements 
SnapshotProcessor {
 
   /////////////////////////////// Topic ///////////////////////////////
 
-  public void validateBeforeCreatingTopic(TCreateTopicReq createTopicReq)
+  public boolean validateBeforeCreatingTopic(TCreateTopicReq createTopicReq)
       throws SubscriptionException {
     acquireReadLock();
     try {
-      checkBeforeCreateTopicInternal(createTopicReq);
+      return checkBeforeCreateTopicInternal(createTopicReq);
     } finally {
       releaseReadLock();
     }
   }
 
-  private void checkBeforeCreateTopicInternal(TCreateTopicReq createTopicReq)
+  private boolean checkBeforeCreateTopicInternal(TCreateTopicReq 
createTopicReq)
       throws SubscriptionException {
     if (!isTopicExisted(createTopicReq.getTopicName())) {
-      return;
+      return true;
+    }
+
+    if (createTopicReq.isSetIfNotExistsCondition() && 
createTopicReq.isIfNotExistsCondition()) {
+      return false;
     }
 
     final String exceptionMessage =
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
index 8decdd0b4da..325832a3869 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
@@ -185,7 +185,7 @@ public abstract class AbstractOperatePipeProcedureV2
   /**
    * Execute at state {@link OperatePipeTaskState#VALIDATE_TASK}.
    *
-   * @return true if this procedure can skip subsequent stages (start RUNNING 
pipe or stop STOPPED
+   * @return false if this procedure can skip subsequent stages (start RUNNING 
pipe or stop STOPPED
    *     pipe without runtime exception)
    * @throws PipeException if validation for pipe parameters failed
    */
@@ -224,8 +224,8 @@ public abstract class AbstractOperatePipeProcedureV2
     try {
       switch (state) {
         case VALIDATE_TASK:
-          if (executeFromValidateTask(env)) {
-            LOGGER.warn("ProcedureId {}: {}", getProcId(), 
SKIP_PIPE_PROCEDURE_MESSAGE);
+          if (!executeFromValidateTask(env)) {
+            LOGGER.info("ProcedureId {}: {}", getProcId(), 
SKIP_PIPE_PROCEDURE_MESSAGE);
             // On client side, the message returned after the successful 
execution of the pipe
             // command corresponding to this procedure is "Msg: The statement 
is executed
             // successfully."
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java
index 53fc302c663..fd485079890 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java
@@ -66,14 +66,21 @@ public class CreatePipePluginProcedure extends 
AbstractNodeProcedure<CreatePipeP
   private PipePluginMeta pipePluginMeta;
   private byte[] jarFile;
 
+  // This field will not be serialized. It may cause some problems
+  // when the procedure fails on one node and recovers on another node.
+  // Though it is not a good practice, it is acceptable here.
+  private boolean isSetIfNotExistsCondition;
+
   public CreatePipePluginProcedure() {
     super();
   }
 
-  public CreatePipePluginProcedure(PipePluginMeta pipePluginMeta, byte[] 
jarFile) {
+  public CreatePipePluginProcedure(
+      PipePluginMeta pipePluginMeta, byte[] jarFile, boolean 
isSetIfNotExistsCondition) {
     super();
     this.pipePluginMeta = pipePluginMeta;
     this.jarFile = jarFile;
+    this.isSetIfNotExistsCondition = isSetIfNotExistsCondition;
   }
 
   @Override
@@ -125,20 +132,25 @@ public class CreatePipePluginProcedure extends 
AbstractNodeProcedure<CreatePipeP
         env.getConfigManager().getPipeManager().getPipePluginCoordinator();
 
     pipePluginCoordinator.lock();
+    final String pluginName = pipePluginMeta.getPluginName();
 
     try {
-      pipePluginCoordinator
+      if (pipePluginCoordinator
           .getPipePluginInfo()
-          .validateBeforeCreatingPipePlugin(
-              pipePluginMeta.getPluginName(),
-              pipePluginMeta.getJarName(),
-              pipePluginMeta.getJarMD5());
+          .validateBeforeCreatingPipePlugin(pluginName, 
isSetIfNotExistsCondition)) {
+        LOGGER.info(
+            "Pipe plugin {} is already created and isSetIfNotExistsCondition 
is true, end the CreatePipePluginProcedure({})",
+            pluginName,
+            pluginName);
+        pipePluginCoordinator.unlock();
+        return Flow.NO_MORE_STATE;
+      }
     } catch (PipeException e) {
       // The pipe plugin has already created, we should end the procedure
       LOGGER.warn(
           "Pipe plugin {} is already created, end the 
CreatePipePluginProcedure({})",
-          pipePluginMeta.getPluginName(),
-          pipePluginMeta.getPluginName());
+          pluginName,
+          pluginName);
       setFailure(new ProcedureException(e.getMessage()));
       pipePluginCoordinator.unlock();
       return Flow.NO_MORE_STATE;
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
index 2502a56ab47..dc9d4ce4f87 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
@@ -62,13 +62,20 @@ public class DropPipePluginProcedure extends 
AbstractNodeProcedure<DropPipePlugi
 
   private String pluginName;
 
+  // If the plugin does not exist and the If Exists condition is met, the 
process ends.
+  // This field will not be serialized. It may cause some problems
+  // when the procedure fails on one node and recovers on another node.
+  // Though it is not a good practice, it is acceptable here.
+  private boolean isSetIfExistsCondition;
+
   public DropPipePluginProcedure() {
     super();
   }
 
-  public DropPipePluginProcedure(String pluginName) {
+  public DropPipePluginProcedure(String pluginName, boolean 
isSetIfExistsCondition) {
     super();
     this.pluginName = pluginName;
+    this.isSetIfExistsCondition = isSetIfExistsCondition;
   }
 
   @Override
@@ -117,7 +124,18 @@ public class DropPipePluginProcedure extends 
AbstractNodeProcedure<DropPipePlugi
     pipePluginCoordinator.lock();
 
     try {
-      
pipePluginCoordinator.getPipePluginInfo().validateBeforeDroppingPipePlugin(pluginName);
+      if (pipePluginCoordinator
+          .getPipePluginInfo()
+          .validateBeforeDroppingPipePlugin(pluginName, 
isSetIfExistsCondition)) {
+        LOGGER.info(
+            "Pipe plugin {} is not exist, end the DropPipePluginProcedure({})",
+            pluginName,
+            pluginName);
+        pipePluginCoordinator.unlock();
+        pipeTaskCoordinator.unlock();
+        return Flow.NO_MORE_STATE;
+      }
+
       pipeTaskInfo.get().validatePipePluginUsageByPipe(pluginName);
     } catch (PipeException e) {
       // if the pipe plugin is a built-in plugin, we should not drop it
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
index 0803aa29ca8..f18737e8b7a 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
@@ -71,7 +71,7 @@ public class PipeHandleLeaderChangeProcedure extends 
AbstractOperatePipeProcedur
     LOGGER.info("PipeHandleLeaderChangeProcedure: executeFromValidateTask");
 
     // Nothing needs to be checked
-    return false;
+    return true;
   }
 
   @Override
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
index 0919894f0d5..e91a6ba974d 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
@@ -81,7 +81,7 @@ public class PipeHandleMetaChangeProcedure extends 
AbstractOperatePipeProcedureV
     LOGGER.info("PipeHandleMetaChangeProcedure: executeFromValidateTask");
 
     // Do nothing
-    return false;
+    return true;
   }
 
   @Override
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
index 8b1991aeb88..c4907d1f9e7 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
@@ -94,7 +94,7 @@ public class PipeMetaSyncProcedure extends 
AbstractOperatePipeProcedureV2 {
     LOGGER.info("PipeMetaSyncProcedure: executeFromValidateTask");
 
     LAST_EXECUTION_TIME.set(System.currentTimeMillis());
-    return false;
+    return true;
   }
 
   @Override
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
index bfd35a3bc96..59a04f9a806 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
@@ -96,7 +96,9 @@ public class AlterPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
 
     // We should execute checkBeforeAlterPipe before checking the pipe plugin. 
This method will
     // update the alterPipeRequest based on the alterPipeRequest and existing 
pipe metadata.
-    pipeTaskInfo.get().checkAndUpdateRequestBeforeAlterPipe(alterPipeRequest);
+    if 
(!pipeTaskInfo.get().checkAndUpdateRequestBeforeAlterPipe(alterPipeRequest)) {
+      return false;
+    }
 
     final PipeManager pipeManager = env.getConfigManager().getPipeManager();
     pipeManager
@@ -107,7 +109,7 @@ public class AlterPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
             alterPipeRequest.getProcessorAttributes(),
             alterPipeRequest.getConnectorAttributes());
 
-    return false;
+    return true;
   }
 
   @Override
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
index 68075b5911d..cd581d01332 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
@@ -42,6 +42,7 @@ import 
org.apache.iotdb.confignode.procedure.impl.pipe.PipeTaskOperation;
 import org.apache.iotdb.confignode.procedure.store.ProcedureType;
 import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
 import org.apache.iotdb.consensus.exception.ConsensusException;
+import org.apache.iotdb.pipe.api.PipePlugin;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.rpc.TSStatusCode;
 
@@ -110,6 +111,17 @@ public class CreatePipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
     return PipeTaskOperation.CREATE_PIPE;
   }
 
+  /**
+   * Check the {@link PipePlugin} configuration in Pipe. If there is an error, 
throw {@link
+   * PipeException}. If there is a Pipe with the same name and there is no 
IfNotExists condition in
+   * {@link #createPipeRequest}, throw {@link PipeException}. If there is an 
IfNotExists condition,
+   * return {@code false}. If there is no Pipe with the same name, return 
{@code true}.
+   *
+   * @param env the environment for the procedure
+   * @return {@code true} The pipeline does not exist {@code false} The 
pipeline already exists and
+   *     satisfies the IfNotExists condition
+   * @throws PipeException
+   */
   @Override
   public boolean executeFromValidateTask(ConfigNodeProcedureEnv env) throws 
PipeException {
     LOGGER.info(
@@ -123,9 +135,8 @@ public class CreatePipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
             createPipeRequest.getExtractorAttributes(),
             createPipeRequest.getProcessorAttributes(),
             createPipeRequest.getConnectorAttributes());
-    pipeTaskInfo.get().checkBeforeCreatePipe(createPipeRequest);
 
-    return false;
+    return pipeTaskInfo.get().checkBeforeCreatePipe(createPipeRequest);
   }
 
   @Override
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java
index 60d851a66e3..0c7042caf3f 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java
@@ -78,7 +78,7 @@ public class DropPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
 
     pipeTaskInfo.get().checkBeforeDropPipe(pipeName);
 
-    return false;
+    return true;
   }
 
   @Override
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
index 628935a2116..58254dc1ab0 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
@@ -65,8 +65,8 @@ public class StartPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
 
     pipeTaskInfo.get().checkBeforeStartPipe(pipeName);
 
-    return pipeTaskInfo.get().isPipeRunning(pipeName)
-        && !pipeTaskInfo.get().isStoppedByRuntimeException(pipeName);
+    return !pipeTaskInfo.get().isPipeRunning(pipeName)
+        || pipeTaskInfo.get().isStoppedByRuntimeException(pipeName);
   }
 
   @Override
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
index 81a70733363..a817e16c7ae 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
@@ -65,7 +65,7 @@ public class StopPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
 
     pipeTaskInfo.get().checkBeforeStopPipe(pipeName);
 
-    return pipeTaskInfo.get().isPipeStoppedByUser(pipeName);
+    return !pipeTaskInfo.get().isPipeStoppedByUser(pipeName);
   }
 
   @Override
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/AbstractOperateSubscriptionProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/AbstractOperateSubscriptionProcedure.java
index 7b0ca8a3d25..4566738cd8d 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/AbstractOperateSubscriptionProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/AbstractOperateSubscriptionProcedure.java
@@ -41,6 +41,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -53,6 +54,9 @@ public abstract class AbstractOperateSubscriptionProcedure
   private static final Logger LOGGER =
       LoggerFactory.getLogger(AbstractOperateSubscriptionProcedure.class);
 
+  private static final String SKIP_SUBSCRIPTION_PROCEDURE_MESSAGE =
+      "Skip subscription-related operations and do nothing";
+
   private static final int RETRY_THRESHOLD = 1;
 
   protected AtomicReference<SubscriptionInfo> subscriptionInfo;
@@ -156,7 +160,7 @@ public abstract class AbstractOperateSubscriptionProcedure
 
   protected abstract SubscriptionOperation getOperation();
 
-  protected abstract void executeFromValidate(ConfigNodeProcedureEnv env)
+  protected abstract boolean executeFromValidate(ConfigNodeProcedureEnv env)
       throws SubscriptionException;
 
   protected abstract void 
executeFromOperateOnConfigNodes(ConfigNodeProcedureEnv env)
@@ -179,7 +183,14 @@ public abstract class AbstractOperateSubscriptionProcedure
     try {
       switch (state) {
         case VALIDATE:
-          executeFromValidate(env);
+          if (!executeFromValidate(env)) {
+            LOGGER.info("ProcedureId {}: {}", getProcId(), 
SKIP_SUBSCRIPTION_PROCEDURE_MESSAGE);
+            // On client side, the message returned after the successful 
execution of the
+            // subscription command corresponding to this procedure is "Msg: 
The statement is
+            // executed successfully."
+            
this.setResult(SKIP_SUBSCRIPTION_PROCEDURE_MESSAGE.getBytes(StandardCharsets.UTF_8));
+            return Flow.NO_MORE_STATE;
+          }
           setNextState(OperateSubscriptionState.OPERATE_ON_CONFIG_NODES);
           break;
         case OPERATE_ON_CONFIG_NODES:
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/consumer/AlterConsumerGroupProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/consumer/AlterConsumerGroupProcedure.java
index 981e0513d04..6e6031b8102 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/consumer/AlterConsumerGroupProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/consumer/AlterConsumerGroupProcedure.java
@@ -75,10 +75,11 @@ public class AlterConsumerGroupProcedure extends 
AbstractOperateSubscriptionProc
   }
 
   @Override
-  public void executeFromValidate(ConfigNodeProcedureEnv env) throws 
SubscriptionException {
+  public boolean executeFromValidate(ConfigNodeProcedureEnv env) throws 
SubscriptionException {
     LOGGER.info("AlterConsumerGroupProcedure: executeFromValidate, try to 
validate");
 
     validateAndGetOldAndNewMeta(env);
+    return true;
   }
 
   protected void validateAndGetOldAndNewMeta(ConfigNodeProcedureEnv env) {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/consumer/runtime/ConsumerGroupMetaSyncProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/consumer/runtime/ConsumerGroupMetaSyncProcedure.java
index 6e3a5387479..bb010eaca40 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/consumer/runtime/ConsumerGroupMetaSyncProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/consumer/runtime/ConsumerGroupMetaSyncProcedure.java
@@ -91,10 +91,11 @@ public class ConsumerGroupMetaSyncProcedure extends 
AbstractOperateSubscriptionP
   }
 
   @Override
-  public void executeFromValidate(ConfigNodeProcedureEnv env) {
+  public boolean executeFromValidate(ConfigNodeProcedureEnv env) {
     LOGGER.info("ConsumerGroupMetaSyncProcedure: executeFromValidate");
 
     LAST_EXECUTION_TIME.set(System.currentTimeMillis());
+    return true;
   }
 
   @Override
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedure.java
index 0894feb62ad..166f7b3da5e 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedure.java
@@ -86,7 +86,7 @@ public class CreateSubscriptionProcedure extends 
AbstractOperateSubscriptionAndP
   }
 
   @Override
-  protected void executeFromValidate(ConfigNodeProcedureEnv env) throws 
SubscriptionException {
+  protected boolean executeFromValidate(ConfigNodeProcedureEnv env) throws 
SubscriptionException {
     LOGGER.info("CreateSubscriptionProcedure: executeFromValidate");
 
     subscriptionInfo.get().validateBeforeSubscribe(subscribeReq);
@@ -131,6 +131,7 @@ public class CreateSubscriptionProcedure extends 
AbstractOperateSubscriptionAndP
       createPipeProcedure.executeFromValidateTask(env);
       createPipeProcedure.executeFromCalculateInfoForTask(env);
     }
+    return true;
   }
 
   // TODO: check periodically if the subscription is still valid but no 
working pipe?
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/DropSubscriptionProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/DropSubscriptionProcedure.java
index 89da284ac0d..7d0c1e09cbd 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/DropSubscriptionProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/DropSubscriptionProcedure.java
@@ -87,7 +87,7 @@ public class DropSubscriptionProcedure extends 
AbstractOperateSubscriptionAndPip
   }
 
   @Override
-  protected void executeFromValidate(final ConfigNodeProcedureEnv env)
+  protected boolean executeFromValidate(final ConfigNodeProcedureEnv env)
       throws SubscriptionException {
     LOGGER.info("DropSubscriptionProcedure: executeFromValidate");
 
@@ -133,6 +133,7 @@ public class DropSubscriptionProcedure extends 
AbstractOperateSubscriptionAndPip
 
     // Validate AlterConsumerGroupProcedure
     alterConsumerGroupProcedure.executeFromValidate(env);
+    return true;
   }
 
   @Override
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/AlterTopicProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/AlterTopicProcedure.java
index 93380ed10be..f8fbdab72e0 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/AlterTopicProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/AlterTopicProcedure.java
@@ -84,12 +84,14 @@ public class AlterTopicProcedure extends 
AbstractOperateSubscriptionProcedure {
   }
 
   @Override
-  public void executeFromValidate(ConfigNodeProcedureEnv env) throws 
SubscriptionException {
+  public boolean executeFromValidate(ConfigNodeProcedureEnv env) throws 
SubscriptionException {
     LOGGER.info("AlterTopicProcedure: executeFromValidate");
 
     subscriptionInfo.get().validateBeforeAlteringTopic(updatedTopicMeta);
 
     existedTopicMeta = 
subscriptionInfo.get().getTopicMeta(updatedTopicMeta.getTopicName());
+
+    return true;
   }
 
   @Override
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/CreateTopicProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/CreateTopicProcedure.java
index ed3d59bd3d4..b712861cb54 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/CreateTopicProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/CreateTopicProcedure.java
@@ -66,11 +66,13 @@ public class CreateTopicProcedure extends 
AbstractOperateSubscriptionProcedure {
   }
 
   @Override
-  protected void executeFromValidate(ConfigNodeProcedureEnv env) throws 
SubscriptionException {
+  protected boolean executeFromValidate(ConfigNodeProcedureEnv env) throws 
SubscriptionException {
     LOGGER.info("CreateTopicProcedure: executeFromValidate");
 
     // 1. check if the topic exists
-    subscriptionInfo.get().validateBeforeCreatingTopic(createTopicReq);
+    if (!subscriptionInfo.get().validateBeforeCreatingTopic(createTopicReq)) {
+      return false;
+    }
 
     // 2. create the topic meta
     topicMeta =
@@ -78,6 +80,7 @@ public class CreateTopicProcedure extends 
AbstractOperateSubscriptionProcedure {
             createTopicReq.getTopicName(),
             System.currentTimeMillis(),
             createTopicReq.getTopicAttributes());
+    return true;
   }
 
   @Override
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/DropTopicProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/DropTopicProcedure.java
index 54c6e3ed469..f1cfbb59d10 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/DropTopicProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/DropTopicProcedure.java
@@ -61,10 +61,11 @@ public class DropTopicProcedure extends 
AbstractOperateSubscriptionProcedure {
   }
 
   @Override
-  protected void executeFromValidate(ConfigNodeProcedureEnv env) throws 
SubscriptionException {
+  protected boolean executeFromValidate(ConfigNodeProcedureEnv env) throws 
SubscriptionException {
     LOGGER.info("DropTopicProcedure: executeFromValidate({})", topicName);
 
     subscriptionInfo.get().validateBeforeDroppingTopic(topicName);
+    return true;
   }
 
   @Override
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/runtime/TopicMetaSyncProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/runtime/TopicMetaSyncProcedure.java
index 3d49a766102..40920e43936 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/runtime/TopicMetaSyncProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/runtime/TopicMetaSyncProcedure.java
@@ -90,10 +90,11 @@ public class TopicMetaSyncProcedure extends 
AbstractOperateSubscriptionProcedure
   }
 
   @Override
-  public void executeFromValidate(ConfigNodeProcedureEnv env) {
+  public boolean executeFromValidate(ConfigNodeProcedureEnv env) {
     LOGGER.info("TopicMetaSyncProcedure: executeFromValidate");
 
     LAST_EXECUTION_TIME.set(System.currentTimeMillis());
+    return true;
   }
 
   @Override
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index fd7db9ce3ca..54c1a55e212 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -117,6 +117,8 @@ import 
org.apache.iotdb.confignode.rpc.thrift.TDeleteTimeSeriesReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropCQReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropFunctionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropPipePluginReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDropPipeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDropTopicReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
 import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetAllSubscriptionInfoResp;
@@ -824,7 +826,7 @@ public class ConfigNodeRPCServiceProcessor implements 
IConfigNodeRPCService.Ifac
 
   @Override
   public TSStatus dropPipePlugin(TDropPipePluginReq req) {
-    return configManager.dropPipePlugin(req.getPluginName());
+    return configManager.dropPipePlugin(req);
   }
 
   @Override
@@ -1054,7 +1056,13 @@ public class ConfigNodeRPCServiceProcessor implements 
IConfigNodeRPCService.Ifac
 
   @Override
   public TSStatus dropPipe(String pipeName) {
-    return configManager.dropPipe(pipeName);
+    return configManager.dropPipe(
+        new TDropPipeReq().setPipeName(pipeName).setIfExistsCondition(false));
+  }
+
+  @Override
+  public TSStatus dropPipeExtended(TDropPipeReq req) {
+    return configManager.dropPipe(req);
   }
 
   @Override
@@ -1084,7 +1092,13 @@ public class ConfigNodeRPCServiceProcessor implements 
IConfigNodeRPCService.Ifac
 
   @Override
   public TSStatus dropTopic(String topicName) {
-    return configManager.dropTopic(topicName);
+    return configManager.dropTopic(
+        new 
TDropTopicReq().setTopicName(topicName).setIfExistsCondition(false));
+  }
+
+  @Override
+  public TSStatus dropTopicExtended(TDropTopicReq req) throws TException {
+    return configManager.dropTopic(req);
   }
 
   @Override
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java
index a697adcdec8..f3571bed57e 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java
@@ -152,7 +152,7 @@ public class PipeInfoTest {
     pipeInfo.getPipePluginInfo().createPipePlugin(createPipePluginPlan);
 
     // Drop pipe plugin test plugin
-    pipeInfo.getPipePluginInfo().validateBeforeDroppingPipePlugin(pluginName);
+    pipeInfo.getPipePluginInfo().validateBeforeDroppingPipePlugin(pluginName, 
false);
     DropPipePluginPlan dropPipePluginPlan = new DropPipePluginPlan(pluginName);
     pipeInfo.getPipePluginInfo().dropPipePlugin(dropPipePluginPlan);
   }
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedureTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedureTest.java
index 8405f2d4675..93b95a3f336 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedureTest.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedureTest.java
@@ -41,7 +41,7 @@ public class CreatePipePluginProcedureTest {
     PipePluginMeta pipePluginMeta =
         new PipePluginMeta("test", "test.class", false, "test.jar", 
"testMD5test");
     CreatePipePluginProcedure proc =
-        new CreatePipePluginProcedure(pipePluginMeta, new byte[] {1, 2, 3});
+        new CreatePipePluginProcedure(pipePluginMeta, new byte[] {1, 2, 3}, 
false);
 
     try {
       proc.serialize(outputStream);
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedureTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedureTest.java
index c29f5bd7fb5..232a5fbee59 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedureTest.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedureTest.java
@@ -36,7 +36,7 @@ public class DropPipePluginProcedureTest {
     PublicBAOS byteArrayOutputStream = new PublicBAOS();
     DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream);
 
-    DropPipePluginProcedure proc = new DropPipePluginProcedure("test");
+    DropPipePluginProcedure proc = new DropPipePluginProcedure("test", false);
 
     try {
       proc.serialize(outputStream);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
index 1fb3e907b05..04057ef3def 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
@@ -85,6 +85,8 @@ import 
org.apache.iotdb.confignode.rpc.thrift.TDeleteTimeSeriesReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropCQReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropFunctionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropPipePluginReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDropPipeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDropTopicReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
 import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetAllSubscriptionInfoResp;
@@ -994,6 +996,12 @@ public class ConfigNodeClient implements 
IConfigNodeRPCService.Iface, ThriftClie
         () -> client.dropPipe(pipeName), status -> 
!updateConfigNodeLeader(status));
   }
 
+  @Override
+  public TSStatus dropPipeExtended(TDropPipeReq req) throws TException {
+    return executeRemoteCallWithRetry(
+        () -> client.dropPipeExtended(req), status -> 
!updateConfigNodeLeader(status));
+  }
+
   @Override
   public TShowPipeResp showPipe(TShowPipeReq req) throws TException {
     return executeRemoteCallWithRetry(
@@ -1018,6 +1026,12 @@ public class ConfigNodeClient implements 
IConfigNodeRPCService.Iface, ThriftClie
         () -> client.dropTopic(topicName), status -> 
!updateConfigNodeLeader(status));
   }
 
+  @Override
+  public TSStatus dropTopicExtended(TDropTopicReq req) throws TException {
+    return executeRemoteCallWithRetry(
+        () -> client.dropTopicExtended(req), status -> 
!updateConfigNodeLeader(status));
+  }
+
   @Override
   public TShowTopicResp showTopic(TShowTopicReq req) throws TException {
     return executeRemoteCallWithRetry(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index 0bba6836171..1bc8667cc94 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -81,6 +81,8 @@ import 
org.apache.iotdb.confignode.rpc.thrift.TDeleteTimeSeriesReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropCQReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropFunctionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropPipePluginReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDropPipeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDropTopicReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
 import org.apache.iotdb.confignode.rpc.thrift.TGetDatabaseReq;
 import org.apache.iotdb.confignode.rpc.thrift.TGetPipePluginTableResp;
@@ -201,6 +203,7 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowTTLStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.AlterPipeStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.CreatePipePluginStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.CreatePipeStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.DropPipePluginStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.DropPipeStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.ShowPipesStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.StartPipeStatement;
@@ -881,6 +884,7 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
           client.createPipePlugin(
               new TCreatePipePluginReq()
                   .setPluginName(pluginName)
+                  
.setIfNotExistsCondition(createPipePluginStatement.hasIfNotExistsCondition())
                   .setClassName(className)
                   .setJarFile(jarFile)
                   .setJarMD5(jarMd5)
@@ -907,13 +911,21 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
   }
 
   @Override
-  public SettableFuture<ConfigTaskResult> dropPipePlugin(String pluginName) {
+  public SettableFuture<ConfigTaskResult> dropPipePlugin(
+      DropPipePluginStatement dropPipePluginStatement) {
     final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
     try (final ConfigNodeClient client =
         
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
-      final TSStatus executionStatus = client.dropPipePlugin(new 
TDropPipePluginReq(pluginName));
+      final TSStatus executionStatus =
+          client.dropPipePlugin(
+              new TDropPipePluginReq()
+                  .setPluginName(dropPipePluginStatement.getPluginName())
+                  
.setIfExistsCondition(dropPipePluginStatement.hasIfExistsCondition()));
       if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != 
executionStatus.getCode()) {
-        LOGGER.warn("[{}] Failed to drop pipe plugin {}.", executionStatus, 
pluginName);
+        LOGGER.warn(
+            "[{}] Failed to drop pipe plugin {}.",
+            executionStatus,
+            dropPipePluginStatement.getPluginName());
         future.setException(new IoTDBException(executionStatus.message, 
executionStatus.code));
       } else {
         future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
@@ -1777,6 +1789,7 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
       TCreatePipeReq req =
           new TCreatePipeReq()
               .setPipeName(createPipeStatement.getPipeName())
+              
.setIfNotExistsCondition(createPipeStatement.hasIfNotExistsCondition())
               
.setExtractorAttributes(createPipeStatement.getExtractorAttributes())
               
.setProcessorAttributes(createPipeStatement.getProcessorAttributes())
               
.setConnectorAttributes(createPipeStatement.getConnectorAttributes());
@@ -1846,6 +1859,7 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
               alterPipeStatement.isReplaceAllConnectorAttributes());
       req.setExtractorAttributes(alterPipeStatement.getExtractorAttributes());
       
req.setIsReplaceAllExtractorAttributes(alterPipeStatement.isReplaceAllExtractorAttributes());
+      req.setIfExistsCondition(alterPipeStatement.hasIfExistsCondition());
       final TSStatus tsStatus = configNodeClient.alterPipe(req);
       if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
         LOGGER.warn("Failed to alter pipe {} in config node, status is {}.", 
pipeName, tsStatus);
@@ -1909,7 +1923,11 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
 
     try (ConfigNodeClient configNodeClient =
         
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
-      final TSStatus tsStatus = 
configNodeClient.dropPipe(dropPipeStatement.getPipeName());
+      final TSStatus tsStatus =
+          configNodeClient.dropPipeExtended(
+              new TDropPipeReq()
+                  .setPipeName(dropPipeStatement.getPipeName())
+                  
.setIfExistsCondition(dropPipeStatement.hasIfExistsCondition()));
       if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
         LOGGER.warn(
             "Failed to drop pipe {}, status is {}.", 
dropPipeStatement.getPipeName(), tsStatus);
@@ -2057,7 +2075,10 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
     try (final ConfigNodeClient configNodeClient =
         
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
       final TCreateTopicReq req =
-          new 
TCreateTopicReq().setTopicName(topicName).setTopicAttributes(topicAttributes);
+          new TCreateTopicReq()
+              .setTopicName(topicName)
+              
.setIfNotExistsCondition(createTopicStatement.hasIfNotExistsCondition())
+              .setTopicAttributes(topicAttributes);
       final TSStatus tsStatus = configNodeClient.createTopic(req);
       if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
         LOGGER.warn("Failed to create topic {} in config node, status is {}.", 
topicName, tsStatus);
@@ -2076,7 +2097,11 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
     final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
     try (ConfigNodeClient configNodeClient =
         
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
-      final TSStatus tsStatus = 
configNodeClient.dropTopic(dropTopicStatement.getTopicName());
+      final TSStatus tsStatus =
+          configNodeClient.dropTopicExtended(
+              new TDropTopicReq()
+                  
.setIfExistsCondition(dropTopicStatement.hasIfExistsCondition())
+                  .setTopicName(dropTopicStatement.getTopicName()));
       if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
         LOGGER.warn(
             "Failed to drop topic {}, status is {}.", 
dropTopicStatement.getTopicName(), tsStatus);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java
index 954f74f3897..8a1e1f416c2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java
@@ -60,6 +60,7 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowTTLStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.AlterPipeStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.CreatePipePluginStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.CreatePipeStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.DropPipePluginStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.DropPipeStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.ShowPipesStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.StartPipeStatement;
@@ -118,7 +119,7 @@ public interface IConfigTaskExecutor {
 
   SettableFuture<ConfigTaskResult> createPipePlugin(CreatePipePluginStatement 
createPipeStatement);
 
-  SettableFuture<ConfigTaskResult> dropPipePlugin(String pluginName);
+  SettableFuture<ConfigTaskResult> dropPipePlugin(DropPipePluginStatement 
dropPipePluginStatement);
 
   SettableFuture<ConfigTaskResult> showPipePlugins();
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/DropPipePluginTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/DropPipePluginTask.java
index 82f9a325706..eb57f958430 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/DropPipePluginTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/DropPipePluginTask.java
@@ -28,15 +28,15 @@ import com.google.common.util.concurrent.ListenableFuture;
 
 public class DropPipePluginTask implements IConfigTask {
 
-  private final String pluginName;
+  private final DropPipePluginStatement dropPipePluginStatement;
 
   public DropPipePluginTask(DropPipePluginStatement dropPipePluginStatement) {
-    this.pluginName = dropPipePluginStatement.getPluginName();
+    this.dropPipePluginStatement = dropPipePluginStatement;
   }
 
   @Override
   public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor 
configTaskExecutor)
       throws InterruptedException {
-    return configTaskExecutor.dropPipePlugin(pluginName);
+    return configTaskExecutor.dropPipePlugin(dropPipePluginStatement);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
index 60302379b0c..6115218d125 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
@@ -960,6 +960,7 @@ public class ASTVisitor extends 
IoTDBSqlParserBaseVisitor<Statement> {
   public Statement 
visitCreatePipePlugin(IoTDBSqlParser.CreatePipePluginContext ctx) {
     return new CreatePipePluginStatement(
         parseIdentifier(ctx.pluginName.getText()),
+        ctx.IF() != null && ctx.NOT() != null && ctx.EXISTS() != null,
         parseStringLiteral(ctx.className.getText()),
         parseAndValidateURI(ctx.uriClause()));
   }
@@ -967,7 +968,10 @@ public class ASTVisitor extends 
IoTDBSqlParserBaseVisitor<Statement> {
   // Drop PipePlugin 
=====================================================================
   @Override
   public Statement visitDropPipePlugin(IoTDBSqlParser.DropPipePluginContext 
ctx) {
-    return new 
DropPipePluginStatement(parseIdentifier(ctx.pluginName.getText()));
+    final DropPipePluginStatement dropPipePluginStatement = new 
DropPipePluginStatement();
+    
dropPipePluginStatement.setPluginName(parseIdentifier(ctx.pluginName.getText()));
+    dropPipePluginStatement.setIfExists(ctx.IF() != null && ctx.EXISTS() != 
null);
+    return dropPipePluginStatement;
   }
 
   // Show PipePlugins 
=====================================================================
@@ -3719,6 +3723,10 @@ public class ASTVisitor extends 
IoTDBSqlParserBaseVisitor<Statement> {
       throw new SemanticException(
           "Not support for this sql in CREATE PIPE, please enter pipe name.");
     }
+
+    createPipeStatement.setIfNotExists(
+        ctx.IF() != null && ctx.NOT() != null && ctx.EXISTS() != null);
+
     if (ctx.extractorAttributesClause() != null) {
       createPipeStatement.setExtractorAttributes(
           parseExtractorAttributesClause(
@@ -3749,6 +3757,8 @@ public class ASTVisitor extends 
IoTDBSqlParserBaseVisitor<Statement> {
           "Not support for this sql in ALTER PIPE, please enter pipe name.");
     }
 
+    alterPipeStatement.setIfExists(ctx.IF() != null && ctx.EXISTS() != null);
+
     if (ctx.alterExtractorAttributesClause() != null) {
       alterPipeStatement.setExtractorAttributes(
           parseExtractorAttributesClause(
@@ -3781,6 +3791,7 @@ public class ASTVisitor extends 
IoTDBSqlParserBaseVisitor<Statement> {
       alterPipeStatement.setConnectorAttributes(new HashMap<>());
       alterPipeStatement.setReplaceAllConnectorAttributes(false);
     }
+
     return alterPipeStatement;
   }
 
@@ -3827,6 +3838,8 @@ public class ASTVisitor extends 
IoTDBSqlParserBaseVisitor<Statement> {
       throw new SemanticException("Not support for this sql in DROP PIPE, 
please enter pipename.");
     }
 
+    dropPipeStatement.setIfExists(ctx.IF() != null && ctx.EXISTS() != null);
+
     return dropPipeStatement;
   }
 
@@ -3879,6 +3892,9 @@ public class ASTVisitor extends 
IoTDBSqlParserBaseVisitor<Statement> {
           "Not support for this sql in CREATE TOPIC, please enter topicName.");
     }
 
+    createTopicStatement.setIfNotExists(
+        ctx.IF() != null && ctx.NOT() != null && ctx.EXISTS() != null);
+
     if (ctx.topicAttributesClause() != null) {
       createTopicStatement.setTopicAttributes(
           
parseTopicAttributesClause(ctx.topicAttributesClause().topicAttributeClause()));
@@ -3911,6 +3927,8 @@ public class ASTVisitor extends 
IoTDBSqlParserBaseVisitor<Statement> {
           "Not support for this sql in DROP TOPIC, please enter topicName.");
     }
 
+    dropTopicStatement.setIfExists(ctx.IF() != null && ctx.EXISTS() != null);
+
     return dropTopicStatement;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/AlterPipeStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/AlterPipeStatement.java
index 0c0bef6c274..de5dc3c1d59 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/AlterPipeStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/AlterPipeStatement.java
@@ -37,6 +37,7 @@ import java.util.Map;
 public class AlterPipeStatement extends Statement implements IConfigStatement {
 
   private String pipeName;
+  private boolean ifExistsCondition;
   private Map<String, String> extractorAttributes;
   private Map<String, String> processorAttributes;
   private Map<String, String> connectorAttributes;
@@ -52,6 +53,10 @@ public class AlterPipeStatement extends Statement implements 
IConfigStatement {
     return pipeName;
   }
 
+  public boolean hasIfExistsCondition() {
+    return ifExistsCondition;
+  }
+
   public Map<String, String> getExtractorAttributes() {
     return extractorAttributes;
   }
@@ -80,6 +85,10 @@ public class AlterPipeStatement extends Statement implements 
IConfigStatement {
     this.pipeName = pipeName;
   }
 
+  public void setIfExists(boolean ifExistsCondition) {
+    this.ifExistsCondition = ifExistsCondition;
+  }
+
   public void setExtractorAttributes(Map<String, String> extractorAttributes) {
     this.extractorAttributes = extractorAttributes;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/CreatePipePluginStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/CreatePipePluginStatement.java
index f954cc347dc..f0975b56012 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/CreatePipePluginStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/CreatePipePluginStatement.java
@@ -36,13 +36,16 @@ import java.util.List;
 public class CreatePipePluginStatement extends Statement implements 
IConfigStatement {
 
   private final String pluginName;
+  private final boolean ifNotExistsCondition;
   private final String className;
   private final String uriString;
 
-  public CreatePipePluginStatement(String pluginName, String className, String 
uriString) {
+  public CreatePipePluginStatement(
+      String pluginName, boolean ifNotExistsCondition, String className, 
String uriString) {
     super();
     statementType = StatementType.CREATE_PIPEPLUGIN;
     this.pluginName = pluginName;
+    this.ifNotExistsCondition = ifNotExistsCondition;
     this.className = className;
     this.uriString = uriString;
   }
@@ -51,6 +54,10 @@ public class CreatePipePluginStatement extends Statement 
implements IConfigState
     return pluginName;
   }
 
+  public boolean hasIfNotExistsCondition() {
+    return ifNotExistsCondition;
+  }
+
   public String getClassName() {
     return className;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/CreatePipeStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/CreatePipeStatement.java
index 45a1a4664cb..a7b7471ffd0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/CreatePipeStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/CreatePipeStatement.java
@@ -37,6 +37,7 @@ import java.util.Map;
 public class CreatePipeStatement extends Statement implements IConfigStatement 
{
 
   private String pipeName;
+  private boolean ifNotExistsCondition;
   private Map<String, String> extractorAttributes;
   private Map<String, String> processorAttributes;
   private Map<String, String> connectorAttributes;
@@ -49,6 +50,10 @@ public class CreatePipeStatement extends Statement 
implements IConfigStatement {
     return pipeName;
   }
 
+  public boolean hasIfNotExistsCondition() {
+    return ifNotExistsCondition;
+  }
+
   public Map<String, String> getExtractorAttributes() {
     return extractorAttributes;
   }
@@ -65,6 +70,10 @@ public class CreatePipeStatement extends Statement 
implements IConfigStatement {
     this.pipeName = pipeName;
   }
 
+  public void setIfNotExists(boolean ifNotExistsCondition) {
+    this.ifNotExistsCondition = ifNotExistsCondition;
+  }
+
   public void setExtractorAttributes(Map<String, String> extractorAttributes) {
     this.extractorAttributes = extractorAttributes;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/DropPipePluginStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/DropPipePluginStatement.java
index 49807d21a24..c7f4ebfd5df 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/DropPipePluginStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/DropPipePluginStatement.java
@@ -35,18 +35,30 @@ import java.util.List;
 
 public class DropPipePluginStatement extends Statement implements 
IConfigStatement {
 
-  private final String pluginName;
+  private String pluginName;
+  private boolean ifExistsCondition;
 
-  public DropPipePluginStatement(String pluginName) {
+  public DropPipePluginStatement() {
     super();
     statementType = StatementType.DROP_PIPEPLUGIN;
-    this.pluginName = pluginName;
   }
 
   public String getPluginName() {
     return pluginName;
   }
 
+  public boolean hasIfExistsCondition() {
+    return ifExistsCondition;
+  }
+
+  public void setPluginName(String pluginName) {
+    this.pluginName = pluginName;
+  }
+
+  public void setIfExists(boolean ifExistsCondition) {
+    this.ifExistsCondition = ifExistsCondition;
+  }
+
   @Override
   public QueryType getQueryType() {
     return QueryType.WRITE;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/DropPipeStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/DropPipeStatement.java
index a7d54a3086f..a3403e00e68 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/DropPipeStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/DropPipeStatement.java
@@ -36,11 +36,16 @@ import java.util.List;
 public class DropPipeStatement extends Statement implements IConfigStatement {
 
   private String pipeName;
+  private boolean ifExistsCondition;
 
   public DropPipeStatement(StatementType dropPipeStatement) {
     this.statementType = dropPipeStatement;
   }
 
+  public boolean hasIfExistsCondition() {
+    return ifExistsCondition;
+  }
+
   public String getPipeName() {
     return pipeName;
   }
@@ -49,6 +54,10 @@ public class DropPipeStatement extends Statement implements 
IConfigStatement {
     this.pipeName = pipeName;
   }
 
+  public void setIfExists(boolean ifExistsCondition) {
+    this.ifExistsCondition = ifExistsCondition;
+  }
+
   @Override
   public QueryType getQueryType() {
     return QueryType.WRITE;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/subscription/CreateTopicStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/subscription/CreateTopicStatement.java
index 6482975b038..a98a1d5d273 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/subscription/CreateTopicStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/subscription/CreateTopicStatement.java
@@ -37,7 +37,7 @@ import java.util.Map;
 public class CreateTopicStatement extends Statement implements 
IConfigStatement {
 
   private String topicName;
-
+  private boolean ifNotExistsCondition;
   private Map<String, String> topicAttributes;
 
   public CreateTopicStatement() {
@@ -49,6 +49,10 @@ public class CreateTopicStatement extends Statement 
implements IConfigStatement
     return topicName;
   }
 
+  public boolean hasIfNotExistsCondition() {
+    return ifNotExistsCondition;
+  }
+
   public Map<String, String> getTopicAttributes() {
     return topicAttributes;
   }
@@ -57,6 +61,10 @@ public class CreateTopicStatement extends Statement 
implements IConfigStatement
     this.topicName = topicName;
   }
 
+  public void setIfNotExists(boolean ifNotExistsCondition) {
+    this.ifNotExistsCondition = ifNotExistsCondition;
+  }
+
   public void setTopicAttributes(Map<String, String> topicAttributes) {
     this.topicAttributes = topicAttributes;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/subscription/DropTopicStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/subscription/DropTopicStatement.java
index e366b1c5eb3..36525b1846e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/subscription/DropTopicStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/subscription/DropTopicStatement.java
@@ -35,6 +35,7 @@ import java.util.List;
 
 public class DropTopicStatement extends Statement implements IConfigStatement {
   private String topicName;
+  private boolean ifExistsCondition;
 
   public DropTopicStatement() {
     super();
@@ -45,10 +46,18 @@ public class DropTopicStatement extends Statement 
implements IConfigStatement {
     return topicName;
   }
 
+  public boolean hasIfExistsCondition() {
+    return ifExistsCondition;
+  }
+
   public void setTopicName(String topicName) {
     this.topicName = topicName;
   }
 
+  public void setIfExists(boolean ifExistsCondition) {
+    this.ifExistsCondition = ifExistsCondition;
+  }
+
   @Override
   public QueryType getQueryType() {
     return QueryType.WRITE;
diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift 
b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
index 17618206b43..47e8cf553bf 100644
--- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
@@ -674,10 +674,12 @@ struct TCreatePipePluginReq {
   3: required string jarName
   4: required binary jarFile
   5: required string jarMD5
+  6: optional bool ifNotExistsCondition
 }
 
 struct TDropPipePluginReq {
   1: required string pluginName
+  2: optional bool ifExistsCondition
 }
 
 // Get PipePlugin table from config node
@@ -710,6 +712,7 @@ struct TCreatePipeReq {
     2: optional map<string, string> extractorAttributes
     3: optional map<string, string> processorAttributes
     4: required map<string, string> connectorAttributes
+    5: optional bool ifNotExistsCondition
 }
 
 struct TAlterPipeReq {
@@ -720,6 +723,12 @@ struct TAlterPipeReq {
     5: required bool isReplaceAllConnectorAttributes
     6: optional map<string, string> extractorAttributes
     7: optional bool isReplaceAllExtractorAttributes
+    8: optional bool ifExistsCondition
+}
+
+struct TDropPipeReq {
+    1: required string pipeName
+    2: optional bool ifExistsCondition
 }
 
 // Deprecated, restored for compatibility
@@ -774,6 +783,12 @@ struct TAlterLogicalViewReq {
 struct TCreateTopicReq {
     1: required string topicName
     2: optional map<string, string> topicAttributes
+    3: optional bool ifNotExistsCondition
+}
+
+struct TDropTopicReq {
+    1: required string topicName
+    2: optional bool ifExistsCondition
 }
 
 struct TShowTopicReq {
@@ -1519,6 +1534,9 @@ service IConfigNodeRPCService {
   /** Drop Pipe */
   common.TSStatus dropPipe(string pipeName)
 
+  /** Drop Pipe */
+  common.TSStatus dropPipeExtended(TDropPipeReq req)
+
   /** Show Pipe by name, if name is empty, show all Pipe */
   TShowPipeResp showPipe(TShowPipeReq req)
 
@@ -1540,6 +1558,9 @@ service IConfigNodeRPCService {
   /** Drop Topic */
   common.TSStatus dropTopic(string topicName)
 
+  /** Drop Topic */
+  common.TSStatus dropTopicExtended(TDropTopicReq req)
+
   /** Show Topic by name, if name is empty, show all Topic */
   TShowTopicResp showTopic(TShowTopicReq req)
 

Reply via email to