This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new dcdbbe524e9 Subscription: improve parsing logic when using JAVA SDK 
client & refactor subscription IT & intro `getSubscribedTopicNames` API (#12721)
dcdbbe524e9 is described below

commit dcdbbe524e934c7d330e476198faea4b443b42de
Author: V_Galaxy <[email protected]>
AuthorDate: Wed Jun 12 19:29:35 2024 +0800

    Subscription: improve parsing logic when using JAVA SDK client & refactor 
subscription IT & intro `getSubscribedTopicNames` API (#12721)
---
 .../apache/iotdb/SubscriptionSessionExample.java   |  48 +++++---
 ...tionDualIT.java => AbstractSubscriptionIT.java} |  36 +-----
 .../it/IoTDBSubscriptionITConstant.java            |  19 ++-
 .../IoTDBSubscriptionRestartIT.java                |  53 ++++-----
 .../it/dual/AbstractSubscriptionDualIT.java        |  24 ++--
 .../it/dual/IoTDBSubscriptionConsumerGroupIT.java  |  31 ++---
 .../it/dual/IoTDBSubscriptionTimePrecisionIT.java  |  33 +++---
 .../it/dual/IoTDBSubscriptionTopicIT.java          | 131 +++++++++------------
 .../AbstractSubscriptionLocalIT.java}              |  29 +++--
 .../it/local/IoTDBSubscriptionBasicIT.java         | 117 ++++++------------
 .../it/local/IoTDBSubscriptionIdempotentIT.java    |  14 +--
 .../SubscriptionIdentifierSemanticException.java   |  26 ++--
 .../response/PipeSubscribeSubscribeResp.java       |  51 +++++++-
 .../response/PipeSubscribeUnsubscribeResp.java     |  51 +++++++-
 .../consumer/SubscriptionConsumer.java             | 102 ++++++++++++----
 .../consumer/SubscriptionProvider.java             |  12 +-
 .../consumer/SubscriptionPullConsumer.java         |   9 +-
 .../consumer/SubscriptionPushConsumer.java         |   8 +-
 .../payload/SubscriptionFileHandler.java           |  20 ++--
 .../session/subscription/util/IdentifierUtils.java |  51 ++++++++
 .../agent/SubscriptionAgentLauncher.java           |   8 +-
 .../agent/SubscriptionBrokerAgent.java             |  11 +-
 .../agent/SubscriptionConsumerAgent.java           |  22 ++--
 .../agent/SubscriptionReceiverAgent.java           |   6 +-
 .../subscription/agent/SubscriptionTopicAgent.java |  20 ++--
 .../db/subscription/broker/SubscriptionBroker.java |  33 +++++-
 .../broker/SubscriptionPrefetchingQueue.java       |  28 ++++-
 .../SubscriptionPrefetchingTabletsQueue.java       |   9 ++
 .../broker/SubscriptionPrefetchingTsFileQueue.java |  24 ++++
 .../db/subscription/event/SubscriptionEvent.java   |   6 +
 .../receiver/SubscriptionReceiverV1.java           |  36 +++---
 .../task/subtask/SubscriptionConnectorSubtask.java |  18 +--
 .../SubscriptionConnectorSubtaskLifeCycle.java     |  54 ++-------
 33 files changed, 673 insertions(+), 467 deletions(-)

diff --git 
a/example/session/src/main/java/org/apache/iotdb/SubscriptionSessionExample.java
 
b/example/session/src/main/java/org/apache/iotdb/SubscriptionSessionExample.java
index 62c869e02bf..3aea4e0a762 100644
--- 
a/example/session/src/main/java/org/apache/iotdb/SubscriptionSessionExample.java
+++ 
b/example/session/src/main/java/org/apache/iotdb/SubscriptionSessionExample.java
@@ -30,9 +30,13 @@ import 
org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
 import 
org.apache.iotdb.session.subscription.payload.SubscriptionSessionDataSet;
 
 import org.apache.tsfile.read.TsFileReader;
