This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch rc/2.0.4
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 9763fdb24265fc3a8a5083202c0cdcd08700cc8a
Author: VGalaxies <[email protected]>
AuthorDate: Mon May 19 16:08:37 2025 +0800

    Subscription: support drop subscription from session & intro 
allTopicMessagesHaveBeenConsumed for pull consumer (#15486)
    
    - Introducing a new DropSubscriptionTask and corresponding updates across 
executor interfaces, cluster executors, and managers.
    - Updating the SQL parser to recognize the DROP SUBSCRIPTION statement.
    - Enhancing client session APIs and internal data models to support 
subscription deletion.
    
    (cherry picked from commit 7f54522b4790bd29f31f5a6587dd639f42c98883)
---
 .../apache/iotdb/SubscriptionSessionExample.java   |  4 +-
 .../dual/treemodel/IoTDBSubscriptionTopicIT.java   |  2 +-
 .../it/local/IoTDBSubscriptionBasicIT.java         | 97 ++++++++++++++++++++++
 .../org/apache/iotdb/tool/tsfile/ExportTsFile.java |  2 +-
 .../subscription/SubscriptionTableTsFile.java      |  2 +-
 .../subscription/SubscriptionTreeTsFile.java       |  2 +-
 .../subscription/AbstractSubscriptionSession.java  | 19 ++++-
 .../subscription/ISubscriptionTableSession.java    | 22 +++++
 .../subscription/ISubscriptionTreeSession.java     | 22 +++++
 .../subscription/SubscriptionTableSession.java     | 12 +++
 .../subscription/SubscriptionTreeSession.java      | 12 +++
 .../consumer/ISubscriptionTablePullConsumer.java   | 12 +++
 .../consumer/ISubscriptionTreePullConsumer.java    | 12 +++
 .../base/AbstractSubscriptionConsumer.java         |  5 ++
 .../table/SubscriptionTablePullConsumer.java       |  5 ++
 .../tree/SubscriptionTreePullConsumer.java         |  5 ++
 .../session/subscription/model/Subscription.java   | 15 +++-
 .../org/apache/iotdb/db/qp/sql/IdentifierParser.g4 |  1 +
 .../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4   |  5 +-
 .../antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4  |  4 +
 .../subscription/SubscriptionTableResp.java        |  8 +-
 .../iotdb/confignode/manager/ConfigManager.java    |  9 ++
 .../apache/iotdb/confignode/manager/IManager.java  |  3 +
 .../subscription/SubscriptionCoordinator.java      | 27 ++++++
 .../persistence/subscription/SubscriptionInfo.java | 59 ++++++++++---
 .../thrift/ConfigNodeRPCServiceProcessor.java      |  6 ++
 .../org/apache/iotdb/db/audit/AuditLogger.java     |  1 +
 .../iotdb/db/protocol/client/ConfigNodeClient.java |  7 ++
 .../execution/config/TableConfigTaskVisitor.java   |  9 ++
 .../execution/config/TreeConfigTaskVisitor.java    |  8 ++
 .../config/executor/ClusterConfigTaskExecutor.java | 25 ++++++
 .../config/executor/IConfigTaskExecutor.java       |  4 +
 .../sys/subscription/DropSubscriptionTask.java     | 50 +++++++++++
 .../sys/subscription/ShowSubscriptionsTask.java    | 13 ++-
 .../db/queryengine/plan/parser/ASTVisitor.java     | 17 ++++
 .../relational/analyzer/StatementAnalyzer.java     |  6 ++
 .../plan/relational/sql/ast/AstVisitor.java        |  4 +
 .../plan/relational/sql/ast/DropSubscription.java  | 75 +++++++++++++++++
 .../plan/relational/sql/parser/AstBuilder.java     |  9 ++
 .../plan/relational/sql/util/SqlFormatter.java     | 34 +++++---
 .../queryengine/plan/statement/StatementType.java  |  1 +
 .../plan/statement/StatementVisitor.java           |  5 ++
 .../subscription/DropSubscriptionStatement.java    | 95 +++++++++++++++++++++
 iotdb-core/node-commons/pom.xml                    |  4 +
 .../schema/column/ColumnHeaderConstant.java        |  2 +
 .../meta/consumer/ConsumerGroupMeta.java           | 96 ++++++++++++++++++---
 .../meta/consumer/ConsumerGroupMetaKeeper.java     |  7 ++
 .../meta/subscription/SubscriptionMeta.java        | 76 +++--------------
 .../db/relational/grammar/sql/RelationalSql.g4     |  8 +-
 .../src/main/thrift/confignode.thrift              | 12 ++-
 50 files changed, 828 insertions(+), 112 deletions(-)

diff --git 
a/example/session/src/main/java/org/apache/iotdb/SubscriptionSessionExample.java
 
b/example/session/src/main/java/org/apache/iotdb/SubscriptionSessionExample.java
index 221fd8c1a37..6f4d503bce0 100644
--- 
a/example/session/src/main/java/org/apache/iotdb/SubscriptionSessionExample.java
+++ 
b/example/session/src/main/java/org/apache/iotdb/SubscriptionSessionExample.java
@@ -272,7 +272,7 @@ public class SubscriptionSessionExample {
                         .buildPushConsumer()) {
                   consumer3.open();
                   consumer3.subscribe(TOPIC_3);
-                  while 
(!consumer3.allSnapshotTopicMessagesHaveBeenConsumed()) {
+                  while (!consumer3.allTopicMessagesHaveBeenConsumed()) {
                     LockSupport.parkNanos(SLEEP_NS); // wait some time
                   }
                 }
@@ -314,7 +314,7 @@ public class SubscriptionSessionExample {
                         .buildPullConsumer()) {
                   consumer4.open();
                   consumer4.subscribe(TOPIC_4);
-                  while 
(!consumer4.allSnapshotTopicMessagesHaveBeenConsumed()) {
+                  while (!consumer4.allTopicMessagesHaveBeenConsumed()) {
                     for (final SubscriptionMessage message : 
consumer4.poll(POLL_TIMEOUT_MS)) {
                       final SubscriptionTsFileHandler handler = 
message.getTsFileHandler();
                       handler.moveFile(
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/treemodel/IoTDBSubscriptionTopicIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/treemodel/IoTDBSubscriptionTopicIT.java
index 373e83068bd..ccce4d9f0d2 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/treemodel/IoTDBSubscriptionTopicIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/treemodel/IoTDBSubscriptionTopicIT.java
@@ -860,7 +860,7 @@ public class IoTDBSubscriptionTopicIT extends 
AbstractSubscriptionDualIT {
                 consumer.open();
                 consumer.subscribe(topicName);
 
-                while (!consumer.allSnapshotTopicMessagesHaveBeenConsumed()) {
+                while (!consumer.allTopicMessagesHaveBeenConsumed()) {
                   LockSupport.parkNanos(IoTDBSubscriptionITConstant.SLEEP_NS); 
// wait some time
                   consumer.poll(IoTDBSubscriptionITConstant.POLL_TIMEOUT_MS); 
// poll and ignore
                 }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java
index be478548c31..eab487a0b95 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java
@@ -19,10 +19,14 @@
 
 package org.apache.iotdb.subscription.it.local;
 
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.confignode.rpc.thrift.TShowSubscriptionReq;
+import org.apache.iotdb.confignode.rpc.thrift.TShowSubscriptionResp;
 import org.apache.iotdb.isession.ISession;
 import org.apache.iotdb.it.env.EnvFactory;
 import org.apache.iotdb.it.framework.IoTDBTestRunner;
 import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.subscription.config.TopicConstant;
 import org.apache.iotdb.session.subscription.SubscriptionTreeSession;
 import org.apache.iotdb.session.subscription.consumer.AckStrategy;
@@ -30,6 +34,7 @@ import 
org.apache.iotdb.session.subscription.consumer.AsyncCommitCallback;
 import org.apache.iotdb.session.subscription.consumer.ConsumeResult;
 import 
org.apache.iotdb.session.subscription.consumer.tree.SubscriptionTreePullConsumer;
 import 
org.apache.iotdb.session.subscription.consumer.tree.SubscriptionTreePushConsumer;
+import org.apache.iotdb.session.subscription.model.Subscription;
 import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
 import 
org.apache.iotdb.session.subscription.payload.SubscriptionSessionDataSet;
 import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant;
@@ -51,6 +56,7 @@ import java.time.Duration;
 import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -621,4 +627,95 @@ public class IoTDBSubscriptionBasicIT extends 
AbstractSubscriptionLocalIT {
       fail(e.getMessage());
     }
   }
+
+  @Test
+  public void testDropSubscriptionBySession() throws Exception {
+    // Insert some historical data
+    try (final ISession session = EnvFactory.getEnv().getSessionConnection()) {
+      for (int i = 0; i < 100; ++i) {
+        session.executeNonQueryStatement(
+            String.format("insert into root.db.d1(time, s1) values (%s, 1)", 
i));
+      }
+      session.executeNonQueryStatement("flush");
+    } catch (final Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+
+    // Create topic
+    final String topicName = "topic8";
+    final String host = EnvFactory.getEnv().getIP();
+    final int port = Integer.parseInt(EnvFactory.getEnv().getPort());
+    try (final SubscriptionTreeSession session = new 
SubscriptionTreeSession(host, port)) {
+      session.open();
+      session.createTopic(topicName);
+    } catch (final Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+
+    // Subscription
+    final Thread thread =
+        new Thread(
+            () -> {
+              try (final SubscriptionTreePullConsumer consumer =
+                  new SubscriptionTreePullConsumer.Builder()
+                      .host(host)
+                      .port(port)
+                      .consumerId("c1")
+                      .consumerGroupId("cg1")
+                      .autoCommit(true)
+                      .buildPullConsumer()) {
+                consumer.open();
+                consumer.subscribe(topicName);
+
+                while (!consumer.allTopicMessagesHaveBeenConsumed()) {
+                  LockSupport.parkNanos(IoTDBSubscriptionITConstant.SLEEP_NS); 
// wait some time
+                  consumer.poll(IoTDBSubscriptionITConstant.POLL_TIMEOUT_MS); 
// poll and ignore
+                }
+              } catch (final Exception e) {
+                e.printStackTrace();
+                // Avoid failure
+              } finally {
+                LOGGER.info("consumer exiting...");
+              }
+            },
+            String.format("%s - consumer", testName.getDisplayName()));
+    thread.start();
+
+    // Drop Subscription
+    LockSupport.parkNanos(5_000_000_000L); // wait some time
+    try (final SubscriptionTreeSession session = new 
SubscriptionTreeSession(host, port)) {
+      session.open();
+      final Set<Subscription> subscriptions = 
session.getSubscriptions(topicName);
+      Assert.assertEquals(1, subscriptions.size());
+      
session.dropSubscription(subscriptions.iterator().next().getSubscriptionId());
+    } catch (final Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+
+    try {
+      // Keep retrying if there are execution failures
+      AWAIT.untilAsserted(
+          () -> {
+            // Check empty subscription
+            try (final SyncConfigNodeIServiceClient client =
+                (SyncConfigNodeIServiceClient)
+                    EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+              final TShowSubscriptionResp showSubscriptionResp =
+                  client.showSubscription(new TShowSubscriptionReq());
+              Assert.assertEquals(
+                  RpcUtils.SUCCESS_STATUS.getCode(), 
showSubscriptionResp.status.getCode());
+              Assert.assertNotNull(showSubscriptionResp.subscriptionInfoList);
+              Assert.assertEquals(0, 
showSubscriptionResp.subscriptionInfoList.size());
+            }
+          });
+    } catch (final Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    } finally {
+      thread.join();
+    }
+  }
 }
diff --git 
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ExportTsFile.java 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ExportTsFile.java
index dac16a00629..3d8fe0baf91 100644
--- 
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ExportTsFile.java
+++ 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ExportTsFile.java
@@ -47,7 +47,7 @@ public class ExportTsFile {
   public static void main(String[] args) throws Exception {
     Logger logger =
         (Logger) 
LoggerFactory.getLogger("org.apache.iotdb.session.subscription.consumer.base");
-    logger.setLevel(Level.ERROR);
+    logger.setLevel(Level.WARN);
     Options options = OptionsUtil.createSubscriptionTsFileOptions();
     parseParams(args, options);
     if (StringUtils.isEmpty(commonParam.getPath())) {
diff --git 
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/subscription/SubscriptionTableTsFile.java
 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/subscription/SubscriptionTableTsFile.java
index edadbf0a7de..f869490e7c5 100644
--- 
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/subscription/SubscriptionTableTsFile.java
+++ 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/subscription/SubscriptionTableTsFile.java
@@ -148,7 +148,7 @@ public class SubscriptionTableTsFile extends 
AbstractSubscriptionTsFile {
             @Override
             public void run() {
               final String consumerGroupId = consumer.getConsumerGroupId();
-              while (!consumer.allSnapshotTopicMessagesHaveBeenConsumed()) {
+              while (!consumer.allTopicMessagesHaveBeenConsumed()) {
                 try {
                   for (final SubscriptionMessage message :
                       consumer.poll(Constants.POLL_MESSAGE_TIMEOUT)) {
diff --git 
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/subscription/SubscriptionTreeTsFile.java
 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/subscription/SubscriptionTreeTsFile.java
index 2128e431d5a..7f2d302e6e4 100644
--- 
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/subscription/SubscriptionTreeTsFile.java
+++ 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/subscription/SubscriptionTreeTsFile.java
@@ -144,7 +144,7 @@ public class SubscriptionTreeTsFile extends 
AbstractSubscriptionTsFile {
           new Runnable() {
             @Override
             public void run() {
-              while (!consumer.allSnapshotTopicMessagesHaveBeenConsumed()) {
+              while (!consumer.allTopicMessagesHaveBeenConsumed()) {
                 try {
                   for (final SubscriptionMessage message :
                       consumer.poll(Constants.POLL_MESSAGE_TIMEOUT)) {
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/AbstractSubscriptionSession.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/AbstractSubscriptionSession.java
index 17bae51a460..5ab42f29d70 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/AbstractSubscriptionSession.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/AbstractSubscriptionSession.java
@@ -177,6 +177,20 @@ abstract class AbstractSubscriptionSession {
     }
   }
 
+  protected void dropSubscription(final String subscriptionId)
+      throws IoTDBConnectionException, StatementExecutionException {
+    IdentifierUtils.checkAndParseIdentifier(subscriptionId); // ignore the 
parse result
+    final String sql = String.format("DROP SUBSCRIPTION %s", subscriptionId);
+    session.executeNonQueryStatement(sql);
+  }
+
+  protected void dropSubscriptionIfExists(final String subscriptionId)
+      throws IoTDBConnectionException, StatementExecutionException {
+    IdentifierUtils.checkAndParseIdentifier(subscriptionId); // ignore the 
parse result
+    final String sql = String.format("DROP SUBSCRIPTION IF EXISTS %s", 
subscriptionId);
+    session.executeNonQueryStatement(sql);
+  }
+
   /////////////////////////////// utility ///////////////////////////////
 
   private Set<Topic> convertDataSetToTopics(final SessionDataSet dataSet)
@@ -202,7 +216,7 @@ abstract class AbstractSubscriptionSession {
     while (dataSet.hasNext()) {
       final RowRecord record = dataSet.next();
       final List<Field> fields = record.getFields();
-      if (fields.size() != 3) {
+      if (fields.size() != 4) {
         throw new SubscriptionException(
             String.format(
                 "Unexpected fields %s was obtained during SHOW 
SUBSCRIPTION...",
@@ -212,7 +226,8 @@ abstract class AbstractSubscriptionSession {
           new Subscription(
               fields.get(0).getStringValue(),
               fields.get(1).getStringValue(),
-              fields.get(2).getStringValue()));
+              fields.get(2).getStringValue(),
+              fields.get(3).getStringValue()));
     }
     return subscriptions;
   }
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/ISubscriptionTableSession.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/ISubscriptionTableSession.java
index d12b318e59d..2dac0f5106b 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/ISubscriptionTableSession.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/ISubscriptionTableSession.java
@@ -174,4 +174,26 @@ public interface ISubscriptionTableSession extends 
AutoCloseable {
    */
   Set<Subscription> getSubscriptions(final String topicName)
       throws IoTDBConnectionException, StatementExecutionException;
+
+  /**
+   * Removes the subscription identified by the given subscription ID.
+   *
+   * @param subscriptionId The unique identifier of the subscription to be 
removed.
+   * @throws IoTDBConnectionException If there is an issue with the connection 
to IoTDB.
+   * @throws StatementExecutionException If there is an issue executing the 
SQL statement.
+   */
+  void dropSubscription(final String subscriptionId)
+      throws IoTDBConnectionException, StatementExecutionException;
+
+  /**
+   * Removes the subscription identified by the given subscription ID if it 
exists.
+   *
+   * <p>If the subscription does not exist, this method will not throw an 
exception.
+   *
+   * @param subscriptionId The unique identifier of the subscription to be 
removed.
+   * @throws IoTDBConnectionException If there is an issue with the connection 
to IoTDB.
+   * @throws StatementExecutionException If there is an issue executing the 
SQL statement.
+   */
+  void dropSubscriptionIfExists(final String subscriptionId)
+      throws IoTDBConnectionException, StatementExecutionException;
 }
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/ISubscriptionTreeSession.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/ISubscriptionTreeSession.java
index 82eb9d3de60..799314769e8 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/ISubscriptionTreeSession.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/ISubscriptionTreeSession.java
@@ -174,4 +174,26 @@ public interface ISubscriptionTreeSession extends 
AutoCloseable {
    */
   Set<Subscription> getSubscriptions(final String topicName)
       throws IoTDBConnectionException, StatementExecutionException;
+
+  /**
+   * Removes the subscription identified by the given subscription ID.
+   *
+   * @param subscriptionId The unique identifier of the subscription to be 
removed.
+   * @throws IoTDBConnectionException If there is an issue with the connection 
to IoTDB.
+   * @throws StatementExecutionException If there is an issue executing the 
SQL statement.
+   */
+  void dropSubscription(final String subscriptionId)
+      throws IoTDBConnectionException, StatementExecutionException;
+
+  /**
+   * Removes the subscription identified by the given subscription ID if it 
exists.
+   *
+   * <p>If the subscription does not exist, this method will not throw an 
exception.
+   *
+   * @param subscriptionId The unique identifier of the subscription to be 
removed.
+   * @throws IoTDBConnectionException If there is an issue with the connection 
to IoTDB.
+   * @throws StatementExecutionException If there is an issue executing the 
SQL statement.
+   */
+  void dropSubscriptionIfExists(final String subscriptionId)
+      throws IoTDBConnectionException, StatementExecutionException;
 }
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionTableSession.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionTableSession.java
index 2b5d32f77c6..bf21a43b26f 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionTableSession.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionTableSession.java
@@ -110,4 +110,16 @@ public class SubscriptionTableSession extends 
AbstractSubscriptionSession
       throws IoTDBConnectionException, StatementExecutionException {
     return super.getSubscriptions(topicName);
   }
+
+  @Override
+  public void dropSubscription(final String subscriptionId)
+      throws IoTDBConnectionException, StatementExecutionException {
+    super.dropSubscription(subscriptionId);
+  }
+
+  @Override
+  public void dropSubscriptionIfExists(final String subscriptionId)
+      throws IoTDBConnectionException, StatementExecutionException {
+    super.dropSubscriptionIfExists(subscriptionId);
+  }
 }
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionTreeSession.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionTreeSession.java
index db1f753a91e..900a243434d 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionTreeSession.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionTreeSession.java
@@ -134,6 +134,18 @@ public class SubscriptionTreeSession extends 
AbstractSubscriptionSession
     return super.getSubscriptions(topicName);
   }
 
+  @Override
+  public void dropSubscription(final String subscriptionId)
+      throws IoTDBConnectionException, StatementExecutionException {
+    super.dropSubscription(subscriptionId);
+  }
+
+  @Override
+  public void dropSubscriptionIfExists(final String subscriptionId)
+      throws IoTDBConnectionException, StatementExecutionException {
+    super.dropSubscriptionIfExists(subscriptionId);
+  }
+
   /////////////////////////////// builder ///////////////////////////////
 
   @Deprecated // keep for forward compatibility
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/ISubscriptionTablePullConsumer.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/ISubscriptionTablePullConsumer.java
index 31b6160ed6e..0168a1ba384 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/ISubscriptionTablePullConsumer.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/ISubscriptionTablePullConsumer.java
@@ -196,4 +196,16 @@ public interface ISubscriptionTablePullConsumer extends 
AutoCloseable {
    * @return the consumer group's identifier
    */
   String getConsumerGroupId();
+
+  /**
+   * Checks whether all topic messages have been consumed.
+   *
+   * <p>This method is used by the pull consumer in a loop that retrieves 
messages to determine if
+   * all messages for the subscription have been processed. It ensures that 
the consumer can
+   * correctly detect the termination signal for the subscription once all 
messages have been
+   * consumed.
+   *
+   * @return true if all topic messages have been consumed, false otherwise.
+   */
+  boolean allTopicMessagesHaveBeenConsumed();
 }
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/ISubscriptionTreePullConsumer.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/ISubscriptionTreePullConsumer.java
index aef104ebb35..803b7c51224 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/ISubscriptionTreePullConsumer.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/ISubscriptionTreePullConsumer.java
@@ -196,4 +196,16 @@ public interface ISubscriptionTreePullConsumer extends 
AutoCloseable {
    * @return the consumer group's identifier
    */
   String getConsumerGroupId();
+
+  /**
+   * Checks whether all topic messages have been consumed.
+   *
+   * <p>This method is used by the pull consumer in a loop that retrieves 
messages to determine if
+   * all messages for the subscription have been processed. It ensures that 
the consumer can
+   * correctly detect the termination signal for the subscription once all 
messages have been
+   * consumed.
+   *
+   * @return true if all topic messages have been consumed, false otherwise.
+   */
+  boolean allTopicMessagesHaveBeenConsumed();
 }
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java
index 88fab21ef4f..a12340e9d76 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java
@@ -123,10 +123,15 @@ abstract class AbstractSubscriptionConsumer implements 
AutoCloseable {
   @SuppressWarnings("java:S3077")
   protected volatile Map<String, TopicConfig> subscribedTopics = new 
HashMap<>();
 
+  @Deprecated
   public boolean allSnapshotTopicMessagesHaveBeenConsumed() {
     return allTopicMessagesHaveBeenConsumed(subscribedTopics.keySet());
   }
 
+  public boolean allTopicMessagesHaveBeenConsumed() {
+    return allTopicMessagesHaveBeenConsumed(subscribedTopics.keySet());
+  }
+
   private boolean allTopicMessagesHaveBeenConsumed(final Collection<String> 
topicNames) {
     // For the topic that needs to be detected, there are two scenarios to 
consider:
     //   1. If configs as live, it cannot be determined whether the topic has 
been fully consumed.
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumer.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumer.java
index e5f05633677..9e51f7438ff 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumer.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumer.java
@@ -159,4 +159,9 @@ public class SubscriptionTablePullConsumer extends 
AbstractSubscriptionPullConsu
   public String getConsumerGroupId() {
     return super.getConsumerGroupId();
   }
+
+  @Override
+  public boolean allTopicMessagesHaveBeenConsumed() {
+    return super.allTopicMessagesHaveBeenConsumed();
+  }
 }
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumer.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumer.java
index 850e36d3230..713dd601e2d 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumer.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumer.java
@@ -205,6 +205,11 @@ public class SubscriptionTreePullConsumer extends 
AbstractSubscriptionPullConsum
     return super.getConsumerGroupId();
   }
 
+  @Override
+  public boolean allTopicMessagesHaveBeenConsumed() {
+    return super.allTopicMessagesHaveBeenConsumed();
+  }
+
   /////////////////////////////// builder ///////////////////////////////
 
   @Deprecated // keep for forward compatibility
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/model/Subscription.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/model/Subscription.java
index 7d19802832a..01e454cae2c 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/model/Subscription.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/model/Subscription.java
@@ -21,17 +21,26 @@ package org.apache.iotdb.session.subscription.model;
 
 public class Subscription {
 
+  private final String subscriptionId;
   private final String topicName;
   private final String consumerGroupId;
   private final String consumerIds;
 
   public Subscription(
-      final String topicName, final String consumerGroupId, final String 
consumerIds) {
+      final String subscriptionId,
+      final String topicName,
+      final String consumerGroupId,
+      final String consumerIds) {
+    this.subscriptionId = subscriptionId;
     this.topicName = topicName;
     this.consumerGroupId = consumerGroupId;
     this.consumerIds = consumerIds;
   }
 
+  public String getSubscriptionId() {
+    return subscriptionId;
+  }
+
   public String getTopicName() {
     return topicName;
   }
@@ -46,7 +55,9 @@ public class Subscription {
 
   @Override
   public String toString() {
-    return "Subscription{topicName="
+    return "Subscription{subscriptionId="
+        + subscriptionId
+        + ", topicName="
         + topicName
         + ", consumerGroupId="
         + consumerGroupId
diff --git 
a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4
 
b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4
index fca3fb47cff..5761d9308d1 100644
--- 
a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4
+++ 
b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4
@@ -225,6 +225,7 @@ keyWords
     | STATELESS
     | STATEMENT
     | STOP
+    | SUBSCRIPTION
     | SUBSCRIPTIONS
     | SUBSTRING
     | SYSTEM
diff --git 
a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 
b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
index b4a7701267f..9faf3c7a66b 100644
--- 
a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
+++ 
b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
@@ -57,7 +57,7 @@ ddlStatement
     // Pipe Plugin
     | createPipePlugin | dropPipePlugin | showPipePlugins
     // Subscription
-    | createTopic | dropTopic | showTopics | showSubscriptions
+    | createTopic | dropTopic | showTopics | showSubscriptions | 
dropSubscription
     // CQ
     | createContinuousQuery | dropContinuousQuery | showContinuousQueries
     // Cluster
@@ -689,6 +689,9 @@ showSubscriptions
     : SHOW SUBSCRIPTIONS (ON topicName=identifier)?
     ;
 
+dropSubscription
+    : DROP SUBSCRIPTION (IF EXISTS)? subscriptionId=identifier
+    ;
 
 // AI Model 
=========================================================================================
 // ---- Create Model
diff --git 
a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 
b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
index e31d6718323..7c09422f1c1 100644
--- a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
+++ b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
@@ -810,6 +810,10 @@ STOP
     : S T O P
     ;
 
+SUBSCRIPTION
+    : S U B S C R I P T I O N
+    ;
+
 SUBSCRIPTIONS
     : S U B S C R I P T I O N S
     ;
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/subscription/SubscriptionTableResp.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/subscription/SubscriptionTableResp.java
index c242dbf2b08..39441139586 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/subscription/SubscriptionTableResp.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/subscription/SubscriptionTableResp.java
@@ -32,6 +32,7 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.stream.Collectors;
 
 public class SubscriptionTableResp implements DataSet {
@@ -66,11 +67,14 @@ public class SubscriptionTableResp implements DataSet {
     final List<TShowSubscriptionInfo> showSubscriptionInfoList = new 
ArrayList<>();
 
     for (SubscriptionMeta subscriptionMeta : allSubscriptionMeta) {
-      showSubscriptionInfoList.add(
+      TShowSubscriptionInfo showSubscriptionInfo =
           new TShowSubscriptionInfo(
               subscriptionMeta.getTopicMeta().getTopicName(),
               subscriptionMeta.getConsumerGroupId(),
-              subscriptionMeta.getConsumerIds()));
+              subscriptionMeta.getConsumerIds());
+      Optional<Long> creationTime = subscriptionMeta.getCreationTime();
+      creationTime.ifPresent(showSubscriptionInfo::setCreationTime);
+      showSubscriptionInfoList.add(showSubscriptionInfo);
     }
     return new 
TShowSubscriptionResp(status).setSubscriptionInfoList(showSubscriptionInfoList);
   }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 8e559591e96..efdf213dcec 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -179,6 +179,7 @@ import 
org.apache.iotdb.confignode.rpc.thrift.TDropFunctionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropModelReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropPipePluginReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropPipeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDropSubscriptionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropTopicReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
 import org.apache.iotdb.confignode.rpc.thrift.TExtendRegionReq;
@@ -2333,6 +2334,14 @@ public class ConfigManager implements IManager {
         : status;
   }
 
+  @Override
+  public TSStatus dropSubscriptionById(TDropSubscriptionReq req) {
+    TSStatus status = confirmLeader();
+    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+        ? 
subscriptionManager.getSubscriptionCoordinator().dropSubscription(req)
+        : status;
+  }
+
   @Override
   public TShowSubscriptionResp showSubscription(TShowSubscriptionReq req) {
     TSStatus status = confirmLeader();
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index 3e6abee92e3..0d6295ce6e4 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@ -94,6 +94,7 @@ import 
org.apache.iotdb.confignode.rpc.thrift.TDropFunctionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropModelReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropPipePluginReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropPipeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDropSubscriptionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropTopicReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
 import org.apache.iotdb.confignode.rpc.thrift.TExtendRegionReq;
@@ -791,6 +792,8 @@ public interface IManager {
 
   TSStatus dropSubscription(TUnsubscribeReq req);
 
+  TSStatus dropSubscriptionById(TDropSubscriptionReq req);
+
   TShowSubscriptionResp showSubscription(TShowSubscriptionReq req);
 
   TGetAllSubscriptionInfoResp getAllSubscriptionInfo();
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java
index 194ec1f31f9..42b500f6eb1 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java
@@ -30,6 +30,7 @@ import 
org.apache.iotdb.confignode.persistence.subscription.SubscriptionInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TCloseConsumerReq;
 import org.apache.iotdb.confignode.rpc.thrift.TCreateConsumerReq;
 import org.apache.iotdb.confignode.rpc.thrift.TCreateTopicReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDropSubscriptionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropTopicReq;
 import org.apache.iotdb.confignode.rpc.thrift.TGetAllSubscriptionInfoResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetAllTopicInfoResp;
@@ -42,10 +43,12 @@ import 
org.apache.iotdb.confignode.rpc.thrift.TUnsubscribeReq;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
+import org.apache.tsfile.utils.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Collections;
+import java.util.Optional;
 import java.util.concurrent.atomic.AtomicReference;
 
 public class SubscriptionCoordinator {
@@ -243,6 +246,30 @@ public class SubscriptionCoordinator {
     return status;
   }
 
+  public TSStatus dropSubscription(TDropSubscriptionReq req) {
+    final String subscriptionId = req.getSubsciptionId();
+    final boolean isSetIfExistsCondition =
+        req.isSetIfExistsCondition() && req.isIfExistsCondition();
+    final Optional<Pair<String, String>> topicNameWithConsumerGroupName =
+        subscriptionInfo.parseSubscriptionId(subscriptionId, req.isTableModel);
+    if (!topicNameWithConsumerGroupName.isPresent()) {
+      return isSetIfExistsCondition
+          ? RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS)
+          : RpcUtils.getStatus(
+              TSStatusCode.TOPIC_NOT_EXIST_ERROR,
+              String.format(
+                  "Failed to drop subscription %s. Failures: %s does not 
exist.",
+                  subscriptionId, subscriptionId));
+    }
+    return configManager
+        .getProcedureManager()
+        .dropSubscription(
+            new TUnsubscribeReq()
+                .setConsumerId(null)
+                .setConsumerGroupId(topicNameWithConsumerGroupName.get().right)
+                
.setTopicNames(Collections.singleton(topicNameWithConsumerGroupName.get().left)));
+  }
+
   public TShowSubscriptionResp showSubscription(TShowSubscriptionReq req) {
     try {
       return ((SubscriptionTableResp)
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfo.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfo.java
index 2329c05a30b..ea4ac3b69fa 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfo.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfo.java
@@ -44,6 +44,8 @@ import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
 
+import org.apache.thrift.annotation.Nullable;
+import org.apache.tsfile.utils.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,9 +56,11 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 
@@ -628,15 +632,18 @@ public class SubscriptionInfo implements 
SnapshotProcessor {
   private void checkBeforeUnsubscribeInternal(TUnsubscribeReq unsubscribeReq)
       throws SubscriptionException {
     // 1. Check if the consumer exists
-    if (!isConsumerExisted(unsubscribeReq.getConsumerGroupId(), 
unsubscribeReq.getConsumerId())) {
-      // There is no consumer with the same consumerId and consumerGroupId,
-      // we should end the procedure
-      final String exceptionMessage =
-          String.format(
-              "Failed to unsubscribe because the consumer %s in consumer group 
%s does not exist",
-              unsubscribeReq.getConsumerId(), 
unsubscribeReq.getConsumerGroupId());
-      LOGGER.warn(exceptionMessage);
-      throw new SubscriptionException(exceptionMessage);
+    // NOTE: consumer id may be null if drop subscription by session
+    if (Objects.nonNull(unsubscribeReq.getConsumerId())) {
+      if (!isConsumerExisted(unsubscribeReq.getConsumerGroupId(), 
unsubscribeReq.getConsumerId())) {
+        // There is no consumer with the same consumerId and consumerGroupId,
+        // we should end the procedure
+        final String exceptionMessage =
+            String.format(
+                "Failed to unsubscribe because the consumer %s in consumer 
group %s does not exist",
+                unsubscribeReq.getConsumerId(), 
unsubscribeReq.getConsumerGroupId());
+        LOGGER.warn(exceptionMessage);
+        throw new SubscriptionException(exceptionMessage);
+      }
     }
 
     // 2. Check if all topics exist. No need to check if already subscribed.
@@ -663,16 +670,28 @@ public class SubscriptionInfo implements 
SnapshotProcessor {
   }
 
   private List<SubscriptionMeta> getAllSubscriptionMeta() {
+    return getAllSubscriptionMetaInternal(null);
+  }
+
+  private List<SubscriptionMeta> getAllSubscriptionMetaInternal(
+      @Nullable Predicate<TopicMeta> predicate) {
     List<SubscriptionMeta> allSubscriptions = new ArrayList<>();
     for (TopicMeta topicMeta : topicMetaKeeper.getAllTopicMeta()) {
+      if (Objects.nonNull(predicate) && !predicate.test(topicMeta)) {
+        continue;
+      }
       for (String consumerGroupId :
           
consumerGroupMetaKeeper.getSubscribedConsumerGroupIds(topicMeta.getTopicName()))
 {
         Set<String> subscribedConsumerIDs =
             consumerGroupMetaKeeper.getConsumersSubscribingTopic(
                 consumerGroupId, topicMeta.getTopicName());
+        Optional<Long> creationTime =
+            consumerGroupMetaKeeper.getSubscriptionCreationTime(
+                consumerGroupId, topicMeta.getTopicName());
         if (!subscribedConsumerIDs.isEmpty()) {
           allSubscriptions.add(
-              new SubscriptionMeta(topicMeta, consumerGroupId, 
subscribedConsumerIDs));
+              new SubscriptionMeta(
+                  topicMeta, consumerGroupId, subscribedConsumerIDs, 
creationTime.orElse(null)));
         }
       }
     }
@@ -685,6 +704,26 @@ public class SubscriptionInfo implements SnapshotProcessor 
{
         .collect(Collectors.toList());
   }
 
+  public Optional<Pair<String, String>> parseSubscriptionId(
+      String subscriptionId, boolean isTableModel) {
+    acquireReadLock();
+    try {
+      List<SubscriptionMeta> allSubscriptions =
+          getAllSubscriptionMetaInternal(topicMeta -> 
topicMeta.visibleUnder(isTableModel));
+      for (SubscriptionMeta subscriptionMeta : allSubscriptions) {
+        if (Objects.equals(subscriptionId, 
subscriptionMeta.getSubscriptionId())) {
+          return Optional.of(
+              new Pair<>(
+                  subscriptionMeta.getTopicMeta().getTopicName(),
+                  subscriptionMeta.getConsumerGroupId()));
+        }
+      }
+      return Optional.empty();
+    } finally {
+      releaseReadLock();
+    }
+  }
+
   /////////////////////////////////  Snapshot  
/////////////////////////////////
 
   @Override
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 81d04f35ab9..c50b66ff85f 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -141,6 +141,7 @@ import 
org.apache.iotdb.confignode.rpc.thrift.TDropFunctionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropModelReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropPipePluginReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropPipeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDropSubscriptionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropTopicReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
 import org.apache.iotdb.confignode.rpc.thrift.TExtendRegionReq;
@@ -1294,6 +1295,11 @@ public class ConfigNodeRPCServiceProcessor implements 
IConfigNodeRPCService.Ifac
     return configManager.dropSubscription(req);
   }
 
+  @Override
+  public TSStatus dropSubscriptionById(TDropSubscriptionReq req) {
+    return configManager.dropSubscriptionById(req);
+  }
+
   @Override
   public TShowSubscriptionResp showSubscription(TShowSubscriptionReq req) {
     return configManager.showSubscription(req);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/audit/AuditLogger.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/audit/AuditLogger.java
index bb5f557ec0e..ed3efee9a1b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/audit/AuditLogger.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/audit/AuditLogger.java
@@ -210,6 +210,7 @@ public class AuditLogger {
       case RENAME_LOGICAL_VIEW:
       case CREATE_TOPIC:
       case DROP_TOPIC:
+      case DROP_SUBSCRIPTION:
         return AuditLogOperation.DDL;
       case LOAD_DATA:
       case INSERT:
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
index 53361b8022f..dbe0bc30997 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
@@ -100,6 +100,7 @@ import 
org.apache.iotdb.confignode.rpc.thrift.TDropFunctionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropModelReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropPipePluginReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropPipeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDropSubscriptionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropTopicReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
 import org.apache.iotdb.confignode.rpc.thrift.TExtendRegionReq;
@@ -1172,6 +1173,12 @@ public class ConfigNodeClient implements 
IConfigNodeRPCService.Iface, ThriftClie
         () -> client.dropSubscription(req), status -> 
!updateConfigNodeLeader(status));
   }
 
+  @Override
+  public TSStatus dropSubscriptionById(TDropSubscriptionReq req) throws 
TException {
+    return executeRemoteCallWithRetry(
+        () -> client.dropSubscriptionById(req), status -> 
!updateConfigNodeLeader(status));
+  }
+
   @Override
   public TShowSubscriptionResp showSubscription(TShowSubscriptionReq req) 
throws TException {
     return executeRemoteCallWithRetry(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java
index 8a343173545..e404fd6dbb9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java
@@ -101,6 +101,7 @@ import 
org.apache.iotdb.db.queryengine.plan.execution.config.sys.pipe.ShowPipeTa
 import 
org.apache.iotdb.db.queryengine.plan.execution.config.sys.pipe.StartPipeTask;
 import 
org.apache.iotdb.db.queryengine.plan.execution.config.sys.pipe.StopPipeTask;
 import 
org.apache.iotdb.db.queryengine.plan.execution.config.sys.subscription.CreateTopicTask;
+import 
org.apache.iotdb.db.queryengine.plan.execution.config.sys.subscription.DropSubscriptionTask;
 import 
org.apache.iotdb.db.queryengine.plan.execution.config.sys.subscription.DropTopicTask;
 import 
org.apache.iotdb.db.queryengine.plan.execution.config.sys.subscription.ShowSubscriptionsTask;
 import 
org.apache.iotdb.db.queryengine.plan.execution.config.sys.subscription.ShowTopicsTask;
@@ -132,6 +133,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropDB;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropFunction;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropPipe;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropPipePlugin;
+import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropSubscription;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropTable;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropTopic;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
@@ -1136,6 +1138,13 @@ public class TableConfigTaskVisitor extends 
AstVisitor<IConfigTask, MPPQueryCont
     return new ShowSubscriptionsTask(node);
   }
 
+  @Override
+  protected IConfigTask visitDropSubscription(DropSubscription node, 
MPPQueryContext context) {
+    context.setQueryType(QueryType.WRITE);
+    accessControl.checkUserIsAdmin(context.getSession().getUserName());
+    return new DropSubscriptionTask(node);
+  }
+
   @Override
   protected IConfigTask visitShowCurrentUser(ShowCurrentUser node, 
MPPQueryContext context) {
     context.setQueryType(QueryType.READ);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java
index 564776c7534..282839ec560 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java
@@ -104,6 +104,7 @@ import 
org.apache.iotdb.db.queryengine.plan.execution.config.sys.quota.SetThrott
 import 
org.apache.iotdb.db.queryengine.plan.execution.config.sys.quota.ShowSpaceQuotaTask;
 import 
org.apache.iotdb.db.queryengine.plan.execution.config.sys.quota.ShowThrottleQuotaTask;
 import 
org.apache.iotdb.db.queryengine.plan.execution.config.sys.subscription.CreateTopicTask;
+import 
org.apache.iotdb.db.queryengine.plan.execution.config.sys.subscription.DropSubscriptionTask;
 import 
org.apache.iotdb.db.queryengine.plan.execution.config.sys.subscription.DropTopicTask;
 import 
org.apache.iotdb.db.queryengine.plan.execution.config.sys.subscription.ShowSubscriptionsTask;
 import 
org.apache.iotdb.db.queryengine.plan.execution.config.sys.subscription.ShowTopicsTask;
@@ -158,6 +159,7 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.region.MigrateReg
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.region.ReconstructRegionStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.region.RemoveRegionStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.CreateTopicStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.DropSubscriptionStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.DropTopicStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.ShowSubscriptionsStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.ShowTopicsStatement;
@@ -621,6 +623,12 @@ public class TreeConfigTaskVisitor extends 
StatementVisitor<IConfigTask, MPPQuer
     return new ShowSubscriptionsTask(showSubscriptionsStatement);
   }
 
+  @Override
+  public IConfigTask visitDropSubscription(
+      DropSubscriptionStatement dropSubscriptionStatement, MPPQueryContext 
context) {
+    return new DropSubscriptionTask(dropSubscriptionStatement);
+  }
+
   @Override
   public IConfigTask visitDeleteTimeSeries(
       DeleteTimeSeriesStatement deleteTimeSeriesStatement, MPPQueryContext 
context) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index d3b6a139455..4f0b0c4ee1a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -105,6 +105,7 @@ import 
org.apache.iotdb.confignode.rpc.thrift.TDropFunctionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropModelReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropPipePluginReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropPipeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDropSubscriptionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropTopicReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
 import org.apache.iotdb.confignode.rpc.thrift.TExtendRegionReq;
@@ -256,6 +257,7 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.ShowPipesSta
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.StartPipeStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.StopPipeStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.CreateTopicStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.DropSubscriptionStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.DropTopicStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.ShowSubscriptionsStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.ShowTopicsStatement;
@@ -2378,6 +2380,29 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
     return future;
   }
 
+  @Override
+  public SettableFuture<ConfigTaskResult> dropSubscription(
+      final DropSubscriptionStatement dropSubscriptionStatement) {
+    final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
+    try (ConfigNodeClient configNodeClient =
+        
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
+      final TSStatus tsStatus =
+          configNodeClient.dropSubscriptionById(
+              new TDropSubscriptionReq()
+                  
.setSubsciptionId(dropSubscriptionStatement.getSubscriptionId())
+                  
.setIfExistsCondition(dropSubscriptionStatement.hasIfExistsCondition())
+                  .setIsTableModel(dropSubscriptionStatement.isTableModel()));
+      if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
+        future.setException(new IoTDBException(tsStatus.message, 
tsStatus.code));
+      } else {
+        future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+      }
+    } catch (Exception e) {
+      future.setException(e);
+    }
+    return future;
+  }
+
   @Override
   public SettableFuture<ConfigTaskResult> createTopic(
       final CreateTopicStatement createTopicStatement) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java
index 6e844322a9e..fe0b79fa962 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java
@@ -73,6 +73,7 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.ShowPipesSta
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.StartPipeStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.StopPipeStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.CreateTopicStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.DropSubscriptionStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.DropTopicStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.ShowSubscriptionsStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.ShowTopicsStatement;
@@ -223,6 +224,9 @@ public interface IConfigTaskExecutor {
   SettableFuture<ConfigTaskResult> showSubscriptions(
       ShowSubscriptionsStatement showSubscriptionsStatement);
 
+  SettableFuture<ConfigTaskResult> dropSubscription(
+      DropSubscriptionStatement dropSubscriptionStatement);
+
   SettableFuture<ConfigTaskResult> createTopic(CreateTopicStatement 
createTopicStatement);
 
   SettableFuture<ConfigTaskResult> dropTopic(DropTopicStatement 
dropTopicStatement);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/DropSubscriptionTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/DropSubscriptionTask.java
new file mode 100644
index 00000000000..bdf076a720e
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/DropSubscriptionTask.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.execution.config.sys.subscription;
+
+import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult;
+import org.apache.iotdb.db.queryengine.plan.execution.config.IConfigTask;
+import 
org.apache.iotdb.db.queryengine.plan.execution.config.executor.IConfigTaskExecutor;
+import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropSubscription;
+import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.DropSubscriptionStatement;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+public class DropSubscriptionTask implements IConfigTask {
+
+  private final DropSubscriptionStatement dropSubscriptionStatement;
+
+  public DropSubscriptionTask(final DropSubscriptionStatement 
dropSubscriptionStatement) {
+    this.dropSubscriptionStatement = dropSubscriptionStatement;
+  }
+
+  public DropSubscriptionTask(final DropSubscription dropSubscription) {
+    this.dropSubscriptionStatement = new DropSubscriptionStatement();
+    
this.dropSubscriptionStatement.setSubscriptionId(dropSubscription.getSubscriptionId());
+    
this.dropSubscriptionStatement.setIfExists(dropSubscription.hasIfExistsCondition());
+    this.dropSubscriptionStatement.setTableModel(true);
+  }
+
+  @Override
+  public ListenableFuture<ConfigTaskResult> execute(final IConfigTaskExecutor 
configTaskExecutor)
+      throws InterruptedException {
+    return configTaskExecutor.dropSubscription(dropSubscriptionStatement);
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/ShowSubscriptionsTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/ShowSubscriptionsTask.java
index c59a5d7b902..87e07063629 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/ShowSubscriptionsTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/ShowSubscriptionsTask.java
@@ -70,15 +70,24 @@ public class ShowSubscriptionsTask implements IConfigTask {
 
     for (final TShowSubscriptionInfo tSubscriptionInfo : subscriptionInfoList) 
{
       builder.getTimeColumnBuilder().writeLong(0L);
+      final StringBuilder subscriptionId =
+          new StringBuilder(
+              tSubscriptionInfo.getTopicName() + "_" + 
tSubscriptionInfo.getConsumerGroupId());
+      if (tSubscriptionInfo.getCreationTime() != 0) {
+        subscriptionId.append("_").append(tSubscriptionInfo.getCreationTime());
+      }
       builder
           .getColumnBuilder(0)
-          .writeBinary(new Binary(tSubscriptionInfo.getTopicName(), 
TSFileConfig.STRING_CHARSET));
+          .writeBinary(new Binary(subscriptionId.toString(), 
TSFileConfig.STRING_CHARSET));
       builder
           .getColumnBuilder(1)
+          .writeBinary(new Binary(tSubscriptionInfo.getTopicName(), 
TSFileConfig.STRING_CHARSET));
+      builder
+          .getColumnBuilder(2)
           .writeBinary(
               new Binary(tSubscriptionInfo.getConsumerGroupId(), 
TSFileConfig.STRING_CHARSET));
       builder
-          .getColumnBuilder(2)
+          .getColumnBuilder(3)
           .writeBinary(
               new Binary(
                   tSubscriptionInfo.getConsumerIds().toString(), 
TSFileConfig.STRING_CHARSET));
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
index 386eb4255d2..ea1a4bb5fc5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
@@ -190,6 +190,7 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.region.MigrateReg
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.region.ReconstructRegionStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.region.RemoveRegionStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.CreateTopicStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.DropSubscriptionStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.DropTopicStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.ShowSubscriptionsStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.ShowTopicsStatement;
@@ -4108,6 +4109,22 @@ public class ASTVisitor extends 
IoTDBSqlParserBaseVisitor<Statement> {
     return showSubscriptionsStatement;
   }
 
+  @Override
+  public Statement 
visitDropSubscription(IoTDBSqlParser.DropSubscriptionContext ctx) {
+    final DropSubscriptionStatement dropSubscriptionStatement = new 
DropSubscriptionStatement();
+
+    if (ctx.subscriptionId != null) {
+      
dropSubscriptionStatement.setSubscriptionId(parseIdentifier(ctx.subscriptionId.getText()));
+    } else {
+      throw new SemanticException(
+          "Not support for this sql in DROP SUBSCRIPTION, please enter 
subscriptionId.");
+    }
+
+    dropSubscriptionStatement.setIfExists(ctx.IF() != null && ctx.EXISTS() != 
null);
+
+    return dropSubscriptionStatement;
+  }
+
   @Override
   public Statement visitGetRegionId(IoTDBSqlParser.GetRegionIdContext ctx) {
     TConsensusGroupType type =
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java
index c8c95d1cc7e..4716dd5f156 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java
@@ -84,6 +84,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropFunction;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropIndex;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropPipe;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropPipePlugin;
+import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropSubscription;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropTable;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropTopic;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Except;
@@ -4055,6 +4056,11 @@ public class StatementAnalyzer {
       return createAndAssignScope(node, context);
     }
 
+    @Override
+    protected Scope visitDropSubscription(DropSubscription node, 
Optional<Scope> context) {
+      return createAndAssignScope(node, context);
+    }
+
     @Override
     public Scope visitTableFunctionInvocation(TableFunctionInvocation node, 
Optional<Scope> scope) {
       TableFunction function = 
metadata.getTableFunction(node.getName().toString());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java
index 7175853fb5b..0075a979384 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java
@@ -613,6 +613,10 @@ public abstract class AstVisitor<R, C> {
     return visitStatement(node, context);
   }
 
+  protected R visitDropSubscription(DropSubscription node, C context) {
+    return visitStatement(node, context);
+  }
+
   protected R visitShowVersion(ShowVersion node, C context) {
     return visitStatement(node, context);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/DropSubscription.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/DropSubscription.java
new file mode 100644
index 00000000000..24aba397013
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/DropSubscription.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.sql.ast;
+
+import java.util.Objects;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+import static java.util.Objects.requireNonNull;
+
+public class DropSubscription extends SubscriptionStatement {
+
+  private final String subscriptionId;
+  private final boolean ifExistsCondition;
+
+  public DropSubscription(final String subscriptionId, final boolean 
ifExistsCondition) {
+    this.subscriptionId = requireNonNull(subscriptionId, "subscription id can 
not be null");
+    this.ifExistsCondition = ifExistsCondition;
+  }
+
+  public String getSubscriptionId() {
+    return subscriptionId;
+  }
+
+  public boolean hasIfExistsCondition() {
+    return ifExistsCondition;
+  }
+
+  @Override
+  public <R, C> R accept(final AstVisitor<R, C> visitor, final C context) {
+    return visitor.visitDropSubscription(this, context);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(subscriptionId, ifExistsCondition);
+  }
+
+  @Override
+  public boolean equals(final Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null || getClass() != obj.getClass()) {
+      return false;
+    }
+    final DropSubscription that = (DropSubscription) obj;
+    return Objects.equals(this.subscriptionId, that.subscriptionId)
+        && Objects.equals(this.ifExistsCondition, that.ifExistsCondition);
+  }
+
+  @Override
+  public String toString() {
+    return toStringHelper(this)
+        .add("subscriptionId", subscriptionId)
+        .add("ifExistsCondition", ifExistsCondition)
+        .toString();
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java
index 259c9fe360d..d77c0e48841 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java
@@ -78,6 +78,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropFunction;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropIndex;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropPipe;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropPipePlugin;
+import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropSubscription;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropTable;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropTopic;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Except;
@@ -1067,6 +1068,14 @@ public class AstBuilder extends 
RelationalSqlBaseVisitor<Node> {
     return new ShowSubscriptions(topicName);
   }
 
+  @Override
+  public Node visitDropSubscriptionStatement(
+      RelationalSqlParser.DropSubscriptionStatementContext ctx) {
+    final String subscriptionId = ((Identifier) 
visit(ctx.identifier())).getValue();
+    final boolean hasIfExistsCondition = ctx.IF() != null && ctx.EXISTS() != 
null;
+    return new DropSubscription(subscriptionId, hasIfExistsCondition);
+  }
+
   @Override
   public Node visitShowDevicesStatement(final 
RelationalSqlParser.ShowDevicesStatementContext ctx) {
     final QualifiedName name = getQualifiedName(ctx.tableName);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/util/SqlFormatter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/util/SqlFormatter.java
index 8c042dc5116..d744eae54e5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/util/SqlFormatter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/util/SqlFormatter.java
@@ -38,6 +38,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropDB;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropFunction;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropPipe;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropPipePlugin;
+import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropSubscription;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropTable;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropTopic;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Except;
@@ -1207,6 +1208,28 @@ public final class SqlFormatter {
       return null;
     }
 
+    @Override
+    protected Void visitShowSubscriptions(ShowSubscriptions node, Integer 
context) {
+      if (Objects.isNull(node.getTopicName())) {
+        builder.append("SHOW SUBSCRIPTIONS");
+      } else {
+        builder.append("SHOW SUBSCRIPTIONS ON ").append(node.getTopicName());
+      }
+
+      return null;
+    }
+
+    @Override
+    protected Void visitDropSubscription(DropSubscription node, Integer 
context) {
+      builder.append("DROP SUBSCRIPTION ");
+      if (node.hasIfExistsCondition()) {
+        builder.append("IF EXISTS ");
+      }
+      builder.append(node.getSubscriptionId());
+
+      return null;
+    }
+
     @Override
     protected Void visitRelationalAuthorPlan(RelationalAuthorStatement node, 
Integer context) {
       switch (node.getAuthorType()) {
@@ -1429,17 +1452,6 @@ public final class SqlFormatter {
       return null;
     }
 
-    @Override
-    protected Void visitShowSubscriptions(ShowSubscriptions node, Integer 
context) {
-      if (Objects.isNull(node.getTopicName())) {
-        builder.append("SHOW SUBSCRIPTIONS");
-      } else {
-        builder.append("SHOW SUBSCRIPTIONS ON ").append(node.getTopicName());
-      }
-
-      return null;
-    }
-
     private void appendBeginLabel(Optional<Identifier> label) {
       label.ifPresent(value -> builder.append(formatName(value)).append(": "));
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java
index 296fd1137ba..4d5fe5dbf3f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java
@@ -183,6 +183,7 @@ public enum StatementType {
   DROP_TOPIC,
   SHOW_TOPICS,
   SHOW_SUBSCRIPTIONS,
+  DROP_SUBSCRIPTION,
 
   SET_CONFIGURATION
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java
index 4ecfcb65079..d775af00133 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java
@@ -94,6 +94,7 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.region.MigrateReg
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.region.ReconstructRegionStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.region.RemoveRegionStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.CreateTopicStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.DropSubscriptionStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.DropTopicStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.ShowSubscriptionsStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.ShowTopicsStatement;
@@ -564,6 +565,10 @@ public abstract class StatementVisitor<R, C> {
     return visitStatement(showSubscriptionsStatement, context);
   }
 
+  public R visitDropSubscription(DropSubscriptionStatement 
dropSubscriptionStatement, C context) {
+    return visitStatement(dropSubscriptionStatement, context);
+  }
+
   public R visitGetRegionId(GetRegionIdStatement getRegionIdStatement, C 
context) {
     return visitStatement(getRegionIdStatement, context);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/subscription/DropSubscriptionStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/subscription/DropSubscriptionStatement.java
new file mode 100644
index 00000000000..53ecb6164fa
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/subscription/DropSubscriptionStatement.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.auth.entity.PrivilegeType;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.auth.AuthorityChecker;
+import org.apache.iotdb.db.queryengine.plan.analyze.QueryType;
+import org.apache.iotdb.db.queryengine.plan.statement.IConfigStatement;
+import org.apache.iotdb.db.queryengine.plan.statement.Statement;
+import org.apache.iotdb.db.queryengine.plan.statement.StatementType;
+import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import java.util.Collections;
+import java.util.List;
+
+public class DropSubscriptionStatement extends Statement implements 
IConfigStatement {
+
+  private String subscriptionId;
+  private boolean ifExistsCondition;
+  private boolean isTableModel;
+
+  public DropSubscriptionStatement() {
+    super();
+    statementType = StatementType.DROP_SUBSCRIPTION;
+  }
+
+  public String getSubscriptionId() {
+    return subscriptionId;
+  }
+
+  public boolean hasIfExistsCondition() {
+    return ifExistsCondition;
+  }
+
+  public boolean isTableModel() {
+    return isTableModel;
+  }
+
+  public void setSubscriptionId(String subscriptionId) {
+    this.subscriptionId = subscriptionId;
+  }
+
+  public void setIfExists(boolean ifExistsCondition) {
+    this.ifExistsCondition = ifExistsCondition;
+  }
+
+  public void setTableModel(final boolean tableModel) {
+    this.isTableModel = tableModel;
+  }
+
+  @Override
+  public QueryType getQueryType() {
+    return QueryType.WRITE;
+  }
+
+  @Override
+  public List<PartialPath> getPaths() {
+    return Collections.emptyList();
+  }
+
+  @Override
+  public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+    return visitor.visitDropSubscription(this, context);
+  }
+
+  @Override
+  public TSStatus checkPermissionBeforeProcess(String userName) {
+    if (AuthorityChecker.SUPER_USER.equals(userName)) {
+      return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+    }
+    return AuthorityChecker.getTSStatus(
+        AuthorityChecker.checkSystemPermission(userName, 
PrivilegeType.USE_PIPE),
+        PrivilegeType.USE_PIPE);
+  }
+}
diff --git a/iotdb-core/node-commons/pom.xml b/iotdb-core/node-commons/pom.xml
index 0a43dbfbd92..9751fb3797d 100644
--- a/iotdb-core/node-commons/pom.xml
+++ b/iotdb-core/node-commons/pom.xml
@@ -171,6 +171,10 @@
             <groupId>org.reflections</groupId>
             <artifactId>reflections</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.checkerframework</groupId>
+            <artifactId>checker-qual</artifactId>
+        </dependency>
         <dependency>
             <groupId>org.mockito</groupId>
             <artifactId>mockito-core</artifactId>
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/column/ColumnHeaderConstant.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/column/ColumnHeaderConstant.java
index eb1fbebe194..9bb6c8cdaa6 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/column/ColumnHeaderConstant.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/column/ColumnHeaderConstant.java
@@ -129,6 +129,7 @@ public class ColumnHeaderConstant {
   // column names for show subscriptions statement
   public static final String CONSUMER_GROUP_NAME = "ConsumerGroupName";
   public static final String SUBSCRIBED_CONSUMERS = "SubscribedConsumers";
+  public static final String SUBSCRIPTION_ID = "SubscriptionID";
 
   // show cluster status
   public static final String NODE_TYPE_CONFIG_NODE = "ConfigNode";
@@ -515,6 +516,7 @@ public class ColumnHeaderConstant {
 
   public static final List<ColumnHeader> showSubscriptionColumnHeaders =
       ImmutableList.of(
+          new ColumnHeader(SUBSCRIPTION_ID, TSDataType.TEXT),
           new ColumnHeader(TOPIC_NAME, TSDataType.TEXT),
           new ColumnHeader(CONSUMER_GROUP_NAME, TSDataType.TEXT),
           new ColumnHeader(SUBSCRIBED_CONSUMERS, TSDataType.TEXT));
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMeta.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMeta.java
index 13d010a636a..728f584d92b 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMeta.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMeta.java
@@ -21,8 +21,10 @@ package org.apache.iotdb.commons.subscription.meta.consumer;
 
 import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
 
+import org.apache.thrift.annotation.Nullable;
 import org.apache.tsfile.utils.PublicBAOS;
 import org.apache.tsfile.utils.ReadWriteIOUtils;
+import org.checkerframework.checker.nullness.qual.NonNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,8 +37,10 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
 
 public class ConsumerGroupMeta {
 
@@ -46,10 +50,12 @@ public class ConsumerGroupMeta {
   private long creationTime;
   private Map<String, Set<String>> topicNameToSubscribedConsumerIdSet;
   private Map<String, ConsumerMeta> consumerIdToConsumerMeta;
+  private Map<String, Long> topicNameToSubscriptionCreationTime; // used when 
creationTime < 0
 
   public ConsumerGroupMeta() {
     this.topicNameToSubscribedConsumerIdSet = new ConcurrentHashMap<>();
     this.consumerIdToConsumerMeta = new ConcurrentHashMap<>();
+    this.topicNameToSubscriptionCreationTime = new ConcurrentHashMap<>();
   }
 
   public ConsumerGroupMeta(
@@ -57,7 +63,7 @@ public class ConsumerGroupMeta {
     this();
 
     this.consumerGroupId = consumerGroupId;
-    this.creationTime = creationTime;
+    this.creationTime = -creationTime;
 
     consumerIdToConsumerMeta.put(firstConsumerMeta.getConsumerId(), 
firstConsumerMeta);
   }
@@ -69,6 +75,8 @@ public class ConsumerGroupMeta {
     copied.topicNameToSubscribedConsumerIdSet =
         new ConcurrentHashMap<>(topicNameToSubscribedConsumerIdSet);
     copied.consumerIdToConsumerMeta = new 
ConcurrentHashMap<>(consumerIdToConsumerMeta);
+    copied.topicNameToSubscriptionCreationTime =
+        new ConcurrentHashMap<>(topicNameToSubscriptionCreationTime);
     return copied;
   }
 
@@ -77,7 +85,11 @@ public class ConsumerGroupMeta {
   }
 
   public long getCreationTime() {
-    return creationTime;
+    return Math.abs(creationTime);
+  }
+
+  private boolean shouldRecordSubscriptionCreationTime() {
+    return creationTime < 0;
   }
 
   public static /* @NonNull */ Set<String> getTopicsUnsubByGroup(
@@ -127,7 +139,6 @@ public class ConsumerGroupMeta {
       LOGGER.warn(exceptionMessage);
       throw new SubscriptionException(exceptionMessage);
     }
-    return;
   }
 
   public void addConsumer(final ConsumerMeta consumerMeta) {
@@ -172,6 +183,12 @@ public class ConsumerGroupMeta {
     return topicNameToSubscribedConsumerIdSet.getOrDefault(topic, 
Collections.emptySet());
   }
 
+  public Optional<Long> getSubscriptionTime(final String topic) {
+    return shouldRecordSubscriptionCreationTime()
+        ? Optional.ofNullable(topicNameToSubscriptionCreationTime.get(topic))
+        : Optional.empty();
+  }
+
   public Set<String> getTopicsSubscribedByConsumer(final String consumerId) {
     final Set<String> topics = new HashSet<>();
     for (final Map.Entry<String, Set<String>> topicNameToSubscribedConsumerId :
@@ -218,15 +235,35 @@ public class ConsumerGroupMeta {
 
     for (final String topic : topics) {
       topicNameToSubscribedConsumerIdSet
-          .computeIfAbsent(topic, k -> new HashSet<>())
+          .computeIfAbsent(
+              topic,
+              k -> {
+                if (shouldRecordSubscriptionCreationTime()) {
+                  topicNameToSubscriptionCreationTime.put(topic, 
System.currentTimeMillis());
+                }
+                return new HashSet<>();
+              })
           .add(consumerId);
     }
   }
 
   /**
    * @return topics subscribed by no consumers in this group after this 
removal.
+   * @param consumerId if null, remove subscriptions of topics for all 
consumers
    */
-  public Set<String> removeSubscription(final String consumerId, final 
Set<String> topics) {
+  public Set<String> removeSubscription(
+      @Nullable final String consumerId, final Set<String> topics) {
+    if (Objects.isNull(consumerId)) {
+      return consumerIdToConsumerMeta.keySet().stream()
+          .map(id -> removeSubscriptionInternal(id, topics))
+          .flatMap(Set::stream)
+          .collect(Collectors.toSet());
+    }
+    return removeSubscriptionInternal(consumerId, topics);
+  }
+
+  private Set<String> removeSubscriptionInternal(
+      @NonNull final String consumerId, final Set<String> topics) {
     if (!consumerIdToConsumerMeta.containsKey(consumerId)) {
       throw new SubscriptionException(
           String.format(
@@ -239,8 +276,12 @@ public class ConsumerGroupMeta {
       if (topicNameToSubscribedConsumerIdSet.containsKey(topic)) {
         topicNameToSubscribedConsumerIdSet.get(topic).remove(consumerId);
         if (topicNameToSubscribedConsumerIdSet.get(topic).isEmpty()) {
+          // remove subscription for consumer group
           noSubscriptionTopicAfterRemoval.add(topic);
           topicNameToSubscribedConsumerIdSet.remove(topic);
+          if (shouldRecordSubscriptionCreationTime()) {
+            topicNameToSubscriptionCreationTime.remove(topic);
+          }
         }
       }
     }
@@ -275,6 +316,14 @@ public class ConsumerGroupMeta {
       ReadWriteIOUtils.write(entry.getKey(), outputStream);
       entry.getValue().serialize(outputStream);
     }
+
+    if (shouldRecordSubscriptionCreationTime()) {
+      ReadWriteIOUtils.write(topicNameToSubscriptionCreationTime.size(), 
outputStream);
+      for (final Map.Entry<String, Long> entry : 
topicNameToSubscriptionCreationTime.entrySet()) {
+        ReadWriteIOUtils.write(entry.getKey(), outputStream);
+        ReadWriteIOUtils.write(entry.getValue(), outputStream);
+      }
+    }
   }
 
   public static ConsumerGroupMeta deserialize(final InputStream inputStream) 
throws IOException {
@@ -305,6 +354,16 @@ public class ConsumerGroupMeta {
       consumerGroupMeta.consumerIdToConsumerMeta.put(key, value);
     }
 
+    consumerGroupMeta.topicNameToSubscriptionCreationTime = new 
ConcurrentHashMap<>();
+    if (consumerGroupMeta.shouldRecordSubscriptionCreationTime()) {
+      size = ReadWriteIOUtils.readInt(inputStream);
+      for (int i = 0; i < size; ++i) {
+        final String key = ReadWriteIOUtils.readString(inputStream);
+        final long value = ReadWriteIOUtils.readLong(inputStream);
+        consumerGroupMeta.topicNameToSubscriptionCreationTime.put(key, value);
+      }
+    }
+
     return consumerGroupMeta;
   }
 
@@ -336,6 +395,16 @@ public class ConsumerGroupMeta {
       consumerGroupMeta.consumerIdToConsumerMeta.put(key, value);
     }
 
+    consumerGroupMeta.topicNameToSubscriptionCreationTime = new 
ConcurrentHashMap<>();
+    if (consumerGroupMeta.shouldRecordSubscriptionCreationTime()) {
+      size = ReadWriteIOUtils.readInt(byteBuffer);
+      for (int i = 0; i < size; ++i) {
+        final String key = ReadWriteIOUtils.readString(byteBuffer);
+        final long value = ReadWriteIOUtils.readLong(byteBuffer);
+        consumerGroupMeta.topicNameToSubscriptionCreationTime.put(key, value);
+      }
+    }
+
     return consumerGroupMeta;
   }
 
@@ -350,11 +419,13 @@ public class ConsumerGroupMeta {
       return false;
     }
     final ConsumerGroupMeta that = (ConsumerGroupMeta) obj;
-    return Objects.equals(consumerGroupId, that.consumerGroupId)
-        && creationTime == that.creationTime
+    return Objects.equals(this.consumerGroupId, that.consumerGroupId)
+        && this.creationTime == that.creationTime
+        && Objects.equals(
+            this.topicNameToSubscribedConsumerIdSet, 
that.topicNameToSubscribedConsumerIdSet)
+        && Objects.equals(this.consumerIdToConsumerMeta, 
that.consumerIdToConsumerMeta)
         && Objects.equals(
-            topicNameToSubscribedConsumerIdSet, 
that.topicNameToSubscribedConsumerIdSet)
-        && Objects.equals(consumerIdToConsumerMeta, 
that.consumerIdToConsumerMeta);
+            this.topicNameToSubscriptionCreationTime, 
that.topicNameToSubscriptionCreationTime);
   }
 
   @Override
@@ -363,7 +434,8 @@ public class ConsumerGroupMeta {
         consumerGroupId,
         creationTime,
         topicNameToSubscribedConsumerIdSet,
-        consumerIdToConsumerMeta);
+        consumerIdToConsumerMeta,
+        topicNameToSubscriptionCreationTime);
   }
 
   @Override
@@ -372,11 +444,13 @@ public class ConsumerGroupMeta {
         + "consumerGroupId='"
         + consumerGroupId
         + "', creationTime="
-        + creationTime
+        + getCreationTime()
         + ", topicNameToSubscribedConsumerIdSet="
         + topicNameToSubscribedConsumerIdSet
         + ", consumerIdToConsumerMeta="
         + consumerIdToConsumerMeta
+        + ", topicNameToSubscriptionCreationTime="
+        + topicNameToSubscriptionCreationTime
         + "}";
   }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMetaKeeper.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMetaKeeper.java
index c2775b204a9..34dbbbd761d 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMetaKeeper.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMetaKeeper.java
@@ -28,6 +28,7 @@ import java.util.Collections;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -91,6 +92,12 @@ public class ConsumerGroupMetaKeeper {
         : Collections.emptySet();
   }
 
+  public Optional<Long> getSubscriptionCreationTime(String consumerGroupId, 
String topic) {
+    return consumerGroupIdToConsumerGroupMetaMap.containsKey(consumerGroupId)
+        ? 
consumerGroupIdToConsumerGroupMetaMap.get(consumerGroupId).getSubscriptionTime(topic)
+        : Optional.empty();
+  }
+
   public Set<String> getTopicsSubscribedByConsumer(String consumerGroupId, 
String consumerId) {
     return consumerGroupIdToConsumerGroupMetaMap.containsKey(consumerGroupId)
         ? consumerGroupIdToConsumerGroupMetaMap
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/subscription/SubscriptionMeta.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/subscription/SubscriptionMeta.java
index 36935053c92..8f644e1aa20 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/subscription/SubscriptionMeta.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/subscription/SubscriptionMeta.java
@@ -21,15 +21,8 @@ package 
org.apache.iotdb.commons.subscription.meta.subscription;
 
 import org.apache.iotdb.commons.subscription.meta.topic.TopicMeta;
 
-import org.apache.tsfile.utils.PublicBAOS;
-import org.apache.tsfile.utils.ReadWriteIOUtils;
-
-import java.io.DataOutputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-import java.util.HashSet;
+import java.util.Objects;
+import java.util.Optional;
 import java.util.Set;
 
 /** SubscriptionMeta is created for show subscription and is not stored in 
meta keeper. */
@@ -38,15 +31,18 @@ public class SubscriptionMeta {
   private TopicMeta topicMeta;
   private String consumerGroupId;
   private Set<String> consumerIds;
+  private Long creationTime;
 
   private SubscriptionMeta() {
     // Empty constructor
   }
 
-  public SubscriptionMeta(TopicMeta topicMeta, String consumerGroupId, 
Set<String> consumerIds) {
+  public SubscriptionMeta(
+      TopicMeta topicMeta, String consumerGroupId, Set<String> consumerIds, 
Long creationTime) {
     this.topicMeta = topicMeta;
     this.consumerGroupId = consumerGroupId;
     this.consumerIds = consumerIds;
+    this.creationTime = creationTime;
   }
 
   public TopicMeta getTopicMeta() {
@@ -61,60 +57,14 @@ public class SubscriptionMeta {
     return consumerIds;
   }
 
-  public ByteBuffer serialize() throws IOException {
-    PublicBAOS byteArrayOutputStream = new PublicBAOS();
-    DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream);
-    serialize(outputStream);
-    return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size());
-  }
-
-  public void serialize(DataOutputStream outputStream) throws IOException {
-    topicMeta.serialize(outputStream);
-    ReadWriteIOUtils.write(consumerGroupId, outputStream);
-
-    ReadWriteIOUtils.write(consumerIds.size(), outputStream);
-    for (String consumerId : consumerIds) {
-      ReadWriteIOUtils.write(consumerId, outputStream);
-    }
-  }
-
-  public void serialize(FileOutputStream outputStream) throws IOException {
-    topicMeta.serialize(outputStream);
-    ReadWriteIOUtils.write(consumerGroupId, outputStream);
-
-    ReadWriteIOUtils.write(consumerIds.size(), outputStream);
-    for (String consumerId : consumerIds) {
-      ReadWriteIOUtils.write(consumerId, outputStream);
-    }
+  public Optional<Long> getCreationTime() {
+    return Objects.nonNull(creationTime) ? Optional.of(creationTime) : 
Optional.empty();
   }
 
-  public static SubscriptionMeta deserialize(InputStream inputStream) throws 
IOException {
-    final SubscriptionMeta subscriptionMeta = new SubscriptionMeta();
-
-    subscriptionMeta.topicMeta = TopicMeta.deserialize(inputStream);
-    subscriptionMeta.consumerGroupId = 
ReadWriteIOUtils.readString(inputStream);
-    subscriptionMeta.consumerIds = new HashSet<>();
-
-    int size = ReadWriteIOUtils.readInt(inputStream);
-    for (int i = 0; i < size; i++) {
-      
subscriptionMeta.consumerIds.add(ReadWriteIOUtils.readString(inputStream));
-    }
-
-    return subscriptionMeta;
-  }
-
-  public static SubscriptionMeta deserialize(ByteBuffer byteBuffer) {
-    final SubscriptionMeta subscriptionMeta = new SubscriptionMeta();
-
-    subscriptionMeta.topicMeta = TopicMeta.deserialize(byteBuffer);
-    subscriptionMeta.consumerGroupId = ReadWriteIOUtils.readString(byteBuffer);
-    subscriptionMeta.consumerIds = new HashSet<>();
-
-    int size = ReadWriteIOUtils.readInt(byteBuffer);
-    for (int i = 0; i < size; i++) {
-      
subscriptionMeta.consumerIds.add(ReadWriteIOUtils.readString(byteBuffer));
-    }
-
-    return subscriptionMeta;
+  public String getSubscriptionId() {
+    final StringBuilder subscriptionId =
+        new StringBuilder(topicMeta.getTopicName() + "_" + consumerGroupId);
+    getCreationTime().ifPresent(creationTime -> 
subscriptionId.append("_").append(creationTime));
+    return subscriptionId.toString();
   }
 }
diff --git 
a/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4
 
b/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4
index 829a7880754..42fedcb9707 100644
--- 
a/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4
+++ 
b/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4
@@ -92,6 +92,7 @@ statement
     | dropTopicStatement
     | showTopicsStatement
     | showSubscriptionsStatement
+    | dropSubscriptionStatement
 
     // Show Statement
     | showDevicesStatement
@@ -435,6 +436,10 @@ showSubscriptionsStatement
     : SHOW SUBSCRIPTIONS (ON topicName=identifier)?
     ;
 
+dropSubscriptionStatement
+    : DROP SUBSCRIPTION (IF EXISTS)? subscriptionId=identifier
+    ;
+
 
 // -------------------------------------------- Show Statement 
---------------------------------------------------------
 showDevicesStatement
@@ -1181,7 +1186,7 @@ nonReserved
     | QUERIES | QUERY | QUOTES
     | RANGE | READ | READONLY | RECONSTRUCT | REFRESH | REGION | REGIONID | 
REGIONS | REMOVE | RENAME | REPAIR | REPEAT  | REPEATABLE | REPLACE | RESET | 
RESPECT | RESTRICT | RETURN | RETURNING | RETURNS | REVOKE | ROLE | ROLES | 
ROLLBACK | ROW | ROWS | RUNNING
     | SERIESSLOTID | SCALAR | SCHEMA | SCHEMAS | SECOND | SECURITY | SEEK | 
SERIALIZABLE | SESSION | SET | SETS
-    | SHOW | SINK | SOME | SOURCE | START | STATS | STOP | SUBSCRIPTIONS | 
SUBSET | SUBSTRING | SYSTEM
+    | SHOW | SINK | SOME | SOURCE | START | STATS | STOP | SUBSCRIPTION | 
SUBSCRIPTIONS | SUBSET | SUBSTRING | SYSTEM
     | TABLES | TABLESAMPLE | TAG | TEXT | TEXT_STRING | TIES | TIME | 
TIMEPARTITION | TIMER | TIMER_XL | TIMESERIES | TIMESLOTID | TIMESTAMP | TO | 
TOPIC | TOPICS | TRAILING | TRANSACTION | TRUNCATE | TRY_CAST | TYPE
     | UNBOUNDED | UNCOMMITTED | UNCONDITIONAL | UNIQUE | UNKNOWN | UNMATCHED | 
UNTIL | UPDATE | URI | USE | USED | USER | UTF16 | UTF32 | UTF8
     | VALIDATE | VALUE | VARIABLES | VARIATION | VERBOSE | VERSION | VIEW
@@ -1506,6 +1511,7 @@ SQL_DIALECT: 'SQL_DIALECT';
 START: 'START';
 STATS: 'STATS';
 STOP: 'STOP';
+SUBSCRIPTION: 'SUBSCRIPTION';
 SUBSCRIPTIONS: 'SUBSCRIPTIONS';
 SUBSET: 'SUBSET';
 SUBSTRING: 'SUBSTRING';
diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift 
b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
index b2b201db7c8..ca75e28f823 100644
--- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
@@ -959,6 +959,13 @@ struct TShowSubscriptionInfo {
     1: required string topicName
     2: required string consumerGroupId
     3: required set<string> consumerIds
+    4: optional i64 creationTime
+}
+
+struct TDropSubscriptionReq {
+    1: required string subsciptionId
+    2: optional bool ifExistsCondition
+    3: optional bool isTableModel
 }
 
 struct TGetAllSubscriptionInfoResp {
@@ -1860,9 +1867,12 @@ service IConfigNodeRPCService {
   /** Create subscription */
   common.TSStatus createSubscription(TSubscribeReq req)
 
-  /** Close subscription */
+  /** Close subscription by consumer */
   common.TSStatus dropSubscription(TUnsubscribeReq req)
 
+  /** Close subscription by session */
+  common.TSStatus dropSubscriptionById(TDropSubscriptionReq req)
+
   /** Show Subscription on topic name, if name is empty, show all 
subscriptions */
   TShowSubscriptionResp showSubscription(TShowSubscriptionReq req)
 

Reply via email to