This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 24be79aba9c Fix CQ recovery gap and stale callback contamination 
(#17734) (#17820)
24be79aba9c is described below

commit 24be79aba9c973a4cb11fdb8947cdf32da12ea8e
Author: Caideyipi <[email protected]>
AuthorDate: Wed Jun 3 10:57:35 2026 +0800

    Fix CQ recovery gap and stale callback contamination (#17734) (#17820)
    
    * fix
    
    * sp
    
    * Fix CQ local schedule cancellation
---
 iotdb-core/confignode/pom.xml                      |   4 -
 .../consensus/request/read/cq/ShowCQPlan.java      |  13 +++
 .../consensus/request/write/cq/ActiveCQPlan.java   |  20 ++--
 .../consensus/request/write/cq/AddCQPlan.java      |  20 ++--
 .../consensus/request/write/cq/DropCQPlan.java     |  20 ++--
 .../request/write/cq/UpdateCQLastExecTimePlan.java |  23 ++--
 .../iotdb/confignode/manager/cq/CQManager.java     | 102 +++++++++++++++++-
 .../confignode/manager/cq/CQScheduleTask.java      |  58 ++++++++--
 .../iotdb/confignode/persistence/cq/CQInfo.java    |  76 +++++++------
 .../persistence/executor/ConfigPlanExecutor.java   |   3 +-
 .../procedure/impl/cq/CreateCQProcedure.java       |  80 +++++++++++---
 .../request/ConfigPhysicalPlanSerDeTest.java       |   8 +-
 .../apache/iotdb/confignode/cq/CQManagerTest.java  | 107 +++++++++++++++++++
 .../iotdb/confignode/persistence/CQInfoTest.java   |  64 ++++++++++-
 .../procedure/impl/CreateCQProcedureTest.java      |  26 +++++
 .../impl/cq/CreateCQProcedureRecoveryTest.java     | 117 +++++++++++++++++++++
 16 files changed, 635 insertions(+), 106 deletions(-)

diff --git a/iotdb-core/confignode/pom.xml b/iotdb-core/confignode/pom.xml
index fb22fd5b091..a7cd4b6298b 100644
--- a/iotdb-core/confignode/pom.xml
+++ b/iotdb-core/confignode/pom.xml
@@ -137,10 +137,6 @@
             <groupId>org.apache.commons</groupId>
             <artifactId>commons-lang3</artifactId>
         </dependency>
-        <dependency>
-            <groupId>commons-codec</groupId>
-            <artifactId>commons-codec</artifactId>
-        </dependency>
         <dependency>
             <groupId>org.apache.thrift</groupId>
             <artifactId>libthrift</artifactId>
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/cq/ShowCQPlan.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/cq/ShowCQPlan.java
index 5217849deb4..c28838d556b 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/cq/ShowCQPlan.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/cq/ShowCQPlan.java
@@ -21,11 +21,24 @@ package 
org.apache.iotdb.confignode.consensus.request.read.cq;
 
 import 
org.apache.iotdb.confignode.consensus.request.read.ConfigPhysicalReadPlan;
 
+import java.util.Optional;
+
 import static 
org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType.SHOW_CQ;
 
 public class ShowCQPlan extends ConfigPhysicalReadPlan {
 
+  private final String cqId;
+
   public ShowCQPlan() {
+    this(null);
+  }
+
+  public ShowCQPlan(String cqId) {
     super(SHOW_CQ);
+    this.cqId = cqId;
+  }
+
+  public Optional<String> getCqId() {
+    return Optional.ofNullable(cqId);
   }
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/ActiveCQPlan.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/ActiveCQPlan.java
index e488ac25669..4ca33b054a9 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/ActiveCQPlan.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/ActiveCQPlan.java
@@ -35,39 +35,39 @@ public class ActiveCQPlan extends ConfigPhysicalPlan {
 
   private String cqId;
 
-  private String md5;
+  private String cqToken;
 
   public ActiveCQPlan() {
     super(ACTIVE_CQ);
   }
 
-  public ActiveCQPlan(String cqId, String md5) {
+  public ActiveCQPlan(String cqId, String cqToken) {
     super(ACTIVE_CQ);
     Validate.notNull(cqId);
-    Validate.notNull(md5);
+    Validate.notNull(cqToken);
     this.cqId = cqId;
-    this.md5 = md5;
+    this.cqToken = cqToken;
   }
 
   public String getCqId() {
     return cqId;
   }
 
-  public String getMd5() {
-    return md5;
+  public String getCqToken() {
+    return cqToken;
   }
 
   @Override
   protected void serializeImpl(DataOutputStream stream) throws IOException {
     stream.writeShort(getType().getPlanType());
     ReadWriteIOUtils.write(cqId, stream);
-    ReadWriteIOUtils.write(md5, stream);
+    ReadWriteIOUtils.write(cqToken, stream);
   }
 
   @Override
   protected void deserializeImpl(ByteBuffer buffer) throws IOException {
     cqId = ReadWriteIOUtils.readString(buffer);
-    md5 = ReadWriteIOUtils.readString(buffer);
+    cqToken = ReadWriteIOUtils.readString(buffer);
   }
 
   @Override
@@ -82,11 +82,11 @@ public class ActiveCQPlan extends ConfigPhysicalPlan {
       return false;
     }
     ActiveCQPlan that = (ActiveCQPlan) o;
-    return cqId.equals(that.cqId) && md5.equals(that.md5);
+    return cqId.equals(that.cqId) && cqToken.equals(that.cqToken);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(super.hashCode(), cqId, md5);
+    return Objects.hash(super.hashCode(), cqId, cqToken);
   }
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/AddCQPlan.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/AddCQPlan.java
index 0aae9e2e974..721c83d3d2f 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/AddCQPlan.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/AddCQPlan.java
@@ -37,7 +37,7 @@ public class AddCQPlan extends ConfigPhysicalPlan {
 
   private TCreateCQReq req;
 
-  private String md5;
+  private String cqToken;
 
   private long firstExecutionTime;
 
@@ -45,12 +45,12 @@ public class AddCQPlan extends ConfigPhysicalPlan {
     super(ADD_CQ);
   }
 
-  public AddCQPlan(TCreateCQReq req, String md5, long firstExecutionTime) {
+  public AddCQPlan(TCreateCQReq req, String cqToken, long firstExecutionTime) {
     super(ADD_CQ);
     Validate.notNull(req);
-    Validate.notNull(md5);
+    Validate.notNull(cqToken);
     this.req = req;
-    this.md5 = md5;
+    this.cqToken = cqToken;
     this.firstExecutionTime = firstExecutionTime;
   }
 
@@ -58,8 +58,8 @@ public class AddCQPlan extends ConfigPhysicalPlan {
     return req;
   }
 
-  public String getMd5() {
-    return md5;
+  public String getCqToken() {
+    return cqToken;
   }
 
   public long getFirstExecutionTime() {
@@ -70,14 +70,14 @@ public class AddCQPlan extends ConfigPhysicalPlan {
   protected void serializeImpl(DataOutputStream stream) throws IOException {
     stream.writeShort(getType().getPlanType());
     ThriftCommonsSerDeUtils.serializeTCreateCQReq(req, stream);
-    ReadWriteIOUtils.write(md5, stream);
+    ReadWriteIOUtils.write(cqToken, stream);
     ReadWriteIOUtils.write(firstExecutionTime, stream);
   }
 
   @Override
   protected void deserializeImpl(ByteBuffer buffer) throws IOException {
     req = ThriftCommonsSerDeUtils.deserializeTCreateCQReq(buffer);
-    md5 = ReadWriteIOUtils.readString(buffer);
+    cqToken = ReadWriteIOUtils.readString(buffer);
     firstExecutionTime = ReadWriteIOUtils.readLong(buffer);
   }
 
@@ -95,11 +95,11 @@ public class AddCQPlan extends ConfigPhysicalPlan {
     AddCQPlan addCQPlan = (AddCQPlan) o;
     return firstExecutionTime == addCQPlan.firstExecutionTime
         && Objects.equals(req, addCQPlan.req)
-        && Objects.equals(md5, addCQPlan.md5);
+        && Objects.equals(cqToken, addCQPlan.cqToken);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(super.hashCode(), req, md5, firstExecutionTime);
+    return Objects.hash(super.hashCode(), req, cqToken, firstExecutionTime);
   }
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/DropCQPlan.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/DropCQPlan.java
index 5c901362997..f22561c0b80 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/DropCQPlan.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/DropCQPlan.java
@@ -37,7 +37,7 @@ public class DropCQPlan extends ConfigPhysicalPlan {
   private String cqId;
 
   // may be null in user call of drop CQ
-  private String md5;
+  private String cqToken;
 
   public DropCQPlan() {
     super(DROP_CQ);
@@ -49,33 +49,33 @@ public class DropCQPlan extends ConfigPhysicalPlan {
     this.cqId = cqId;
   }
 
-  public DropCQPlan(String cqId, String md5) {
+  public DropCQPlan(String cqId, String cqToken) {
     super(DROP_CQ);
     Validate.notNull(cqId);
-    Validate.notNull(md5);
+    Validate.notNull(cqToken);
     this.cqId = cqId;
-    this.md5 = md5;
+    this.cqToken = cqToken;
   }
 
   public String getCqId() {
     return cqId;
   }
 
-  public Optional<String> getMd5() {
-    return Optional.ofNullable(md5);
+  public Optional<String> getCqToken() {
+    return Optional.ofNullable(cqToken);
   }
 
   @Override
   protected void serializeImpl(DataOutputStream stream) throws IOException {
     stream.writeShort(getType().getPlanType());
     ReadWriteIOUtils.write(cqId, stream);
-    ReadWriteIOUtils.write(md5, stream);
+    ReadWriteIOUtils.write(cqToken, stream);
   }
 
   @Override
   protected void deserializeImpl(ByteBuffer buffer) throws IOException {
     cqId = ReadWriteIOUtils.readString(buffer);
-    md5 = ReadWriteIOUtils.readString(buffer);
+    cqToken = ReadWriteIOUtils.readString(buffer);
   }
 
   @Override
@@ -90,11 +90,11 @@ public class DropCQPlan extends ConfigPhysicalPlan {
       return false;
     }
     DropCQPlan that = (DropCQPlan) o;
-    return cqId.equals(that.cqId) && Objects.equals(md5, that.md5);
+    return cqId.equals(that.cqId) && Objects.equals(cqToken, that.cqToken);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(super.hashCode(), cqId, md5);
+    return Objects.hash(super.hashCode(), cqId, cqToken);
   }
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/UpdateCQLastExecTimePlan.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/UpdateCQLastExecTimePlan.java
index 51a8358a04c..392f6006ba5 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/UpdateCQLastExecTimePlan.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/UpdateCQLastExecTimePlan.java
@@ -37,20 +37,19 @@ public class UpdateCQLastExecTimePlan extends 
ConfigPhysicalPlan {
 
   private long executionTime;
 
-  // may be null in user call of drop CQ
-  private String md5;
+  private String cqToken;
 
   public UpdateCQLastExecTimePlan() {
     super(UPDATE_CQ_LAST_EXEC_TIME);
   }
 
-  public UpdateCQLastExecTimePlan(String cqId, long executionTime, String md5) 
{
+  public UpdateCQLastExecTimePlan(String cqId, long executionTime, String 
cqToken) {
     super(UPDATE_CQ_LAST_EXEC_TIME);
     Validate.notNull(cqId);
-    Validate.notNull(md5);
+    Validate.notNull(cqToken);
     this.cqId = cqId;
     this.executionTime = executionTime;
-    this.md5 = md5;
+    this.cqToken = cqToken;
   }
 
   public String getCqId() {
@@ -61,8 +60,8 @@ public class UpdateCQLastExecTimePlan extends 
ConfigPhysicalPlan {
     return executionTime;
   }
 
-  public String getMd5() {
-    return md5;
+  public String getCqToken() {
+    return cqToken;
   }
 
   @Override
@@ -70,14 +69,14 @@ public class UpdateCQLastExecTimePlan extends 
ConfigPhysicalPlan {
     stream.writeShort(getType().getPlanType());
     ReadWriteIOUtils.write(cqId, stream);
     ReadWriteIOUtils.write(executionTime, stream);
-    ReadWriteIOUtils.write(md5, stream);
+    ReadWriteIOUtils.write(cqToken, stream);
   }
 
   @Override
   protected void deserializeImpl(ByteBuffer buffer) throws IOException {
     cqId = ReadWriteIOUtils.readString(buffer);
     executionTime = ReadWriteIOUtils.readLong(buffer);
-    md5 = ReadWriteIOUtils.readString(buffer);
+    cqToken = ReadWriteIOUtils.readString(buffer);
   }
 
   @Override
@@ -92,11 +91,13 @@ public class UpdateCQLastExecTimePlan extends 
ConfigPhysicalPlan {
       return false;
     }
     UpdateCQLastExecTimePlan that = (UpdateCQLastExecTimePlan) o;
-    return executionTime == that.executionTime && cqId.equals(that.cqId) && 
md5.equals(that.md5);
+    return executionTime == that.executionTime
+        && cqId.equals(that.cqId)
+        && cqToken.equals(that.cqToken);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(super.hashCode(), cqId, executionTime, md5);
+    return Objects.hash(super.hashCode(), cqId, executionTime, cqToken);
   }
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java
index 5726b3ce826..a90a21f285f 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java
@@ -42,7 +42,10 @@ import org.slf4j.LoggerFactory;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -56,11 +59,15 @@ public class CQManager {
 
   private final ReadWriteLock lock;
 
+  // Key: CQ id. Value: the local task and the metadata token it owns.
+  private final ConcurrentMap<String, LocallyScheduledCQ> locallyScheduledCQs;
+
   private ScheduledExecutorService executor;
 
   public CQManager(ConfigManager configManager) {
     this.configManager = configManager;
     this.lock = new ReentrantReadWriteLock();
+    this.locallyScheduledCQs = new ConcurrentHashMap<>();
     this.executor =
         IoTDBThreadPoolFactory.newScheduledThreadPool(
             CONF.getCqSubmitThread(), ThreadName.CQ_SCHEDULER.getName());
@@ -77,14 +84,21 @@ public class CQManager {
   }
 
   public TSStatus dropCQ(TDropCQReq req) {
+    lock.readLock().lock();
     try {
-      return configManager.getConsensusManager().write(new 
DropCQPlan(req.cqId));
+      TSStatus status = configManager.getConsensusManager().write(new 
DropCQPlan(req.cqId));
+      if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        cancelLocallyScheduledCQ(req.cqId);
+      }
+      return status;
     } catch (ConsensusException e) {
       LOGGER.warn("Unexpected error happened while dropping cq {}: ", 
req.cqId, e);
       // consensus layer related errors
       TSStatus res = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
       res.setMessage(e.getMessage());
       return res;
+    } finally {
+      lock.readLock().unlock();
     }
   }
 
@@ -117,6 +131,7 @@ public class CQManager {
     try {
       // 1. shutdown previous cq schedule thread pool
       try {
+        cancelAllLocallyScheduledCQs();
         if (executor != null) {
           executor.shutdown();
         }
@@ -154,7 +169,15 @@ public class CQManager {
         for (CQInfo.CQEntry entry : allCQs) {
           if (entry.getState() == CQState.ACTIVE) {
             CQScheduleTask cqScheduleTask = new CQScheduleTask(entry, 
executor, configManager);
-            cqScheduleTask.submitSelf();
+            if (!markCQLocallyScheduled(entry.getCqId(), entry.getCqToken(), 
cqScheduleTask)) {
+              continue;
+            }
+            try {
+              cqScheduleTask.submitSelf();
+            } catch (RuntimeException e) {
+              unmarkCQLocallyScheduled(entry.getCqId(), entry.getCqToken());
+              throw e;
+            }
           }
         }
       }
@@ -174,6 +197,7 @@ public class CQManager {
     try {
       previous = executor;
       executor = null;
+      cancelAllLocallyScheduledCQs();
     } finally {
       lock.writeLock().unlock();
     }
@@ -181,4 +205,78 @@ public class CQManager {
       previous.shutdown();
     }
   }
+
+  public boolean markCQLocallyScheduled(String cqId, String cqToken, 
CQScheduleTask task) {
+    AtomicBoolean shouldSchedule = new AtomicBoolean(false);
+    LocallyScheduledCQ schedule = new LocallyScheduledCQ(cqToken, task);
+    lock.readLock().lock();
+    try {
+      locallyScheduledCQs.compute(
+          cqId,
+          (ignored, previousSchedule) -> {
+            if (previousSchedule != null && 
previousSchedule.hasToken(cqToken)) {
+              return previousSchedule;
+            }
+            if (previousSchedule != null) {
+              previousSchedule.cancel();
+            }
+            shouldSchedule.set(true);
+            return schedule;
+          });
+      if (!shouldSchedule.get()) {
+        task.cancel();
+      }
+      return shouldSchedule.get();
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+
+  public void unmarkCQLocallyScheduled(String cqId, String cqToken) {
+    lock.readLock().lock();
+    try {
+      locallyScheduledCQs.computeIfPresent(
+          cqId,
+          (ignored, schedule) -> {
+            if (schedule.hasToken(cqToken)) {
+              schedule.cancel();
+              return null;
+            }
+            return schedule;
+          });
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+
+  private void cancelLocallyScheduledCQ(String cqId) {
+    LocallyScheduledCQ schedule = locallyScheduledCQs.remove(cqId);
+    if (schedule != null) {
+      schedule.cancel();
+    }
+  }
+
+  private void cancelAllLocallyScheduledCQs() {
+    locallyScheduledCQs.values().forEach(LocallyScheduledCQ::cancel);
+    locallyScheduledCQs.clear();
+  }
+
+  private static class LocallyScheduledCQ {
+
+    private final String cqToken;
+    private final CQScheduleTask task;
+
+    private LocallyScheduledCQ(String cqToken, CQScheduleTask task) {
+      this.cqToken = cqToken;
+      this.task = task;
+    }
+
+    private boolean hasToken(String cqToken) {
+      return this.cqToken.equals(cqToken);
+    }
+
+    private void cancel() {
+      task.cancel();
+    }
+  }
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java
index 6125edc1ef7..d85d0f12a30 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java
@@ -39,7 +39,10 @@ import org.slf4j.LoggerFactory;
 
 import java.util.Optional;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 
 public class CQScheduleTask implements Runnable {
 
@@ -69,7 +72,7 @@ public class CQScheduleTask implements Runnable {
   private final long endTimeOffset;
   private final TimeoutPolicy timeoutPolicy;
   private final String queryBody;
-  private final String md5;
+  private final String cqToken;
 
   private final String zoneId;
 
@@ -81,12 +84,15 @@ public class CQScheduleTask implements Runnable {
 
   private final long retryWaitTimeInMS;
 
+  private final AtomicBoolean cancelled;
+  private final AtomicReference<ScheduledFuture<?>> scheduledFuture;
+
   private long executionTime;
 
   public CQScheduleTask(
       TCreateCQReq req,
       long firstExecutionTime,
-      String md5,
+      String cqToken,
       ScheduledExecutorService executor,
       ConfigManager configManager) {
     this(
@@ -96,7 +102,7 @@ public class CQScheduleTask implements Runnable {
         req.endTimeOffset,
         TimeoutPolicy.deserialize(req.timeoutPolicy),
         req.queryBody,
-        md5,
+        cqToken,
         req.zoneId,
         req.username,
         executor,
@@ -113,7 +119,7 @@ public class CQScheduleTask implements Runnable {
         entry.getEndTimeOffset(),
         entry.getTimeoutPolicy(),
         entry.getQueryBody(),
-        entry.getMd5(),
+        entry.getCqToken(),
         entry.getZoneId(),
         entry.getUsername(),
         executor,
@@ -129,7 +135,7 @@ public class CQScheduleTask implements Runnable {
       long endTimeOffset,
       TimeoutPolicy timeoutPolicy,
       String queryBody,
-      String md5,
+      String cqToken,
       String zoneId,
       String username,
       ScheduledExecutorService executor,
@@ -141,12 +147,14 @@ public class CQScheduleTask implements Runnable {
     this.endTimeOffset = endTimeOffset;
     this.timeoutPolicy = timeoutPolicy;
     this.queryBody = queryBody;
-    this.md5 = md5;
+    this.cqToken = cqToken;
     this.zoneId = zoneId;
     this.username = username;
     this.executor = executor;
     this.configManager = configManager;
     this.retryWaitTimeInMS = Math.min(DEFAULT_RETRY_WAIT_TIME_IN_MS, 
everyInterval / FACTOR);
+    this.cancelled = new AtomicBoolean(false);
+    this.scheduledFuture = new AtomicReference<>();
     this.executionTime = executionTime;
   }
 
@@ -165,6 +173,9 @@ public class CQScheduleTask implements Runnable {
 
   @Override
   public void run() {
+    if (cancelled.get()) {
+      return;
+    }
     long startTime = executionTime - startTimeOffset;
     long endTime = executionTime - endTimeOffset;
 
@@ -177,6 +188,9 @@ public class CQScheduleTask implements Runnable {
         submitSelf(retryWaitTimeInMS, TimeUnit.MILLISECONDS);
       }
     } else {
+      if (cancelled.get()) {
+        return;
+      }
       LOGGER.info(
           "[StartExecuteCQ] execute CQ {} on DataNode[{}], time range is [{}, 
{}), current time is {}",
           cqId,
@@ -206,12 +220,32 @@ public class CQScheduleTask implements Runnable {
   }
 
   private void submitSelf(long delay, TimeUnit unit) {
-    executor.schedule(this, delay, unit);
+    if (cancelled.get()) {
+      return;
+    }
+    ScheduledFuture<?> newFuture = executor.schedule(this, delay, unit);
+    ScheduledFuture<?> previousFuture = scheduledFuture.getAndSet(newFuture);
+    if (previousFuture != null) {
+      previousFuture.cancel(false);
+    }
+    if (cancelled.get() && scheduledFuture.compareAndSet(newFuture, null)) {
+      newFuture.cancel(false);
+    }
+  }
+
+  public void cancel() {
+    cancelled.set(true);
+    ScheduledFuture<?> currentFuture = scheduledFuture.getAndSet(null);
+    if (currentFuture != null) {
+      currentFuture.cancel(false);
+    }
   }
 
   private boolean needSubmit() {
     // current node is still leader and thread pool is not shut down.
-    return configManager.getConsensusManager().isLeader() && 
!executor.isShutdown();
+    return !cancelled.get()
+        && configManager.getConsensusManager().isLeader()
+        && !executor.isShutdown();
   }
 
   private class AsyncExecuteCQCallback implements 
AsyncMethodCallback<TSStatus> {
@@ -238,6 +272,9 @@ public class CQScheduleTask implements Runnable {
 
     @Override
     public void onComplete(TSStatus response) {
+      if (cancelled.get()) {
+        return;
+      }
       if (response.code == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
 
         LOGGER.info(
@@ -251,7 +288,7 @@ public class CQScheduleTask implements Runnable {
           result =
               configManager
                   .getConsensusManager()
-                  .write(new UpdateCQLastExecTimePlan(cqId, executionTime, 
md5));
+                  .write(new UpdateCQLastExecTimePlan(cqId, executionTime, 
cqToken));
         } catch (ConsensusException e) {
           result = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
           result.setMessage(e.getMessage());
@@ -291,6 +328,9 @@ public class CQScheduleTask implements Runnable {
 
     @Override
     public void onError(Exception exception) {
+      if (cancelled.get()) {
+        return;
+      }
       LOGGER.warn("Execute CQ {} failed", cqId, exception);
       if (needSubmit()) {
         submitSelf(retryWaitTimeInMS, TimeUnit.MILLISECONDS);
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/cq/CQInfo.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/cq/CQInfo.java
index 4c1225ef48a..b24f1ca9aa5 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/cq/CQInfo.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/cq/CQInfo.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.cq.CQState;
 import org.apache.iotdb.commons.cq.TimeoutPolicy;
 import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
+import org.apache.iotdb.confignode.consensus.request.read.cq.ShowCQPlan;
 import org.apache.iotdb.confignode.consensus.request.write.cq.ActiveCQPlan;
 import org.apache.iotdb.confignode.consensus.request.write.cq.AddCQPlan;
 import org.apache.iotdb.confignode.consensus.request.write.cq.DropCQPlan;
@@ -44,7 +45,9 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
@@ -61,7 +64,7 @@ public class CQInfo implements SnapshotProcessor {
 
   private static final String CQ_NOT_EXIST_FORMAT = "CQ %s doesn't exist.";
 
-  private static final String MD5_NOT_MATCH_FORMAT = "MD5 of CQ %s doesn't 
match";
+  private static final String CQ_TOKEN_NOT_MATCH_FORMAT = "Token of CQ %s 
doesn't match";
 
   private final Map<String, CQEntry> cqMap;
 
@@ -91,7 +94,7 @@ public class CQInfo implements SnapshotProcessor {
         CQEntry cqEntry =
             new CQEntry(
                 plan.getReq(),
-                plan.getMd5(),
+                plan.getCqToken(),
                 plan.getFirstExecutionTime() - plan.getReq().everyInterval);
         cqMap.put(cqId, cqEntry);
         res.code = TSStatusCode.SUCCESS_STATUS.getStatusCode();
@@ -105,13 +108,13 @@ public class CQInfo implements SnapshotProcessor {
   /**
    * Drop the CQ whose ID is same as <tt>cqId</tt> in plan.
    *
-   * @return SUCCESS_STATUS if there is CQ whose ID and md5 is same as 
<tt>cqId</tt> in plan,
+   * @return SUCCESS_STATUS if there is CQ whose ID and token is same as 
<tt>cqId</tt> in plan,
    *     otherwise NO_SUCH_CQ.
    */
   public TSStatus dropCQ(DropCQPlan plan) {
     TSStatus res = new TSStatus();
     String cqId = plan.getCqId();
-    Optional<String> md5 = plan.getMd5();
+    Optional<String> cqToken = plan.getCqToken();
     lock.writeLock().lock();
     try {
       CQEntry cqEntry = cqMap.get(cqId);
@@ -119,10 +122,10 @@ public class CQInfo implements SnapshotProcessor {
         res.code = TSStatusCode.NO_SUCH_CQ.getStatusCode();
         res.message = String.format(CQ_NOT_EXIST_FORMAT, cqId);
         LOGGER.warn("Drop CQ {} failed, because it doesn't exist.", cqId);
-      } else if ((md5.isPresent() && !md5.get().equals(cqEntry.md5))) {
+      } else if ((cqToken.isPresent() && 
!cqToken.get().equals(cqEntry.cqToken))) {
         res.code = TSStatusCode.NO_SUCH_CQ.getStatusCode();
-        res.message = String.format(MD5_NOT_MATCH_FORMAT, cqId);
-        LOGGER.warn("Drop CQ {} failed, because its MD5 doesn't match.", cqId);
+        res.message = String.format(CQ_TOKEN_NOT_MATCH_FORMAT, cqId);
+        LOGGER.warn("Drop CQ {} failed, because its token doesn't match.", 
cqId);
       } else {
         cqMap.remove(cqId);
         res.code = TSStatusCode.SUCCESS_STATUS.getStatusCode();
@@ -135,11 +138,24 @@ public class CQInfo implements SnapshotProcessor {
   }
 
   public ShowCQResp showCQ() {
+    return showCQ(new ShowCQPlan());
+  }
+
+  public ShowCQResp showCQ(ShowCQPlan plan) {
     lock.readLock().lock();
     try {
-      return new ShowCQResp(
-          new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()),
-          
cqMap.values().stream().map(CQEntry::new).collect(Collectors.toList()));
+      Optional<String> cqId = plan.getCqId();
+      List<CQEntry> cqList;
+      if (cqId.isPresent()) {
+        CQEntry cqEntry = cqMap.get(cqId.get());
+        cqList =
+            cqEntry == null
+                ? Collections.emptyList()
+                : Collections.singletonList(new CQEntry(cqEntry));
+      } else {
+        cqList = 
cqMap.values().stream().map(CQEntry::new).collect(Collectors.toList());
+      }
+      return new ShowCQResp(new 
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()), cqList);
     } finally {
       lock.readLock().unlock();
     }
@@ -153,16 +169,16 @@ public class CQInfo implements SnapshotProcessor {
   public TSStatus activeCQ(ActiveCQPlan plan) {
     TSStatus res = new TSStatus();
     String cqId = plan.getCqId();
-    String md5 = plan.getMd5();
+    String cqToken = plan.getCqToken();
     lock.writeLock().lock();
     try {
       CQEntry cqEntry = cqMap.get(cqId);
       if (cqEntry == null) {
         res.code = TSStatusCode.NO_SUCH_CQ.getStatusCode();
         res.message = String.format(CQ_NOT_EXIST_FORMAT, cqId);
-      } else if (!md5.equals(cqEntry.md5)) {
+      } else if (!cqToken.equals(cqEntry.cqToken)) {
         res.code = TSStatusCode.NO_SUCH_CQ.getStatusCode();
-        res.message = String.format(MD5_NOT_MATCH_FORMAT, cqId);
+        res.message = String.format(CQ_TOKEN_NOT_MATCH_FORMAT, cqId);
       } else if (cqEntry.state == CQState.ACTIVE) {
         res.code = TSStatusCode.CQ_ALREADY_ACTIVE.getStatusCode();
         res.message = String.format("CQ %s has already been active", cqId);
@@ -180,22 +196,22 @@ public class CQInfo implements SnapshotProcessor {
    * Update the last execution time of the corresponding CQ.
    *
    * @return SUCCESS_STATUS if successfully updated, or NO_SUCH_CQ if 1. the 
CQ doesn't exist; or 2.
-   *     md5 is different. or CQ_UPDATE_LAST_EXEC_TIME_FAILED 3. original 
lastExecutionTime >=
+   *     token is different. or CQ_UPDATE_LAST_EXEC_TIME_FAILED 3. original 
lastExecutionTime >=
    *     current lastExecutionTime;
    */
   public TSStatus updateCQLastExecutionTime(UpdateCQLastExecTimePlan plan) {
     TSStatus res = new TSStatus();
     String cqId = plan.getCqId();
-    String md5 = plan.getMd5();
+    String cqToken = plan.getCqToken();
     lock.writeLock().lock();
     try {
       CQEntry cqEntry = cqMap.get(cqId);
       if (cqEntry == null) {
         res.code = TSStatusCode.NO_SUCH_CQ.getStatusCode();
         res.message = String.format(CQ_NOT_EXIST_FORMAT, cqId);
-      } else if (!md5.equals(cqEntry.md5)) {
+      } else if (!cqToken.equals(cqEntry.cqToken)) {
         res.code = TSStatusCode.NO_SUCH_CQ.getStatusCode();
-        res.message = String.format(MD5_NOT_MATCH_FORMAT, cqId);
+        res.message = String.format(CQ_TOKEN_NOT_MATCH_FORMAT, cqId);
       } else if (cqEntry.lastExecutionTime >= plan.getExecutionTime()) {
         res.code = TSStatusCode.CQ_UPDATE_LAST_EXEC_TIME_ERROR.getStatusCode();
         res.message =
@@ -299,7 +315,7 @@ public class CQInfo implements SnapshotProcessor {
     private final TimeoutPolicy timeoutPolicy;
     private final String queryBody;
     private final String sql;
-    private final String md5;
+    private final String cqToken;
 
     private final String zoneId;
 
@@ -308,7 +324,7 @@ public class CQInfo implements SnapshotProcessor {
     private CQState state;
     private long lastExecutionTime;
 
-    private CQEntry(TCreateCQReq req, String md5, long lastExecutionTime) {
+    private CQEntry(TCreateCQReq req, String cqToken, long lastExecutionTime) {
       this(
           req.cqId,
           req.everyInterval,
@@ -318,7 +334,7 @@ public class CQInfo implements SnapshotProcessor {
           TimeoutPolicy.deserialize(req.timeoutPolicy),
           req.queryBody,
           req.sql,
-          md5,
+          cqToken,
           req.zoneId,
           req.username,
           CQState.INACTIVE,
@@ -335,7 +351,7 @@ public class CQInfo implements SnapshotProcessor {
           other.timeoutPolicy,
           other.queryBody,
           other.sql,
-          other.md5,
+          other.cqToken,
           other.zoneId,
           other.username,
           other.state,
@@ -352,7 +368,7 @@ public class CQInfo implements SnapshotProcessor {
         TimeoutPolicy timeoutPolicy,
         String queryBody,
         String sql,
-        String md5,
+        String cqToken,
         String zoneId,
         String username,
         CQState state,
@@ -365,7 +381,7 @@ public class CQInfo implements SnapshotProcessor {
       this.timeoutPolicy = timeoutPolicy;
       this.queryBody = queryBody;
       this.sql = sql;
-      this.md5 = md5;
+      this.cqToken = cqToken;
       this.zoneId = zoneId;
       this.username = username;
       this.state = state;
@@ -381,7 +397,7 @@ public class CQInfo implements SnapshotProcessor {
       ReadWriteIOUtils.write(timeoutPolicy.getType(), stream);
       ReadWriteIOUtils.write(queryBody, stream);
       ReadWriteIOUtils.write(sql, stream);
-      ReadWriteIOUtils.write(md5, stream);
+      ReadWriteIOUtils.write(cqToken, stream);
       ReadWriteIOUtils.write(zoneId, stream);
       ReadWriteIOUtils.write(username, stream);
       ReadWriteIOUtils.write(state.getType(), stream);
@@ -397,7 +413,7 @@ public class CQInfo implements SnapshotProcessor {
       TimeoutPolicy timeoutPolicy = 
TimeoutPolicy.deserialize(ReadWriteIOUtils.readByte(stream));
       String queryBody = ReadWriteIOUtils.readString(stream);
       String sql = ReadWriteIOUtils.readString(stream);
-      String md5 = ReadWriteIOUtils.readString(stream);
+      String cqToken = ReadWriteIOUtils.readString(stream);
       String zoneId = ReadWriteIOUtils.readString(stream);
       String username = ReadWriteIOUtils.readString(stream);
       CQState state = CQState.deserialize(ReadWriteIOUtils.readByte(stream));
@@ -411,7 +427,7 @@ public class CQInfo implements SnapshotProcessor {
           timeoutPolicy,
           queryBody,
           sql,
-          md5,
+          cqToken,
           zoneId,
           username,
           state,
@@ -450,8 +466,8 @@ public class CQInfo implements SnapshotProcessor {
       return sql;
     }
 
-    public String getMd5() {
-      return md5;
+    public String getCqToken() {
+      return cqToken;
     }
 
     public CQState getState() {
@@ -488,7 +504,7 @@ public class CQInfo implements SnapshotProcessor {
           && timeoutPolicy == cqEntry.timeoutPolicy
           && Objects.equals(queryBody, cqEntry.queryBody)
           && Objects.equals(sql, cqEntry.sql)
-          && Objects.equals(md5, cqEntry.md5)
+          && Objects.equals(cqToken, cqEntry.cqToken)
           && Objects.equals(zoneId, cqEntry.zoneId)
           && Objects.equals(username, cqEntry.username)
           && state == cqEntry.state;
@@ -505,7 +521,7 @@ public class CQInfo implements SnapshotProcessor {
           timeoutPolicy,
           queryBody,
           sql,
-          md5,
+          cqToken,
           zoneId,
           username,
           state,
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
index 09017a841c7..60ec748fb54 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
@@ -30,6 +30,7 @@ import 
org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
 import 
org.apache.iotdb.confignode.consensus.request.read.ConfigPhysicalReadPlan;
 import 
org.apache.iotdb.confignode.consensus.request.read.ainode.GetAINodeConfigurationPlan;
 import org.apache.iotdb.confignode.consensus.request.read.auth.AuthorReadPlan;
+import org.apache.iotdb.confignode.consensus.request.read.cq.ShowCQPlan;
 import 
org.apache.iotdb.confignode.consensus.request.read.database.CountDatabasePlan;
 import 
org.apache.iotdb.confignode.consensus.request.read.database.GetDatabasePlan;
 import 
org.apache.iotdb.confignode.consensus.request.read.datanode.GetDataNodeConfigurationPlan;
@@ -322,7 +323,7 @@ public class ConfigPlanExecutor {
       case GetSeriesSlotList:
         return partitionInfo.getSeriesSlotList((GetSeriesSlotListPlan) req);
       case SHOW_CQ:
-        return cqInfo.showCQ();
+        return cqInfo.showCQ((ShowCQPlan) req);
       case GetFunctionTable:
         return udfInfo.getUDFTable();
       case GetFunctionJar:
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java
index 3da60c5ad84..af7f968e8a5 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java
@@ -22,10 +22,14 @@ package org.apache.iotdb.confignode.procedure.impl.cq;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
+import org.apache.iotdb.confignode.consensus.request.read.cq.ShowCQPlan;
 import org.apache.iotdb.confignode.consensus.request.write.cq.ActiveCQPlan;
 import org.apache.iotdb.confignode.consensus.request.write.cq.AddCQPlan;
 import org.apache.iotdb.confignode.consensus.request.write.cq.DropCQPlan;
+import org.apache.iotdb.confignode.consensus.response.cq.ShowCQResp;
+import org.apache.iotdb.confignode.manager.cq.CQManager;
 import org.apache.iotdb.confignode.manager.cq.CQScheduleTask;
+import org.apache.iotdb.confignode.persistence.cq.CQInfo;
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
 import org.apache.iotdb.confignode.procedure.impl.node.AbstractNodeProcedure;
@@ -35,7 +39,6 @@ import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq;
 import org.apache.iotdb.consensus.exception.ConsensusException;
 import org.apache.iotdb.rpc.TSStatusCode;
 
-import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.tsfile.utils.ReadWriteIOUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -44,6 +47,8 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Objects;
+import java.util.Optional;
+import java.util.UUID;
 import java.util.concurrent.ScheduledExecutorService;
 
 import static 
org.apache.iotdb.confignode.procedure.state.cq.CreateCQState.INACTIVE;
@@ -59,7 +64,7 @@ public class CreateCQProcedure extends 
AbstractNodeProcedure<CreateCQState> {
 
   private TCreateCQReq req;
 
-  private String md5;
+  private String cqToken;
 
   private long firstExecutionTime;
 
@@ -74,7 +79,7 @@ public class CreateCQProcedure extends 
AbstractNodeProcedure<CreateCQState> {
   public CreateCQProcedure(TCreateCQReq req, ScheduledExecutorService 
executor) {
     super();
     this.req = req;
-    this.md5 = DigestUtils.md2Hex(req.cqId);
+    this.cqToken = generateCQToken();
     this.executor = executor;
     this.firstExecutionTime =
         CQScheduleTask.getFirstExecutionTime(req.boundaryTime, 
req.everyInterval);
@@ -90,12 +95,16 @@ public class CreateCQProcedure extends 
AbstractNodeProcedure<CreateCQState> {
           addCQ(env);
           return Flow.HAS_MORE_STATE;
         case INACTIVE:
-          CQScheduleTask cqScheduleTask =
-              new CQScheduleTask(req, firstExecutionTime, md5, executor, 
env.getConfigManager());
-          cqScheduleTask.submitSelf();
+          submitScheduleTask(
+              env,
+              new CQScheduleTask(
+                  req, firstExecutionTime, cqToken, executor, 
env.getConfigManager()));
           setNextState(SCHEDULED);
           break;
         case SCHEDULED:
+          if (isStateDeserialized()) {
+            recoverScheduledTask(env);
+          }
           activeCQ(env);
           return Flow.NO_MORE_STATE;
         default:
@@ -125,7 +134,7 @@ public class CreateCQProcedure extends 
AbstractNodeProcedure<CreateCQState> {
       res =
           env.getConfigManager()
               .getConsensusManager()
-              .write(new AddCQPlan(req, md5, firstExecutionTime));
+              .write(new AddCQPlan(req, cqToken, firstExecutionTime));
     } catch (ConsensusException e) {
       LOGGER.warn(CONSENSUS_WRITE_ERROR, e);
       res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
@@ -146,7 +155,7 @@ public class CreateCQProcedure extends 
AbstractNodeProcedure<CreateCQState> {
   private void activeCQ(ConfigNodeProcedureEnv env) {
     TSStatus res;
     try {
-      res = env.getConfigManager().getConsensusManager().write(new 
ActiveCQPlan(req.cqId, md5));
+      res = env.getConfigManager().getConsensusManager().write(new 
ActiveCQPlan(req.cqId, cqToken));
     } catch (ConsensusException e) {
       LOGGER.warn(CONSENSUS_WRITE_ERROR, e);
       res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
@@ -164,6 +173,42 @@ public class CreateCQProcedure extends 
AbstractNodeProcedure<CreateCQState> {
     }
   }
 
+  void recoverScheduledTask(ConfigNodeProcedureEnv env) throws 
ConsensusException {
+    Optional<CQInfo.CQEntry> cqEntry = getCurrentCQEntry(env);
+    if (!cqEntry.isPresent()) {
+      LOGGER.info(
+          "Skip recovering the schedule task of CQ {} because its metadata is 
unavailable.",
+          req.cqId);
+      return;
+    }
+    submitScheduleTask(env, new CQScheduleTask(cqEntry.get(), executor, 
env.getConfigManager()));
+  }
+
+  Optional<CQInfo.CQEntry> getCurrentCQEntry(ConfigNodeProcedureEnv env) 
throws ConsensusException {
+    ShowCQResp response =
+        (ShowCQResp) env.getConfigManager().getConsensusManager().read(new 
ShowCQPlan(req.cqId));
+    return response.getCqList().stream()
+        .filter(entry -> cqToken.equals(entry.getCqToken()))
+        .findFirst();
+  }
+
+  private static String generateCQToken() {
+    return UUID.randomUUID().toString();
+  }
+
+  private void submitScheduleTask(ConfigNodeProcedureEnv env, CQScheduleTask 
cqScheduleTask) {
+    CQManager cqManager = env.getConfigManager().getCQManager();
+    if (!cqManager.markCQLocallyScheduled(req.cqId, cqToken, cqScheduleTask)) {
+      return;
+    }
+    try {
+      cqScheduleTask.submitSelf();
+    } catch (RuntimeException e) {
+      cqManager.unmarkCQLocallyScheduled(req.cqId, cqToken);
+      throw e;
+    }
+  }
+
   @Override
   protected void rollbackState(ConfigNodeProcedureEnv env, CreateCQState state)
       throws IOException, InterruptedException, ProcedureException {
@@ -176,7 +221,8 @@ public class CreateCQProcedure extends 
AbstractNodeProcedure<CreateCQState> {
         LOGGER.info("Start [INACTIVE] rollback of CQ {}", req.cqId);
         TSStatus res;
         try {
-          res = env.getConfigManager().getConsensusManager().write(new 
DropCQPlan(req.cqId, md5));
+          res =
+              env.getConfigManager().getConsensusManager().write(new 
DropCQPlan(req.cqId, cqToken));
         } catch (ConsensusException e) {
           LOGGER.warn(CONSENSUS_WRITE_ERROR, e);
           res = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
@@ -227,7 +273,7 @@ public class CreateCQProcedure extends 
AbstractNodeProcedure<CreateCQState> {
     stream.writeShort(ProcedureType.CREATE_CQ_PROCEDURE.getTypeCode());
     super.serialize(stream);
     ThriftCommonsSerDeUtils.serializeTCreateCQReq(req, stream);
-    ReadWriteIOUtils.write(md5, stream);
+    ReadWriteIOUtils.write(cqToken, stream);
     ReadWriteIOUtils.write(firstExecutionTime, stream);
   }
 
@@ -235,7 +281,7 @@ public class CreateCQProcedure extends 
AbstractNodeProcedure<CreateCQState> {
   public void deserialize(ByteBuffer byteBuffer) {
     super.deserialize(byteBuffer);
     this.req = ThriftCommonsSerDeUtils.deserializeTCreateCQReq(byteBuffer);
-    this.md5 = ReadWriteIOUtils.readString(byteBuffer);
+    this.cqToken = ReadWriteIOUtils.readString(byteBuffer);
     this.firstExecutionTime = ReadWriteIOUtils.readLong(byteBuffer);
   }
 
@@ -254,7 +300,7 @@ public class CreateCQProcedure extends 
AbstractNodeProcedure<CreateCQState> {
         && isGeneratedByPipe == that.isGeneratedByPipe
         && firstExecutionTime == that.firstExecutionTime
         && Objects.equals(req, that.req)
-        && Objects.equals(md5, that.md5);
+        && Objects.equals(cqToken, that.cqToken);
   }
 
   @Override
@@ -265,7 +311,15 @@ public class CreateCQProcedure extends 
AbstractNodeProcedure<CreateCQState> {
         getCycles(),
         isGeneratedByPipe,
         req,
-        md5,
+        cqToken,
         firstExecutionTime);
   }
+
+  public String getCqId() {
+    return req == null ? null : req.getCqId();
+  }
+
+  public String getCqToken() {
+    return cqToken;
+  }
 }
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
index 3b0554baa11..1c73fe323ce 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
@@ -1213,7 +1213,7 @@ public class ConfigPhysicalPlanSerDeTest {
 
   @Test
   public void ActiveCQPlanTest() throws IOException {
-    ActiveCQPlan activeCQPlan0 = new ActiveCQPlan("testCq", "testCq_md5");
+    ActiveCQPlan activeCQPlan0 = new ActiveCQPlan("testCq", "testCqToken");
     ActiveCQPlan activeCQPlan1 =
         (ActiveCQPlan) 
ConfigPhysicalPlan.Factory.create(activeCQPlan0.serializeToByteBuffer());
 
@@ -1236,7 +1236,7 @@ public class ConfigPhysicalPlanSerDeTest {
                 "create cq testCq1 BEGIN select s1 into root.backup.d1.s1 from 
root.sg.d1 END",
                 "Asia",
                 "root"),
-            "testCq1_md5",
+            "testCq1Token",
             executionTime);
     AddCQPlan addCQPlan1 =
         (AddCQPlan) 
ConfigPhysicalPlan.Factory.create(addCQPlan0.serializeToByteBuffer());
@@ -1251,7 +1251,7 @@ public class ConfigPhysicalPlanSerDeTest {
         (DropCQPlan) 
ConfigPhysicalPlan.Factory.create(dropCQPlan0.serializeToByteBuffer());
     Assert.assertEquals(dropCQPlan0, dropCQPlan1);
 
-    dropCQPlan0 = new DropCQPlan("testCq1", "testCq1_md5");
+    dropCQPlan0 = new DropCQPlan("testCq1", "testCq1Token");
     dropCQPlan1 =
         (DropCQPlan) 
ConfigPhysicalPlan.Factory.create(dropCQPlan0.serializeToByteBuffer());
     Assert.assertEquals(dropCQPlan0, dropCQPlan1);
@@ -1260,7 +1260,7 @@ public class ConfigPhysicalPlanSerDeTest {
   @Test
   public void UpdateCQLastExecTimePlanTest() throws IOException {
     UpdateCQLastExecTimePlan updateCQLastExecTimePlan0 =
-        new UpdateCQLastExecTimePlan("testCq", System.currentTimeMillis(), 
"testCq_md5");
+        new UpdateCQLastExecTimePlan("testCq", System.currentTimeMillis(), 
"testCqToken");
     UpdateCQLastExecTimePlan updateCQLastExecTimePlan1 =
         (UpdateCQLastExecTimePlan)
             
ConfigPhysicalPlan.Factory.create(updateCQLastExecTimePlan0.serializeToByteBuffer());
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/cq/CQManagerTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/cq/CQManagerTest.java
new file mode 100644
index 00000000000..a0bc5a523ba
--- /dev/null
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/cq/CQManagerTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.cq;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.cq.TimeoutPolicy;
+import org.apache.iotdb.confignode.manager.ConfigManager;
+import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
+import org.apache.iotdb.confignode.manager.cq.CQManager;
+import org.apache.iotdb.confignode.manager.cq.CQScheduleTask;
+import org.apache.iotdb.confignode.rpc.thrift.TDropCQReq;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertTrue;
+
+public class CQManagerTest {
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void dropCQShouldCancelLocallyScheduledTask() throws Exception {
+    ConfigManager configManager = Mockito.mock(ConfigManager.class);
+    ConsensusManager consensusManager = Mockito.mock(ConsensusManager.class);
+    
Mockito.when(configManager.getConsensusManager()).thenReturn(consensusManager);
+    Mockito.when(consensusManager.write(Mockito.any()))
+        .thenReturn(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
+    CQManager cqManager = new CQManager(configManager);
+    ScheduledFuture<?> future = Mockito.mock(ScheduledFuture.class);
+    CQScheduleTask task = newScheduledTask(configManager, future, "token");
+
+    try {
+      assertTrue(cqManager.markCQLocallyScheduled("testCq", "token", task));
+      task.submitSelf();
+      cqManager.dropCQ(new TDropCQReq("testCq"));
+
+      Mockito.verify(future).cancel(false);
+    } finally {
+      cqManager.stopCQScheduler();
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void newTokenShouldCancelPreviousLocallyScheduledTask() {
+    ConfigManager configManager = Mockito.mock(ConfigManager.class);
+    CQManager cqManager = new CQManager(configManager);
+    ScheduledFuture<?> previousFuture = Mockito.mock(ScheduledFuture.class);
+    CQScheduleTask previousTask = newScheduledTask(configManager, 
previousFuture, "previousToken");
+    ScheduledFuture<?> currentFuture = Mockito.mock(ScheduledFuture.class);
+    CQScheduleTask currentTask = newScheduledTask(configManager, 
currentFuture, "currentToken");
+
+    try {
+      assertTrue(cqManager.markCQLocallyScheduled("testCq", "previousToken", 
previousTask));
+      previousTask.submitSelf();
+      assertTrue(cqManager.markCQLocallyScheduled("testCq", "currentToken", 
currentTask));
+
+      Mockito.verify(previousFuture).cancel(false);
+    } finally {
+      cqManager.stopCQScheduler();
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private CQScheduleTask newScheduledTask(
+      ConfigManager configManager, ScheduledFuture<?> scheduledFuture, String 
cqToken) {
+    ScheduledExecutorService executor = 
Mockito.mock(ScheduledExecutorService.class);
+    Mockito.when(
+            executor.schedule(
+                Mockito.any(Runnable.class), Mockito.anyLong(), 
Mockito.any(TimeUnit.class)))
+        .thenReturn((ScheduledFuture) scheduledFuture);
+    return new CQScheduleTask(
+        "testCq",
+        1000,
+        0,
+        1000,
+        TimeoutPolicy.BLOCKED,
+        "select s1 into root.backup.d1.s1 from root.sg.d1",
+        cqToken,
+        "Asia",
+        "root",
+        executor,
+        configManager,
+        System.currentTimeMillis() + 10_000);
+  }
+}
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/CQInfoTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/CQInfoTest.java
index bfadd3d05e8..8bf2d12cd59 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/CQInfoTest.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/CQInfoTest.java
@@ -18,9 +18,14 @@
  */
 package org.apache.iotdb.confignode.persistence;
 
+import org.apache.iotdb.confignode.consensus.request.read.cq.ShowCQPlan;
 import org.apache.iotdb.confignode.consensus.request.write.cq.AddCQPlan;
+import org.apache.iotdb.confignode.consensus.request.write.cq.DropCQPlan;
+import 
org.apache.iotdb.confignode.consensus.request.write.cq.UpdateCQLastExecTimePlan;
+import org.apache.iotdb.confignode.consensus.response.cq.ShowCQResp;
 import org.apache.iotdb.confignode.persistence.cq.CQInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq;
+import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.thrift.TException;
@@ -70,7 +75,7 @@ public class CQInfoTest {
                 "create cq testCq1 BEGIN select s1 into root.backup.d1.s1 from 
root.sg.d1 END",
                 "Asia",
                 "root"),
-            "testCq1_md5",
+            "testCq1Token",
             executionTime);
 
     cqInfo.addCQ(addCQPlan);
@@ -89,7 +94,7 @@ public class CQInfoTest {
                 "create cq testCq2 BEGIN select s1 into root.backup.d2.s1 from 
root.sg.d2 END",
                 "Asia",
                 "root"),
-            "testCq2_md5",
+            "testCq2Token",
             executionTime);
     cqInfo.addCQ(addCQPlan);
 
@@ -99,4 +104,59 @@ public class CQInfoTest {
 
     Assert.assertEquals(cqInfo, actualCQInfo);
   }
+
+  @Test
+  public void testOldCallbackCannotTouchRecreatedCQ() throws Exception {
+    long executionTime = System.currentTimeMillis();
+    TCreateCQReq req =
+        new TCreateCQReq(
+            "testCq3",
+            1000,
+            0,
+            1000,
+            0,
+            (byte) 0,
+            "select s1 into root.backup.d3.s1 from root.sg.d3",
+            "create cq testCq3 BEGIN select s1 into root.backup.d3.s1 from 
root.sg.d3 END",
+            "Asia",
+            "root");
+
+    cqInfo.addCQ(new AddCQPlan(req, "oldToken", executionTime));
+    cqInfo.dropCQ(new DropCQPlan("testCq3"));
+    cqInfo.addCQ(new AddCQPlan(req, "newToken", executionTime));
+
+    Assert.assertEquals(
+        TSStatusCode.NO_SUCH_CQ.getStatusCode(),
+        cqInfo.updateCQLastExecutionTime(
+                new UpdateCQLastExecTimePlan("testCq3", executionTime + 1000, 
"oldToken"))
+            .code);
+    Assert.assertEquals(
+        TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+        cqInfo.updateCQLastExecutionTime(
+                new UpdateCQLastExecTimePlan("testCq3", executionTime + 1000, 
"newToken"))
+            .code);
+  }
+
+  @Test
+  public void testShowCQCanFilterByCQId() throws Exception {
+    long executionTime = System.currentTimeMillis();
+    TCreateCQReq req =
+        new TCreateCQReq(
+            "testCq4",
+            1000,
+            0,
+            1000,
+            0,
+            (byte) 0,
+            "select s1 into root.backup.d4.s1 from root.sg.d4",
+            "create cq testCq4 BEGIN select s1 into root.backup.d4.s1 from 
root.sg.d4 END",
+            "Asia",
+            "root");
+    cqInfo.addCQ(new AddCQPlan(req, "testCq4Token", executionTime));
+
+    ShowCQResp showCQResp = cqInfo.showCQ(new ShowCQPlan("testCq4"));
+
+    Assert.assertEquals(1, showCQResp.getCqList().size());
+    Assert.assertEquals("testCq4", showCQResp.getCqList().get(0).getCqId());
+  }
 }
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/CreateCQProcedureTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/CreateCQProcedureTest.java
index d0e92b32816..3e7fd2052ad 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/CreateCQProcedureTest.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/CreateCQProcedureTest.java
@@ -36,10 +36,36 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.fail;
 
 public class CreateCQProcedureTest {
 
+  @Test
+  public void tokenShouldBeUniqueForSameCQId() {
+    ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
+    try {
+      TCreateCQReq req =
+          new TCreateCQReq(
+              "testCq1",
+              1000,
+              0,
+              1000,
+              0,
+              (byte) 0,
+              "select s1 into root.backup.d1(s1) from root.sg.d1",
+              "create cq testCq1 BEGIN select s1 into root.backup.d1(s1) from 
root.sg.d1 END",
+              "Asia",
+              "root");
+      CreateCQProcedure createCQProcedure1 = new CreateCQProcedure(req, 
executor);
+      CreateCQProcedure createCQProcedure2 = new CreateCQProcedure(req, 
executor);
+
+      assertNotEquals(createCQProcedure1.getCqToken(), 
createCQProcedure2.getCqToken());
+    } finally {
+      executor.shutdown();
+    }
+  }
+
   @Test
   public void serializeDeserializeTest() {
 
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedureRecoveryTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedureRecoveryTest.java
new file mode 100644
index 00000000000..a90e282494f
--- /dev/null
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedureRecoveryTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.impl.cq;
+
+import org.apache.iotdb.confignode.consensus.request.read.cq.ShowCQPlan;
+import org.apache.iotdb.confignode.consensus.request.write.cq.AddCQPlan;
+import org.apache.iotdb.confignode.manager.ConfigManager;
+import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
+import org.apache.iotdb.confignode.manager.cq.CQManager;
+import org.apache.iotdb.confignode.manager.cq.CQScheduleTask;
+import org.apache.iotdb.confignode.persistence.cq.CQInfo;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
+import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq;
+
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+public class CreateCQProcedureRecoveryTest {
+
+  private TCreateCQReq newCreateCQReq() {
+    return new TCreateCQReq(
+        "testCq1",
+        1000,
+        0,
+        1000,
+        0,
+        (byte) 0,
+        "select s1 into root.backup.d1.s1 from root.sg.d1",
+        "create cq testCq1 BEGIN select s1 into root.backup.d1.s1 from 
root.sg.d1 END",
+        "Asia",
+        "root");
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void recoverScheduledTaskShouldResubmitFromLatestMetadata() throws 
Exception {
+    ScheduledExecutorService executor = 
Mockito.mock(ScheduledExecutorService.class);
+    Mockito.when(
+            executor.schedule(
+                Mockito.any(Runnable.class), Mockito.anyLong(), 
Mockito.any(TimeUnit.class)))
+        .thenReturn(Mockito.mock(ScheduledFuture.class));
+
+    ConfigManager configManager = Mockito.mock(ConfigManager.class);
+    ConsensusManager consensusManager = Mockito.mock(ConsensusManager.class);
+    CQManager cqManager = Mockito.mock(CQManager.class);
+    ConfigNodeProcedureEnv env = Mockito.mock(ConfigNodeProcedureEnv.class);
+    Mockito.when(env.getConfigManager()).thenReturn(configManager);
+    
Mockito.when(configManager.getConsensusManager()).thenReturn(consensusManager);
+    Mockito.when(configManager.getCQManager()).thenReturn(cqManager);
+    Mockito.when(
+            cqManager.markCQLocallyScheduled(
+                Mockito.anyString(), Mockito.anyString(), 
Mockito.any(CQScheduleTask.class)))
+        .thenReturn(true);
+
+    TCreateCQReq req = newCreateCQReq();
+    CreateCQProcedure procedure = new CreateCQProcedure(req, executor);
+
+    CQInfo cqInfo = new CQInfo();
+    cqInfo.addCQ(new AddCQPlan(req, procedure.getCqToken(), 
System.currentTimeMillis() + 10_000));
+    
Mockito.when(consensusManager.read(Mockito.any(ShowCQPlan.class))).thenReturn(cqInfo.showCQ());
+
+    procedure.recoverScheduledTask(env);
+
+    Mockito.verify(executor)
+        .schedule(Mockito.any(Runnable.class), Mockito.anyLong(), 
Mockito.any(TimeUnit.class));
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void recoverScheduledTaskShouldSkipDuplicatedLocalSchedule() throws 
Exception {
+    ScheduledExecutorService executor = 
Mockito.mock(ScheduledExecutorService.class);
+    ConfigManager configManager = Mockito.mock(ConfigManager.class);
+    ConsensusManager consensusManager = Mockito.mock(ConsensusManager.class);
+    CQManager cqManager = Mockito.mock(CQManager.class);
+    ConfigNodeProcedureEnv env = Mockito.mock(ConfigNodeProcedureEnv.class);
+    Mockito.when(env.getConfigManager()).thenReturn(configManager);
+    
Mockito.when(configManager.getConsensusManager()).thenReturn(consensusManager);
+    Mockito.when(configManager.getCQManager()).thenReturn(cqManager);
+    Mockito.when(
+            cqManager.markCQLocallyScheduled(
+                Mockito.anyString(), Mockito.anyString(), 
Mockito.any(CQScheduleTask.class)))
+        .thenReturn(false);
+
+    TCreateCQReq req = newCreateCQReq();
+    CreateCQProcedure procedure = new CreateCQProcedure(req, executor);
+
+    CQInfo cqInfo = new CQInfo();
+    cqInfo.addCQ(new AddCQPlan(req, procedure.getCqToken(), 
System.currentTimeMillis() + 10_000));
+    
Mockito.when(consensusManager.read(Mockito.any(ShowCQPlan.class))).thenReturn(cqInfo.showCQ());
+
+    procedure.recoverScheduledTask(env);
+
+    Mockito.verify(executor, Mockito.never())
+        .schedule(Mockito.any(Runnable.class), Mockito.anyLong(), 
Mockito.any(TimeUnit.class));
+  }
+}

Reply via email to