This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 29d0d51dc74 Fix CQ recovery gap and stale callback contamination
(#17734)
29d0d51dc74 is described below
commit 29d0d51dc74081c1b9295e29c18569762a72a579
Author: Caideyipi <[email protected]>
AuthorDate: Tue Jun 2 14:53:38 2026 +0800
Fix CQ recovery gap and stale callback contamination (#17734)
* fix
* sp
* Fix CQ local schedule cancellation
---
.../iotdb/confignode/i18n/ConfigNodeMessages.java | 4 +-
.../iotdb/confignode/i18n/ConfigNodeMessages.java | 4 +-
.../consensus/request/read/cq/ShowCQPlan.java | 13 +++
.../consensus/request/write/cq/ActiveCQPlan.java | 20 ++--
.../consensus/request/write/cq/AddCQPlan.java | 20 ++--
.../consensus/request/write/cq/DropCQPlan.java | 20 ++--
.../request/write/cq/UpdateCQLastExecTimePlan.java | 23 ++--
.../iotdb/confignode/manager/cq/CQManager.java | 102 +++++++++++++++++-
.../confignode/manager/cq/CQScheduleTask.java | 58 ++++++++--
.../iotdb/confignode/persistence/cq/CQInfo.java | 76 +++++++------
.../persistence/executor/ConfigPlanExecutor.java | 3 +-
.../procedure/impl/cq/CreateCQProcedure.java | 80 +++++++++++---
.../request/ConfigPhysicalPlanSerDeTest.java | 8 +-
.../apache/iotdb/confignode/cq/CQManagerTest.java | 107 +++++++++++++++++++
.../iotdb/confignode/persistence/CQInfoTest.java | 64 ++++++++++-
.../procedure/impl/CreateCQProcedureTest.java | 26 +++++
.../impl/cq/CreateCQProcedureRecoveryTest.java | 117 +++++++++++++++++++++
17 files changed, 639 insertions(+), 106 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java
b/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java
index c1c3e877de7..f662fa5871d 100644
---
a/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java
+++
b/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java
@@ -130,8 +130,8 @@ public final class ConfigNodeMessages {
public static final String DOES_NOT_EXIST = "%s does not exist";
public static final String DROPPING_TAG_OR_TIME_COLUMN_IS_NOT_SUPPORTED =
"Dropping tag or time column is not supported.";
- public static final String DROP_CQ_FAILED_BECAUSE_ITS_MD5_DOESN_T_MATCH =
- "Drop CQ {} failed, because its MD5 doesn't match.";
+ public static final String DROP_CQ_FAILED_BECAUSE_ITS_TOKEN_DOESN_T_MATCH =
+ "Drop CQ {} failed, because its token doesn't match.";
public static final String DROP_CQ_FAILED_BECAUSE_IT_DOESN_T_EXIST =
"Drop CQ {} failed, because it doesn't exist.";
public static final String DROP_CQ_SUCCESSFULLY = "Drop CQ {} successfully.";
diff --git
a/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java
b/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java
index 6ca847626c8..8bffa1b0831 100644
---
a/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java
+++
b/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java
@@ -126,8 +126,8 @@ public final class ConfigNodeMessages {
"Deserialization error for write plan, request: {}, bytebuffer: {}";
public static final String DOES_NOT_EXIST = "%s does not exist";
public static final String DROPPING_TAG_OR_TIME_COLUMN_IS_NOT_SUPPORTED =
"不支持删除标签列或时间列。";
- public static final String DROP_CQ_FAILED_BECAUSE_ITS_MD5_DOESN_T_MATCH =
- "Drop CQ {} failed, because its MD5 doesn't match.";
+ public static final String DROP_CQ_FAILED_BECAUSE_ITS_TOKEN_DOESN_T_MATCH =
+ "Drop CQ {} failed, because its token doesn't match.";
public static final String DROP_CQ_FAILED_BECAUSE_IT_DOESN_T_EXIST =
"Drop CQ {} failed, because it doesn't exist.";
public static final String DROP_CQ_SUCCESSFULLY = "Drop CQ {} successfully.";
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/cq/ShowCQPlan.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/cq/ShowCQPlan.java
index 5217849deb4..c28838d556b 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/cq/ShowCQPlan.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/cq/ShowCQPlan.java
@@ -21,11 +21,24 @@ package
org.apache.iotdb.confignode.consensus.request.read.cq;
import
org.apache.iotdb.confignode.consensus.request.read.ConfigPhysicalReadPlan;
+import java.util.Optional;
+
import static
org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType.SHOW_CQ;
public class ShowCQPlan extends ConfigPhysicalReadPlan {
+ private final String cqId;
+
public ShowCQPlan() {
+ this(null);
+ }
+
+ public ShowCQPlan(String cqId) {
super(SHOW_CQ);
+ this.cqId = cqId;
+ }
+
+ public Optional<String> getCqId() {
+ return Optional.ofNullable(cqId);
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/ActiveCQPlan.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/ActiveCQPlan.java
index 263aeb9f0d0..3faa1c2d62f 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/ActiveCQPlan.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/ActiveCQPlan.java
@@ -35,39 +35,39 @@ public class ActiveCQPlan extends ConfigPhysicalPlan {
private String cqId;
- private String md5;
+ private String cqToken;
public ActiveCQPlan() {
super(ACTIVE_CQ);
}
- public ActiveCQPlan(String cqId, String md5) {
+ public ActiveCQPlan(String cqId, String cqToken) {
super(ACTIVE_CQ);
Validate.notNull(cqId);
- Validate.notNull(md5);
+ Validate.notNull(cqToken);
this.cqId = cqId;
- this.md5 = md5;
+ this.cqToken = cqToken;
}
public String getCqId() {
return cqId;
}
- public String getMd5() {
- return md5;
+ public String getCqToken() {
+ return cqToken;
}
@Override
protected void serializeImpl(DataOutputStream stream) throws IOException {
stream.writeShort(getType().getPlanType());
ReadWriteIOUtils.write(cqId, stream);
- ReadWriteIOUtils.write(md5, stream);
+ ReadWriteIOUtils.write(cqToken, stream);
}
@Override
protected void deserializeImpl(ByteBuffer buffer) throws IOException {
cqId = ReadWriteIOUtils.readString(buffer);
- md5 = ReadWriteIOUtils.readString(buffer);
+ cqToken = ReadWriteIOUtils.readString(buffer);
}
@Override
@@ -82,11 +82,11 @@ public class ActiveCQPlan extends ConfigPhysicalPlan {
return false;
}
ActiveCQPlan that = (ActiveCQPlan) o;
- return cqId.equals(that.cqId) && md5.equals(that.md5);
+ return cqId.equals(that.cqId) && cqToken.equals(that.cqToken);
}
@Override
public int hashCode() {
- return Objects.hash(super.hashCode(), cqId, md5);
+ return Objects.hash(super.hashCode(), cqId, cqToken);
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/AddCQPlan.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/AddCQPlan.java
index 62f994688b3..471516c38e5 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/AddCQPlan.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/AddCQPlan.java
@@ -37,7 +37,7 @@ public class AddCQPlan extends ConfigPhysicalPlan {
private TCreateCQReq req;
- private String md5;
+ private String cqToken;
private long firstExecutionTime;
@@ -45,12 +45,12 @@ public class AddCQPlan extends ConfigPhysicalPlan {
super(ADD_CQ);
}
- public AddCQPlan(TCreateCQReq req, String md5, long firstExecutionTime) {
+ public AddCQPlan(TCreateCQReq req, String cqToken, long firstExecutionTime) {
super(ADD_CQ);
Validate.notNull(req);
- Validate.notNull(md5);
+ Validate.notNull(cqToken);
this.req = req;
- this.md5 = md5;
+ this.cqToken = cqToken;
this.firstExecutionTime = firstExecutionTime;
}
@@ -58,8 +58,8 @@ public class AddCQPlan extends ConfigPhysicalPlan {
return req;
}
- public String getMd5() {
- return md5;
+ public String getCqToken() {
+ return cqToken;
}
public long getFirstExecutionTime() {
@@ -70,14 +70,14 @@ public class AddCQPlan extends ConfigPhysicalPlan {
protected void serializeImpl(DataOutputStream stream) throws IOException {
stream.writeShort(getType().getPlanType());
ThriftCommonsSerDeUtils.serializeTCreateCQReq(req, stream);
- ReadWriteIOUtils.write(md5, stream);
+ ReadWriteIOUtils.write(cqToken, stream);
ReadWriteIOUtils.write(firstExecutionTime, stream);
}
@Override
protected void deserializeImpl(ByteBuffer buffer) throws IOException {
req = ThriftCommonsSerDeUtils.deserializeTCreateCQReq(buffer);
- md5 = ReadWriteIOUtils.readString(buffer);
+ cqToken = ReadWriteIOUtils.readString(buffer);
firstExecutionTime = ReadWriteIOUtils.readLong(buffer);
}
@@ -95,11 +95,11 @@ public class AddCQPlan extends ConfigPhysicalPlan {
AddCQPlan addCQPlan = (AddCQPlan) o;
return firstExecutionTime == addCQPlan.firstExecutionTime
&& Objects.equals(req, addCQPlan.req)
- && Objects.equals(md5, addCQPlan.md5);
+ && Objects.equals(cqToken, addCQPlan.cqToken);
}
@Override
public int hashCode() {
- return Objects.hash(super.hashCode(), req, md5, firstExecutionTime);
+ return Objects.hash(super.hashCode(), req, cqToken, firstExecutionTime);
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/DropCQPlan.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/DropCQPlan.java
index 108241b233d..69c29bff634 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/DropCQPlan.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/DropCQPlan.java
@@ -37,7 +37,7 @@ public class DropCQPlan extends ConfigPhysicalPlan {
private String cqId;
// may be null in user call of drop CQ
- private String md5;
+ private String cqToken;
public DropCQPlan() {
super(DROP_CQ);
@@ -49,33 +49,33 @@ public class DropCQPlan extends ConfigPhysicalPlan {
this.cqId = cqId;
}
- public DropCQPlan(String cqId, String md5) {
+ public DropCQPlan(String cqId, String cqToken) {
super(DROP_CQ);
Validate.notNull(cqId);
- Validate.notNull(md5);
+ Validate.notNull(cqToken);
this.cqId = cqId;
- this.md5 = md5;
+ this.cqToken = cqToken;
}
public String getCqId() {
return cqId;
}
- public Optional<String> getMd5() {
- return Optional.ofNullable(md5);
+ public Optional<String> getCqToken() {
+ return Optional.ofNullable(cqToken);
}
@Override
protected void serializeImpl(DataOutputStream stream) throws IOException {
stream.writeShort(getType().getPlanType());
ReadWriteIOUtils.write(cqId, stream);
- ReadWriteIOUtils.write(md5, stream);
+ ReadWriteIOUtils.write(cqToken, stream);
}
@Override
protected void deserializeImpl(ByteBuffer buffer) throws IOException {
cqId = ReadWriteIOUtils.readString(buffer);
- md5 = ReadWriteIOUtils.readString(buffer);
+ cqToken = ReadWriteIOUtils.readString(buffer);
}
@Override
@@ -90,11 +90,11 @@ public class DropCQPlan extends ConfigPhysicalPlan {
return false;
}
DropCQPlan that = (DropCQPlan) o;
- return cqId.equals(that.cqId) && Objects.equals(md5, that.md5);
+ return cqId.equals(that.cqId) && Objects.equals(cqToken, that.cqToken);
}
@Override
public int hashCode() {
- return Objects.hash(super.hashCode(), cqId, md5);
+ return Objects.hash(super.hashCode(), cqId, cqToken);
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/UpdateCQLastExecTimePlan.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/UpdateCQLastExecTimePlan.java
index 861a7d4f51b..a487ae648e3 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/UpdateCQLastExecTimePlan.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/UpdateCQLastExecTimePlan.java
@@ -37,20 +37,19 @@ public class UpdateCQLastExecTimePlan extends
ConfigPhysicalPlan {
private long executionTime;
- // may be null in user call of drop CQ
- private String md5;
+ private String cqToken;
public UpdateCQLastExecTimePlan() {
super(UPDATE_CQ_LAST_EXEC_TIME);
}
- public UpdateCQLastExecTimePlan(String cqId, long executionTime, String md5)
{
+ public UpdateCQLastExecTimePlan(String cqId, long executionTime, String
cqToken) {
super(UPDATE_CQ_LAST_EXEC_TIME);
Validate.notNull(cqId);
- Validate.notNull(md5);
+ Validate.notNull(cqToken);
this.cqId = cqId;
this.executionTime = executionTime;
- this.md5 = md5;
+ this.cqToken = cqToken;
}
public String getCqId() {
@@ -61,8 +60,8 @@ public class UpdateCQLastExecTimePlan extends
ConfigPhysicalPlan {
return executionTime;
}
- public String getMd5() {
- return md5;
+ public String getCqToken() {
+ return cqToken;
}
@Override
@@ -70,14 +69,14 @@ public class UpdateCQLastExecTimePlan extends
ConfigPhysicalPlan {
stream.writeShort(getType().getPlanType());
ReadWriteIOUtils.write(cqId, stream);
ReadWriteIOUtils.write(executionTime, stream);
- ReadWriteIOUtils.write(md5, stream);
+ ReadWriteIOUtils.write(cqToken, stream);
}
@Override
protected void deserializeImpl(ByteBuffer buffer) throws IOException {
cqId = ReadWriteIOUtils.readString(buffer);
executionTime = ReadWriteIOUtils.readLong(buffer);
- md5 = ReadWriteIOUtils.readString(buffer);
+ cqToken = ReadWriteIOUtils.readString(buffer);
}
@Override
@@ -92,11 +91,13 @@ public class UpdateCQLastExecTimePlan extends
ConfigPhysicalPlan {
return false;
}
UpdateCQLastExecTimePlan that = (UpdateCQLastExecTimePlan) o;
- return executionTime == that.executionTime && cqId.equals(that.cqId) &&
md5.equals(that.md5);
+ return executionTime == that.executionTime
+ && cqId.equals(that.cqId)
+ && cqToken.equals(that.cqToken);
}
@Override
public int hashCode() {
- return Objects.hash(super.hashCode(), cqId, executionTime, md5);
+ return Objects.hash(super.hashCode(), cqId, executionTime, cqToken);
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java
index c4c1e8aede9..29837bc8aa2 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java
@@ -43,7 +43,10 @@ import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -57,11 +60,15 @@ public class CQManager {
private final ReadWriteLock lock;
+ // Key: CQ id. Value: the local task and the metadata token it owns.
+ private final ConcurrentMap<String, LocallyScheduledCQ> locallyScheduledCQs;
+
private ScheduledExecutorService executor;
public CQManager(ConfigManager configManager) {
this.configManager = configManager;
this.lock = new ReentrantReadWriteLock();
+ this.locallyScheduledCQs = new ConcurrentHashMap<>();
this.executor =
IoTDBThreadPoolFactory.newScheduledThreadPool(
CONF.getCqSubmitThread(), ThreadName.CQ_SCHEDULER.getName());
@@ -78,14 +85,21 @@ public class CQManager {
}
public TSStatus dropCQ(TDropCQReq req) {
+ lock.readLock().lock();
try {
- return configManager.getConsensusManager().write(new
DropCQPlan(req.cqId));
+ TSStatus status = configManager.getConsensusManager().write(new
DropCQPlan(req.cqId));
+ if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ cancelLocallyScheduledCQ(req.cqId);
+ }
+ return status;
} catch (ConsensusException e) {
LOGGER.warn(ManagerMessages.UNEXPECTED_ERROR_HAPPENED_WHILE_DROPPING_CQ,
req.cqId, e);
// consensus layer related errors
TSStatus res = new
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
res.setMessage(e.getMessage());
return res;
+ } finally {
+ lock.readLock().unlock();
}
}
@@ -118,6 +132,7 @@ public class CQManager {
try {
// 1. shutdown previous cq schedule thread pool
try {
+ cancelAllLocallyScheduledCQs();
if (executor != null) {
executor.shutdown();
}
@@ -156,7 +171,15 @@ public class CQManager {
for (CQInfo.CQEntry entry : allCQs) {
if (entry.getState() == CQState.ACTIVE) {
CQScheduleTask cqScheduleTask = new CQScheduleTask(entry,
executor, configManager);
- cqScheduleTask.submitSelf();
+ if (!markCQLocallyScheduled(entry.getCqId(), entry.getCqToken(),
cqScheduleTask)) {
+ continue;
+ }
+ try {
+ cqScheduleTask.submitSelf();
+ } catch (RuntimeException e) {
+ unmarkCQLocallyScheduled(entry.getCqId(), entry.getCqToken());
+ throw e;
+ }
}
}
}
@@ -176,6 +199,7 @@ public class CQManager {
try {
previous = executor;
executor = null;
+ cancelAllLocallyScheduledCQs();
} finally {
lock.writeLock().unlock();
}
@@ -183,4 +207,78 @@ public class CQManager {
previous.shutdown();
}
}
+
+ public boolean markCQLocallyScheduled(String cqId, String cqToken,
CQScheduleTask task) {
+ AtomicBoolean shouldSchedule = new AtomicBoolean(false);
+ LocallyScheduledCQ schedule = new LocallyScheduledCQ(cqToken, task);
+ lock.readLock().lock();
+ try {
+ locallyScheduledCQs.compute(
+ cqId,
+ (ignored, previousSchedule) -> {
+ if (previousSchedule != null &&
previousSchedule.hasToken(cqToken)) {
+ return previousSchedule;
+ }
+ if (previousSchedule != null) {
+ previousSchedule.cancel();
+ }
+ shouldSchedule.set(true);
+ return schedule;
+ });
+ if (!shouldSchedule.get()) {
+ task.cancel();
+ }
+ return shouldSchedule.get();
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ public void unmarkCQLocallyScheduled(String cqId, String cqToken) {
+ lock.readLock().lock();
+ try {
+ locallyScheduledCQs.computeIfPresent(
+ cqId,
+ (ignored, schedule) -> {
+ if (schedule.hasToken(cqToken)) {
+ schedule.cancel();
+ return null;
+ }
+ return schedule;
+ });
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ private void cancelLocallyScheduledCQ(String cqId) {
+ LocallyScheduledCQ schedule = locallyScheduledCQs.remove(cqId);
+ if (schedule != null) {
+ schedule.cancel();
+ }
+ }
+
+ private void cancelAllLocallyScheduledCQs() {
+ locallyScheduledCQs.values().forEach(LocallyScheduledCQ::cancel);
+ locallyScheduledCQs.clear();
+ }
+
+ private static class LocallyScheduledCQ {
+
+ private final String cqToken;
+ private final CQScheduleTask task;
+
+ private LocallyScheduledCQ(String cqToken, CQScheduleTask task) {
+ this.cqToken = cqToken;
+ this.task = task;
+ }
+
+ private boolean hasToken(String cqToken) {
+ return this.cqToken.equals(cqToken);
+ }
+
+ private void cancel() {
+ task.cancel();
+ }
+ }
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java
index c58f5ade9bc..6b73ffca95f 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java
@@ -40,7 +40,10 @@ import org.slf4j.LoggerFactory;
import java.util.Optional;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
public class CQScheduleTask implements Runnable {
@@ -70,7 +73,7 @@ public class CQScheduleTask implements Runnable {
private final long endTimeOffset;
private final TimeoutPolicy timeoutPolicy;
private final String queryBody;
- private final String md5;
+ private final String cqToken;
private final String zoneId;
@@ -82,12 +85,15 @@ public class CQScheduleTask implements Runnable {
private final long retryWaitTimeInMS;
+ private final AtomicBoolean cancelled;
+ private final AtomicReference<ScheduledFuture<?>> scheduledFuture;
+
private long executionTime;
public CQScheduleTask(
TCreateCQReq req,
long firstExecutionTime,
- String md5,
+ String cqToken,
ScheduledExecutorService executor,
ConfigManager configManager) {
this(
@@ -97,7 +103,7 @@ public class CQScheduleTask implements Runnable {
req.endTimeOffset,
TimeoutPolicy.deserialize(req.timeoutPolicy),
req.queryBody,
- md5,
+ cqToken,
req.zoneId,
req.username,
executor,
@@ -114,7 +120,7 @@ public class CQScheduleTask implements Runnable {
entry.getEndTimeOffset(),
entry.getTimeoutPolicy(),
entry.getQueryBody(),
- entry.getMd5(),
+ entry.getCqToken(),
entry.getZoneId(),
entry.getUsername(),
executor,
@@ -130,7 +136,7 @@ public class CQScheduleTask implements Runnable {
long endTimeOffset,
TimeoutPolicy timeoutPolicy,
String queryBody,
- String md5,
+ String cqToken,
String zoneId,
String username,
ScheduledExecutorService executor,
@@ -142,12 +148,14 @@ public class CQScheduleTask implements Runnable {
this.endTimeOffset = endTimeOffset;
this.timeoutPolicy = timeoutPolicy;
this.queryBody = queryBody;
- this.md5 = md5;
+ this.cqToken = cqToken;
this.zoneId = zoneId;
this.username = username;
this.executor = executor;
this.configManager = configManager;
this.retryWaitTimeInMS = Math.min(DEFAULT_RETRY_WAIT_TIME_IN_MS,
everyInterval / FACTOR);
+ this.cancelled = new AtomicBoolean(false);
+ this.scheduledFuture = new AtomicReference<>();
this.executionTime = executionTime;
}
@@ -166,6 +174,9 @@ public class CQScheduleTask implements Runnable {
@Override
public void run() {
+ if (cancelled.get()) {
+ return;
+ }
long startTime = executionTime - startTimeOffset;
long endTime = executionTime - endTimeOffset;
@@ -178,6 +189,9 @@ public class CQScheduleTask implements Runnable {
submitSelf(retryWaitTimeInMS, TimeUnit.MILLISECONDS);
}
} else {
+ if (cancelled.get()) {
+ return;
+ }
LOGGER.info(
ManagerMessages.STARTEXECUTECQ_EXECUTE_CQ_ON_DATANODE_TIME_RANGE_IS_CURRENT_TIME,
cqId,
@@ -207,12 +221,32 @@ public class CQScheduleTask implements Runnable {
}
private void submitSelf(long delay, TimeUnit unit) {
- executor.schedule(this, delay, unit);
+ if (cancelled.get()) {
+ return;
+ }
+ ScheduledFuture<?> newFuture = executor.schedule(this, delay, unit);
+ ScheduledFuture<?> previousFuture = scheduledFuture.getAndSet(newFuture);
+ if (previousFuture != null) {
+ previousFuture.cancel(false);
+ }
+ if (cancelled.get() && scheduledFuture.compareAndSet(newFuture, null)) {
+ newFuture.cancel(false);
+ }
+ }
+
+ public void cancel() {
+ cancelled.set(true);
+ ScheduledFuture<?> currentFuture = scheduledFuture.getAndSet(null);
+ if (currentFuture != null) {
+ currentFuture.cancel(false);
+ }
}
private boolean needSubmit() {
// current node is still leader and thread pool is not shut down.
- return configManager.getConsensusManager().isLeader() &&
!executor.isShutdown();
+ return !cancelled.get()
+ && configManager.getConsensusManager().isLeader()
+ && !executor.isShutdown();
}
private class AsyncExecuteCQCallback implements
AsyncMethodCallback<TSStatus> {
@@ -239,6 +273,9 @@ public class CQScheduleTask implements Runnable {
@Override
public void onComplete(TSStatus response) {
+ if (cancelled.get()) {
+ return;
+ }
if (response.code == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
LOGGER.info(
@@ -252,7 +289,7 @@ public class CQScheduleTask implements Runnable {
result =
configManager
.getConsensusManager()
- .write(new UpdateCQLastExecTimePlan(cqId, executionTime,
md5));
+ .write(new UpdateCQLastExecTimePlan(cqId, executionTime,
cqToken));
} catch (ConsensusException e) {
result = new
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
result.setMessage(e.getMessage());
@@ -291,6 +328,9 @@ public class CQScheduleTask implements Runnable {
@Override
public void onError(Exception exception) {
+ if (cancelled.get()) {
+ return;
+ }
LOGGER.warn(ManagerMessages.EXECUTE_CQ_FAILED, cqId, exception);
if (needSubmit()) {
submitSelf(retryWaitTimeInMS, TimeUnit.MILLISECONDS);
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/cq/CQInfo.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/cq/CQInfo.java
index 9c99cfbb0e8..013e2415f94 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/cq/CQInfo.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/cq/CQInfo.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.cq.CQState;
import org.apache.iotdb.commons.cq.TimeoutPolicy;
import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
+import org.apache.iotdb.confignode.consensus.request.read.cq.ShowCQPlan;
import org.apache.iotdb.confignode.consensus.request.write.cq.ActiveCQPlan;
import org.apache.iotdb.confignode.consensus.request.write.cq.AddCQPlan;
import org.apache.iotdb.confignode.consensus.request.write.cq.DropCQPlan;
@@ -45,7 +46,9 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
@@ -62,7 +65,7 @@ public class CQInfo implements SnapshotProcessor {
private static final String CQ_NOT_EXIST_FORMAT = "CQ %s doesn't exist.";
- private static final String MD5_NOT_MATCH_FORMAT = "MD5 of CQ %s doesn't
match";
+ private static final String CQ_TOKEN_NOT_MATCH_FORMAT = "Token of CQ %s
doesn't match";
private final Map<String, CQEntry> cqMap;
@@ -92,7 +95,7 @@ public class CQInfo implements SnapshotProcessor {
CQEntry cqEntry =
new CQEntry(
plan.getReq(),
- plan.getMd5(),
+ plan.getCqToken(),
plan.getFirstExecutionTime() - plan.getReq().everyInterval);
cqMap.put(cqId, cqEntry);
res.code = TSStatusCode.SUCCESS_STATUS.getStatusCode();
@@ -106,13 +109,13 @@ public class CQInfo implements SnapshotProcessor {
/**
* Drop the CQ whose ID is same as <tt>cqId</tt> in plan.
*
- * @return SUCCESS_STATUS if there is CQ whose ID and md5 is same as
<tt>cqId</tt> in plan,
+ * @return SUCCESS_STATUS if there is CQ whose ID and token is same as
<tt>cqId</tt> in plan,
* otherwise NO_SUCH_CQ.
*/
public TSStatus dropCQ(DropCQPlan plan) {
TSStatus res = new TSStatus();
String cqId = plan.getCqId();
- Optional<String> md5 = plan.getMd5();
+ Optional<String> cqToken = plan.getCqToken();
lock.writeLock().lock();
try {
CQEntry cqEntry = cqMap.get(cqId);
@@ -120,10 +123,10 @@ public class CQInfo implements SnapshotProcessor {
res.code = TSStatusCode.NO_SUCH_CQ.getStatusCode();
res.message = String.format(CQ_NOT_EXIST_FORMAT, cqId);
LOGGER.warn(ConfigNodeMessages.DROP_CQ_FAILED_BECAUSE_IT_DOESN_T_EXIST, cqId);
- } else if ((md5.isPresent() && !md5.get().equals(cqEntry.md5))) {
+ } else if ((cqToken.isPresent() &&
!cqToken.get().equals(cqEntry.cqToken))) {
res.code = TSStatusCode.NO_SUCH_CQ.getStatusCode();
- res.message = String.format(MD5_NOT_MATCH_FORMAT, cqId);
-
LOGGER.warn(ConfigNodeMessages.DROP_CQ_FAILED_BECAUSE_ITS_MD5_DOESN_T_MATCH,
cqId);
+ res.message = String.format(CQ_TOKEN_NOT_MATCH_FORMAT, cqId);
+
LOGGER.warn(ConfigNodeMessages.DROP_CQ_FAILED_BECAUSE_ITS_TOKEN_DOESN_T_MATCH,
cqId);
} else {
cqMap.remove(cqId);
res.code = TSStatusCode.SUCCESS_STATUS.getStatusCode();
@@ -136,11 +139,24 @@ public class CQInfo implements SnapshotProcessor {
}
public ShowCQResp showCQ() {
+ return showCQ(new ShowCQPlan());
+ }
+
+ public ShowCQResp showCQ(ShowCQPlan plan) {
lock.readLock().lock();
try {
- return new ShowCQResp(
- new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()),
-
cqMap.values().stream().map(CQEntry::new).collect(Collectors.toList()));
+ Optional<String> cqId = plan.getCqId();
+ List<CQEntry> cqList;
+ if (cqId.isPresent()) {
+ CQEntry cqEntry = cqMap.get(cqId.get());
+ cqList =
+ cqEntry == null
+ ? Collections.emptyList()
+ : Collections.singletonList(new CQEntry(cqEntry));
+ } else {
+ cqList =
cqMap.values().stream().map(CQEntry::new).collect(Collectors.toList());
+ }
+ return new ShowCQResp(new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()), cqList);
} finally {
lock.readLock().unlock();
}
@@ -154,16 +170,16 @@ public class CQInfo implements SnapshotProcessor {
public TSStatus activeCQ(ActiveCQPlan plan) {
TSStatus res = new TSStatus();
String cqId = plan.getCqId();
- String md5 = plan.getMd5();
+ String cqToken = plan.getCqToken();
lock.writeLock().lock();
try {
CQEntry cqEntry = cqMap.get(cqId);
if (cqEntry == null) {
res.code = TSStatusCode.NO_SUCH_CQ.getStatusCode();
res.message = String.format(CQ_NOT_EXIST_FORMAT, cqId);
- } else if (!md5.equals(cqEntry.md5)) {
+ } else if (!cqToken.equals(cqEntry.cqToken)) {
res.code = TSStatusCode.NO_SUCH_CQ.getStatusCode();
- res.message = String.format(MD5_NOT_MATCH_FORMAT, cqId);
+ res.message = String.format(CQ_TOKEN_NOT_MATCH_FORMAT, cqId);
} else if (cqEntry.state == CQState.ACTIVE) {
res.code = TSStatusCode.CQ_ALREADY_ACTIVE.getStatusCode();
res.message = String.format("CQ %s has already been active", cqId);
@@ -181,22 +197,22 @@ public class CQInfo implements SnapshotProcessor {
* Update the last execution time of the corresponding CQ.
*
* @return SUCCESS_STATUS if successfully updated, or NO_SUCH_CQ if 1. the
CQ doesn't exist; or 2.
- * md5 is different. or CQ_UPDATE_LAST_EXEC_TIME_FAILED 3. original
lastExecutionTime >=
+ * token is different. or CQ_UPDATE_LAST_EXEC_TIME_FAILED 3. original
lastExecutionTime >=
* current lastExecutionTime;
*/
public TSStatus updateCQLastExecutionTime(UpdateCQLastExecTimePlan plan) {
TSStatus res = new TSStatus();
String cqId = plan.getCqId();
- String md5 = plan.getMd5();
+ String cqToken = plan.getCqToken();
lock.writeLock().lock();
try {
CQEntry cqEntry = cqMap.get(cqId);
if (cqEntry == null) {
res.code = TSStatusCode.NO_SUCH_CQ.getStatusCode();
res.message = String.format(CQ_NOT_EXIST_FORMAT, cqId);
- } else if (!md5.equals(cqEntry.md5)) {
+ } else if (!cqToken.equals(cqEntry.cqToken)) {
res.code = TSStatusCode.NO_SUCH_CQ.getStatusCode();
- res.message = String.format(MD5_NOT_MATCH_FORMAT, cqId);
+ res.message = String.format(CQ_TOKEN_NOT_MATCH_FORMAT, cqId);
} else if (cqEntry.lastExecutionTime >= plan.getExecutionTime()) {
res.code = TSStatusCode.CQ_UPDATE_LAST_EXEC_TIME_ERROR.getStatusCode();
res.message =
@@ -300,7 +316,7 @@ public class CQInfo implements SnapshotProcessor {
private final TimeoutPolicy timeoutPolicy;
private final String queryBody;
private final String sql;
- private final String md5;
+ private final String cqToken;
private final String zoneId;
@@ -309,7 +325,7 @@ public class CQInfo implements SnapshotProcessor {
private CQState state;
private long lastExecutionTime;
- private CQEntry(TCreateCQReq req, String md5, long lastExecutionTime) {
+ private CQEntry(TCreateCQReq req, String cqToken, long lastExecutionTime) {
this(
req.cqId,
req.everyInterval,
@@ -319,7 +335,7 @@ public class CQInfo implements SnapshotProcessor {
TimeoutPolicy.deserialize(req.timeoutPolicy),
req.queryBody,
req.sql,
- md5,
+ cqToken,
req.zoneId,
req.username,
CQState.INACTIVE,
@@ -336,7 +352,7 @@ public class CQInfo implements SnapshotProcessor {
other.timeoutPolicy,
other.queryBody,
other.sql,
- other.md5,
+ other.cqToken,
other.zoneId,
other.username,
other.state,
@@ -353,7 +369,7 @@ public class CQInfo implements SnapshotProcessor {
TimeoutPolicy timeoutPolicy,
String queryBody,
String sql,
- String md5,
+ String cqToken,
String zoneId,
String username,
CQState state,
@@ -366,7 +382,7 @@ public class CQInfo implements SnapshotProcessor {
this.timeoutPolicy = timeoutPolicy;
this.queryBody = queryBody;
this.sql = sql;
- this.md5 = md5;
+ this.cqToken = cqToken;
this.zoneId = zoneId;
this.username = username;
this.state = state;
@@ -382,7 +398,7 @@ public class CQInfo implements SnapshotProcessor {
ReadWriteIOUtils.write(timeoutPolicy.getType(), stream);
ReadWriteIOUtils.write(queryBody, stream);
ReadWriteIOUtils.write(sql, stream);
- ReadWriteIOUtils.write(md5, stream);
+ ReadWriteIOUtils.write(cqToken, stream);
ReadWriteIOUtils.write(zoneId, stream);
ReadWriteIOUtils.write(username, stream);
ReadWriteIOUtils.write(state.getType(), stream);
@@ -398,7 +414,7 @@ public class CQInfo implements SnapshotProcessor {
TimeoutPolicy timeoutPolicy =
TimeoutPolicy.deserialize(ReadWriteIOUtils.readByte(stream));
String queryBody = ReadWriteIOUtils.readString(stream);
String sql = ReadWriteIOUtils.readString(stream);
- String md5 = ReadWriteIOUtils.readString(stream);
+ String cqToken = ReadWriteIOUtils.readString(stream);
String zoneId = ReadWriteIOUtils.readString(stream);
String username = ReadWriteIOUtils.readString(stream);
CQState state = CQState.deserialize(ReadWriteIOUtils.readByte(stream));
@@ -412,7 +428,7 @@ public class CQInfo implements SnapshotProcessor {
timeoutPolicy,
queryBody,
sql,
- md5,
+ cqToken,
zoneId,
username,
state,
@@ -451,8 +467,8 @@ public class CQInfo implements SnapshotProcessor {
return sql;
}
- public String getMd5() {
- return md5;
+ public String getCqToken() {
+ return cqToken;
}
public CQState getState() {
@@ -489,7 +505,7 @@ public class CQInfo implements SnapshotProcessor {
&& timeoutPolicy == cqEntry.timeoutPolicy
&& Objects.equals(queryBody, cqEntry.queryBody)
&& Objects.equals(sql, cqEntry.sql)
- && Objects.equals(md5, cqEntry.md5)
+ && Objects.equals(cqToken, cqEntry.cqToken)
&& Objects.equals(zoneId, cqEntry.zoneId)
&& Objects.equals(username, cqEntry.username)
&& state == cqEntry.state;
@@ -506,7 +522,7 @@ public class CQInfo implements SnapshotProcessor {
timeoutPolicy,
queryBody,
sql,
- md5,
+ cqToken,
zoneId,
username,
state,
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
index eb8d5e5538b..96dcdea4648 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
import
org.apache.iotdb.confignode.consensus.request.read.ConfigPhysicalReadPlan;
import
org.apache.iotdb.confignode.consensus.request.read.ainode.GetAINodeConfigurationPlan;
+import org.apache.iotdb.confignode.consensus.request.read.cq.ShowCQPlan;
import
org.apache.iotdb.confignode.consensus.request.read.database.CountDatabasePlan;
import
org.apache.iotdb.confignode.consensus.request.read.database.GetDatabasePlan;
import
org.apache.iotdb.confignode.consensus.request.read.datanode.GetDataNodeConfigurationPlan;
@@ -361,7 +362,7 @@ public class ConfigPlanExecutor {
case GetRegionGroupsByTime:
return partitionInfo.getRegionGroupsByTime((GetRegionGroupsByTimePlan)
req);
case SHOW_CQ:
- return cqInfo.showCQ();
+ return cqInfo.showCQ((ShowCQPlan) req);
case ShowExternalService:
return externalServiceInfo.showService(((ShowExternalServicePlan)
req).getDataNodeIds());
case GetFunctionTable:
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java
index ac964d23ca3..490f723d2e6 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java
@@ -22,11 +22,15 @@ package org.apache.iotdb.confignode.procedure.impl.cq;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
+import org.apache.iotdb.confignode.consensus.request.read.cq.ShowCQPlan;
import org.apache.iotdb.confignode.consensus.request.write.cq.ActiveCQPlan;
import org.apache.iotdb.confignode.consensus.request.write.cq.AddCQPlan;
import org.apache.iotdb.confignode.consensus.request.write.cq.DropCQPlan;
+import org.apache.iotdb.confignode.consensus.response.cq.ShowCQResp;
import org.apache.iotdb.confignode.i18n.ProcedureMessages;
+import org.apache.iotdb.confignode.manager.cq.CQManager;
import org.apache.iotdb.confignode.manager.cq.CQScheduleTask;
+import org.apache.iotdb.confignode.persistence.cq.CQInfo;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
import org.apache.iotdb.confignode.procedure.impl.node.AbstractNodeProcedure;
@@ -36,7 +40,6 @@ import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq;
import org.apache.iotdb.consensus.exception.ConsensusException;
import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.tsfile.external.commons.codec.digest.DigestUtils;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,6 +48,8 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Objects;
+import java.util.Optional;
+import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import static
org.apache.iotdb.confignode.procedure.state.cq.CreateCQState.INACTIVE;
@@ -60,7 +65,7 @@ public class CreateCQProcedure extends
AbstractNodeProcedure<CreateCQState> {
private TCreateCQReq req;
- private String md5;
+ private String cqToken;
private long firstExecutionTime;
@@ -75,7 +80,7 @@ public class CreateCQProcedure extends
AbstractNodeProcedure<CreateCQState> {
public CreateCQProcedure(TCreateCQReq req, ScheduledExecutorService
executor) {
super();
this.req = req;
- this.md5 = DigestUtils.md2Hex(req.cqId);
+ this.cqToken = generateCQToken();
this.executor = executor;
this.firstExecutionTime =
CQScheduleTask.getFirstExecutionTime(req.boundaryTime,
req.everyInterval);
@@ -91,12 +96,16 @@ public class CreateCQProcedure extends
AbstractNodeProcedure<CreateCQState> {
addCQ(env);
return Flow.HAS_MORE_STATE;
case INACTIVE:
- CQScheduleTask cqScheduleTask =
- new CQScheduleTask(req, firstExecutionTime, md5, executor,
env.getConfigManager());
- cqScheduleTask.submitSelf();
+ submitScheduleTask(
+ env,
+ new CQScheduleTask(
+ req, firstExecutionTime, cqToken, executor,
env.getConfigManager()));
setNextState(SCHEDULED);
break;
case SCHEDULED:
+ if (isStateDeserialized()) {
+ recoverScheduledTask(env);
+ }
activeCQ(env);
return Flow.NO_MORE_STATE;
default:
@@ -126,7 +135,7 @@ public class CreateCQProcedure extends
AbstractNodeProcedure<CreateCQState> {
res =
env.getConfigManager()
.getConsensusManager()
- .write(new AddCQPlan(req, md5, firstExecutionTime));
+ .write(new AddCQPlan(req, cqToken, firstExecutionTime));
} catch (ConsensusException e) {
LOGGER.warn(CONSENSUS_WRITE_ERROR, e);
res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
@@ -147,7 +156,7 @@ public class CreateCQProcedure extends
AbstractNodeProcedure<CreateCQState> {
private void activeCQ(ConfigNodeProcedureEnv env) {
TSStatus res;
try {
- res = env.getConfigManager().getConsensusManager().write(new
ActiveCQPlan(req.cqId, md5));
+ res = env.getConfigManager().getConsensusManager().write(new
ActiveCQPlan(req.cqId, cqToken));
} catch (ConsensusException e) {
LOGGER.warn(CONSENSUS_WRITE_ERROR, e);
res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
@@ -168,6 +177,42 @@ public class CreateCQProcedure extends
AbstractNodeProcedure<CreateCQState> {
}
}
+ void recoverScheduledTask(ConfigNodeProcedureEnv env) throws
ConsensusException {
+ Optional<CQInfo.CQEntry> cqEntry = getCurrentCQEntry(env);
+ if (!cqEntry.isPresent()) {
+ LOGGER.info(
+ "Skip recovering the schedule task of CQ {} because its metadata is
unavailable.",
+ req.cqId);
+ return;
+ }
+ submitScheduleTask(env, new CQScheduleTask(cqEntry.get(), executor,
env.getConfigManager()));
+ }
+
+ Optional<CQInfo.CQEntry> getCurrentCQEntry(ConfigNodeProcedureEnv env)
throws ConsensusException {
+ ShowCQResp response =
+ (ShowCQResp) env.getConfigManager().getConsensusManager().read(new
ShowCQPlan(req.cqId));
+ return response.getCqList().stream()
+ .filter(entry -> cqToken.equals(entry.getCqToken()))
+ .findFirst();
+ }
+
+ private static String generateCQToken() {
+ return UUID.randomUUID().toString();
+ }
+
+ private void submitScheduleTask(ConfigNodeProcedureEnv env, CQScheduleTask
cqScheduleTask) {
+ CQManager cqManager = env.getConfigManager().getCQManager();
+ if (!cqManager.markCQLocallyScheduled(req.cqId, cqToken, cqScheduleTask)) {
+ return;
+ }
+ try {
+ cqScheduleTask.submitSelf();
+ } catch (RuntimeException e) {
+ cqManager.unmarkCQLocallyScheduled(req.cqId, cqToken);
+ throw e;
+ }
+ }
+
@Override
protected void rollbackState(ConfigNodeProcedureEnv env, CreateCQState state)
throws IOException, InterruptedException, ProcedureException {
@@ -180,7 +225,8 @@ public class CreateCQProcedure extends
AbstractNodeProcedure<CreateCQState> {
LOGGER.info(ProcedureMessages.START_INACTIVE_ROLLBACK_OF_CQ, req.cqId);
TSStatus res;
try {
- res = env.getConfigManager().getConsensusManager().write(new
DropCQPlan(req.cqId, md5));
+ res =
+ env.getConfigManager().getConsensusManager().write(new
DropCQPlan(req.cqId, cqToken));
} catch (ConsensusException e) {
LOGGER.warn(CONSENSUS_WRITE_ERROR, e);
res = new
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
@@ -231,7 +277,7 @@ public class CreateCQProcedure extends
AbstractNodeProcedure<CreateCQState> {
stream.writeShort(ProcedureType.CREATE_CQ_PROCEDURE.getTypeCode());
super.serialize(stream);
ThriftCommonsSerDeUtils.serializeTCreateCQReq(req, stream);
- ReadWriteIOUtils.write(md5, stream);
+ ReadWriteIOUtils.write(cqToken, stream);
ReadWriteIOUtils.write(firstExecutionTime, stream);
}
@@ -239,7 +285,7 @@ public class CreateCQProcedure extends
AbstractNodeProcedure<CreateCQState> {
public void deserialize(ByteBuffer byteBuffer) {
super.deserialize(byteBuffer);
this.req = ThriftCommonsSerDeUtils.deserializeTCreateCQReq(byteBuffer);
- this.md5 = ReadWriteIOUtils.readString(byteBuffer);
+ this.cqToken = ReadWriteIOUtils.readString(byteBuffer);
this.firstExecutionTime = ReadWriteIOUtils.readLong(byteBuffer);
}
@@ -258,7 +304,7 @@ public class CreateCQProcedure extends
AbstractNodeProcedure<CreateCQState> {
&& isGeneratedByPipe == that.isGeneratedByPipe
&& firstExecutionTime == that.firstExecutionTime
&& Objects.equals(req, that.req)
- && Objects.equals(md5, that.md5);
+ && Objects.equals(cqToken, that.cqToken);
}
@Override
@@ -269,7 +315,15 @@ public class CreateCQProcedure extends
AbstractNodeProcedure<CreateCQState> {
getCycles(),
isGeneratedByPipe,
req,
- md5,
+ cqToken,
firstExecutionTime);
}
+
+ public String getCqId() {
+ return req == null ? null : req.getCqId();
+ }
+
+ public String getCqToken() {
+ return cqToken;
+ }
}
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
index d6a3d7e3fd7..ea35be6c5d7 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
@@ -1674,7 +1674,7 @@ public class ConfigPhysicalPlanSerDeTest {
@Test
public void ActiveCQPlanTest() throws IOException {
- ActiveCQPlan activeCQPlan0 = new ActiveCQPlan("testCq", "testCq_md5");
+ ActiveCQPlan activeCQPlan0 = new ActiveCQPlan("testCq", "testCqToken");
ActiveCQPlan activeCQPlan1 =
(ActiveCQPlan)
ConfigPhysicalPlan.Factory.create(activeCQPlan0.serializeToByteBuffer());
@@ -1697,7 +1697,7 @@ public class ConfigPhysicalPlanSerDeTest {
"create cq testCq1 BEGIN select s1 into root.backup.d1.s1 from
root.sg.d1 END",
"Asia",
"root"),
- "testCq1_md5",
+ "testCq1Token",
executionTime);
AddCQPlan addCQPlan1 =
(AddCQPlan)
ConfigPhysicalPlan.Factory.create(addCQPlan0.serializeToByteBuffer());
@@ -1712,7 +1712,7 @@ public class ConfigPhysicalPlanSerDeTest {
(DropCQPlan)
ConfigPhysicalPlan.Factory.create(dropCQPlan0.serializeToByteBuffer());
Assert.assertEquals(dropCQPlan0, dropCQPlan1);
- dropCQPlan0 = new DropCQPlan("testCq1", "testCq1_md5");
+ dropCQPlan0 = new DropCQPlan("testCq1", "testCq1Token");
dropCQPlan1 =
(DropCQPlan)
ConfigPhysicalPlan.Factory.create(dropCQPlan0.serializeToByteBuffer());
Assert.assertEquals(dropCQPlan0, dropCQPlan1);
@@ -1721,7 +1721,7 @@ public class ConfigPhysicalPlanSerDeTest {
@Test
public void UpdateCQLastExecTimePlanTest() throws IOException {
UpdateCQLastExecTimePlan updateCQLastExecTimePlan0 =
- new UpdateCQLastExecTimePlan("testCq", System.currentTimeMillis(),
"testCq_md5");
+ new UpdateCQLastExecTimePlan("testCq", System.currentTimeMillis(),
"testCqToken");
UpdateCQLastExecTimePlan updateCQLastExecTimePlan1 =
(UpdateCQLastExecTimePlan)
ConfigPhysicalPlan.Factory.create(updateCQLastExecTimePlan0.serializeToByteBuffer());
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/cq/CQManagerTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/cq/CQManagerTest.java
new file mode 100644
index 00000000000..a0bc5a523ba
--- /dev/null
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/cq/CQManagerTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.cq;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.cq.TimeoutPolicy;
+import org.apache.iotdb.confignode.manager.ConfigManager;
+import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
+import org.apache.iotdb.confignode.manager.cq.CQManager;
+import org.apache.iotdb.confignode.manager.cq.CQScheduleTask;
+import org.apache.iotdb.confignode.rpc.thrift.TDropCQReq;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertTrue;
+
+public class CQManagerTest {
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void dropCQShouldCancelLocallyScheduledTask() throws Exception {
+ ConfigManager configManager = Mockito.mock(ConfigManager.class);
+ ConsensusManager consensusManager = Mockito.mock(ConsensusManager.class);
+
Mockito.when(configManager.getConsensusManager()).thenReturn(consensusManager);
+ Mockito.when(consensusManager.write(Mockito.any()))
+ .thenReturn(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
+ CQManager cqManager = new CQManager(configManager);
+ ScheduledFuture<?> future = Mockito.mock(ScheduledFuture.class);
+ CQScheduleTask task = newScheduledTask(configManager, future, "token");
+
+ try {
+ assertTrue(cqManager.markCQLocallyScheduled("testCq", "token", task));
+ task.submitSelf();
+ cqManager.dropCQ(new TDropCQReq("testCq"));
+
+ Mockito.verify(future).cancel(false);
+ } finally {
+ cqManager.stopCQScheduler();
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void newTokenShouldCancelPreviousLocallyScheduledTask() {
+ ConfigManager configManager = Mockito.mock(ConfigManager.class);
+ CQManager cqManager = new CQManager(configManager);
+ ScheduledFuture<?> previousFuture = Mockito.mock(ScheduledFuture.class);
+ CQScheduleTask previousTask = newScheduledTask(configManager,
previousFuture, "previousToken");
+ ScheduledFuture<?> currentFuture = Mockito.mock(ScheduledFuture.class);
+ CQScheduleTask currentTask = newScheduledTask(configManager,
currentFuture, "currentToken");
+
+ try {
+ assertTrue(cqManager.markCQLocallyScheduled("testCq", "previousToken",
previousTask));
+ previousTask.submitSelf();
+ assertTrue(cqManager.markCQLocallyScheduled("testCq", "currentToken",
currentTask));
+
+ Mockito.verify(previousFuture).cancel(false);
+ } finally {
+ cqManager.stopCQScheduler();
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private CQScheduleTask newScheduledTask(
+ ConfigManager configManager, ScheduledFuture<?> scheduledFuture, String
cqToken) {
+ ScheduledExecutorService executor =
Mockito.mock(ScheduledExecutorService.class);
+ Mockito.when(
+ executor.schedule(
+ Mockito.any(Runnable.class), Mockito.anyLong(),
Mockito.any(TimeUnit.class)))
+ .thenReturn((ScheduledFuture) scheduledFuture);
+ return new CQScheduleTask(
+ "testCq",
+ 1000,
+ 0,
+ 1000,
+ TimeoutPolicy.BLOCKED,
+ "select s1 into root.backup.d1.s1 from root.sg.d1",
+ cqToken,
+ "Asia",
+ "root",
+ executor,
+ configManager,
+ System.currentTimeMillis() + 10_000);
+ }
+}
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/CQInfoTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/CQInfoTest.java
index 4b409d6cf0c..64bbd69c5b6 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/CQInfoTest.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/CQInfoTest.java
@@ -18,9 +18,14 @@
*/
package org.apache.iotdb.confignode.persistence;
+import org.apache.iotdb.confignode.consensus.request.read.cq.ShowCQPlan;
import org.apache.iotdb.confignode.consensus.request.write.cq.AddCQPlan;
+import org.apache.iotdb.confignode.consensus.request.write.cq.DropCQPlan;
+import
org.apache.iotdb.confignode.consensus.request.write.cq.UpdateCQLastExecTimePlan;
+import org.apache.iotdb.confignode.consensus.response.cq.ShowCQResp;
import org.apache.iotdb.confignode.persistence.cq.CQInfo;
import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq;
+import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.thrift.TException;
import org.apache.tsfile.external.commons.io.FileUtils;
@@ -70,7 +75,7 @@ public class CQInfoTest {
"create cq testCq1 BEGIN select s1 into root.backup.d1.s1 from
root.sg.d1 END",
"Asia",
"root"),
- "testCq1_md5",
+ "testCq1Token",
executionTime);
cqInfo.addCQ(addCQPlan);
@@ -89,7 +94,7 @@ public class CQInfoTest {
"create cq testCq2 BEGIN select s1 into root.backup.d2.s1 from
root.sg.d2 END",
"Asia",
"root"),
- "testCq2_md5",
+ "testCq2Token",
executionTime);
cqInfo.addCQ(addCQPlan);
@@ -99,4 +104,59 @@ public class CQInfoTest {
Assert.assertEquals(cqInfo, actualCQInfo);
}
+
+ @Test
+ public void testOldCallbackCannotTouchRecreatedCQ() throws Exception {
+ long executionTime = System.currentTimeMillis();
+ TCreateCQReq req =
+ new TCreateCQReq(
+ "testCq3",
+ 1000,
+ 0,
+ 1000,
+ 0,
+ (byte) 0,
+ "select s1 into root.backup.d3.s1 from root.sg.d3",
+ "create cq testCq3 BEGIN select s1 into root.backup.d3.s1 from
root.sg.d3 END",
+ "Asia",
+ "root");
+
+ cqInfo.addCQ(new AddCQPlan(req, "oldToken", executionTime));
+ cqInfo.dropCQ(new DropCQPlan("testCq3"));
+ cqInfo.addCQ(new AddCQPlan(req, "newToken", executionTime));
+
+ Assert.assertEquals(
+ TSStatusCode.NO_SUCH_CQ.getStatusCode(),
+ cqInfo.updateCQLastExecutionTime(
+ new UpdateCQLastExecTimePlan("testCq3", executionTime + 1000,
"oldToken"))
+ .code);
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+ cqInfo.updateCQLastExecutionTime(
+ new UpdateCQLastExecTimePlan("testCq3", executionTime + 1000,
"newToken"))
+ .code);
+ }
+
+ @Test
+ public void testShowCQCanFilterByCQId() throws Exception {
+ long executionTime = System.currentTimeMillis();
+ TCreateCQReq req =
+ new TCreateCQReq(
+ "testCq4",
+ 1000,
+ 0,
+ 1000,
+ 0,
+ (byte) 0,
+ "select s1 into root.backup.d4.s1 from root.sg.d4",
+ "create cq testCq4 BEGIN select s1 into root.backup.d4.s1 from
root.sg.d4 END",
+ "Asia",
+ "root");
+ cqInfo.addCQ(new AddCQPlan(req, "testCq4Token", executionTime));
+
+ ShowCQResp showCQResp = cqInfo.showCQ(new ShowCQPlan("testCq4"));
+
+ Assert.assertEquals(1, showCQResp.getCqList().size());
+ Assert.assertEquals("testCq4", showCQResp.getCqList().get(0).getCqId());
+ }
}
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/CreateCQProcedureTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/CreateCQProcedureTest.java
index d0e92b32816..3e7fd2052ad 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/CreateCQProcedureTest.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/CreateCQProcedureTest.java
@@ -36,10 +36,36 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.fail;
public class CreateCQProcedureTest {
+ @Test
+ public void tokenShouldBeUniqueForSameCQId() {
+ ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
+ try {
+ TCreateCQReq req =
+ new TCreateCQReq(
+ "testCq1",
+ 1000,
+ 0,
+ 1000,
+ 0,
+ (byte) 0,
+ "select s1 into root.backup.d1(s1) from root.sg.d1",
+ "create cq testCq1 BEGIN select s1 into root.backup.d1(s1) from
root.sg.d1 END",
+ "Asia",
+ "root");
+ CreateCQProcedure createCQProcedure1 = new CreateCQProcedure(req,
executor);
+ CreateCQProcedure createCQProcedure2 = new CreateCQProcedure(req,
executor);
+
+ assertNotEquals(createCQProcedure1.getCqToken(),
createCQProcedure2.getCqToken());
+ } finally {
+ executor.shutdown();
+ }
+ }
+
@Test
public void serializeDeserializeTest() {
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedureRecoveryTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedureRecoveryTest.java
new file mode 100644
index 00000000000..a90e282494f
--- /dev/null
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedureRecoveryTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.impl.cq;
+
+import org.apache.iotdb.confignode.consensus.request.read.cq.ShowCQPlan;
+import org.apache.iotdb.confignode.consensus.request.write.cq.AddCQPlan;
+import org.apache.iotdb.confignode.manager.ConfigManager;
+import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
+import org.apache.iotdb.confignode.manager.cq.CQManager;
+import org.apache.iotdb.confignode.manager.cq.CQScheduleTask;
+import org.apache.iotdb.confignode.persistence.cq.CQInfo;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
+import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq;
+
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+public class CreateCQProcedureRecoveryTest {
+
+ private TCreateCQReq newCreateCQReq() {
+ return new TCreateCQReq(
+ "testCq1",
+ 1000,
+ 0,
+ 1000,
+ 0,
+ (byte) 0,
+ "select s1 into root.backup.d1.s1 from root.sg.d1",
+ "create cq testCq1 BEGIN select s1 into root.backup.d1.s1 from
root.sg.d1 END",
+ "Asia",
+ "root");
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void recoverScheduledTaskShouldResubmitFromLatestMetadata() throws
Exception {
+ ScheduledExecutorService executor =
Mockito.mock(ScheduledExecutorService.class);
+ Mockito.when(
+ executor.schedule(
+ Mockito.any(Runnable.class), Mockito.anyLong(),
Mockito.any(TimeUnit.class)))
+ .thenReturn(Mockito.mock(ScheduledFuture.class));
+
+ ConfigManager configManager = Mockito.mock(ConfigManager.class);
+ ConsensusManager consensusManager = Mockito.mock(ConsensusManager.class);
+ CQManager cqManager = Mockito.mock(CQManager.class);
+ ConfigNodeProcedureEnv env = Mockito.mock(ConfigNodeProcedureEnv.class);
+ Mockito.when(env.getConfigManager()).thenReturn(configManager);
+
Mockito.when(configManager.getConsensusManager()).thenReturn(consensusManager);
+ Mockito.when(configManager.getCQManager()).thenReturn(cqManager);
+ Mockito.when(
+ cqManager.markCQLocallyScheduled(
+ Mockito.anyString(), Mockito.anyString(),
Mockito.any(CQScheduleTask.class)))
+ .thenReturn(true);
+
+ TCreateCQReq req = newCreateCQReq();
+ CreateCQProcedure procedure = new CreateCQProcedure(req, executor);
+
+ CQInfo cqInfo = new CQInfo();
+ cqInfo.addCQ(new AddCQPlan(req, procedure.getCqToken(),
System.currentTimeMillis() + 10_000));
+
Mockito.when(consensusManager.read(Mockito.any(ShowCQPlan.class))).thenReturn(cqInfo.showCQ());
+
+ procedure.recoverScheduledTask(env);
+
+ Mockito.verify(executor)
+ .schedule(Mockito.any(Runnable.class), Mockito.anyLong(),
Mockito.any(TimeUnit.class));
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void recoverScheduledTaskShouldSkipDuplicatedLocalSchedule() throws
Exception {
+ ScheduledExecutorService executor =
Mockito.mock(ScheduledExecutorService.class);
+ ConfigManager configManager = Mockito.mock(ConfigManager.class);
+ ConsensusManager consensusManager = Mockito.mock(ConsensusManager.class);
+ CQManager cqManager = Mockito.mock(CQManager.class);
+ ConfigNodeProcedureEnv env = Mockito.mock(ConfigNodeProcedureEnv.class);
+ Mockito.when(env.getConfigManager()).thenReturn(configManager);
+
Mockito.when(configManager.getConsensusManager()).thenReturn(consensusManager);
+ Mockito.when(configManager.getCQManager()).thenReturn(cqManager);
+ Mockito.when(
+ cqManager.markCQLocallyScheduled(
+ Mockito.anyString(), Mockito.anyString(),
Mockito.any(CQScheduleTask.class)))
+ .thenReturn(false);
+
+ TCreateCQReq req = newCreateCQReq();
+ CreateCQProcedure procedure = new CreateCQProcedure(req, executor);
+
+ CQInfo cqInfo = new CQInfo();
+ cqInfo.addCQ(new AddCQPlan(req, procedure.getCqToken(),
System.currentTimeMillis() + 10_000));
+
Mockito.when(consensusManager.read(Mockito.any(ShowCQPlan.class))).thenReturn(cqInfo.showCQ());
+
+ procedure.recoverScheduledTask(env);
+
+ Mockito.verify(executor, Mockito.never())
+ .schedule(Mockito.any(Runnable.class), Mockito.anyLong(),
Mockito.any(TimeUnit.class));
+ }
+}