+import org.apache.tsfile.read.common.Path;
+import org.apache.tsfile.read.expression.QueryExpression;
+import org.apache.tsfile.read.query.dataset.QueryDataSet;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
@@ -46,11 +50,12 @@ public class SubscriptionSessionExample {
   private static final int PORT = 6667;
 
   private static final String TOPIC_1 = "topic1";
-  private static final String TOPIC_2 = "`topic2`";
+  private static final String TOPIC_2 = "`'topic2'`";
 
-  public static final long SLEEP_NS = 1_000_000_000L;
-  public static final long POLL_TIMEOUT_MS = 10_000L;
+  private static final long SLEEP_NS = 1_000_000_000L;
+  private static final long POLL_TIMEOUT_MS = 10_000L;
   private static final int MAX_RETRY_TIMES = 3;
+  private static final int PARALLELISM = 8;
 
   private static void prepareData() throws Exception {
     // Open session
@@ -66,13 +71,13 @@ public class SubscriptionSessionExample {
 
     // Insert some historical data
     final long currentTime = System.currentTimeMillis();
-    for (int i = 0; i < 10000; ++i) {
+    for (int i = 0; i < 100; ++i) {
       session.executeNonQueryStatement(
           String.format("insert into root.db.d1(time, s1, s2) values (%s, 1, 
2)", i));
       session.executeNonQueryStatement(
-          String.format("insert into root.db.d2(time, s3, s4) values (%s, 3, 
4)", currentTime + i));
+          String.format("insert into root.db.d2(time, s1, s2) values (%s, 3, 
4)", currentTime + i));
       session.executeNonQueryStatement(
-          String.format("insert into root.sg.d3(time, s5) values (%s, 5)", 
currentTime + 2 * i));
+          String.format("insert into root.sg.d3(time, s1) values (%s, 5)", 
currentTime + 2 * i));
     }
     session.executeNonQueryStatement("flush");
 
@@ -104,11 +109,16 @@ public class SubscriptionSessionExample {
     session = null;
   }
 
-  private static void subscriptionExample1() throws Exception {
+  /** single pull consumer subscribe topic with path and time range */
+  private static void dataSubscription1() throws Exception {
     // Create topics
     try (final SubscriptionSession subscriptionSession = new 
SubscriptionSession(HOST, PORT)) {
       subscriptionSession.open();
-      subscriptionSession.createTopic(TOPIC_1);
+      final Properties config = new Properties();
+      config.put(TopicConstant.PATH_KEY, "root.db.d1.s1");
+      config.put(TopicConstant.START_TIME_KEY, 25);
+      config.put(TopicConstant.END_TIME_KEY, 75);
+      subscriptionSession.createTopic(TOPIC_1, config);
     }
 
     int retryCount = 0;
@@ -151,7 +161,8 @@ public class SubscriptionSessionExample {
     consumer1.close();
   }
 
-  private static void subscriptionExample2() throws Exception {
+  /** multi pull consumer subscribe topic with tsfile format */
+  private static void dataSubscription2() throws Exception {
     try (final SubscriptionSession subscriptionSession = new 
SubscriptionSession(HOST, PORT)) {
       subscriptionSession.open();
       final Properties config = new Properties();
@@ -160,7 +171,7 @@ public class SubscriptionSessionExample {
     }
 
     final List<Thread> threads = new ArrayList<>();
-    for (int i = 0; i < 8; ++i) {
+    for (int i = 0; i < PARALLELISM; ++i) {
       final int idx = i;
       final Thread thread =
           new Thread(
@@ -187,7 +198,16 @@ public class SubscriptionSessionExample {
                     }
                     for (final SubscriptionMessage message : messages) {
                       try (final TsFileReader reader = 
message.getTsFileHandler().openReader()) {
-                        // do something...
+                        final QueryDataSet dataSet =
+                            reader.query(
+                                QueryExpression.create(
+                                    Arrays.asList(
+                                        new Path("root.db.d2", "s2", true),
+                                        new Path("root.db.d3", "s1", true)),
+                                    null));
+                        while (dataSet.hasNext()) {
+                          System.out.println(dataSet.next());
+                        }
                       }
                     }
                     consumer2.commitSync(messages);
@@ -208,8 +228,8 @@ public class SubscriptionSessionExample {
 
   public static void main(final String[] args) throws Exception {
     prepareData();
-    dataQuery();
-    subscriptionExample1();
-    subscriptionExample2();
+    // dataQuery();
+    // dataSubscription1();
+    dataSubscription2();
   }
 }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/AbstractSubscriptionDualIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/AbstractSubscriptionIT.java
similarity index 59%
copy from 
integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/AbstractSubscriptionDualIT.java
copy to 
integration-test/src/test/java/org/apache/iotdb/subscription/it/AbstractSubscriptionIT.java
index 3d8eb45a7fd..a168dbc78ba 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/AbstractSubscriptionDualIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/AbstractSubscriptionIT.java
@@ -17,24 +17,22 @@
  * under the License.
  */
 
-package org.apache.iotdb.subscription.it.dual;
+package org.apache.iotdb.subscription.it;
 
-import org.apache.iotdb.it.env.MultiEnvFactory;
-import org.apache.iotdb.itbase.env.BaseEnv;
 import 
org.apache.iotdb.session.subscription.consumer.SubscriptionExecutorServiceManager;
 
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.rules.TestName;
+import org.junit.rules.TestRule;
 
-abstract class AbstractSubscriptionDualIT {
-
-  protected BaseEnv senderEnv;
-  protected BaseEnv receiverEnv;
+public abstract class AbstractSubscriptionIT {
 
   @Rule public TestName testName = new TestName();
 
+  @Rule public final TestRule skipOnSetUpFailure = new 
SkipOnSetUpFailure("setUp");
+
   @Before
   public void setUp() {
     // set thread name
@@ -44,30 +42,8 @@ abstract class AbstractSubscriptionDualIT {
     SubscriptionExecutorServiceManager.setControlFlowExecutorCorePoolSize(1);
     
SubscriptionExecutorServiceManager.setUpstreamDataFlowExecutorCorePoolSize(1);
     
SubscriptionExecutorServiceManager.setDownstreamDataFlowExecutorCorePoolSize(1);
-
-    MultiEnvFactory.createEnv(2);
-    senderEnv = MultiEnvFactory.getEnv(0);
-    receiverEnv = MultiEnvFactory.getEnv(1);
-
-    setUpConfig();
-
-    senderEnv.initClusterEnvironment();
-    receiverEnv.initClusterEnvironment();
-  }
-
-  void setUpConfig() {
-    // enable auto create schema
-    senderEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true);
-    receiverEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true);
-
-    // 10 min, assert that the operations will not time out
-    senderEnv.getConfig().getCommonConfig().setCnConnectionTimeoutMs(600000);
-    receiverEnv.getConfig().getCommonConfig().setCnConnectionTimeoutMs(600000);
   }
 
   @After
-  public final void tearDown() {
-    senderEnv.cleanClusterEnvironment();
-    receiverEnv.cleanClusterEnvironment();
-  }
+  public void tearDown() {}
 }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java
index 7d16110cd1f..840abb4ccb5 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java
@@ -19,11 +19,24 @@
 
 package org.apache.iotdb.subscription.it;
 
+import org.awaitility.Awaitility;
+import org.awaitility.core.ConditionFactory;
+
+import java.util.concurrent.TimeUnit;
+
 public class IoTDBSubscriptionITConstant {
 
-  public static final long AWAITILITY_POLL_DELAY_SECOND = 1L;
-  public static final long AWAITILITY_POLL_INTERVAL_SECOND = 2L;
-  public static final long AWAITILITY_AT_MOST_SECOND = 600L;
+  private static final long AWAITILITY_POLL_DELAY_SECOND = 1L;
+  private static final long AWAITILITY_POLL_INTERVAL_SECOND = 2L;
+  private static final long AWAITILITY_AT_MOST_SECOND = 600L;
+
+  public static final ConditionFactory AWAIT =
+      Awaitility.await()
+          .pollInSameThread()
+          .pollDelay(IoTDBSubscriptionITConstant.AWAITILITY_POLL_DELAY_SECOND, 
TimeUnit.SECONDS)
+          .pollInterval(
+              IoTDBSubscriptionITConstant.AWAITILITY_POLL_INTERVAL_SECOND, 
TimeUnit.SECONDS)
+          .atMost(IoTDBSubscriptionITConstant.AWAITILITY_AT_MOST_SECOND, 
TimeUnit.SECONDS);
 
   public static final long SLEEP_NS = 1_000_000_000L;
   public static final long POLL_TIMEOUT_MS = 10_000L;
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionRestartIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/cluster/IoTDBSubscriptionRestartIT.java
similarity index 90%
rename from 
integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionRestartIT.java
rename to 
integration-test/src/test/java/org/apache/iotdb/subscription/it/cluster/IoTDBSubscriptionRestartIT.java
index d3680219578..41dd654383d 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionRestartIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/cluster/IoTDBSubscriptionRestartIT.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.subscription.it.local;
+package org.apache.iotdb.subscription.it.cluster;
 
 import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
 import org.apache.iotdb.confignode.rpc.thrift.TShowSubscriptionReq;
@@ -36,17 +36,14 @@ import 
org.apache.iotdb.session.subscription.SubscriptionSession;
 import org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer;
 import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
 import 
org.apache.iotdb.session.subscription.payload.SubscriptionSessionDataSet;
+import org.apache.iotdb.subscription.it.AbstractSubscriptionIT;
 import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant;
-import org.apache.iotdb.subscription.it.SkipOnSetUpFailure;
 
-import org.awaitility.Awaitility;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import org.junit.rules.TestRule;
 import org.junit.runner.RunWith;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -54,22 +51,23 @@ import org.slf4j.LoggerFactory;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.LockSupport;
 
+import static 
org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant.AWAIT;
 import static org.junit.Assert.fail;
 
 @RunWith(IoTDBTestRunner.class)
 @Category({ClusterIT.class})
-public class IoTDBSubscriptionRestartIT {
+public class IoTDBSubscriptionRestartIT extends AbstractSubscriptionIT {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBSubscriptionRestartIT.class);
 
-  @Rule public final TestRule skipOnSetUpFailure = new 
SkipOnSetUpFailure("setUp");
-
+  @Override
   @Before
-  public void setUp() throws Exception {
+  public void setUp() {
+    super.setUp();
+
     EnvFactory.getEnv()
         .getConfig()
         .getCommonConfig()
@@ -82,8 +80,11 @@ public class IoTDBSubscriptionRestartIT {
     EnvFactory.getEnv().initClusterEnvironment(3, 3);
   }
 
+  @Override
   @After
-  public void tearDown() throws Exception {
+  public void tearDown() {
+    super.tearDown();
+
     EnvFactory.getEnv().cleanClusterEnvironment();
   }
 
@@ -195,18 +196,14 @@ public class IoTDBSubscriptionRestartIT {
               } finally {
                 LOGGER.info("consumer exiting...");
               }
-            });
+            },
+            String.format("%s - %s", testName.getMethodName(), consumer));
     thread.start();
 
     // Check timestamps size
     try {
       // Keep retrying if there are execution failures
-      Awaitility.await()
-          .pollDelay(IoTDBSubscriptionITConstant.AWAITILITY_POLL_DELAY_SECOND, 
TimeUnit.SECONDS)
-          .pollInterval(
-              IoTDBSubscriptionITConstant.AWAITILITY_POLL_INTERVAL_SECOND, 
TimeUnit.SECONDS)
-          .atMost(IoTDBSubscriptionITConstant.AWAITILITY_AT_MOST_SECOND, 
TimeUnit.SECONDS)
-          .untilAsserted(() -> Assert.assertEquals(100, timestamps.size()));
+      AWAIT.untilAsserted(() -> Assert.assertEquals(100, timestamps.size()));
     } catch (final Exception e) {
       e.printStackTrace();
       fail(e.getMessage());
@@ -312,7 +309,8 @@ public class IoTDBSubscriptionRestartIT {
               } finally {
                 LOGGER.info("consumer exiting...");
               }
-            });
+            },
+            String.format("%s - %s", testName.getMethodName(), consumer));
     thread.start();
 
     // Start DN 1 & DN 2
@@ -343,12 +341,7 @@ public class IoTDBSubscriptionRestartIT {
     // Check timestamps size
     try {
       // Keep retrying if there are execution failures
-      Awaitility.await()
-          .pollDelay(IoTDBSubscriptionITConstant.AWAITILITY_POLL_DELAY_SECOND, 
TimeUnit.SECONDS)
-          .pollInterval(
-              IoTDBSubscriptionITConstant.AWAITILITY_POLL_INTERVAL_SECOND, 
TimeUnit.SECONDS)
-          .atMost(IoTDBSubscriptionITConstant.AWAITILITY_AT_MOST_SECOND, 
TimeUnit.SECONDS)
-          .untilAsserted(() -> Assert.assertEquals(200, timestamps.size()));
+      AWAIT.untilAsserted(() -> Assert.assertEquals(200, timestamps.size()));
     } catch (final Exception e) {
       e.printStackTrace();
       fail(e.getMessage());
@@ -443,7 +436,8 @@ public class IoTDBSubscriptionRestartIT {
               } finally {
                 LOGGER.info("consumer exiting...");
               }
-            });
+            },
+            String.format("%s - %s", testName.getMethodName(), consumer));
     thread.start();
 
     // Shutdown leader CN
@@ -486,12 +480,7 @@ public class IoTDBSubscriptionRestartIT {
     // Check timestamps size
     try {
       // Keep retrying if there are execution failures
-      Awaitility.await()
-          .pollDelay(IoTDBSubscriptionITConstant.AWAITILITY_POLL_DELAY_SECOND, 
TimeUnit.SECONDS)
-          .pollInterval(
-              IoTDBSubscriptionITConstant.AWAITILITY_POLL_INTERVAL_SECOND, 
TimeUnit.SECONDS)
-          .atMost(IoTDBSubscriptionITConstant.AWAITILITY_AT_MOST_SECOND, 
TimeUnit.SECONDS)
-          .untilAsserted(() -> Assert.assertEquals(200, timestamps.size()));
+      AWAIT.untilAsserted(() -> Assert.assertEquals(200, timestamps.size()));
     } catch (final Exception e) {
       e.printStackTrace();
       fail(e.getMessage());
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/AbstractSubscriptionDualIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/AbstractSubscriptionDualIT.java
index 3d8eb45a7fd..fb25cdc3a6c 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/AbstractSubscriptionDualIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/AbstractSubscriptionDualIT.java
@@ -21,29 +21,20 @@ package org.apache.iotdb.subscription.it.dual;
 
 import org.apache.iotdb.it.env.MultiEnvFactory;
 import org.apache.iotdb.itbase.env.BaseEnv;
-import 
org.apache.iotdb.session.subscription.consumer.SubscriptionExecutorServiceManager;
+import org.apache.iotdb.subscription.it.AbstractSubscriptionIT;
 
 import org.junit.After;
 import org.junit.Before;
-import org.junit.Rule;
-import org.junit.rules.TestName;
 
-abstract class AbstractSubscriptionDualIT {
+abstract class AbstractSubscriptionDualIT extends AbstractSubscriptionIT {
 
   protected BaseEnv senderEnv;
   protected BaseEnv receiverEnv;
 
-  @Rule public TestName testName = new TestName();
-
+  @Override
   @Before
   public void setUp() {
-    // set thread name
-    Thread.currentThread().setName(String.format("%s - main", 
testName.getMethodName()));
-
-    // set thread pools core size
-    SubscriptionExecutorServiceManager.setControlFlowExecutorCorePoolSize(1);
-    
SubscriptionExecutorServiceManager.setUpstreamDataFlowExecutorCorePoolSize(1);
-    
SubscriptionExecutorServiceManager.setDownstreamDataFlowExecutorCorePoolSize(1);
+    super.setUp();
 
     MultiEnvFactory.createEnv(2);
     senderEnv = MultiEnvFactory.getEnv(0);
@@ -55,7 +46,7 @@ abstract class AbstractSubscriptionDualIT {
     receiverEnv.initClusterEnvironment();
   }
 
-  void setUpConfig() {
+  protected void setUpConfig() {
     // enable auto create schema
     senderEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true);
     receiverEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true);
@@ -65,8 +56,11 @@ abstract class AbstractSubscriptionDualIT {
     receiverEnv.getConfig().getCommonConfig().setCnConnectionTimeoutMs(600000);
   }
 
+  @Override
   @After
-  public final void tearDown() {
+  public void tearDown() {
+    super.tearDown();
+
     senderEnv.cleanClusterEnvironment();
     receiverEnv.cleanClusterEnvironment();
   }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java
index db7b9e7ca3a..650f861f498 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java
@@ -42,7 +42,6 @@ import org.apache.tsfile.read.common.RowRecord;
 import org.apache.tsfile.read.expression.QueryExpression;
 import org.apache.tsfile.read.query.dataset.QueryDataSet;
 import org.apache.tsfile.utils.Pair;
-import org.awaitility.Awaitility;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -62,11 +61,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.LockSupport;
 import java.util.stream.Collectors;
 
+import static 
org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant.AWAIT;
 import static org.junit.Assert.fail;
 
 @RunWith(IoTDBTestRunner.class)
@@ -109,7 +108,7 @@ public class IoTDBSubscriptionConsumerGroupIT extends 
AbstractSubscriptionDualIT
   }
 
   @Override
-  void setUpConfig() {
+  protected void setUpConfig() {
     super.setUpConfig();
 
     // Enable air gap receiver
@@ -1015,22 +1014,16 @@ public class IoTDBSubscriptionConsumerGroupIT extends 
AbstractSubscriptionDualIT
       try (final Connection connection = receiverEnv.getConnection();
           final Statement statement = connection.createStatement()) {
         // Keep retrying if there are execution failures
-        Awaitility.await()
-            .pollInSameThread()
-            
.pollDelay(IoTDBSubscriptionITConstant.AWAITILITY_POLL_DELAY_SECOND, 
TimeUnit.SECONDS)
-            .pollInterval(
-                IoTDBSubscriptionITConstant.AWAITILITY_POLL_INTERVAL_SECOND, 
TimeUnit.SECONDS)
-            .atMost(IoTDBSubscriptionITConstant.AWAITILITY_AT_MOST_SECOND, 
TimeUnit.SECONDS)
-            .untilAsserted(
-                () -> {
-                  if (receiverCrashed.get()) {
-                    LOGGER.info("detect receiver crashed, skipping this 
test...");
-                    return;
-                  }
-                  TestUtils.assertSingleResultSetEqual(
-                      TestUtils.executeQueryWithRetry(statement, "select 
count(*) from root.**"),
-                      expectedHeaderWithResult);
-                });
+        AWAIT.untilAsserted(
+            () -> {
+              if (receiverCrashed.get()) {
+                LOGGER.info("detect receiver crashed, skipping this test...");
+                return;
+              }
+              TestUtils.assertSingleResultSetEqual(
+                  TestUtils.executeQueryWithRetry(statement, "select count(*) 
from root.**"),
+                  expectedHeaderWithResult);
+            });
       }
     } catch (final Exception e) {
       e.printStackTrace();
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTimePrecisionIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTimePrecisionIT.java
index 2b1cc407b7f..39a9f2225f6 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTimePrecisionIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTimePrecisionIT.java
@@ -30,7 +30,6 @@ import 
org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
 import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant;
 
 import org.apache.tsfile.write.record.Tablet;
-import org.awaitility.Awaitility;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
@@ -43,10 +42,10 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Properties;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.LockSupport;
 
+import static 
org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant.AWAIT;
 import static org.junit.Assert.fail;
 
 @RunWith(IoTDBTestRunner.class)
@@ -57,7 +56,7 @@ public class IoTDBSubscriptionTimePrecisionIT extends 
AbstractSubscriptionDualIT
       LoggerFactory.getLogger(IoTDBSubscriptionTimePrecisionIT.class);
 
   @Override
-  void setUpConfig() {
+  protected void setUpConfig() {
     super.setUpConfig();
 
     // Set timestamp precision to nanosecond
@@ -160,7 +159,8 @@ public class IoTDBSubscriptionTimePrecisionIT extends 
AbstractSubscriptionDualIT
               } finally {
                 LOGGER.info("consumer exiting...");
               }
-            });
+            },
+            String.format("%s - consumer", testName.getMethodName()));
     thread.start();
 
     // Check data on receiver
@@ -168,21 +168,16 @@ public class IoTDBSubscriptionTimePrecisionIT extends 
AbstractSubscriptionDualIT
       try (final Connection connection = receiverEnv.getConnection();
           final Statement statement = connection.createStatement()) {
         // Keep retrying if there are execution failures
-        Awaitility.await()
-            
.pollDelay(IoTDBSubscriptionITConstant.AWAITILITY_POLL_DELAY_SECOND, 
TimeUnit.SECONDS)
-            .pollInterval(
-                IoTDBSubscriptionITConstant.AWAITILITY_POLL_INTERVAL_SECOND, 
TimeUnit.SECONDS)
-            .atMost(IoTDBSubscriptionITConstant.AWAITILITY_AT_MOST_SECOND, 
TimeUnit.SECONDS)
-            .untilAsserted(
-                () ->
-                    TestUtils.assertSingleResultSetEqual(
-                        TestUtils.executeQueryWithRetry(statement, "select 
count(*) from root.**"),
-                        new HashMap<String, String>() {
-                          {
-                            put("count(root.db.d1.s2)", "100");
-                            put("count(root.db.d2.s1)", "100");
-                          }
-                        }));
+        AWAIT.untilAsserted(
+            () ->
+                TestUtils.assertSingleResultSetEqual(
+                    TestUtils.executeQueryWithRetry(statement, "select 
count(*) from root.**"),
+                    new HashMap<String, String>() {
+                      {
+                        put("count(root.db.d1.s2)", "100");
+                        put("count(root.db.d2.s1)", "100");
+                      }
+                    }));
       }
     } catch (final Exception e) {
       e.printStackTrace();
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
index 0cd6bc0d0a1..ac7e8333984 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
@@ -34,7 +34,6 @@ import 
org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
 import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant;
 
 import org.apache.tsfile.write.record.Tablet;
-import org.awaitility.Awaitility;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -51,11 +50,11 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Properties;
 import java.util.Set;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.LockSupport;
 
+import static 
org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant.AWAIT;
 import static org.junit.Assert.fail;
 
 @RunWith(IoTDBTestRunner.class)
@@ -136,7 +135,8 @@ public class IoTDBSubscriptionTopicIT extends 
AbstractSubscriptionDualIT {
               } finally {
                 LOGGER.info("consumer exiting...");
               }
-            });
+            },
+            String.format("%s - consumer", testName.getMethodName()));
     thread.start();
 
     // Check data on receiver
@@ -144,21 +144,16 @@ public class IoTDBSubscriptionTopicIT extends 
AbstractSubscriptionDualIT {
       try (final Connection connection = receiverEnv.getConnection();
           final Statement statement = connection.createStatement()) {
         // Keep retrying if there are execution failures
-        Awaitility.await()
-            
.pollDelay(IoTDBSubscriptionITConstant.AWAITILITY_POLL_DELAY_SECOND, 
TimeUnit.SECONDS)
-            .pollInterval(
-                IoTDBSubscriptionITConstant.AWAITILITY_POLL_INTERVAL_SECOND, 
TimeUnit.SECONDS)
-            .atMost(IoTDBSubscriptionITConstant.AWAITILITY_AT_MOST_SECOND, 
TimeUnit.SECONDS)
-            .untilAsserted(
-                () ->
-                    TestUtils.assertSingleResultSetEqual(
-                        TestUtils.executeQueryWithRetry(statement, "select 
count(*) from root.**"),
-                        new HashMap<String, String>() {
-                          {
-                            put("count(root.db.d1.s)", "100");
-                            put("count(root.db.d2.s)", "100");
-                          }
-                        }));
+        AWAIT.untilAsserted(
+            () ->
+                TestUtils.assertSingleResultSetEqual(
+                    TestUtils.executeQueryWithRetry(statement, "select 
count(*) from root.**"),
+                    new HashMap<String, String>() {
+                      {
+                        put("count(root.db.d1.s)", "100");
+                        put("count(root.db.d2.s)", "100");
+                      }
+                    }));
       }
     } catch (final Exception e) {
       e.printStackTrace();
@@ -238,7 +233,8 @@ public class IoTDBSubscriptionTopicIT extends 
AbstractSubscriptionDualIT {
               } finally {
                 LOGGER.info("consumer exiting...");
               }
-            });
+            },
+            String.format("%s - consumer", testName.getMethodName()));
     thread.start();
 
     // Check data on receiver
@@ -246,20 +242,15 @@ public class IoTDBSubscriptionTopicIT extends 
AbstractSubscriptionDualIT {
       try (final Connection connection = receiverEnv.getConnection();
           final Statement statement = connection.createStatement()) {
         // Keep retrying if there are execution failures
-        Awaitility.await()
-            
.pollDelay(IoTDBSubscriptionITConstant.AWAITILITY_POLL_DELAY_SECOND, 
TimeUnit.SECONDS)
-            .pollInterval(
-                IoTDBSubscriptionITConstant.AWAITILITY_POLL_INTERVAL_SECOND, 
TimeUnit.SECONDS)
-            .atMost(IoTDBSubscriptionITConstant.AWAITILITY_AT_MOST_SECOND, 
TimeUnit.SECONDS)
-            .untilAsserted(
-                () ->
-                    TestUtils.assertSingleResultSetEqual(
-                        TestUtils.executeQueryWithRetry(statement, "select 
count(*) from root.**"),
-                        new HashMap<String, String>() {
-                          {
-                            put("count(root.db.d2.s)", "100");
-                          }
-                        }));
+        AWAIT.untilAsserted(
+            () ->
+                TestUtils.assertSingleResultSetEqual(
+                    TestUtils.executeQueryWithRetry(statement, "select 
count(*) from root.**"),
+                    new HashMap<String, String>() {
+                      {
+                        put("count(root.db.d2.s)", "100");
+                      }
+                    }));
       }
     } catch (final Exception e) {
       e.printStackTrace();
@@ -336,7 +327,8 @@ public class IoTDBSubscriptionTopicIT extends 
AbstractSubscriptionDualIT {
               } finally {
                 LOGGER.info("consumer exiting...");
               }
-            });
+            },
+            String.format("%s - consumer", testName.getMethodName()));
     thread.start();
 
     // Check data on receiver
@@ -348,17 +340,12 @@ public class IoTDBSubscriptionTopicIT extends 
AbstractSubscriptionDualIT {
       try (final Connection connection = receiverEnv.getConnection();
           final Statement statement = connection.createStatement()) {
         // Keep retrying if there are execution failures
-        Awaitility.await()
-            
.pollDelay(IoTDBSubscriptionITConstant.AWAITILITY_POLL_DELAY_SECOND, 
TimeUnit.SECONDS)
-            .pollInterval(
-                IoTDBSubscriptionITConstant.AWAITILITY_POLL_INTERVAL_SECOND, 
TimeUnit.SECONDS)
-            .atMost(IoTDBSubscriptionITConstant.AWAITILITY_AT_MOST_SECOND, 
TimeUnit.SECONDS)
-            .untilAsserted(
-                () ->
-                    TestUtils.assertResultSetEqual(
-                        TestUtils.executeQueryWithRetry(statement, "select * 
from root.**"),
-                        "Time,root.db.d1.at1,",
-                        expectedResSet));
+        AWAIT.untilAsserted(
+            () ->
+                TestUtils.assertResultSetEqual(
+                    TestUtils.executeQueryWithRetry(statement, "select * from 
root.**"),
+                    "Time,root.db.d1.at1,",
+                    expectedResSet));
       }
     } catch (final Exception e) {
       e.printStackTrace();
@@ -392,9 +379,9 @@ public class IoTDBSubscriptionTopicIT extends 
AbstractSubscriptionDualIT {
     }
 
     // Create topic on sender
-    final String topic1 = "`topic1`";
-    final String topic2 = "`'topic2'`";
-    final String topic3 = "`\"topic3\"`";
+    final String topic1 = "`topic4`";
+    final String topic2 = "`'topic5'`";
+    final String topic3 = "`\"topic6\"`";
     final String host = senderEnv.getIP();
     final int port = Integer.parseInt(senderEnv.getPort());
     try (final SubscriptionSession session = new SubscriptionSession(host, 
port)) {
@@ -464,7 +451,8 @@ public class IoTDBSubscriptionTopicIT extends 
AbstractSubscriptionDualIT {
               } finally {
                 LOGGER.info("consumer exiting...");
               }
-            });
+            },
+            String.format("%s - consumer", testName.getMethodName()));
     thread.start();
 
     // Check data on receiver
@@ -472,20 +460,15 @@ public class IoTDBSubscriptionTopicIT extends 
AbstractSubscriptionDualIT {
       try (final Connection connection = receiverEnv.getConnection();
           final Statement statement = connection.createStatement()) {
         // Keep retrying if there are execution failures
-        Awaitility.await()
-            
.pollDelay(IoTDBSubscriptionITConstant.AWAITILITY_POLL_DELAY_SECOND, 
TimeUnit.SECONDS)
-            .pollInterval(
-                IoTDBSubscriptionITConstant.AWAITILITY_POLL_INTERVAL_SECOND, 
TimeUnit.SECONDS)
-            .atMost(IoTDBSubscriptionITConstant.AWAITILITY_AT_MOST_SECOND, 
TimeUnit.SECONDS)
-            .untilAsserted(
-                () ->
-                    TestUtils.assertSingleResultSetEqual(
-                        TestUtils.executeQueryWithRetry(statement, "select 
count(*) from root.**"),
-                        new HashMap<String, String>() {
-                          {
-                            put("count(root.db.d1.s)", "300");
-                          }
-                        }));
+        AWAIT.untilAsserted(
+            () ->
+                TestUtils.assertSingleResultSetEqual(
+                    TestUtils.executeQueryWithRetry(statement, "select 
count(*) from root.**"),
+                    new HashMap<String, String>() {
+                      {
+                        put("count(root.db.d1.s)", "300");
+                      }
+                    }));
       }
     } catch (final Exception e) {
       e.printStackTrace();
@@ -507,7 +490,7 @@ public class IoTDBSubscriptionTopicIT extends 
AbstractSubscriptionDualIT {
       final Properties properties = new Properties();
       properties.put(TopicConstant.START_TIME_KEY, "2024-01-32");
       properties.put(TopicConstant.END_TIME_KEY, TopicConstant.NOW_TIME_VALUE);
-      session.createTopic("topic1", properties);
+      session.createTopic("topic7", properties);
       fail();
     } catch (final Exception ignored) {
     }
@@ -519,7 +502,7 @@ public class IoTDBSubscriptionTopicIT extends 
AbstractSubscriptionDualIT {
       final Properties properties = new Properties();
       properties.put(TopicConstant.START_TIME_KEY, "2001.01.01T08:00:00");
       properties.put(TopicConstant.END_TIME_KEY, "2000.01.01T08:00:00");
-      session.createTopic("topic2", properties);
+      session.createTopic("topic8", properties);
       fail();
     } catch (final Exception ignored) {
     }
@@ -532,7 +515,7 @@ public class IoTDBSubscriptionTopicIT extends 
AbstractSubscriptionDualIT {
     final Properties config = new Properties();
     config.put(TopicConstant.FORMAT_KEY, 
TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE);
     config.put(TopicConstant.PATH_KEY, "root.db.*.s");
-    testTopicInvalidRuntimeConfigTemplate("topic3", config);
+    testTopicInvalidRuntimeConfigTemplate("topic9", config);
   }
 
   @Test
@@ -543,7 +526,7 @@ public class IoTDBSubscriptionTopicIT extends 
AbstractSubscriptionDualIT {
     config.put("processor", "tumbling-time-sampling-processor");
     config.put("processor.tumbling-time.interval-seconds", "1");
     config.put("processor.down-sampling.split-file", "true");
-    testTopicInvalidRuntimeConfigTemplate("topic4", config);
+    testTopicInvalidRuntimeConfigTemplate("topic10", config);
   }
 
   private void testTopicInvalidRuntimeConfigTemplate(
@@ -600,7 +583,8 @@ public class IoTDBSubscriptionTopicIT extends 
AbstractSubscriptionDualIT {
               } finally {
                 LOGGER.info("consumer exiting...");
               }
-            }));
+            },
+            String.format("%s - consumer", testName.getMethodName())));
 
     // Insert some realtime data on sender
     threads.add(
@@ -628,18 +612,15 @@ public class IoTDBSubscriptionTopicIT extends 
AbstractSubscriptionDualIT {
                 fail(e.getMessage());
               }
               dataPrepared.set(true);
-            }));
+            },
+            String.format("%s - data inserter", testName.getMethodName())));
 
     for (final Thread thread : threads) {
       thread.start();
     }
 
-    Awaitility.await()
-        .pollDelay(IoTDBSubscriptionITConstant.AWAITILITY_POLL_DELAY_SECOND, 
TimeUnit.SECONDS)
-        
.pollInterval(IoTDBSubscriptionITConstant.AWAITILITY_POLL_INTERVAL_SECOND, 
TimeUnit.SECONDS)
-        .atMost(IoTDBSubscriptionITConstant.AWAITILITY_AT_MOST_SECOND, 
TimeUnit.SECONDS)
-        // The expected SubscriptionRuntimeCriticalException was not thrown if 
result is false
-        .untilTrue(result);
+    // The expected SubscriptionRuntimeCriticalException was not thrown if 
result is false
+    AWAIT.untilTrue(result);
 
     for (final Thread thread : threads) {
       thread.join();
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/AbstractSubscriptionLocalIT.java
similarity index 60%
copy from 
integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java
copy to 
integration-test/src/test/java/org/apache/iotdb/subscription/it/local/AbstractSubscriptionLocalIT.java
index 7d16110cd1f..bb248f745d4 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/AbstractSubscriptionLocalIT.java
@@ -17,14 +17,29 @@
  * under the License.
  */
 
-package org.apache.iotdb.subscription.it;
+package org.apache.iotdb.subscription.it.local;
 
-public class IoTDBSubscriptionITConstant {
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.subscription.it.AbstractSubscriptionIT;
 
-  public static final long AWAITILITY_POLL_DELAY_SECOND = 1L;
-  public static final long AWAITILITY_POLL_INTERVAL_SECOND = 2L;
-  public static final long AWAITILITY_AT_MOST_SECOND = 600L;
+import org.junit.After;
+import org.junit.Before;
 
-  public static final long SLEEP_NS = 1_000_000_000L;
-  public static final long POLL_TIMEOUT_MS = 10_000L;
+abstract class AbstractSubscriptionLocalIT extends AbstractSubscriptionIT {
+
+  @Override
+  @Before
+  public void setUp() {
+    super.setUp();
+
+    EnvFactory.getEnv().initClusterEnvironment();
+  }
+
+  @Override
+  @After
+  public void tearDown() {
+    super.tearDown();
+
+    EnvFactory.getEnv().cleanClusterEnvironment();
+  }
 }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java
index 13a5e8abc26..fccbee6d95d 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java
@@ -40,10 +40,7 @@ import org.apache.tsfile.read.TsFileReader;
 import org.apache.tsfile.read.common.Path;
 import org.apache.tsfile.read.expression.QueryExpression;
 import org.apache.tsfile.read.query.dataset.QueryDataSet;
-import org.awaitility.Awaitility;
-import org.junit.After;
 import org.junit.Assert;
-import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
@@ -57,31 +54,21 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.LockSupport;
 import java.util.stream.Collectors;
 
+import static 
org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant.AWAIT;
 import static org.junit.Assert.fail;
 
 @RunWith(IoTDBTestRunner.class)
 @Category({LocalStandaloneIT.class})
-public class IoTDBSubscriptionBasicIT {
+public class IoTDBSubscriptionBasicIT extends AbstractSubscriptionLocalIT {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBSubscriptionBasicIT.class);
 
-  @Before
-  public void setUp() throws Exception {
-    EnvFactory.getEnv().initClusterEnvironment();
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    EnvFactory.getEnv().cleanClusterEnvironment();
-  }
-
   @Test
   public void testBasicSubscribeTablets() throws Exception {
     // Insert some historical data
@@ -146,18 +133,14 @@ public class IoTDBSubscriptionBasicIT {
               } finally {
                 LOGGER.info("consumer exiting...");
               }
-            });
+            },
+            String.format("%s - consumer", testName.getMethodName()));
     thread.start();
 
     // Check row count
     try {
       // Keep retrying if there are execution failures
-      Awaitility.await()
-          .pollDelay(IoTDBSubscriptionITConstant.AWAITILITY_POLL_DELAY_SECOND, 
TimeUnit.SECONDS)
-          .pollInterval(
-              IoTDBSubscriptionITConstant.AWAITILITY_POLL_INTERVAL_SECOND, 
TimeUnit.SECONDS)
-          .atMost(IoTDBSubscriptionITConstant.AWAITILITY_AT_MOST_SECOND, 
TimeUnit.SECONDS)
-          .untilAsserted(() -> Assert.assertEquals(100, rowCount.get()));
+      AWAIT.untilAsserted(() -> Assert.assertEquals(100, rowCount.get()));
     } catch (final Exception e) {
       e.printStackTrace();
       fail(e.getMessage());
@@ -242,18 +225,14 @@ public class IoTDBSubscriptionBasicIT {
               } finally {
                 LOGGER.info("consumer exiting...");
               }
-            });
+            },
+            String.format("%s - consumer", testName.getMethodName()));
     thread.start();
 
     // Check row count
     try {
       // Keep retrying if there are execution failures
-      Awaitility.await()
-          .pollDelay(IoTDBSubscriptionITConstant.AWAITILITY_POLL_DELAY_SECOND, 
TimeUnit.SECONDS)
-          .pollInterval(
-              IoTDBSubscriptionITConstant.AWAITILITY_POLL_INTERVAL_SECOND, 
TimeUnit.SECONDS)
-          .atMost(IoTDBSubscriptionITConstant.AWAITILITY_AT_MOST_SECOND, 
TimeUnit.SECONDS)
-          .untilAsserted(() -> Assert.assertEquals(100, rowCount.get()));
+      AWAIT.untilAsserted(() -> Assert.assertEquals(100, rowCount.get()));
     } catch (final Exception e) {
       e.printStackTrace();
       fail(e.getMessage());
@@ -379,18 +358,14 @@ public class IoTDBSubscriptionBasicIT {
               } finally {
                 LOGGER.info("consumer exiting...");
               }
-            });
+            },
+            String.format("%s - consumer", testName.getMethodName()));
     thread.start();
 
     // Check row count
     try {
       // Keep retrying if there are execution failures
-      Awaitility.await()
-          .pollDelay(IoTDBSubscriptionITConstant.AWAITILITY_POLL_DELAY_SECOND, 
TimeUnit.SECONDS)
-          .pollInterval(
-              IoTDBSubscriptionITConstant.AWAITILITY_POLL_INTERVAL_SECOND, 
TimeUnit.SECONDS)
-          .atMost(IoTDBSubscriptionITConstant.AWAITILITY_AT_MOST_SECOND, 
TimeUnit.SECONDS)
-          .untilAsserted(() -> Assert.assertEquals(100, rowCount.get()));
+      AWAIT.untilAsserted(() -> Assert.assertEquals(100, rowCount.get()));
       Assert.assertTrue(commitSuccessCount.get() > 
lastCommitSuccessCount.get());
       Assert.assertEquals(0, commitFailureCount.get());
     } catch (final Exception e) {
@@ -416,12 +391,7 @@ public class IoTDBSubscriptionBasicIT {
     // Check row count
     try {
       // Keep retrying if there are execution failures
-      Awaitility.await()
-          .pollDelay(IoTDBSubscriptionITConstant.AWAITILITY_POLL_DELAY_SECOND, 
TimeUnit.SECONDS)
-          .pollInterval(
-              IoTDBSubscriptionITConstant.AWAITILITY_POLL_INTERVAL_SECOND, 
TimeUnit.SECONDS)
-          .atMost(IoTDBSubscriptionITConstant.AWAITILITY_AT_MOST_SECOND, 
TimeUnit.SECONDS)
-          .untilAsserted(() -> Assert.assertEquals(200, rowCount.get()));
+      AWAIT.untilAsserted(() -> Assert.assertEquals(200, rowCount.get()));
       Assert.assertTrue(commitSuccessCount.get() > 
lastCommitSuccessCount.get());
       Assert.assertEquals(0, commitFailureCount.get());
     } catch (final Exception e) {
@@ -486,16 +456,11 @@ public class IoTDBSubscriptionBasicIT {
       consumer.subscribe(topicName);
 
       // The push consumer should automatically poll 10 rows of data by 1 
onReceive()
-      Awaitility.await()
-          .pollDelay(IoTDBSubscriptionITConstant.AWAITILITY_POLL_DELAY_SECOND, 
TimeUnit.SECONDS)
-          .pollInterval(
-              IoTDBSubscriptionITConstant.AWAITILITY_POLL_INTERVAL_SECOND, 
TimeUnit.SECONDS)
-          .atMost(IoTDBSubscriptionITConstant.AWAITILITY_AT_MOST_SECOND, 
TimeUnit.SECONDS)
-          .untilAsserted(
-              () -> {
-                Assert.assertEquals(10, rowCount.get());
-                Assert.assertTrue(onReceiveCount.get() > 
lastOnReceiveCount.get());
-              });
+      AWAIT.untilAsserted(
+          () -> {
+            Assert.assertEquals(10, rowCount.get());
+            Assert.assertTrue(onReceiveCount.get() > lastOnReceiveCount.get());
+          });
 
       lastOnReceiveCount.set(onReceiveCount.get());
 
@@ -508,16 +473,11 @@ public class IoTDBSubscriptionBasicIT {
         session.executeNonQueryStatement("flush");
       }
 
-      Awaitility.await()
-          .pollDelay(IoTDBSubscriptionITConstant.AWAITILITY_POLL_DELAY_SECOND, 
TimeUnit.SECONDS)
-          .pollInterval(
-              IoTDBSubscriptionITConstant.AWAITILITY_POLL_INTERVAL_SECOND, 
TimeUnit.SECONDS)
-          .atMost(IoTDBSubscriptionITConstant.AWAITILITY_AT_MOST_SECOND, 
TimeUnit.SECONDS)
-          .untilAsserted(
-              () -> {
-                Assert.assertEquals(20, rowCount.get());
-                Assert.assertTrue(onReceiveCount.get() > 
lastOnReceiveCount.get());
-              });
+      AWAIT.untilAsserted(
+          () -> {
+            Assert.assertEquals(20, rowCount.get());
+            Assert.assertTrue(onReceiveCount.get() > lastOnReceiveCount.get());
+          });
 
       lastOnReceiveCount.set(onReceiveCount.get());
 
@@ -529,16 +489,11 @@ public class IoTDBSubscriptionBasicIT {
         session.executeNonQueryStatement("flush");
       }
 
-      Awaitility.await()
-          .pollDelay(IoTDBSubscriptionITConstant.AWAITILITY_POLL_DELAY_SECOND, 
TimeUnit.SECONDS)
-          .pollInterval(
-              IoTDBSubscriptionITConstant.AWAITILITY_POLL_INTERVAL_SECOND, 
TimeUnit.SECONDS)
-          .atMost(IoTDBSubscriptionITConstant.AWAITILITY_AT_MOST_SECOND, 
TimeUnit.SECONDS)
-          .untilAsserted(
-              () -> {
-                Assert.assertEquals(30, rowCount.get());
-                Assert.assertTrue(onReceiveCount.get() > 
lastOnReceiveCount.get());
-              });
+      AWAIT.untilAsserted(
+          () -> {
+            Assert.assertEquals(30, rowCount.get());
+            Assert.assertTrue(onReceiveCount.get() > lastOnReceiveCount.get());
+          });
 
       consumer.unsubscribe(topicName);
     } catch (final Exception e) {
@@ -624,22 +579,18 @@ public class IoTDBSubscriptionBasicIT {
               } finally {
                 LOGGER.info("consumer exiting...");
               }
-            });
+            },
+            String.format("%s - consumer", testName.getMethodName()));
     thread.start();
 
     // Check row count
     try {
       // Keep retrying if there are execution failures
-      Awaitility.await()
-          .pollDelay(IoTDBSubscriptionITConstant.AWAITILITY_POLL_DELAY_SECOND, 
TimeUnit.SECONDS)
-          .pollInterval(
-              IoTDBSubscriptionITConstant.AWAITILITY_POLL_INTERVAL_SECOND, 
TimeUnit.SECONDS)
-          .atMost(IoTDBSubscriptionITConstant.AWAITILITY_AT_MOST_SECOND, 
TimeUnit.SECONDS)
-          .untilAsserted(
-              () -> {
-                Assert.assertEquals(100, rowCount.get());
-                Assert.assertEquals((100 + 199) * 100 / 2, timestampSum.get());
-              });
+      AWAIT.untilAsserted(
+          () -> {
+            Assert.assertEquals(100, rowCount.get());
+            Assert.assertEquals((100 + 199) * 100 / 2, timestampSum.get());
+          });
     } catch (final Exception e) {
       e.printStackTrace();
       fail(e.getMessage());
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionIdempotentIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionIdempotentIT.java
index b671acd6709..85fff437f18 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionIdempotentIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionIdempotentIT.java
@@ -25,9 +25,7 @@ import org.apache.iotdb.itbase.category.LocalStandaloneIT;
 import org.apache.iotdb.session.subscription.SubscriptionSession;
 import org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer;
 
-import org.junit.After;
 import org.junit.Assert;
-import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
@@ -38,20 +36,10 @@ import static org.junit.jupiter.api.Assertions.fail;
 
 @RunWith(IoTDBTestRunner.class)
 @Category({LocalStandaloneIT.class})
-public class IoTDBSubscriptionIdempotentIT {
+public class IoTDBSubscriptionIdempotentIT extends AbstractSubscriptionLocalIT 
{
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBSubscriptionIdempotentIT.class);
 
-  @Before
-  public void setUp() throws Exception {
-    EnvFactory.getEnv().initClusterEnvironment();
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    EnvFactory.getEnv().cleanClusterEnvironment();
-  }
-
   @Test
   public void testSubscribeOrUnsubscribeNonExistedTopicTest() {
     final String host = EnvFactory.getEnv().getIP();
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/exception/SubscriptionIdentifierSemanticException.java
similarity index 52%
copy from 
integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java
copy to 
iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/exception/SubscriptionIdentifierSemanticException.java
index 7d16110cd1f..6eb9ba6d734 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/exception/SubscriptionIdentifierSemanticException.java
@@ -17,14 +17,26 @@
  * under the License.
  */
 
-package org.apache.iotdb.subscription.it;
+package org.apache.iotdb.rpc.subscription.exception;
 
-public class IoTDBSubscriptionITConstant {
+import java.util.Objects;
 
-  public static final long AWAITILITY_POLL_DELAY_SECOND = 1L;
-  public static final long AWAITILITY_POLL_INTERVAL_SECOND = 2L;
-  public static final long AWAITILITY_AT_MOST_SECOND = 600L;
+public class SubscriptionIdentifierSemanticException extends 
SubscriptionException {
 
-  public static final long SLEEP_NS = 1_000_000_000L;
-  public static final long POLL_TIMEOUT_MS = 10_000L;
+  public SubscriptionIdentifierSemanticException(final String message) {
+    super(message);
+  }
+
+  public SubscriptionIdentifierSemanticException(final String message, final 
Throwable cause) {
+    super(message, cause);
+  }
+
+  @Override
+  public boolean equals(final Object obj) {
+    return obj instanceof SubscriptionIdentifierSemanticException
+        && Objects.equals(
+            getMessage(), ((SubscriptionIdentifierSemanticException) 
obj).getMessage())
+        && Objects.equals(
+            getTimeStamp(), ((SubscriptionIdentifierSemanticException) 
obj).getTimeStamp());
+  }
 }
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeSubscribeResp.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeSubscribeResp.java
index a5540150898..1d1571b0122 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeSubscribeResp.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeSubscribeResp.java
@@ -22,10 +22,25 @@ package org.apache.iotdb.rpc.subscription.payload.response;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.service.rpc.thrift.TPipeSubscribeResp;
 
+import org.apache.tsfile.utils.PublicBAOS;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.Objects;
+import java.util.Set;
 
 public class PipeSubscribeSubscribeResp extends TPipeSubscribeResp {
 
+  private transient Set<String> topicNames = new HashSet<>(); // subscribed 
topic names
+
+  public Set<String> getTopicNames() {
+    return topicNames;
+  }
+
   /////////////////////////////// Thrift ///////////////////////////////
 
   /**
@@ -34,11 +49,32 @@ public class PipeSubscribeSubscribeResp extends 
TPipeSubscribeResp {
    */
   public static PipeSubscribeSubscribeResp toTPipeSubscribeResp(final TSStatus 
status) {
     final PipeSubscribeSubscribeResp resp = new PipeSubscribeSubscribeResp();
+    resp.status = status;
+    resp.version = PipeSubscribeResponseVersion.VERSION_1.getVersion();
+    resp.type = PipeSubscribeResponseType.ACK.getType();
+    return resp;
+  }
+
+  /**
+   * Serialize the incoming parameters into `PipeSubscribeSubscribeResp`, 
called by the subscription
+   * server.
+   */
+  public static PipeSubscribeSubscribeResp toTPipeSubscribeResp(
+      final TSStatus status, final Set<String> topicNames) throws IOException {
+    final PipeSubscribeSubscribeResp resp = new PipeSubscribeSubscribeResp();
 
     resp.status = status;
     resp.version = PipeSubscribeResponseVersion.VERSION_1.getVersion();
     resp.type = PipeSubscribeResponseType.ACK.getType();
 
+    try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+        final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
+      ReadWriteIOUtils.writeObjectSet(topicNames, outputStream);
+      resp.body =
+          Collections.singletonList(
+              ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size()));
+    }
+
     return resp;
   }
 
@@ -47,11 +83,19 @@ public class PipeSubscribeSubscribeResp extends 
TPipeSubscribeResp {
       final TPipeSubscribeResp subscribeResp) {
     final PipeSubscribeSubscribeResp resp = new PipeSubscribeSubscribeResp();
 
+    if (Objects.nonNull(subscribeResp.body)) {
+      for (final ByteBuffer byteBuffer : subscribeResp.body) {
+        if (Objects.nonNull(byteBuffer) && byteBuffer.hasRemaining()) {
+          resp.topicNames = ReadWriteIOUtils.readObjectSet(byteBuffer);
+          break;
+        }
+      }
+    }
+
     resp.status = subscribeResp.status;
     resp.version = subscribeResp.version;
     resp.type = subscribeResp.type;
     resp.body = subscribeResp.body;
-
     return resp;
   }
 
@@ -66,7 +110,8 @@ public class PipeSubscribeSubscribeResp extends 
TPipeSubscribeResp {
       return false;
     }
     final PipeSubscribeSubscribeResp that = (PipeSubscribeSubscribeResp) obj;
-    return Objects.equals(this.status, that.status)
+    return Objects.equals(this.topicNames, that.topicNames)
+        && Objects.equals(this.status, that.status)
         && this.version == that.version
         && this.type == that.type
         && Objects.equals(this.body, that.body);
@@ -74,6 +119,6 @@ public class PipeSubscribeSubscribeResp extends 
TPipeSubscribeResp {
 
   @Override
   public int hashCode() {
-    return Objects.hash(status, version, type, body);
+    return Objects.hash(topicNames, status, version, type, body);
   }
 }
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeUnsubscribeResp.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeUnsubscribeResp.java
index a93e38013ac..e5d8ccf69cf 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeUnsubscribeResp.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeUnsubscribeResp.java
@@ -22,10 +22,25 @@ package org.apache.iotdb.rpc.subscription.payload.response;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.service.rpc.thrift.TPipeSubscribeResp;
 
+import org.apache.tsfile.utils.PublicBAOS;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.Objects;
+import java.util.Set;
 
 public class PipeSubscribeUnsubscribeResp extends TPipeSubscribeResp {
 
+  private transient Set<String> topicNames = new HashSet<>(); // subscribed 
topic names
+
+  public Set<String> getTopicNames() {
+    return topicNames;
+  }
+
   /////////////////////////////// Thrift ///////////////////////////////
 
   /**
@@ -34,11 +49,32 @@ public class PipeSubscribeUnsubscribeResp extends 
TPipeSubscribeResp {
    */
   public static PipeSubscribeUnsubscribeResp toTPipeSubscribeResp(final 
TSStatus status) {
     final PipeSubscribeUnsubscribeResp resp = new 
PipeSubscribeUnsubscribeResp();
+    resp.status = status;
+    resp.version = PipeSubscribeResponseVersion.VERSION_1.getVersion();
+    resp.type = PipeSubscribeResponseType.ACK.getType();
+    return resp;
+  }
+
+  /**
+   * Serialize the incoming parameters into `PipeSubscribeUnsubscribeResp`, 
called by the
+   * subscription server.
+   */
+  public static PipeSubscribeUnsubscribeResp toTPipeSubscribeResp(
+      final TSStatus status, final Set<String> topicNames) throws IOException {
+    final PipeSubscribeUnsubscribeResp resp = new 
PipeSubscribeUnsubscribeResp();
 
     resp.status = status;
     resp.version = PipeSubscribeResponseVersion.VERSION_1.getVersion();
     resp.type = PipeSubscribeResponseType.ACK.getType();
 
+    try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+        final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
+      ReadWriteIOUtils.writeObjectSet(topicNames, outputStream);
+      resp.body =
+          Collections.singletonList(
+              ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size()));
+    }
+
     return resp;
   }
 
@@ -47,11 +83,19 @@ public class PipeSubscribeUnsubscribeResp extends 
TPipeSubscribeResp {
       final TPipeSubscribeResp unsubscribeResp) {
     final PipeSubscribeUnsubscribeResp resp = new 
PipeSubscribeUnsubscribeResp();
 
+    if (Objects.nonNull(unsubscribeResp.body)) {
+      for (final ByteBuffer byteBuffer : unsubscribeResp.body) {
+        if (Objects.nonNull(byteBuffer) && byteBuffer.hasRemaining()) {
+          resp.topicNames = ReadWriteIOUtils.readObjectSet(byteBuffer);
+          break;
+        }
+      }
+    }
+
     resp.status = unsubscribeResp.status;
     resp.version = unsubscribeResp.version;
     resp.type = unsubscribeResp.type;
     resp.body = unsubscribeResp.body;
-
     return resp;
   }
 
@@ -66,7 +110,8 @@ public class PipeSubscribeUnsubscribeResp extends 
TPipeSubscribeResp {
       return false;
     }
     final PipeSubscribeUnsubscribeResp that = (PipeSubscribeUnsubscribeResp) 
obj;
-    return Objects.equals(this.status, that.status)
+    return Objects.equals(this.topicNames, that.topicNames)
+        && Objects.equals(this.status, that.status)
         && this.version == that.version
         && this.type == that.type
         && Objects.equals(this.body, that.body);
@@ -74,6 +119,6 @@ public class PipeSubscribeUnsubscribeResp extends 
TPipeSubscribeResp {
 
   @Override
   public int hashCode() {
-    return Objects.hash(status, version, type, body);
+    return Objects.hash(topicNames, status, version, type, body);
   }
 }
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
index 2bbd4a129a9..a6b418f0220 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
@@ -40,6 +40,7 @@ import 
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponse;
 import 
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponseType;
 import org.apache.iotdb.rpc.subscription.payload.poll.TabletsPayload;
 import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
+import org.apache.iotdb.session.subscription.util.IdentifierUtils;
 import org.apache.iotdb.session.subscription.util.RandomStringGenerator;
 import org.apache.iotdb.session.subscription.util.SubscriptionPollTimer;
 import org.apache.iotdb.session.util.SessionUtils;
@@ -50,8 +51,10 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.IOException;
 import java.io.RandomAccessFile;
+import java.net.URLEncoder;
 import java.nio.file.FileAlreadyExistsException;
 import java.nio.file.Files;
+import java.nio.file.InvalidPathException;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.ArrayList;
@@ -69,6 +72,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.LockSupport;
+import java.util.stream.Collectors;
 
 abstract class SubscriptionConsumer implements AutoCloseable {
 
@@ -92,6 +96,10 @@ abstract class SubscriptionConsumer implements AutoCloseable 
{
   private final String fileSaveDir;
   private final boolean fileSaveFsync;
 
+  protected volatile Set<String> subscribedTopicNames = new HashSet<>();
+
+  /////////////////////////////// getter ///////////////////////////////
+
   public String getConsumerId() {
     return consumerId;
   }
@@ -100,6 +108,10 @@ abstract class SubscriptionConsumer implements 
AutoCloseable {
     return consumerGroupId;
   }
 
+  public Set<String> getSubscribedTopicNames() {
+    return subscribedTopicNames;
+  }
+
   /////////////////////////////// ctor ///////////////////////////////
 
   protected SubscriptionConsumer(final Builder builder) {
@@ -221,6 +233,17 @@ abstract class SubscriptionConsumer implements 
AutoCloseable {
   }
 
   public void subscribe(final Set<String> topicNames) throws 
SubscriptionException {
+    // parse topic names from external source
+    subscribe(topicNames, true);
+  }
+
+  private void subscribe(Set<String> topicNames, final boolean needParse)
+      throws SubscriptionException {
+    if (needParse) {
+      topicNames =
+          
topicNames.stream().map(IdentifierUtils::parseIdentifier).collect(Collectors.toSet());
+    }
+
     providers.acquireReadLock();
     try {
       subscribeWithRedirection(topicNames);
@@ -238,6 +261,17 @@ abstract class SubscriptionConsumer implements 
AutoCloseable {
   }
 
   public void unsubscribe(final Set<String> topicNames) throws 
SubscriptionException {
+    // parse topic names from external source
+    unsubscribe(topicNames, true);
+  }
+
+  private void unsubscribe(Set<String> topicNames, final boolean needParse)
+      throws SubscriptionException {
+    if (needParse) {
+      topicNames =
+          
topicNames.stream().map(IdentifierUtils::parseIdentifier).collect(Collectors.toSet());
+    }
+
     providers.acquireReadLock();
     try {
       unsubscribeWithRedirection(topicNames);
@@ -284,33 +318,51 @@ abstract class SubscriptionConsumer implements 
AutoCloseable {
     return dirPath;
   }
 
-  private Path getFilePath(final String topicName, String fileName) throws 
SubscriptionException {
-    Path filePath;
+  private Path getFilePath(
+      final String topicName,
+      final String fileName,
+      final boolean allowFileAlreadyExistsException,
+      final boolean allowInvalidPathException)
+      throws SubscriptionException {
     try {
-      filePath = getFileDir(topicName).resolve(fileName);
+      final Path filePath = getFileDir(topicName).resolve(fileName);
       Files.createFile(filePath);
+      return filePath;
     } catch (final FileAlreadyExistsException fileAlreadyExistsException) {
-      fileName += "." + RandomStringGenerator.generate(16);
-      try {
-        filePath = getFileDir(topicName).resolve(fileName);
-        Files.createFile(filePath);
-      } catch (final IOException e) {
-        throw new SubscriptionRuntimeNonCriticalException(e.getMessage(), e);
+      if (allowFileAlreadyExistsException) {
+        return getFilePath(
+            topicName, fileName + "." + RandomStringGenerator.generate(16), 
false, true);
       }
+      throw new SubscriptionRuntimeNonCriticalException(
+          fileAlreadyExistsException.getMessage(), fileAlreadyExistsException);
+    } catch (final InvalidPathException invalidPathException) {
+      if (allowInvalidPathException) {
+        return getFilePath(URLEncoder.encode(topicName), fileName, true, 
false);
+      }
+      throw new SubscriptionRuntimeNonCriticalException(
+          invalidPathException.getMessage(), invalidPathException);
     } catch (final IOException e) {
       throw new SubscriptionRuntimeNonCriticalException(e.getMessage(), e);
     }
-    return filePath;
   }
 
   /////////////////////////////// poll ///////////////////////////////
 
-  protected List<SubscriptionMessage> poll(final Set<String> topicNames, final 
long timeoutMs)
+  protected List<SubscriptionMessage> poll(
+      /* @NotNull */ final Set<String> topicNames, final long timeoutMs)
       throws SubscriptionException {
     final List<SubscriptionMessage> messages = new ArrayList<>();
     final SubscriptionPollTimer timer =
         new SubscriptionPollTimer(System.currentTimeMillis(), timeoutMs);
 
+    // check topic names
+    topicNames.stream()
+        .filter(topicName -> !subscribedTopicNames.contains(topicName))
+        .forEach(
+            topicName ->
+                LOGGER.warn(
+                    "SubscriptionConsumer {} does not subscribe to topic {}", 
this, topicName));
+
     do {
       try {
         // poll tablets or file
@@ -336,14 +388,7 @@ abstract class SubscriptionConsumer implements 
AutoCloseable {
             case ERROR:
               final ErrorPayload payload = (ErrorPayload) 
pollResponse.getPayload();
               final String errorMessage = payload.getErrorMessage();
-              final boolean critical = payload.isCritical();
-              LOGGER.warn(
-                  "Error occurred when SubscriptionConsumer {} polling topics 
{}: {}, critical: {}",
-                  this,
-                  topicNames,
-                  errorMessage,
-                  critical);
-              if (critical) {
+              if (payload.isCritical()) {
                 throw new SubscriptionRuntimeCriticalException(errorMessage);
               } else {
                 throw new 
SubscriptionRuntimeNonCriticalException(errorMessage);
@@ -398,7 +443,7 @@ abstract class SubscriptionConsumer implements 
AutoCloseable {
       final SubscriptionCommitContext commitContext, final String fileName)
       throws SubscriptionException {
     final String topicName = commitContext.getTopicName();
-    final Path filePath = getFilePath(topicName, fileName);
+    final Path filePath = getFilePath(topicName, fileName, true, true);
     final File file = filePath.toFile();
     try (final RandomAccessFile fileWriter = new RandomAccessFile(file, "rw")) 
{
       return Optional.of(pollFileInternal(commitContext, file, fileWriter));
@@ -576,6 +621,9 @@ abstract class SubscriptionConsumer implements 
AutoCloseable {
     try {
       final SubscriptionProvider provider = 
providers.getNextAvailableProvider();
       if (Objects.isNull(provider) || !provider.isAvailable()) {
+        if (isClosed()) {
+          return Collections.emptyList();
+        }
         throw new SubscriptionConnectionException(
             String.format(
                 "Cluster has no available subscription providers when %s poll 
topic %s",
@@ -601,6 +649,9 @@ abstract class SubscriptionConsumer implements 
AutoCloseable {
     try {
       final SubscriptionProvider provider = providers.getProvider(dataNodeId);
       if (Objects.isNull(provider) || !provider.isAvailable()) {
+        if (isClosed()) {
+          return Collections.emptyList();
+        }
         throw new SubscriptionConnectionException(
             String.format(
                 "something unexpected happened when %s poll file from 
subscription provider with data node id %s, the subscription provider may be 
unavailable or not existed",
@@ -660,6 +711,9 @@ abstract class SubscriptionConsumer implements 
AutoCloseable {
     try {
       final SubscriptionProvider provider = providers.getProvider(dataNodeId);
       if (Objects.isNull(provider) || !provider.isAvailable()) {
+        if (isClosed()) {
+          return;
+        }
         throw new SubscriptionConnectionException(
             String.format(
                 "something unexpected happened when %s commit (nack: %s) 
messages to subscription provider with data node id %s, the subscription 
provider may be unavailable or not existed",
@@ -775,7 +829,7 @@ abstract class SubscriptionConsumer implements 
AutoCloseable {
     }
     for (final SubscriptionProvider provider : providers) {
       try {
-        provider.subscribe(topicNames);
+        subscribedTopicNames = provider.subscribe(topicNames);
         return;
       } catch (final Exception e) {
         LOGGER.warn(
@@ -805,7 +859,7 @@ abstract class SubscriptionConsumer implements 
AutoCloseable {
     }
     for (final SubscriptionProvider provider : providers) {
       try {
-        provider.unsubscribe(topicNames);
+        subscribedTopicNames = provider.unsubscribe(topicNames);
         return;
       } catch (final Exception e) {
         LOGGER.warn(
@@ -897,12 +951,12 @@ abstract class SubscriptionConsumer implements 
AutoCloseable {
     }
 
     public Builder consumerId(final String consumerId) {
-      this.consumerId = consumerId;
+      this.consumerId = IdentifierUtils.parseIdentifier(consumerId);
       return this;
     }
 
     public Builder consumerGroupId(final String consumerGroupId) {
-      this.consumerGroupId = consumerGroupId;
+      this.consumerGroupId = IdentifierUtils.parseIdentifier(consumerGroupId);
       return this;
     }
 
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionProvider.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionProvider.java
index 70589472f82..9bce0d6774f 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionProvider.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionProvider.java
@@ -40,6 +40,8 @@ import 
org.apache.iotdb.rpc.subscription.payload.request.PipeSubscribeSubscribeR
 import 
org.apache.iotdb.rpc.subscription.payload.request.PipeSubscribeUnsubscribeReq;
 import 
org.apache.iotdb.rpc.subscription.payload.response.PipeSubscribeHandshakeResp;
 import 
org.apache.iotdb.rpc.subscription.payload.response.PipeSubscribePollResp;
+import 
org.apache.iotdb.rpc.subscription.payload.response.PipeSubscribeSubscribeResp;
+import 
org.apache.iotdb.rpc.subscription.payload.response.PipeSubscribeUnsubscribeResp;
 import org.apache.iotdb.service.rpc.thrift.TPipeSubscribeResp;
 import org.apache.iotdb.session.subscription.SubscriptionSession;
 import org.apache.iotdb.session.subscription.SubscriptionSessionConnection;
@@ -215,7 +217,7 @@ final class SubscriptionProvider extends 
SubscriptionSession {
     verifyPipeSubscribeSuccess(resp.status);
   }
 
-  void subscribe(final Set<String> topicNames) throws SubscriptionException {
+  Set<String> subscribe(final Set<String> topicNames) throws 
SubscriptionException {
     final PipeSubscribeSubscribeReq req;
     try {
       req = PipeSubscribeSubscribeReq.toTPipeSubscribeReq(topicNames);
@@ -241,9 +243,12 @@ final class SubscriptionProvider extends 
SubscriptionSession {
       throw new SubscriptionConnectionException(e.getMessage(), e);
     }
     verifyPipeSubscribeSuccess(resp.status);
+    final PipeSubscribeSubscribeResp subscribeResp =
+        PipeSubscribeSubscribeResp.fromTPipeSubscribeResp(resp);
+    return subscribeResp.getTopicNames();
   }
 
-  void unsubscribe(final Set<String> topicNames) throws SubscriptionException {
+  Set<String> unsubscribe(final Set<String> topicNames) throws 
SubscriptionException {
     final PipeSubscribeUnsubscribeReq req;
     try {
       req = PipeSubscribeUnsubscribeReq.toTPipeSubscribeReq(topicNames);
@@ -269,6 +274,9 @@ final class SubscriptionProvider extends 
SubscriptionSession {
       throw new SubscriptionConnectionException(e.getMessage(), e);
     }
     verifyPipeSubscribeSuccess(resp.status);
+    final PipeSubscribeUnsubscribeResp unsubscribeResp =
+        PipeSubscribeUnsubscribeResp.fromTPipeSubscribeResp(resp);
+    return unsubscribeResp.getTopicNames();
   }
 
   List<SubscriptionPollResponse> poll(final SubscriptionPollRequest 
pollMessage)
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java
index 2306e4d05c1..38dfc530ee3 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.session.subscription.consumer;
 import org.apache.iotdb.rpc.subscription.config.ConsumerConstant;
 import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
 import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
+import org.apache.iotdb.session.subscription.util.IdentifierUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,6 +40,7 @@ import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
 
 /**
  * The {@link SubscriptionPullConsumer} corresponds to the pull consumption 
mode in the message
@@ -150,7 +152,12 @@ public class SubscriptionPullConsumer extends 
SubscriptionConsumer {
   @Override
   public List<SubscriptionMessage> poll(final Set<String> topicNames, final 
long timeoutMs)
       throws SubscriptionException {
-    final List<SubscriptionMessage> messages = super.poll(topicNames, 
timeoutMs);
+    // parse topic names from external source
+    final Set<String> parsedTopicNames =
+        
topicNames.stream().map(IdentifierUtils::parseIdentifier).collect(Collectors.toSet());
+
+    // poll messages
+    final List<SubscriptionMessage> messages = super.poll(parsedTopicNames, 
timeoutMs);
 
     // add to uncommitted messages
     if (autoCommit) {
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java
index 7ff1dd39b15..7323473552c 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java
@@ -27,7 +27,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 import java.util.Properties;
@@ -162,9 +161,12 @@ public class SubscriptionPushConsumer extends 
SubscriptionConsumer {
         return;
       }
 
+      if (subscribedTopicNames.isEmpty()) {
+        return;
+      }
+
       try {
-        // Poll all subscribed topics by passing an empty set
-        final List<SubscriptionMessage> messages = 
poll(Collections.emptySet(), autoPollTimeoutMs);
+        final List<SubscriptionMessage> messages = poll(subscribedTopicNames, 
autoPollTimeoutMs);
 
         if (ackStrategy.equals(AckStrategy.BEFORE_CONSUME)) {
           ack(messages);
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/payload/SubscriptionFileHandler.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/payload/SubscriptionFileHandler.java
index 60c269b8637..0ec121f9699 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/payload/SubscriptionFileHandler.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/payload/SubscriptionFileHandler.java
@@ -39,14 +39,14 @@ public abstract class SubscriptionFileHandler implements 
SubscriptionMessageHand
   /**
    * @return a new File instance of the corresponding file
    */
-  public File getFile() {
+  public synchronized File getFile() {
     return new File(absolutePath);
   }
 
   /**
    * @return a new Path instance of the corresponding file
    */
-  public Path getPath() {
+  public synchronized Path getPath() {
     return Paths.get(absolutePath);
   }
 
@@ -54,7 +54,7 @@ public abstract class SubscriptionFileHandler implements 
SubscriptionMessageHand
    * @return the path to the source file
    * @throws IOException if an I/O error occurs
    */
-  public Path deleteFile() throws IOException {
+  public synchronized Path deleteFile() throws IOException {
     final Path sourcePath = getPath();
     Files.delete(sourcePath);
     return sourcePath;
@@ -65,7 +65,7 @@ public abstract class SubscriptionFileHandler implements 
SubscriptionMessageHand
    * @return the path to the target file
    * @throws IOException if an I/O error occurs
    */
-  public Path moveFile(final String target) throws IOException {
+  public synchronized Path moveFile(final String target) throws IOException {
     return this.moveFile(Paths.get(target));
   }
 
@@ -74,7 +74,10 @@ public abstract class SubscriptionFileHandler implements 
SubscriptionMessageHand
    * @return the path to the target file
    * @throws IOException if an I/O error occurs
    */
-  public Path moveFile(final Path target) throws IOException {
+  public synchronized Path moveFile(final Path target) throws IOException {
+    if (!Files.exists(target.getParent())) {
+      Files.createDirectories(target.getParent());
+    }
     return Files.move(getPath(), target, StandardCopyOption.REPLACE_EXISTING);
   }
 
@@ -83,7 +86,7 @@ public abstract class SubscriptionFileHandler implements 
SubscriptionMessageHand
    * @return the path to the target file
    * @throws IOException if an I/O error occurs
    */
-  public Path copyFile(final String target) throws IOException {
+  public synchronized Path copyFile(final String target) throws IOException {
     return this.copyFile(Paths.get(target));
   }
 
@@ -92,7 +95,10 @@ public abstract class SubscriptionFileHandler implements 
SubscriptionMessageHand
    * @return the path to the target file
    * @throws IOException if an I/O error occurs
    */
-  public Path copyFile(final Path target) throws IOException {
+  public synchronized Path copyFile(final Path target) throws IOException {
+    if (!Files.exists(target.getParent())) {
+      Files.createDirectories(target.getParent());
+    }
     return Files.copy(
         getPath(), target, StandardCopyOption.REPLACE_EXISTING, 
StandardCopyOption.COPY_ATTRIBUTES);
   }
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/util/IdentifierUtils.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/util/IdentifierUtils.java
new file mode 100644
index 00000000000..9f6d09ef44e
--- /dev/null
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/util/IdentifierUtils.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.session.subscription.util;
+
+import 
org.apache.iotdb.rpc.subscription.exception.SubscriptionIdentifierSemanticException;
+
+import org.apache.tsfile.common.constant.TsFileConstant;
+import org.apache.tsfile.read.common.parser.PathVisitor;
+
+public class IdentifierUtils {
+
+  /**
+   * refer 
org.apache.iotdb.db.queryengine.plan.parser.ASTVisitor#parseIdentifier(java.lang.String)
+   */
+  public static String parseIdentifier(final String src) {
+    if (src.startsWith(TsFileConstant.BACK_QUOTE_STRING)
+        && src.endsWith(TsFileConstant.BACK_QUOTE_STRING)) {
+      return src.substring(1, src.length() - 1)
+          .replace(TsFileConstant.DOUBLE_BACK_QUOTE_STRING, 
TsFileConstant.BACK_QUOTE_STRING);
+    }
+    checkIdentifier(src);
+    return src;
+  }
+
+  private static void checkIdentifier(final String src) {
+    if (!TsFileConstant.IDENTIFIER_PATTERN.matcher(src).matches()
+        || PathVisitor.isRealNumber(src)) {
+      throw new SubscriptionIdentifierSemanticException(
+          String.format(
+              "%s is illegal, identifier not enclosed with backticks can only 
consist of digits, characters and underscore.",
+              src));
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionAgentLauncher.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionAgentLauncher.java
index 32dd078dd78..d83996cda47 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionAgentLauncher.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionAgentLauncher.java
@@ -87,7 +87,7 @@ class SubscriptionAgentLauncher {
         }
 
         return;
-      } catch (SubscriptionException | ClientManagerException | TException e) {
+      } catch (final SubscriptionException | ClientManagerException | 
TException e) {
         retry++;
         LOGGER.warn(
             "Failed to get topic meta from config node for {} times, will 
retry at most {} times.",
@@ -97,7 +97,7 @@ class SubscriptionAgentLauncher {
         try {
           Thread.sleep(
               retry * 
SubscriptionConfig.getInstance().getSubscriptionLaunchRetryIntervalMs());
-        } catch (InterruptedException interruptedException) {
+        } catch (final InterruptedException interruptedException) {
           LOGGER.info(
               "Interrupted while sleeping, will retry to get topic meta from 
config node.",
               interruptedException);
@@ -146,7 +146,7 @@ class SubscriptionAgentLauncher {
         }
 
         return;
-      } catch (SubscriptionException | ClientManagerException | TException e) {
+      } catch (final SubscriptionException | ClientManagerException | 
TException e) {
         retry++;
         LOGGER.warn(
             "Failed to get consumer group meta from config node for {} times, 
will retry at most {} times.",
@@ -156,7 +156,7 @@ class SubscriptionAgentLauncher {
         try {
           Thread.sleep(
               retry * 
SubscriptionConfig.getInstance().getSubscriptionLaunchRetryIntervalMs());
-        } catch (InterruptedException interruptedException) {
+        } catch (final InterruptedException interruptedException) {
           LOGGER.info(
               "Interrupted while sleeping, will retry to get consumer group 
meta from config node.",
               interruptedException);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
index aff4893c61a..21064ac34c8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
@@ -141,23 +141,22 @@ public class SubscriptionBrokerAgent {
     broker.bindPrefetchingQueue(subtask.getTopicName(), 
subtask.getInputPendingQueue());
   }
 
-  public void unbindPrefetchingQueue(final SubscriptionConnectorSubtask 
subtask) {
-    final String consumerGroupId = subtask.getConsumerGroupId();
+  public void unbindPrefetchingQueue(
+      final String consumerGroupId, final String topicName, final boolean 
doRemove) {
     final SubscriptionBroker broker = 
consumerGroupIdToSubscriptionBroker.get(consumerGroupId);
     if (Objects.isNull(broker)) {
       LOGGER.warn("Subscription: consumer group [{}] does not exist", 
consumerGroupId);
       return;
     }
-    broker.unbindPrefetchingQueue(subtask.getTopicName());
+    broker.unbindPrefetchingQueue(topicName, doRemove);
   }
 
-  public void executePrefetch(final SubscriptionConnectorSubtask subtask) {
-    final String consumerGroupId = subtask.getConsumerGroupId();
+  public void executePrefetch(final String consumerGroupId, final String 
topicName) {
     final SubscriptionBroker broker = 
consumerGroupIdToSubscriptionBroker.get(consumerGroupId);
     if (Objects.isNull(broker)) {
       LOGGER.warn("Subscription: consumer group [{}] does not exist", 
consumerGroupId);
       return;
     }
-    broker.executePrefetch(subtask.getTopicName());
+    broker.executePrefetch(topicName);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionConsumerAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionConsumerAgent.java
index d6572923635..5284d295863 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionConsumerAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionConsumerAgent.java
@@ -62,7 +62,7 @@ public class SubscriptionConsumerAgent {
   ////////////////////////// ConsumerGroupMeta Management Entry 
//////////////////////////
 
   public TPushConsumerGroupMetaRespExceptionMessage 
handleSingleConsumerGroupMetaChanges(
-      ConsumerGroupMeta consumerGroupMetaFromCoordinator) {
+      final ConsumerGroupMeta consumerGroupMetaFromCoordinator) {
     acquireWriteLock();
     try {
       if (consumerGroupMetaFromCoordinator.isEmpty()) {
@@ -71,7 +71,7 @@ public class SubscriptionConsumerAgent {
         
handleSingleConsumerGroupMetaChangesInternal(consumerGroupMetaFromCoordinator);
       }
       return null;
-    } catch (Exception e) {
+    } catch (final Exception e) {
       final String consumerGroupId = 
consumerGroupMetaFromCoordinator.getConsumerGroupId();
       final String exceptionMessage =
           String.format(
@@ -121,14 +121,15 @@ public class SubscriptionConsumerAgent {
   }
 
   public TPushConsumerGroupMetaRespExceptionMessage 
handleConsumerGroupMetaChanges(
-      List<ConsumerGroupMeta> consumerGroupMetasFromCoordinator) {
+      final List<ConsumerGroupMeta> consumerGroupMetasFromCoordinator) {
     acquireWriteLock();
     try {
-      for (ConsumerGroupMeta consumerGroupMetaFromCoordinator : 
consumerGroupMetasFromCoordinator) {
+      for (final ConsumerGroupMeta consumerGroupMetaFromCoordinator :
+          consumerGroupMetasFromCoordinator) {
         try {
           
handleSingleConsumerGroupMetaChangesInternal(consumerGroupMetaFromCoordinator);
           return null;
-        } catch (Exception e) {
+        } catch (final Exception e) {
           final String consumerGroupId = 
consumerGroupMetaFromCoordinator.getConsumerGroupId();
           final String exceptionMessage =
               String.format(
@@ -146,12 +147,12 @@ public class SubscriptionConsumerAgent {
   }
 
   public TPushConsumerGroupMetaRespExceptionMessage handleDropConsumerGroup(
-      String consumerGroupId) {
+      final String consumerGroupId) {
     acquireWriteLock();
     try {
       handleDropConsumerGroupInternal(consumerGroupId);
       return null;
-    } catch (Exception e) {
+    } catch (final Exception e) {
       final String exceptionMessage =
           String.format(
               "Subscription: Failed to drop consumer group %s, because %s", 
consumerGroupId, e);
@@ -163,7 +164,7 @@ public class SubscriptionConsumerAgent {
     }
   }
 
-  private void handleDropConsumerGroupInternal(String consumerGroupId) {
+  private void handleDropConsumerGroupInternal(final String consumerGroupId) {
     if (SubscriptionAgent.broker().isBrokerExist(consumerGroupId)) {
       if (!SubscriptionAgent.broker().dropBroker(consumerGroupId)) {
         final String exceptionMessage =
@@ -180,7 +181,7 @@ public class SubscriptionConsumerAgent {
     consumerGroupMetaKeeper.removeConsumerGroupMeta(consumerGroupId);
   }
 
-  public boolean isConsumerExisted(String consumerGroupId, String consumerId) {
+  public boolean isConsumerExisted(final String consumerGroupId, final String 
consumerId) {
     acquireReadLock();
     try {
       final ConsumerGroupMeta consumerGroupMeta =
@@ -191,7 +192,8 @@ public class SubscriptionConsumerAgent {
     }
   }
 
-  public Set<String> getTopicsSubscribedByConsumer(String consumerGroupId, 
String consumerId) {
+  public Set<String> getTopicsSubscribedByConsumer(
+      final String consumerGroupId, final String consumerId) {
     acquireReadLock();
     try {
       return 
consumerGroupMetaKeeper.getTopicsSubscribedByConsumer(consumerGroupId, 
consumerId);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionReceiverAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionReceiverAgent.java
index 1bb0735b019..71f319374cd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionReceiverAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionReceiverAgent.java
@@ -51,7 +51,7 @@ public class SubscriptionReceiverAgent {
         PipeSubscribeRequestVersion.VERSION_1.getVersion(), 
SubscriptionReceiverV1::new);
   }
 
-  public TPipeSubscribeResp handle(TPipeSubscribeReq req) {
+  public TPipeSubscribeResp handle(final TPipeSubscribeReq req) {
     final byte reqVersion = req.getVersion();
     if (RECEIVER_CONSTRUCTORS.containsKey(reqVersion)) {
       return getReceiver(reqVersion).handle(req);
@@ -69,7 +69,7 @@ public class SubscriptionReceiverAgent {
     }
   }
 
-  private SubscriptionReceiver getReceiver(byte reqVersion) {
+  private SubscriptionReceiver getReceiver(final byte reqVersion) {
     if (receiverThreadLocal.get() == null) {
       return setAndGetReceiver(reqVersion);
     }
@@ -88,7 +88,7 @@ public class SubscriptionReceiverAgent {
     return receiverThreadLocal.get();
   }
 
-  private SubscriptionReceiver setAndGetReceiver(byte reqVersion) {
+  private SubscriptionReceiver setAndGetReceiver(final byte reqVersion) {
     if (RECEIVER_CONSTRUCTORS.containsKey(reqVersion)) {
       receiverThreadLocal.set(RECEIVER_CONSTRUCTORS.get(reqVersion).get());
     } else {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionTopicAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionTopicAgent.java
index fb27e8825b8..45145bd2b67 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionTopicAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionTopicAgent.java
@@ -60,12 +60,12 @@ public class SubscriptionTopicAgent {
   ////////////////////////// Topic Management Entry //////////////////////////
 
   public TPushTopicMetaRespExceptionMessage handleSingleTopicMetaChanges(
-      TopicMeta topicMetaFromCoordinator) {
+      final TopicMeta topicMetaFromCoordinator) {
     acquireWriteLock();
     try {
       handleSingleTopicMetaChangesInternal(topicMetaFromCoordinator);
       return null;
-    } catch (Exception e) {
+    } catch (final Exception e) {
       final String topicName = topicMetaFromCoordinator.getTopicName();
       final String exceptionMessage =
           String.format(
@@ -86,13 +86,13 @@ public class SubscriptionTopicAgent {
   }
 
   public TPushTopicMetaRespExceptionMessage handleTopicMetaChanges(
-      List<TopicMeta> topicMetasFromCoordinator) {
+      final List<TopicMeta> topicMetasFromCoordinator) {
     acquireWriteLock();
     try {
-      for (TopicMeta topicMetaFromCoordinator : topicMetasFromCoordinator) {
+      for (final TopicMeta topicMetaFromCoordinator : 
topicMetasFromCoordinator) {
         try {
           handleSingleTopicMetaChangesInternal(topicMetaFromCoordinator);
-        } catch (Exception e) {
+        } catch (final Exception e) {
           final String topicName = topicMetaFromCoordinator.getTopicName();
           final String exceptionMessage =
               String.format(
@@ -109,12 +109,12 @@ public class SubscriptionTopicAgent {
     }
   }
 
-  public TPushTopicMetaRespExceptionMessage handleDropTopic(String topicName) {
+  public TPushTopicMetaRespExceptionMessage handleDropTopic(final String 
topicName) {
     acquireWriteLock();
     try {
       handleDropTopicInternal(topicName);
       return null;
-    } catch (Exception e) {
+    } catch (final Exception e) {
       final String exceptionMessage =
           String.format("Subscription: Failed to drop topic %s, because %s", 
topicName, e);
       LOGGER.warn(exceptionMessage);
@@ -125,11 +125,11 @@ public class SubscriptionTopicAgent {
     }
   }
 
-  private void handleDropTopicInternal(String topicName) {
+  private void handleDropTopicInternal(final String topicName) {
     topicMetaKeeper.removeTopicMeta(topicName);
   }
 
-  public boolean isTopicExisted(String topicName) {
+  public boolean isTopicExisted(final String topicName) {
     acquireReadLock();
     try {
       return topicMetaKeeper.containsTopicMeta(topicName);
@@ -138,7 +138,7 @@ public class SubscriptionTopicAgent {
     }
   }
 
-  public String getTopicFormat(String topicName) {
+  public String getTopicFormat(final String topicName) {
     acquireReadLock();
     try {
       return topicMetaKeeper
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
index 90785c1602e..d26ea8b4458 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
@@ -65,6 +65,10 @@ public class SubscriptionBroker {
       final String topicName = entry.getKey();
       final SubscriptionPrefetchingQueue prefetchingQueue = entry.getValue();
       if (topicNames.contains(topicName)) {
+        if (prefetchingQueue.isClosed()) {
+          LOGGER.warn("Subscription: prefetching queue bound to topic [{}] is 
closed", topicName);
+          continue;
+        }
         final SubscriptionEvent event = prefetchingQueue.poll(consumerId);
         if (Objects.nonNull(event)) {
           events.add(event);
@@ -95,6 +99,10 @@ public class SubscriptionBroker {
       LOGGER.warn(errorMessage);
       throw new SubscriptionException(errorMessage);
     }
+    if (prefetchingQueue.isClosed()) {
+      LOGGER.warn("Subscription: prefetching queue bound to topic [{}] is 
closed", topicName);
+      return Collections.emptyList();
+    }
     final SubscriptionEvent event =
         ((SubscriptionPrefetchingTsFileQueue) prefetchingQueue)
             .pollTsFile(consumerId, fileName, writingOffset);
@@ -117,6 +125,10 @@ public class SubscriptionBroker {
             "Subscription: prefetching queue bound to topic [{}] does not 
exist", topicName);
         continue;
       }
+      if (prefetchingQueue.isClosed()) {
+        LOGGER.warn("Subscription: prefetching queue bound to topic [{}] is 
closed", topicName);
+        continue;
+      }
       if (!nack) {
         if (prefetchingQueue.ack(commitContext)) {
           successfulCommitContexts.add(commitContext);
@@ -155,18 +167,25 @@ public class SubscriptionBroker {
     }
   }
 
-  public void unbindPrefetchingQueue(final String topicName) {
+  public void unbindPrefetchingQueue(final String topicName, final boolean 
doRemove) {
     final SubscriptionPrefetchingQueue prefetchingQueue =
         topicNameToPrefetchingQueue.get(topicName);
     if (Objects.isNull(prefetchingQueue)) {
       LOGGER.warn("Subscription: prefetching queue bound to topic [{}] does 
not exist", topicName);
       return;
     }
-    // clean up uncommitted events
+
+    // mark prefetching queue closed first
+    prefetchingQueue.markClosed();
+
+    // clean up events in prefetching queue, this operation is idempotent
     prefetchingQueue.cleanup();
-    topicNameToPrefetchingQueue.remove(topicName);
-    SubscriptionPrefetchingQueueMetrics.getInstance()
-        .deregister(prefetchingQueue.getPrefetchingQueueId());
+
+    if (doRemove) {
+      topicNameToPrefetchingQueue.remove(topicName);
+      SubscriptionPrefetchingQueueMetrics.getInstance()
+          .deregister(prefetchingQueue.getPrefetchingQueueId());
+    }
   }
 
   public void executePrefetch(final String topicName) {
@@ -176,6 +195,10 @@ public class SubscriptionBroker {
       LOGGER.warn("Subscription: prefetching queue bound to topic [{}] does 
not exist", topicName);
       return;
     }
+    if (prefetchingQueue.isClosed()) {
+      LOGGER.warn("Subscription: prefetching queue bound to topic [{}] is 
closed", topicName);
+      return;
+    }
     prefetchingQueue.executePrefetch();
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
index 43aef195681..9dec709230f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
@@ -46,6 +46,9 @@ public abstract class SubscriptionPrefetchingQueue {
   protected final Map<SubscriptionCommitContext, SubscriptionEvent> 
uncommittedEvents;
   private final AtomicLong subscriptionCommitIdGenerator = new AtomicLong(0);
 
+  private volatile boolean isCompleted = false;
+  private volatile boolean isClosed = false;
+
   public SubscriptionPrefetchingQueue(
       final String brokerId,
       final String topicName,
@@ -61,11 +64,16 @@ public abstract class SubscriptionPrefetchingQueue {
 
   public abstract void executePrefetch();
 
-  /** clean up uncommitted events */
   public void cleanup() {
+    // clean up uncommitted events
     for (final SubscriptionEvent event : uncommittedEvents.values()) {
+      event.clearReferenceCount();
       SubscriptionEventBinaryCache.getInstance().resetByteBuffer(event, true);
     }
+    uncommittedEvents.clear();
+
+    // no need to clean up events in inputPendingQueue, see
+    // 
org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtask.close
   }
 
   /////////////////////////////// commit ///////////////////////////////
@@ -150,4 +158,22 @@ public abstract class SubscriptionPrefetchingQueue {
   public long getCurrentCommitId() {
     return subscriptionCommitIdGenerator.get();
   }
+
+  /////////////////////////////// termination ///////////////////////////////
+
+  public boolean isClosed() {
+    return isClosed;
+  }
+
+  public void markClosed() {
+    isClosed = true;
+  }
+
+  public boolean isCompleted() {
+    return isCompleted;
+  }
+
+  public void markCompleted() {
+    isCompleted = true;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletsQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletsQueue.java
index 195194ef3c8..14d3315e1b7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletsQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletsQueue.java
@@ -63,6 +63,15 @@ public class SubscriptionPrefetchingTabletsQueue extends 
SubscriptionPrefetching
     this.prefetchingQueue = new LinkedBlockingQueue<>();
   }
 
+  @Override
+  public void cleanup() {
+    super.cleanup();
+
+    // no need to clean up events in prefetchingQueue, since all events in 
prefetchingQueue are also
+    // in uncommittedEvents
+    prefetchingQueue.clear();
+  }
+
   @Override
   public SubscriptionEvent poll(final String consumerId) {
     if (prefetchingQueue.isEmpty()) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
index fc5a1a8a81e..3f2dac42db6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import 
org.apache.iotdb.commons.pipe.task.connection.UnboundedBlockingPendingQueue;
 import org.apache.iotdb.db.pipe.event.UserDefinedEnrichedEvent;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
+import org.apache.iotdb.db.subscription.event.SubscriptionEventBinaryCache;
 import org.apache.iotdb.db.subscription.event.SubscriptionTsFileEvent;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
@@ -63,6 +64,21 @@ public class SubscriptionPrefetchingTsFileQueue extends 
SubscriptionPrefetchingQ
     this.consumerIdToCurrentEventMap = new ConcurrentHashMap<>();
   }
 
+  @Override
+  public void cleanup() {
+    super.cleanup();
+
+    // clean up events in consumerIdToCurrentEventMap
+    consumerIdToCurrentEventMap
+        .values()
+        .forEach(
+            (event) -> {
+              event.clearReferenceCount();
+              
SubscriptionEventBinaryCache.getInstance().resetByteBuffer(event, true);
+            });
+    consumerIdToCurrentEventMap.clear();
+  }
+
   @Override
   public SubscriptionTsFileEvent poll(final String consumerId) {
     if (hasUnPollableOnTheFlySubscriptionTsFileEvent(consumerId)) {
@@ -78,6 +94,14 @@ public class SubscriptionPrefetchingTsFileQueue extends 
SubscriptionPrefetchingQ
     Event event;
     while (Objects.nonNull(
         event = 
UserDefinedEnrichedEvent.maybeOf(inputPendingQueue.waitedPoll()))) {
+      if (!(event instanceof EnrichedEvent)) {
+        LOGGER.warn(
+            "Subscription: SubscriptionPrefetchingTsFileQueue {} only support 
poll EnrichedEvent. Ignore {}.",
+            this,
+            event);
+        continue;
+      }
+
       if (event instanceof TabletInsertionEvent) {
         final String errorMessage =
             String.format(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java
index c327b7e058d..0f7feb08e95 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java
@@ -76,6 +76,12 @@ public class SubscriptionEvent {
     }
   }
 
+  public void clearReferenceCount() {
+    for (final EnrichedEvent enrichedEvent : enrichedEvents) {
+      enrichedEvent.clearReferenceCount(this.getClass().getName());
+    }
+  }
+
   //////////////////////////// pollable ////////////////////////////
 
   public void recordLastPolledTimestamp() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
index 8339c3d3c88..9876339101a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
@@ -31,7 +31,6 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
 import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
 import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
-import org.apache.iotdb.db.queryengine.plan.parser.ASTVisitor;
 import org.apache.iotdb.db.subscription.agent.SubscriptionAgent;
 import org.apache.iotdb.db.subscription.broker.SubscriptionPrefetchingQueue;
 import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
@@ -72,6 +71,7 @@ import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.List;
@@ -247,7 +247,7 @@ public class SubscriptionReceiverV1 implements 
SubscriptionReceiver {
   private TPipeSubscribeResp handlePipeSubscribeSubscribe(final 
PipeSubscribeSubscribeReq req) {
     try {
       return handlePipeSubscribeSubscribeInternal(req);
-    } catch (final SubscriptionException e) {
+    } catch (final SubscriptionException | IOException e) {
       final String exceptionMessage =
           String.format(
               "Subscription: something unexpected happened when subscribing: 
%s, req: %s", e, req);
@@ -258,7 +258,7 @@ public class SubscriptionReceiverV1 implements 
SubscriptionReceiver {
   }
 
   private TPipeSubscribeResp handlePipeSubscribeSubscribeInternal(
-      final PipeSubscribeSubscribeReq req) {
+      final PipeSubscribeSubscribeReq req) throws IOException {
     // check consumer config thread local
     final ConsumerConfig consumerConfig = consumerConfigThreadLocal.get();
     if (Objects.isNull(consumerConfig)) {
@@ -268,18 +268,21 @@ public class SubscriptionReceiverV1 implements 
SubscriptionReceiver {
     }
 
     // subscribe topics
-    Set<String> topicNames = req.getTopicNames();
-    topicNames = 
topicNames.stream().map(ASTVisitor::parseIdentifier).collect(Collectors.toSet());
+    final Set<String> topicNames = req.getTopicNames();
     subscribe(consumerConfig, topicNames);
 
     LOGGER.info("Subscription: consumer {} subscribe {} successfully", 
consumerConfig, topicNames);
-    return 
PipeSubscribeSubscribeResp.toTPipeSubscribeResp(RpcUtils.SUCCESS_STATUS);
+    return PipeSubscribeSubscribeResp.toTPipeSubscribeResp(
+        RpcUtils.SUCCESS_STATUS,
+        SubscriptionAgent.consumer()
+            .getTopicsSubscribedByConsumer(
+                consumerConfig.getConsumerGroupId(), 
consumerConfig.getConsumerId()));
   }
 
   private TPipeSubscribeResp handlePipeSubscribeUnsubscribe(final 
PipeSubscribeUnsubscribeReq req) {
     try {
       return handlePipeSubscribeUnsubscribeInternal(req);
-    } catch (final SubscriptionException e) {
+    } catch (final SubscriptionException | IOException e) {
       final String exceptionMessage =
           String.format(
               "Subscription: something unexpected happened when unsubscribing: 
%s, req: %s",
@@ -291,7 +294,7 @@ public class SubscriptionReceiverV1 implements 
SubscriptionReceiver {
   }
 
   private TPipeSubscribeResp handlePipeSubscribeUnsubscribeInternal(
-      final PipeSubscribeUnsubscribeReq req) {
+      final PipeSubscribeUnsubscribeReq req) throws IOException {
     // check consumer config thread local
     final ConsumerConfig consumerConfig = consumerConfigThreadLocal.get();
     if (Objects.isNull(consumerConfig)) {
@@ -302,13 +305,16 @@ public class SubscriptionReceiverV1 implements 
SubscriptionReceiver {
     }
 
     // unsubscribe topics
-    Set<String> topicNames = req.getTopicNames();
-    topicNames = 
topicNames.stream().map(ASTVisitor::parseIdentifier).collect(Collectors.toSet());
+    final Set<String> topicNames = req.getTopicNames();
     unsubscribe(consumerConfig, topicNames);
 
     LOGGER.info(
         "Subscription: consumer {} unsubscribe {} successfully", 
consumerConfig, topicNames);
-    return 
PipeSubscribeUnsubscribeResp.toTPipeSubscribeResp(RpcUtils.SUCCESS_STATUS);
+    return PipeSubscribeUnsubscribeResp.toTPipeSubscribeResp(
+        RpcUtils.SUCCESS_STATUS,
+        SubscriptionAgent.consumer()
+            .getTopicsSubscribedByConsumer(
+                consumerConfig.getConsumerGroupId(), 
consumerConfig.getConsumerId()));
   }
 
   private TPipeSubscribeResp handlePipeSubscribePoll(final 
PipeSubscribePollReq req) {
@@ -403,15 +409,11 @@ public class SubscriptionReceiverV1 implements 
SubscriptionReceiver {
         SubscriptionAgent.consumer()
             .getTopicsSubscribedByConsumer(
                 consumerConfig.getConsumerGroupId(), 
consumerConfig.getConsumerId());
-    final Set<String> topicNames;
-    if (messagePayload.getTopicNames().isEmpty()) {
+    Set<String> topicNames = messagePayload.getTopicNames();
+    if (topicNames.isEmpty()) {
       // poll all subscribed topics
       topicNames = subscribedTopicNames;
     } else {
-      topicNames =
-          messagePayload.getTopicNames().stream()
-              .map(ASTVisitor::parseIdentifier)
-              .collect(Collectors.toSet());
       // filter unsubscribed topics
       topicNames.removeIf((topicName) -> 
!subscribedTopicNames.contains(topicName));
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtask.java
index bb4838b903e..b429b839942 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtask.java
@@ -31,14 +31,14 @@ public class SubscriptionConnectorSubtask extends 
PipeConnectorSubtask {
   private final String consumerGroupId;
 
   public SubscriptionConnectorSubtask(
-      String taskID,
-      long creationTime,
-      String attributeSortedString,
-      int connectorIndex,
-      UnboundedBlockingPendingQueue<Event> inputPendingQueue,
-      PipeConnector outputPipeConnector,
-      String topicName,
-      String consumerGroupId) {
+      final String taskID,
+      final long creationTime,
+      final String attributeSortedString,
+      final int connectorIndex,
+      final UnboundedBlockingPendingQueue<Event> inputPendingQueue,
+      final PipeConnector outputPipeConnector,
+      final String topicName,
+      final String consumerGroupId) {
     super(
         taskID,
         creationTime,
@@ -56,7 +56,7 @@ public class SubscriptionConnectorSubtask extends 
PipeConnectorSubtask {
       return false;
     }
 
-    SubscriptionAgent.broker().executePrefetch(this);
+    SubscriptionAgent.broker().executePrefetch(consumerGroupId, topicName);
     // always return true
     return true;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskLifeCycle.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskLifeCycle.java
index 14633dfafef..43827ab85c4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskLifeCycle.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskLifeCycle.java
@@ -38,9 +38,9 @@ public class SubscriptionConnectorSubtaskLifeCycle extends 
PipeConnectorSubtaskL
   private int registeredTaskCount;
 
   public SubscriptionConnectorSubtaskLifeCycle(
-      PipeConnectorSubtaskExecutor executor, // SubscriptionSubtaskExecutor
-      PipeConnectorSubtask subtask, // SubscriptionConnectorSubtask
-      UnboundedBlockingPendingQueue<Event> pendingQueue) {
+      final PipeConnectorSubtaskExecutor executor, // 
SubscriptionSubtaskExecutor
+      final PipeConnectorSubtask subtask, // SubscriptionConnectorSubtask
+      final UnboundedBlockingPendingQueue<Event> pendingQueue) {
     super(executor, subtask, pendingQueue);
 
     runningTaskCount = 0;
@@ -54,6 +54,7 @@ public class SubscriptionConnectorSubtaskLifeCycle extends 
PipeConnectorSubtaskL
     }
 
     if (registeredTaskCount == 0) {
+      // bind prefetching queue
       
SubscriptionAgent.broker().bindPrefetchingQueue((SubscriptionConnectorSubtask) 
subtask);
       executor.register(subtask);
       runningTaskCount = 0;
@@ -68,11 +69,13 @@ public class SubscriptionConnectorSubtaskLifeCycle extends 
PipeConnectorSubtaskL
   }
 
   @Override
-  public synchronized boolean deregister(String ignored) {
+  public synchronized boolean deregister(final String ignored) {
     if (registeredTaskCount <= 0) {
       throw new IllegalStateException("registeredTaskCount <= 0");
     }
 
+    // no need to discard events of pipe
+
     try {
       if (registeredTaskCount > 1) {
         return false;
@@ -91,45 +94,12 @@ public class SubscriptionConnectorSubtaskLifeCycle extends 
PipeConnectorSubtaskL
     }
   }
 
-  @Override
-  public synchronized void start() {
-    if (runningTaskCount < 0) {
-      throw new IllegalStateException("runningTaskCount < 0");
-    }
-
-    if (runningTaskCount == 0) {
-      executor.start(subtask.getTaskID());
-    }
-
-    runningTaskCount++;
-    LOGGER.info(
-        "Start subtask {}. runningTaskCount: {}, registeredTaskCount: {}",
-        subtask,
-        runningTaskCount,
-        registeredTaskCount);
-  }
-
-  @Override
-  public synchronized void stop() {
-    if (runningTaskCount <= 0) {
-      throw new IllegalStateException("runningTaskCount <= 0");
-    }
-
-    if (runningTaskCount == 1) {
-      executor.stop(subtask.getTaskID());
-    }
-
-    runningTaskCount--;
-    LOGGER.info(
-        "Stop subtask {}. runningTaskCount: {}, registeredTaskCount: {}",
-        subtask,
-        runningTaskCount,
-        registeredTaskCount);
-  }
-
   @Override
   public synchronized void close() {
-    executor.deregister(subtask.getTaskID());
-    
SubscriptionAgent.broker().unbindPrefetchingQueue((SubscriptionConnectorSubtask)
 subtask);
+    super.close();
+
+    final String consumerGroupId = ((SubscriptionConnectorSubtask) 
subtask).getConsumerGroupId();
+    final String topicName = ((SubscriptionConnectorSubtask) 
subtask).getTopicName();
+    SubscriptionAgent.broker().unbindPrefetchingQueue(consumerGroupId, 
topicName, true);
   }
 }

Reply via email to