This is an automated email from the ASF dual-hosted git repository. Caideyipi pushed a commit to branch re-cq in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit d3bf44f79d12f3cd814db92512751f9ceb33ed25 Author: Caideyipi <[email protected]> AuthorDate: Wed Jun 3 12:24:51 2026 +0800 Revert "Fix CQ recovery gap and stale callback contamination (#17734)" This reverts commit 29d0d51dc74081c1b9295e29c18569762a72a579. --- .../iotdb/confignode/i18n/ConfigNodeMessages.java | 4 +- .../iotdb/confignode/i18n/ConfigNodeMessages.java | 4 +- .../consensus/request/read/cq/ShowCQPlan.java | 13 --- .../consensus/request/write/cq/ActiveCQPlan.java | 20 ++-- .../consensus/request/write/cq/AddCQPlan.java | 20 ++-- .../consensus/request/write/cq/DropCQPlan.java | 20 ++-- .../request/write/cq/UpdateCQLastExecTimePlan.java | 23 ++-- .../iotdb/confignode/manager/cq/CQManager.java | 102 +----------------- .../confignode/manager/cq/CQScheduleTask.java | 58 ++-------- .../iotdb/confignode/persistence/cq/CQInfo.java | 76 ++++++------- .../persistence/executor/ConfigPlanExecutor.java | 3 +- .../procedure/impl/cq/CreateCQProcedure.java | 80 +++----------- .../request/ConfigPhysicalPlanSerDeTest.java | 8 +- .../apache/iotdb/confignode/cq/CQManagerTest.java | 107 ------------------- .../iotdb/confignode/persistence/CQInfoTest.java | 64 +---------- .../procedure/impl/CreateCQProcedureTest.java | 26 ----- .../impl/cq/CreateCQProcedureRecoveryTest.java | 117 --------------------- 17 files changed, 106 insertions(+), 639 deletions(-) diff --git a/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java b/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java index f662fa5871d..c1c3e877de7 100644 --- a/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java +++ b/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java @@ -130,8 +130,8 @@ public final class ConfigNodeMessages { public static final String DOES_NOT_EXIST = "%s does not exist"; public static final String DROPPING_TAG_OR_TIME_COLUMN_IS_NOT_SUPPORTED = "Dropping tag or time column is not supported."; - public static final String DROP_CQ_FAILED_BECAUSE_ITS_TOKEN_DOESN_T_MATCH = - "Drop CQ {} failed, because its token doesn't match."; + public static final String DROP_CQ_FAILED_BECAUSE_ITS_MD5_DOESN_T_MATCH = + "Drop CQ {} failed, because its MD5 doesn't match."; public static final String DROP_CQ_FAILED_BECAUSE_IT_DOESN_T_EXIST = "Drop CQ {} failed, because it doesn't exist."; public static final String DROP_CQ_SUCCESSFULLY = "Drop CQ {} successfully."; diff --git a/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java b/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java index 8bffa1b0831..6ca847626c8 100644 --- a/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java +++ b/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java @@ -126,8 +126,8 @@ public final class ConfigNodeMessages { "Deserialization error for write plan, request: {}, bytebuffer: {}"; public static final String DOES_NOT_EXIST = "%s does not exist"; public static final String DROPPING_TAG_OR_TIME_COLUMN_IS_NOT_SUPPORTED = "不支持删除标签列或时间列。"; - public static final String DROP_CQ_FAILED_BECAUSE_ITS_TOKEN_DOESN_T_MATCH = - "Drop CQ {} failed, because its token doesn't match."; + public static final String DROP_CQ_FAILED_BECAUSE_ITS_MD5_DOESN_T_MATCH = + "Drop CQ {} failed, because its MD5 doesn't match."; public static final String DROP_CQ_FAILED_BECAUSE_IT_DOESN_T_EXIST = "Drop CQ {} failed, because it doesn't exist."; public static final String DROP_CQ_SUCCESSFULLY = "Drop CQ {} successfully."; diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/cq/ShowCQPlan.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/cq/ShowCQPlan.java index c28838d556b..5217849deb4 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/cq/ShowCQPlan.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/cq/ShowCQPlan.java @@ -21,24 +21,11 @@ package org.apache.iotdb.confignode.consensus.request.read.cq; import org.apache.iotdb.confignode.consensus.request.read.ConfigPhysicalReadPlan; -import java.util.Optional; - import static org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType.SHOW_CQ; public class ShowCQPlan extends ConfigPhysicalReadPlan { - private final String cqId; - public ShowCQPlan() { - this(null); - } - - public ShowCQPlan(String cqId) { super(SHOW_CQ); - this.cqId = cqId; - } - - public Optional<String> getCqId() { - return Optional.ofNullable(cqId); } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/ActiveCQPlan.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/ActiveCQPlan.java index 3faa1c2d62f..263aeb9f0d0 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/ActiveCQPlan.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/ActiveCQPlan.java @@ -35,39 +35,39 @@ public class ActiveCQPlan extends ConfigPhysicalPlan { private String cqId; - private String cqToken; + private String md5; public ActiveCQPlan() { super(ACTIVE_CQ); } - public ActiveCQPlan(String cqId, String cqToken) { + public ActiveCQPlan(String cqId, String md5) { super(ACTIVE_CQ); Validate.notNull(cqId); - Validate.notNull(cqToken); + Validate.notNull(md5); this.cqId = cqId; - this.cqToken = cqToken; + this.md5 = md5; } public String getCqId() { return cqId; } - public String getCqToken() { - return cqToken; + public String getMd5() { + return md5; } @Override protected void serializeImpl(DataOutputStream stream) throws IOException { stream.writeShort(getType().getPlanType()); ReadWriteIOUtils.write(cqId, stream); - ReadWriteIOUtils.write(cqToken, stream); + ReadWriteIOUtils.write(md5, stream); } @Override protected void deserializeImpl(ByteBuffer buffer) throws IOException { cqId = ReadWriteIOUtils.readString(buffer); - cqToken = ReadWriteIOUtils.readString(buffer); + md5 = ReadWriteIOUtils.readString(buffer); } @Override @@ -82,11 +82,11 @@ public class ActiveCQPlan extends ConfigPhysicalPlan { return false; } ActiveCQPlan that = (ActiveCQPlan) o; - return cqId.equals(that.cqId) && cqToken.equals(that.cqToken); + return cqId.equals(that.cqId) && md5.equals(that.md5); } @Override public int hashCode() { - return Objects.hash(super.hashCode(), cqId, cqToken); + return Objects.hash(super.hashCode(), cqId, md5); } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/AddCQPlan.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/AddCQPlan.java index 471516c38e5..62f994688b3 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/AddCQPlan.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/AddCQPlan.java @@ -37,7 +37,7 @@ public class AddCQPlan extends ConfigPhysicalPlan { private TCreateCQReq req; - private String cqToken; + private String md5; private long firstExecutionTime; @@ -45,12 +45,12 @@ public class AddCQPlan extends ConfigPhysicalPlan { super(ADD_CQ); } - public AddCQPlan(TCreateCQReq req, String cqToken, long firstExecutionTime) { + public AddCQPlan(TCreateCQReq req, String md5, long firstExecutionTime) { super(ADD_CQ); Validate.notNull(req); - Validate.notNull(cqToken); + Validate.notNull(md5); this.req = req; - this.cqToken = cqToken; + this.md5 = md5; this.firstExecutionTime = firstExecutionTime; } @@ -58,8 +58,8 @@ public class AddCQPlan extends ConfigPhysicalPlan { return req; } - public String getCqToken() { - return cqToken; + public String getMd5() { + return md5; } public long getFirstExecutionTime() { @@ -70,14 +70,14 @@ public class AddCQPlan extends ConfigPhysicalPlan { protected void serializeImpl(DataOutputStream stream) throws IOException { stream.writeShort(getType().getPlanType()); ThriftCommonsSerDeUtils.serializeTCreateCQReq(req, stream); - ReadWriteIOUtils.write(cqToken, stream); + ReadWriteIOUtils.write(md5, stream); ReadWriteIOUtils.write(firstExecutionTime, stream); } @Override protected void deserializeImpl(ByteBuffer buffer) throws IOException { req = ThriftCommonsSerDeUtils.deserializeTCreateCQReq(buffer); - cqToken = ReadWriteIOUtils.readString(buffer); + md5 = ReadWriteIOUtils.readString(buffer); firstExecutionTime = ReadWriteIOUtils.readLong(buffer); } @@ -95,11 +95,11 @@ public class AddCQPlan extends ConfigPhysicalPlan { AddCQPlan addCQPlan = (AddCQPlan) o; return firstExecutionTime == addCQPlan.firstExecutionTime && Objects.equals(req, addCQPlan.req) - && Objects.equals(cqToken, addCQPlan.cqToken); + && Objects.equals(md5, addCQPlan.md5); } @Override public int hashCode() { - return Objects.hash(super.hashCode(), req, cqToken, firstExecutionTime); + return Objects.hash(super.hashCode(), req, md5, firstExecutionTime); } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/DropCQPlan.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/DropCQPlan.java index 69c29bff634..108241b233d 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/DropCQPlan.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/DropCQPlan.java @@ -37,7 +37,7 @@ public class DropCQPlan extends ConfigPhysicalPlan { private String cqId; // may be null in user call of drop CQ - private String cqToken; + private String md5; public DropCQPlan() { super(DROP_CQ); @@ -49,33 +49,33 @@ public class DropCQPlan extends ConfigPhysicalPlan { this.cqId = cqId; } - public DropCQPlan(String cqId, String cqToken) { + public DropCQPlan(String cqId, String md5) { super(DROP_CQ); Validate.notNull(cqId); - Validate.notNull(cqToken); + Validate.notNull(md5); this.cqId = cqId; - this.cqToken = cqToken; + this.md5 = md5; } public String getCqId() { return cqId; } - public Optional<String> getCqToken() { - return Optional.ofNullable(cqToken); + public Optional<String> getMd5() { + return Optional.ofNullable(md5); } @Override protected void serializeImpl(DataOutputStream stream) throws IOException { stream.writeShort(getType().getPlanType()); ReadWriteIOUtils.write(cqId, stream); - ReadWriteIOUtils.write(cqToken, stream); + ReadWriteIOUtils.write(md5, stream); } @Override protected void deserializeImpl(ByteBuffer buffer) throws IOException { cqId = ReadWriteIOUtils.readString(buffer); - cqToken = ReadWriteIOUtils.readString(buffer); + md5 = ReadWriteIOUtils.readString(buffer); } @Override @@ -90,11 +90,11 @@ public class DropCQPlan extends ConfigPhysicalPlan { return false; } DropCQPlan that = (DropCQPlan) o; - return cqId.equals(that.cqId) && Objects.equals(cqToken, that.cqToken); + return cqId.equals(that.cqId) && Objects.equals(md5, that.md5); } @Override public int hashCode() { - return Objects.hash(super.hashCode(), cqId, cqToken); + return Objects.hash(super.hashCode(), cqId, md5); } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/UpdateCQLastExecTimePlan.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/UpdateCQLastExecTimePlan.java index a487ae648e3..861a7d4f51b 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/UpdateCQLastExecTimePlan.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/UpdateCQLastExecTimePlan.java @@ -37,19 +37,20 @@ public class UpdateCQLastExecTimePlan extends ConfigPhysicalPlan { private long executionTime; - private String cqToken; + // may be null in user call of drop CQ + private String md5; public UpdateCQLastExecTimePlan() { super(UPDATE_CQ_LAST_EXEC_TIME); } - public UpdateCQLastExecTimePlan(String cqId, long executionTime, String cqToken) { + public UpdateCQLastExecTimePlan(String cqId, long executionTime, String md5) { super(UPDATE_CQ_LAST_EXEC_TIME); Validate.notNull(cqId); - Validate.notNull(cqToken); + Validate.notNull(md5); this.cqId = cqId; this.executionTime = executionTime; - this.cqToken = cqToken; + this.md5 = md5; } public String getCqId() { @@ -60,8 +61,8 @@ public class UpdateCQLastExecTimePlan extends ConfigPhysicalPlan { return executionTime; } - public String getCqToken() { - return cqToken; + public String getMd5() { + return md5; } @Override @@ -69,14 +70,14 @@ public class UpdateCQLastExecTimePlan extends ConfigPhysicalPlan { stream.writeShort(getType().getPlanType()); ReadWriteIOUtils.write(cqId, stream); ReadWriteIOUtils.write(executionTime, stream); - ReadWriteIOUtils.write(cqToken, stream); + ReadWriteIOUtils.write(md5, stream); } @Override protected void deserializeImpl(ByteBuffer buffer) throws IOException { cqId = ReadWriteIOUtils.readString(buffer); executionTime = ReadWriteIOUtils.readLong(buffer); - cqToken = ReadWriteIOUtils.readString(buffer); + md5 = ReadWriteIOUtils.readString(buffer); } @Override @@ -91,13 +92,11 @@ public class UpdateCQLastExecTimePlan extends ConfigPhysicalPlan { return false; } UpdateCQLastExecTimePlan that = (UpdateCQLastExecTimePlan) o; - return executionTime == that.executionTime - && cqId.equals(that.cqId) - && cqToken.equals(that.cqToken); + return executionTime == that.executionTime && cqId.equals(that.cqId) && md5.equals(that.md5); } @Override public int hashCode() { - return Objects.hash(super.hashCode(), cqId, executionTime, cqToken); + return Objects.hash(super.hashCode(), cqId, executionTime, md5); } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java index 29837bc8aa2..c4c1e8aede9 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java @@ -43,10 +43,7 @@ import org.slf4j.LoggerFactory; import java.util.Collections; import java.util.List; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -60,15 +57,11 @@ public class CQManager { private final ReadWriteLock lock; - // Key: CQ id. Value: the local task and the metadata token it owns. - private final ConcurrentMap<String, LocallyScheduledCQ> locallyScheduledCQs; - private ScheduledExecutorService executor; public CQManager(ConfigManager configManager) { this.configManager = configManager; this.lock = new ReentrantReadWriteLock(); - this.locallyScheduledCQs = new ConcurrentHashMap<>(); this.executor = IoTDBThreadPoolFactory.newScheduledThreadPool( CONF.getCqSubmitThread(), ThreadName.CQ_SCHEDULER.getName()); @@ -85,21 +78,14 @@ public class CQManager { } public TSStatus dropCQ(TDropCQReq req) { - lock.readLock().lock(); try { - TSStatus status = configManager.getConsensusManager().write(new DropCQPlan(req.cqId)); - if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - cancelLocallyScheduledCQ(req.cqId); - } - return status; + return configManager.getConsensusManager().write(new DropCQPlan(req.cqId)); } catch (ConsensusException e) { LOGGER.warn(ManagerMessages.UNEXPECTED_ERROR_HAPPENED_WHILE_DROPPING_CQ, req.cqId, e); // consensus layer related errors TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); res.setMessage(e.getMessage()); return res; - } finally { - lock.readLock().unlock(); } } @@ -132,7 +118,6 @@ public class CQManager { try { // 1. shutdown previous cq schedule thread pool try { - cancelAllLocallyScheduledCQs(); if (executor != null) { executor.shutdown(); } @@ -171,15 +156,7 @@ public class CQManager { for (CQInfo.CQEntry entry : allCQs) { if (entry.getState() == CQState.ACTIVE) { CQScheduleTask cqScheduleTask = new CQScheduleTask(entry, executor, configManager); - if (!markCQLocallyScheduled(entry.getCqId(), entry.getCqToken(), cqScheduleTask)) { - continue; - } - try { - cqScheduleTask.submitSelf(); - } catch (RuntimeException e) { - unmarkCQLocallyScheduled(entry.getCqId(), entry.getCqToken()); - throw e; - } + cqScheduleTask.submitSelf(); } } } @@ -199,7 +176,6 @@ public class CQManager { try { previous = executor; executor = null; - cancelAllLocallyScheduledCQs(); } finally { lock.writeLock().unlock(); } @@ -207,78 +183,4 @@ public class CQManager { previous.shutdown(); } } - - public boolean markCQLocallyScheduled(String cqId, String cqToken, CQScheduleTask task) { - AtomicBoolean shouldSchedule = new AtomicBoolean(false); - LocallyScheduledCQ schedule = new LocallyScheduledCQ(cqToken, task); - lock.readLock().lock(); - try { - locallyScheduledCQs.compute( - cqId, - (ignored, previousSchedule) -> { - if (previousSchedule != null && previousSchedule.hasToken(cqToken)) { - return previousSchedule; - } - if (previousSchedule != null) { - previousSchedule.cancel(); - } - shouldSchedule.set(true); - return schedule; - }); - if (!shouldSchedule.get()) { - task.cancel(); - } - return shouldSchedule.get(); - } finally { - lock.readLock().unlock(); - } - } - - public void unmarkCQLocallyScheduled(String cqId, String cqToken) { - lock.readLock().lock(); - try { - locallyScheduledCQs.computeIfPresent( - cqId, - (ignored, schedule) -> { - if (schedule.hasToken(cqToken)) { - schedule.cancel(); - return null; - } - return schedule; - }); - } finally { - lock.readLock().unlock(); - } - } - - private void cancelLocallyScheduledCQ(String cqId) { - LocallyScheduledCQ schedule = locallyScheduledCQs.remove(cqId); - if (schedule != null) { - schedule.cancel(); - } - } - - private void cancelAllLocallyScheduledCQs() { - locallyScheduledCQs.values().forEach(LocallyScheduledCQ::cancel); - locallyScheduledCQs.clear(); - } - - private static class LocallyScheduledCQ { - - private final String cqToken; - private final CQScheduleTask task; - - private LocallyScheduledCQ(String cqToken, CQScheduleTask task) { - this.cqToken = cqToken; - this.task = task; - } - - private boolean hasToken(String cqToken) { - return this.cqToken.equals(cqToken); - } - - private void cancel() { - task.cancel(); - } - } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java index 6b73ffca95f..c58f5ade9bc 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java @@ -40,10 +40,7 @@ import org.slf4j.LoggerFactory; import java.util.Optional; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; public class CQScheduleTask implements Runnable { @@ -73,7 +70,7 @@ public class CQScheduleTask implements Runnable { private final long endTimeOffset; private final TimeoutPolicy timeoutPolicy; private final String queryBody; - private final String cqToken; + private final String md5; private final String zoneId; @@ -85,15 +82,12 @@ public class CQScheduleTask implements Runnable { private final long retryWaitTimeInMS; - private final AtomicBoolean cancelled; - private final AtomicReference<ScheduledFuture<?>> scheduledFuture; - private long executionTime; public CQScheduleTask( TCreateCQReq req, long firstExecutionTime, - String cqToken, + String md5, ScheduledExecutorService executor, ConfigManager configManager) { this( @@ -103,7 +97,7 @@ public class CQScheduleTask implements Runnable { req.endTimeOffset, TimeoutPolicy.deserialize(req.timeoutPolicy), req.queryBody, - cqToken, + md5, req.zoneId, req.username, executor, @@ -120,7 +114,7 @@ public class CQScheduleTask implements Runnable { entry.getEndTimeOffset(), entry.getTimeoutPolicy(), entry.getQueryBody(), - entry.getCqToken(), + entry.getMd5(), entry.getZoneId(), entry.getUsername(), executor, @@ -136,7 +130,7 @@ public class CQScheduleTask implements Runnable { long endTimeOffset, TimeoutPolicy timeoutPolicy, String queryBody, - String cqToken, + String md5, String zoneId, String username, ScheduledExecutorService executor, @@ -148,14 +142,12 @@ public class CQScheduleTask implements Runnable { this.endTimeOffset = endTimeOffset; this.timeoutPolicy = timeoutPolicy; this.queryBody = queryBody; - this.cqToken = cqToken; + this.md5 = md5; this.zoneId = zoneId; this.username = username; this.executor = executor; this.configManager = configManager; this.retryWaitTimeInMS = Math.min(DEFAULT_RETRY_WAIT_TIME_IN_MS, everyInterval / FACTOR); - this.cancelled = new AtomicBoolean(false); - this.scheduledFuture = new AtomicReference<>(); this.executionTime = executionTime; } @@ -174,9 +166,6 @@ public class CQScheduleTask implements Runnable { @Override public void run() { - if (cancelled.get()) { - return; - } long startTime = executionTime - startTimeOffset; long endTime = executionTime - endTimeOffset; @@ -189,9 +178,6 @@ public class CQScheduleTask implements Runnable { submitSelf(retryWaitTimeInMS, TimeUnit.MILLISECONDS); } } else { - if (cancelled.get()) { - return; - } LOGGER.info( ManagerMessages.STARTEXECUTECQ_EXECUTE_CQ_ON_DATANODE_TIME_RANGE_IS_CURRENT_TIME, cqId, @@ -221,32 +207,12 @@ public class CQScheduleTask implements Runnable { } private void submitSelf(long delay, TimeUnit unit) { - if (cancelled.get()) { - return; - } - ScheduledFuture<?> newFuture = executor.schedule(this, delay, unit); - ScheduledFuture<?> previousFuture = scheduledFuture.getAndSet(newFuture); - if (previousFuture != null) { - previousFuture.cancel(false); - } - if (cancelled.get() && scheduledFuture.compareAndSet(newFuture, null)) { - newFuture.cancel(false); - } - } - - public void cancel() { - cancelled.set(true); - ScheduledFuture<?> currentFuture = scheduledFuture.getAndSet(null); - if (currentFuture != null) { - currentFuture.cancel(false); - } + executor.schedule(this, delay, unit); } private boolean needSubmit() { // current node is still leader and thread pool is not shut down. - return !cancelled.get() - && configManager.getConsensusManager().isLeader() - && !executor.isShutdown(); + return configManager.getConsensusManager().isLeader() && !executor.isShutdown(); } private class AsyncExecuteCQCallback implements AsyncMethodCallback<TSStatus> { @@ -273,9 +239,6 @@ public class CQScheduleTask implements Runnable { @Override public void onComplete(TSStatus response) { - if (cancelled.get()) { - return; - } if (response.code == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { LOGGER.info( @@ -289,7 +252,7 @@ public class CQScheduleTask implements Runnable { result = configManager .getConsensusManager() - .write(new UpdateCQLastExecTimePlan(cqId, executionTime, cqToken)); + .write(new UpdateCQLastExecTimePlan(cqId, executionTime, md5)); } catch (ConsensusException e) { result = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); result.setMessage(e.getMessage()); @@ -328,9 +291,6 @@ public class CQScheduleTask implements Runnable { @Override public void onError(Exception exception) { - if (cancelled.get()) { - return; - } LOGGER.warn(ManagerMessages.EXECUTE_CQ_FAILED, cqId, exception); if (needSubmit()) { submitSelf(retryWaitTimeInMS, TimeUnit.MILLISECONDS); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/cq/CQInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/cq/CQInfo.java index 013e2415f94..9c99cfbb0e8 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/cq/CQInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/cq/CQInfo.java @@ -23,7 +23,6 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.cq.CQState; import org.apache.iotdb.commons.cq.TimeoutPolicy; import org.apache.iotdb.commons.snapshot.SnapshotProcessor; -import org.apache.iotdb.confignode.consensus.request.read.cq.ShowCQPlan; import org.apache.iotdb.confignode.consensus.request.write.cq.ActiveCQPlan; import org.apache.iotdb.confignode.consensus.request.write.cq.AddCQPlan; import org.apache.iotdb.confignode.consensus.request.write.cq.DropCQPlan; @@ -46,9 +45,7 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.util.Collections; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -65,7 +62,7 @@ public class CQInfo implements SnapshotProcessor { private static final String CQ_NOT_EXIST_FORMAT = "CQ %s doesn't exist."; - private static final String CQ_TOKEN_NOT_MATCH_FORMAT = "Token of CQ %s doesn't match"; + private static final String MD5_NOT_MATCH_FORMAT = "MD5 of CQ %s doesn't match"; private final Map<String, CQEntry> cqMap; @@ -95,7 +92,7 @@ public class CQInfo implements SnapshotProcessor { CQEntry cqEntry = new CQEntry( plan.getReq(), - plan.getCqToken(), + plan.getMd5(), plan.getFirstExecutionTime() - plan.getReq().everyInterval); cqMap.put(cqId, cqEntry); res.code = TSStatusCode.SUCCESS_STATUS.getStatusCode(); @@ -109,13 +106,13 @@ public class CQInfo implements SnapshotProcessor { /** * Drop the CQ whose ID is same as <tt>cqId</tt> in plan. * - * @return SUCCESS_STATUS if there is CQ whose ID and token is same as <tt>cqId</tt> in plan, + * @return SUCCESS_STATUS if there is CQ whose ID and md5 is same as <tt>cqId</tt> in plan, * otherwise NO_SUCH_CQ. */ public TSStatus dropCQ(DropCQPlan plan) { TSStatus res = new TSStatus(); String cqId = plan.getCqId(); - Optional<String> cqToken = plan.getCqToken(); + Optional<String> md5 = plan.getMd5(); lock.writeLock().lock(); try { CQEntry cqEntry = cqMap.get(cqId); @@ -123,10 +120,10 @@ public class CQInfo implements SnapshotProcessor { res.code = TSStatusCode.NO_SUCH_CQ.getStatusCode(); res.message = String.format(CQ_NOT_EXIST_FORMAT, cqId); LOGGER.warn(ConfigNodeMessages.DROP_CQ_FAILED_BECAUSE_IT_DOESN_T_EXIST, cqId); - } else if ((cqToken.isPresent() && !cqToken.get().equals(cqEntry.cqToken))) { + } else if ((md5.isPresent() && !md5.get().equals(cqEntry.md5))) { res.code = TSStatusCode.NO_SUCH_CQ.getStatusCode(); - res.message = String.format(CQ_TOKEN_NOT_MATCH_FORMAT, cqId); - LOGGER.warn(ConfigNodeMessages.DROP_CQ_FAILED_BECAUSE_ITS_TOKEN_DOESN_T_MATCH, cqId); + res.message = String.format(MD5_NOT_MATCH_FORMAT, cqId); + LOGGER.warn(ConfigNodeMessages.DROP_CQ_FAILED_BECAUSE_ITS_MD5_DOESN_T_MATCH, cqId); } else { cqMap.remove(cqId); res.code = TSStatusCode.SUCCESS_STATUS.getStatusCode(); @@ -139,24 +136,11 @@ public class CQInfo implements SnapshotProcessor { } public ShowCQResp showCQ() { - return showCQ(new ShowCQPlan()); - } - - public ShowCQResp showCQ(ShowCQPlan plan) { lock.readLock().lock(); try { - Optional<String> cqId = plan.getCqId(); - List<CQEntry> cqList; - if (cqId.isPresent()) { - CQEntry cqEntry = cqMap.get(cqId.get()); - cqList = - cqEntry == null - ? Collections.emptyList() - : Collections.singletonList(new CQEntry(cqEntry)); - } else { - cqList = cqMap.values().stream().map(CQEntry::new).collect(Collectors.toList()); - } - return new ShowCQResp(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()), cqList); + return new ShowCQResp( + new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()), + cqMap.values().stream().map(CQEntry::new).collect(Collectors.toList())); } finally { lock.readLock().unlock(); } @@ -170,16 +154,16 @@ public class CQInfo implements SnapshotProcessor { public TSStatus activeCQ(ActiveCQPlan plan) { TSStatus res = new TSStatus(); String cqId = plan.getCqId(); - String cqToken = plan.getCqToken(); + String md5 = plan.getMd5(); lock.writeLock().lock(); try { CQEntry cqEntry = cqMap.get(cqId); if (cqEntry == null) { res.code = TSStatusCode.NO_SUCH_CQ.getStatusCode(); res.message = String.format(CQ_NOT_EXIST_FORMAT, cqId); - } else if (!cqToken.equals(cqEntry.cqToken)) { + } else if (!md5.equals(cqEntry.md5)) { res.code = TSStatusCode.NO_SUCH_CQ.getStatusCode(); - res.message = String.format(CQ_TOKEN_NOT_MATCH_FORMAT, cqId); + res.message = String.format(MD5_NOT_MATCH_FORMAT, cqId); } else if (cqEntry.state == CQState.ACTIVE) { res.code = TSStatusCode.CQ_ALREADY_ACTIVE.getStatusCode(); res.message = String.format("CQ %s has already been active", cqId); @@ -197,22 +181,22 @@ public class CQInfo implements SnapshotProcessor { * Update the last execution time of the corresponding CQ. * * @return SUCCESS_STATUS if successfully updated, or NO_SUCH_CQ if 1. the CQ doesn't exist; or 2. - * token is different. or CQ_UPDATE_LAST_EXEC_TIME_FAILED 3. original lastExecutionTime >= + * md5 is different. or CQ_UPDATE_LAST_EXEC_TIME_FAILED 3. original lastExecutionTime >= * current lastExecutionTime; */ public TSStatus updateCQLastExecutionTime(UpdateCQLastExecTimePlan plan) { TSStatus res = new TSStatus(); String cqId = plan.getCqId(); - String cqToken = plan.getCqToken(); + String md5 = plan.getMd5(); lock.writeLock().lock(); try { CQEntry cqEntry = cqMap.get(cqId); if (cqEntry == null) { res.code = TSStatusCode.NO_SUCH_CQ.getStatusCode(); res.message = String.format(CQ_NOT_EXIST_FORMAT, cqId); - } else if (!cqToken.equals(cqEntry.cqToken)) { + } else if (!md5.equals(cqEntry.md5)) { res.code = TSStatusCode.NO_SUCH_CQ.getStatusCode(); - res.message = String.format(CQ_TOKEN_NOT_MATCH_FORMAT, cqId); + res.message = String.format(MD5_NOT_MATCH_FORMAT, cqId); } else if (cqEntry.lastExecutionTime >= plan.getExecutionTime()) { res.code = TSStatusCode.CQ_UPDATE_LAST_EXEC_TIME_ERROR.getStatusCode(); res.message = @@ -316,7 +300,7 @@ public class CQInfo implements SnapshotProcessor { private final TimeoutPolicy timeoutPolicy; private final String queryBody; private final String sql; - private final String cqToken; + private final String md5; private final String zoneId; @@ -325,7 +309,7 @@ public class CQInfo implements SnapshotProcessor { private CQState state; private long lastExecutionTime; - private CQEntry(TCreateCQReq req, String cqToken, long lastExecutionTime) { + private CQEntry(TCreateCQReq req, String md5, long lastExecutionTime) { this( req.cqId, req.everyInterval, @@ -335,7 +319,7 @@ public class CQInfo implements SnapshotProcessor { TimeoutPolicy.deserialize(req.timeoutPolicy), req.queryBody, req.sql, - cqToken, + md5, req.zoneId, req.username, CQState.INACTIVE, @@ -352,7 +336,7 @@ public class CQInfo implements SnapshotProcessor { other.timeoutPolicy, other.queryBody, other.sql, - other.cqToken, + other.md5, other.zoneId, other.username, other.state, @@ -369,7 +353,7 @@ public class CQInfo implements SnapshotProcessor { TimeoutPolicy timeoutPolicy, String queryBody, String sql, - String cqToken, + String md5, String zoneId, String username, CQState state, @@ -382,7 +366,7 @@ public class CQInfo implements SnapshotProcessor { this.timeoutPolicy = timeoutPolicy; this.queryBody = queryBody; this.sql = sql; - this.cqToken = cqToken; + this.md5 = md5; this.zoneId = zoneId; this.username = username; this.state = state; @@ -398,7 +382,7 @@ public class CQInfo implements SnapshotProcessor { ReadWriteIOUtils.write(timeoutPolicy.getType(), stream); ReadWriteIOUtils.write(queryBody, stream); ReadWriteIOUtils.write(sql, stream); - ReadWriteIOUtils.write(cqToken, stream); + ReadWriteIOUtils.write(md5, stream); ReadWriteIOUtils.write(zoneId, stream); ReadWriteIOUtils.write(username, stream); ReadWriteIOUtils.write(state.getType(), stream); @@ -414,7 +398,7 @@ public class CQInfo implements SnapshotProcessor { TimeoutPolicy timeoutPolicy = TimeoutPolicy.deserialize(ReadWriteIOUtils.readByte(stream)); String queryBody = ReadWriteIOUtils.readString(stream); String sql = ReadWriteIOUtils.readString(stream); - String cqToken = ReadWriteIOUtils.readString(stream); + String md5 = ReadWriteIOUtils.readString(stream); String zoneId = ReadWriteIOUtils.readString(stream); String username = ReadWriteIOUtils.readString(stream); CQState state = CQState.deserialize(ReadWriteIOUtils.readByte(stream)); @@ -428,7 +412,7 @@ public class CQInfo implements SnapshotProcessor { timeoutPolicy, queryBody, sql, - cqToken, + md5, zoneId, username, state, @@ -467,8 +451,8 @@ public class CQInfo implements SnapshotProcessor { return sql; } - public String getCqToken() { - return cqToken; + public String getMd5() { + return md5; } public CQState getState() { @@ -505,7 +489,7 @@ public class CQInfo implements SnapshotProcessor { && timeoutPolicy == cqEntry.timeoutPolicy && Objects.equals(queryBody, cqEntry.queryBody) && Objects.equals(sql, cqEntry.sql) - && Objects.equals(cqToken, cqEntry.cqToken) + && Objects.equals(md5, cqEntry.md5) && Objects.equals(zoneId, cqEntry.zoneId) && Objects.equals(username, cqEntry.username) && state == cqEntry.state; @@ -522,7 +506,7 @@ public class CQInfo implements SnapshotProcessor { timeoutPolicy, queryBody, sql, - cqToken, + md5, zoneId, username, state, diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java index 96dcdea4648..eb8d5e5538b 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java @@ -30,7 +30,6 @@ import org.apache.iotdb.commons.snapshot.SnapshotProcessor; import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan; import org.apache.iotdb.confignode.consensus.request.read.ConfigPhysicalReadPlan; import org.apache.iotdb.confignode.consensus.request.read.ainode.GetAINodeConfigurationPlan; -import org.apache.iotdb.confignode.consensus.request.read.cq.ShowCQPlan; import org.apache.iotdb.confignode.consensus.request.read.database.CountDatabasePlan; import org.apache.iotdb.confignode.consensus.request.read.database.GetDatabasePlan; import org.apache.iotdb.confignode.consensus.request.read.datanode.GetDataNodeConfigurationPlan; @@ -362,7 +361,7 @@ public class ConfigPlanExecutor { case GetRegionGroupsByTime: return partitionInfo.getRegionGroupsByTime((GetRegionGroupsByTimePlan) req); case SHOW_CQ: - return cqInfo.showCQ((ShowCQPlan) req); + return cqInfo.showCQ(); case ShowExternalService: return externalServiceInfo.showService(((ShowExternalServicePlan) req).getDataNodeIds()); case GetFunctionTable: diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java index 490f723d2e6..ac964d23ca3 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java @@ -22,15 +22,11 @@ package org.apache.iotdb.confignode.procedure.impl.cq; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.exception.IoTDBException; import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils; -import org.apache.iotdb.confignode.consensus.request.read.cq.ShowCQPlan; import org.apache.iotdb.confignode.consensus.request.write.cq.ActiveCQPlan; import org.apache.iotdb.confignode.consensus.request.write.cq.AddCQPlan; import org.apache.iotdb.confignode.consensus.request.write.cq.DropCQPlan; -import org.apache.iotdb.confignode.consensus.response.cq.ShowCQResp; import org.apache.iotdb.confignode.i18n.ProcedureMessages; -import org.apache.iotdb.confignode.manager.cq.CQManager; import org.apache.iotdb.confignode.manager.cq.CQScheduleTask; -import org.apache.iotdb.confignode.persistence.cq.CQInfo; import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; import org.apache.iotdb.confignode.procedure.exception.ProcedureException; import org.apache.iotdb.confignode.procedure.impl.node.AbstractNodeProcedure; @@ -40,6 +36,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq; import org.apache.iotdb.consensus.exception.ConsensusException; import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.tsfile.external.commons.codec.digest.DigestUtils; import org.apache.tsfile.utils.ReadWriteIOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,8 +45,6 @@ import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Objects; -import java.util.Optional; -import java.util.UUID; import java.util.concurrent.ScheduledExecutorService; import static org.apache.iotdb.confignode.procedure.state.cq.CreateCQState.INACTIVE; @@ -65,7 +60,7 @@ public class CreateCQProcedure extends AbstractNodeProcedure<CreateCQState> { private TCreateCQReq req; - private String cqToken; + private String md5; private long firstExecutionTime; @@ -80,7 +75,7 @@ public class CreateCQProcedure extends AbstractNodeProcedure<CreateCQState> { public CreateCQProcedure(TCreateCQReq req, ScheduledExecutorService executor) { super(); this.req = req; - this.cqToken = generateCQToken(); + this.md5 = DigestUtils.md2Hex(req.cqId); this.executor = executor; this.firstExecutionTime = CQScheduleTask.getFirstExecutionTime(req.boundaryTime, req.everyInterval); @@ -96,16 +91,12 @@ public class CreateCQProcedure extends AbstractNodeProcedure<CreateCQState> { addCQ(env); return Flow.HAS_MORE_STATE; case INACTIVE: - submitScheduleTask( - env, - new CQScheduleTask( - req, firstExecutionTime, cqToken, executor, env.getConfigManager())); + CQScheduleTask cqScheduleTask = + new CQScheduleTask(req, firstExecutionTime, md5, executor, env.getConfigManager()); + cqScheduleTask.submitSelf(); setNextState(SCHEDULED); break; case SCHEDULED: - if (isStateDeserialized()) { - recoverScheduledTask(env); - } activeCQ(env); return Flow.NO_MORE_STATE; default: @@ -135,7 +126,7 @@ public class CreateCQProcedure extends AbstractNodeProcedure<CreateCQState> { res = env.getConfigManager() .getConsensusManager() - .write(new AddCQPlan(req, cqToken, firstExecutionTime)); + .write(new AddCQPlan(req, md5, firstExecutionTime)); } catch (ConsensusException e) { LOGGER.warn(CONSENSUS_WRITE_ERROR, e); res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); @@ -156,7 +147,7 @@ public class CreateCQProcedure extends AbstractNodeProcedure<CreateCQState> { private void activeCQ(ConfigNodeProcedureEnv env) { TSStatus res; try { - res = env.getConfigManager().getConsensusManager().write(new ActiveCQPlan(req.cqId, cqToken)); + res = env.getConfigManager().getConsensusManager().write(new ActiveCQPlan(req.cqId, md5)); } catch (ConsensusException e) { LOGGER.warn(CONSENSUS_WRITE_ERROR, e); res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); @@ -177,42 +168,6 @@ public class CreateCQProcedure extends AbstractNodeProcedure<CreateCQState> { } } - void recoverScheduledTask(ConfigNodeProcedureEnv env) throws ConsensusException { - Optional<CQInfo.CQEntry> cqEntry = getCurrentCQEntry(env); - if (!cqEntry.isPresent()) { - LOGGER.info( - "Skip recovering the schedule task of CQ {} because its metadata is unavailable.", - req.cqId); - return; - } - submitScheduleTask(env, new CQScheduleTask(cqEntry.get(), executor, env.getConfigManager())); - } - - Optional<CQInfo.CQEntry> getCurrentCQEntry(ConfigNodeProcedureEnv env) throws ConsensusException { - ShowCQResp response = - (ShowCQResp) env.getConfigManager().getConsensusManager().read(new ShowCQPlan(req.cqId)); - return response.getCqList().stream() - .filter(entry -> cqToken.equals(entry.getCqToken())) - .findFirst(); - } - - private static String generateCQToken() { - return UUID.randomUUID().toString(); - } - - private void submitScheduleTask(ConfigNodeProcedureEnv env, CQScheduleTask cqScheduleTask) { - CQManager cqManager = env.getConfigManager().getCQManager(); - if (!cqManager.markCQLocallyScheduled(req.cqId, cqToken, cqScheduleTask)) { - return; - } - try { - cqScheduleTask.submitSelf(); - } catch (RuntimeException e) { - cqManager.unmarkCQLocallyScheduled(req.cqId, cqToken); - throw e; - } - } - @Override protected void rollbackState(ConfigNodeProcedureEnv env, CreateCQState state) throws IOException, InterruptedException, ProcedureException { @@ -225,8 +180,7 @@ public class CreateCQProcedure extends AbstractNodeProcedure<CreateCQState> { LOGGER.info(ProcedureMessages.START_INACTIVE_ROLLBACK_OF_CQ, req.cqId); TSStatus res; try { - res = - env.getConfigManager().getConsensusManager().write(new DropCQPlan(req.cqId, cqToken)); + res = env.getConfigManager().getConsensusManager().write(new DropCQPlan(req.cqId, md5)); } catch (ConsensusException e) { LOGGER.warn(CONSENSUS_WRITE_ERROR, e); res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); @@ -277,7 +231,7 @@ public class CreateCQProcedure extends AbstractNodeProcedure<CreateCQState> { stream.writeShort(ProcedureType.CREATE_CQ_PROCEDURE.getTypeCode()); super.serialize(stream); ThriftCommonsSerDeUtils.serializeTCreateCQReq(req, stream); - ReadWriteIOUtils.write(cqToken, stream); + ReadWriteIOUtils.write(md5, stream); ReadWriteIOUtils.write(firstExecutionTime, stream); } @@ -285,7 +239,7 @@ public class CreateCQProcedure extends AbstractNodeProcedure<CreateCQState> { public void deserialize(ByteBuffer byteBuffer) { super.deserialize(byteBuffer); this.req = ThriftCommonsSerDeUtils.deserializeTCreateCQReq(byteBuffer); - this.cqToken = ReadWriteIOUtils.readString(byteBuffer); + this.md5 = ReadWriteIOUtils.readString(byteBuffer); this.firstExecutionTime = ReadWriteIOUtils.readLong(byteBuffer); } @@ -304,7 +258,7 @@ public class CreateCQProcedure extends AbstractNodeProcedure<CreateCQState> { && isGeneratedByPipe == that.isGeneratedByPipe && firstExecutionTime == that.firstExecutionTime && Objects.equals(req, that.req) - && Objects.equals(cqToken, that.cqToken); + && Objects.equals(md5, that.md5); } @Override @@ -315,15 +269,7 @@ public class CreateCQProcedure extends AbstractNodeProcedure<CreateCQState> { getCycles(), isGeneratedByPipe, req, - cqToken, + md5, firstExecutionTime); } - - public String getCqId() { - return req == null ? null : req.getCqId(); - } - - public String getCqToken() { - return cqToken; - } } diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java index ea35be6c5d7..d6a3d7e3fd7 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java @@ -1674,7 +1674,7 @@ public class ConfigPhysicalPlanSerDeTest { @Test public void ActiveCQPlanTest() throws IOException { - ActiveCQPlan activeCQPlan0 = new ActiveCQPlan("testCq", "testCqToken"); + ActiveCQPlan activeCQPlan0 = new ActiveCQPlan("testCq", "testCq_md5"); ActiveCQPlan activeCQPlan1 = (ActiveCQPlan) ConfigPhysicalPlan.Factory.create(activeCQPlan0.serializeToByteBuffer()); @@ -1697,7 +1697,7 @@ public class ConfigPhysicalPlanSerDeTest { "create cq testCq1 BEGIN select s1 into root.backup.d1.s1 from root.sg.d1 END", "Asia", "root"), - "testCq1Token", + "testCq1_md5", executionTime); AddCQPlan addCQPlan1 = (AddCQPlan) ConfigPhysicalPlan.Factory.create(addCQPlan0.serializeToByteBuffer()); @@ -1712,7 +1712,7 @@ public class ConfigPhysicalPlanSerDeTest { (DropCQPlan) ConfigPhysicalPlan.Factory.create(dropCQPlan0.serializeToByteBuffer()); Assert.assertEquals(dropCQPlan0, dropCQPlan1); - dropCQPlan0 = new DropCQPlan("testCq1", "testCq1Token"); + dropCQPlan0 = new DropCQPlan("testCq1", "testCq1_md5"); dropCQPlan1 = (DropCQPlan) ConfigPhysicalPlan.Factory.create(dropCQPlan0.serializeToByteBuffer()); Assert.assertEquals(dropCQPlan0, dropCQPlan1); @@ -1721,7 +1721,7 @@ public class ConfigPhysicalPlanSerDeTest { @Test public void UpdateCQLastExecTimePlanTest() throws IOException { UpdateCQLastExecTimePlan updateCQLastExecTimePlan0 = - new UpdateCQLastExecTimePlan("testCq", System.currentTimeMillis(), "testCqToken"); + new UpdateCQLastExecTimePlan("testCq", System.currentTimeMillis(), "testCq_md5"); UpdateCQLastExecTimePlan updateCQLastExecTimePlan1 = (UpdateCQLastExecTimePlan) ConfigPhysicalPlan.Factory.create(updateCQLastExecTimePlan0.serializeToByteBuffer()); diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/cq/CQManagerTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/cq/CQManagerTest.java deleted file mode 100644 index a0bc5a523ba..00000000000 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/cq/CQManagerTest.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iotdb.confignode.cq; - -import org.apache.iotdb.common.rpc.thrift.TSStatus; -import org.apache.iotdb.commons.cq.TimeoutPolicy; -import org.apache.iotdb.confignode.manager.ConfigManager; -import org.apache.iotdb.confignode.manager.consensus.ConsensusManager; -import org.apache.iotdb.confignode.manager.cq.CQManager; -import org.apache.iotdb.confignode.manager.cq.CQScheduleTask; -import org.apache.iotdb.confignode.rpc.thrift.TDropCQReq; -import org.apache.iotdb.rpc.TSStatusCode; - -import org.junit.Test; -import org.mockito.Mockito; - -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; - -import static org.junit.Assert.assertTrue; - -public class CQManagerTest { - - @SuppressWarnings("unchecked") - @Test - public void dropCQShouldCancelLocallyScheduledTask() throws Exception { - ConfigManager configManager = Mockito.mock(ConfigManager.class); - ConsensusManager consensusManager = Mockito.mock(ConsensusManager.class); - Mockito.when(configManager.getConsensusManager()).thenReturn(consensusManager); - Mockito.when(consensusManager.write(Mockito.any())) - .thenReturn(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())); - CQManager cqManager = new CQManager(configManager); - ScheduledFuture<?> future = Mockito.mock(ScheduledFuture.class); - CQScheduleTask task = newScheduledTask(configManager, future, "token"); - - try { - assertTrue(cqManager.markCQLocallyScheduled("testCq", "token", task)); - task.submitSelf(); - cqManager.dropCQ(new TDropCQReq("testCq")); - - Mockito.verify(future).cancel(false); - } finally { - cqManager.stopCQScheduler(); - } - } - - @SuppressWarnings("unchecked") - @Test - public void newTokenShouldCancelPreviousLocallyScheduledTask() { - ConfigManager configManager = Mockito.mock(ConfigManager.class); - CQManager cqManager = new CQManager(configManager); - ScheduledFuture<?> previousFuture = Mockito.mock(ScheduledFuture.class); - CQScheduleTask previousTask = newScheduledTask(configManager, previousFuture, "previousToken"); - ScheduledFuture<?> currentFuture = Mockito.mock(ScheduledFuture.class); - CQScheduleTask currentTask = newScheduledTask(configManager, currentFuture, "currentToken"); - - try { - assertTrue(cqManager.markCQLocallyScheduled("testCq", "previousToken", previousTask)); - previousTask.submitSelf(); - assertTrue(cqManager.markCQLocallyScheduled("testCq", "currentToken", currentTask)); - - Mockito.verify(previousFuture).cancel(false); - } finally { - cqManager.stopCQScheduler(); - } - } - - @SuppressWarnings("unchecked") - private CQScheduleTask newScheduledTask( - ConfigManager configManager, ScheduledFuture<?> scheduledFuture, String cqToken) { - ScheduledExecutorService executor = Mockito.mock(ScheduledExecutorService.class); - Mockito.when( - executor.schedule( - Mockito.any(Runnable.class), Mockito.anyLong(), Mockito.any(TimeUnit.class))) - .thenReturn((ScheduledFuture) scheduledFuture); - return new CQScheduleTask( - "testCq", - 1000, - 0, - 1000, - TimeoutPolicy.BLOCKED, - "select s1 into root.backup.d1.s1 from root.sg.d1", - cqToken, - "Asia", - "root", - executor, - configManager, - System.currentTimeMillis() + 10_000); - } -} diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/CQInfoTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/CQInfoTest.java index 64bbd69c5b6..4b409d6cf0c 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/CQInfoTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/CQInfoTest.java @@ -18,14 +18,9 @@ */ package org.apache.iotdb.confignode.persistence; -import org.apache.iotdb.confignode.consensus.request.read.cq.ShowCQPlan; import org.apache.iotdb.confignode.consensus.request.write.cq.AddCQPlan; -import org.apache.iotdb.confignode.consensus.request.write.cq.DropCQPlan; -import org.apache.iotdb.confignode.consensus.request.write.cq.UpdateCQLastExecTimePlan; -import org.apache.iotdb.confignode.consensus.response.cq.ShowCQResp; import org.apache.iotdb.confignode.persistence.cq.CQInfo; import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq; -import org.apache.iotdb.rpc.TSStatusCode; import org.apache.thrift.TException; import org.apache.tsfile.external.commons.io.FileUtils; @@ -75,7 +70,7 @@ public class CQInfoTest { "create cq testCq1 BEGIN select s1 into root.backup.d1.s1 from root.sg.d1 END", "Asia", "root"), - "testCq1Token", + "testCq1_md5", executionTime); cqInfo.addCQ(addCQPlan); @@ -94,7 +89,7 @@ public class CQInfoTest { "create cq testCq2 BEGIN select s1 into root.backup.d2.s1 from root.sg.d2 END", "Asia", "root"), - "testCq2Token", + "testCq2_md5", executionTime); cqInfo.addCQ(addCQPlan); @@ -104,59 +99,4 @@ public class CQInfoTest { Assert.assertEquals(cqInfo, actualCQInfo); } - - @Test - public void testOldCallbackCannotTouchRecreatedCQ() throws Exception { - long executionTime = System.currentTimeMillis(); - TCreateCQReq req = - new TCreateCQReq( - "testCq3", - 1000, - 0, - 1000, - 0, - (byte) 0, - "select s1 into root.backup.d3.s1 from root.sg.d3", - "create cq testCq3 BEGIN select s1 into root.backup.d3.s1 from root.sg.d3 END", - "Asia", - "root"); - - cqInfo.addCQ(new AddCQPlan(req, "oldToken", executionTime)); - cqInfo.dropCQ(new DropCQPlan("testCq3")); - cqInfo.addCQ(new AddCQPlan(req, "newToken", executionTime)); - - Assert.assertEquals( - TSStatusCode.NO_SUCH_CQ.getStatusCode(), - cqInfo.updateCQLastExecutionTime( - new UpdateCQLastExecTimePlan("testCq3", executionTime + 1000, "oldToken")) - .code); - Assert.assertEquals( - TSStatusCode.SUCCESS_STATUS.getStatusCode(), - cqInfo.updateCQLastExecutionTime( - new UpdateCQLastExecTimePlan("testCq3", executionTime + 1000, "newToken")) - .code); - } - - @Test - public void testShowCQCanFilterByCQId() throws Exception { - long executionTime = System.currentTimeMillis(); - TCreateCQReq req = - new TCreateCQReq( - "testCq4", - 1000, - 0, - 1000, - 0, - (byte) 0, - "select s1 into root.backup.d4.s1 from root.sg.d4", - "create cq testCq4 BEGIN select s1 into root.backup.d4.s1 from root.sg.d4 END", - "Asia", - "root"); - cqInfo.addCQ(new AddCQPlan(req, "testCq4Token", executionTime)); - - ShowCQResp showCQResp = cqInfo.showCQ(new ShowCQPlan("testCq4")); - - Assert.assertEquals(1, showCQResp.getCqList().size()); - Assert.assertEquals("testCq4", showCQResp.getCqList().get(0).getCqId()); - } } diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/CreateCQProcedureTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/CreateCQProcedureTest.java index 3e7fd2052ad..d0e92b32816 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/CreateCQProcedureTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/CreateCQProcedureTest.java @@ -36,36 +36,10 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.fail; public class CreateCQProcedureTest { - @Test - public void tokenShouldBeUniqueForSameCQId() { - ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); - try { - TCreateCQReq req = - new TCreateCQReq( - "testCq1", - 1000, - 0, - 1000, - 0, - (byte) 0, - "select s1 into root.backup.d1(s1) from root.sg.d1", - "create cq testCq1 BEGIN select s1 into root.backup.d1(s1) from root.sg.d1 END", - "Asia", - "root"); - CreateCQProcedure createCQProcedure1 = new CreateCQProcedure(req, executor); - CreateCQProcedure createCQProcedure2 = new CreateCQProcedure(req, executor); - - assertNotEquals(createCQProcedure1.getCqToken(), createCQProcedure2.getCqToken()); - } finally { - executor.shutdown(); - } - } - @Test public void serializeDeserializeTest() { diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedureRecoveryTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedureRecoveryTest.java deleted file mode 100644 index a90e282494f..00000000000 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedureRecoveryTest.java +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.confignode.procedure.impl.cq; - -import org.apache.iotdb.confignode.consensus.request.read.cq.ShowCQPlan; -import org.apache.iotdb.confignode.consensus.request.write.cq.AddCQPlan; -import org.apache.iotdb.confignode.manager.ConfigManager; -import org.apache.iotdb.confignode.manager.consensus.ConsensusManager; -import org.apache.iotdb.confignode.manager.cq.CQManager; -import org.apache.iotdb.confignode.manager.cq.CQScheduleTask; -import org.apache.iotdb.confignode.persistence.cq.CQInfo; -import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; -import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq; - -import org.junit.Test; -import org.mockito.Mockito; - -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; - -public class CreateCQProcedureRecoveryTest { - - private TCreateCQReq newCreateCQReq() { - return new TCreateCQReq( - "testCq1", - 1000, - 0, - 1000, - 0, - (byte) 0, - "select s1 into root.backup.d1.s1 from root.sg.d1", - "create cq testCq1 BEGIN select s1 into root.backup.d1.s1 from root.sg.d1 END", - "Asia", - "root"); - } - - @SuppressWarnings("unchecked") - @Test - public void recoverScheduledTaskShouldResubmitFromLatestMetadata() throws Exception { - ScheduledExecutorService executor = Mockito.mock(ScheduledExecutorService.class); - Mockito.when( - executor.schedule( - Mockito.any(Runnable.class), Mockito.anyLong(), Mockito.any(TimeUnit.class))) - .thenReturn(Mockito.mock(ScheduledFuture.class)); - - ConfigManager configManager = Mockito.mock(ConfigManager.class); - ConsensusManager consensusManager = Mockito.mock(ConsensusManager.class); - CQManager cqManager = Mockito.mock(CQManager.class); - ConfigNodeProcedureEnv env = Mockito.mock(ConfigNodeProcedureEnv.class); - Mockito.when(env.getConfigManager()).thenReturn(configManager); - Mockito.when(configManager.getConsensusManager()).thenReturn(consensusManager); - Mockito.when(configManager.getCQManager()).thenReturn(cqManager); - Mockito.when( - cqManager.markCQLocallyScheduled( - Mockito.anyString(), Mockito.anyString(), Mockito.any(CQScheduleTask.class))) - .thenReturn(true); - - TCreateCQReq req = newCreateCQReq(); - CreateCQProcedure procedure = new CreateCQProcedure(req, executor); - - CQInfo cqInfo = new CQInfo(); - cqInfo.addCQ(new AddCQPlan(req, procedure.getCqToken(), System.currentTimeMillis() + 10_000)); - Mockito.when(consensusManager.read(Mockito.any(ShowCQPlan.class))).thenReturn(cqInfo.showCQ()); - - procedure.recoverScheduledTask(env); - - Mockito.verify(executor) - .schedule(Mockito.any(Runnable.class), Mockito.anyLong(), Mockito.any(TimeUnit.class)); - } - - @SuppressWarnings("unchecked") - @Test - public void recoverScheduledTaskShouldSkipDuplicatedLocalSchedule() throws Exception { - ScheduledExecutorService executor = Mockito.mock(ScheduledExecutorService.class); - ConfigManager configManager = Mockito.mock(ConfigManager.class); - ConsensusManager consensusManager = Mockito.mock(ConsensusManager.class); - CQManager cqManager = Mockito.mock(CQManager.class); - ConfigNodeProcedureEnv env = Mockito.mock(ConfigNodeProcedureEnv.class); - Mockito.when(env.getConfigManager()).thenReturn(configManager); - Mockito.when(configManager.getConsensusManager()).thenReturn(consensusManager); - Mockito.when(configManager.getCQManager()).thenReturn(cqManager); - Mockito.when( - cqManager.markCQLocallyScheduled( - Mockito.anyString(), Mockito.anyString(), Mockito.any(CQScheduleTask.class))) - .thenReturn(false); - - TCreateCQReq req = newCreateCQReq(); - CreateCQProcedure procedure = new CreateCQProcedure(req, executor); - - CQInfo cqInfo = new CQInfo(); - cqInfo.addCQ(new AddCQPlan(req, procedure.getCqToken(), System.currentTimeMillis() + 10_000)); - Mockito.when(consensusManager.read(Mockito.any(ShowCQPlan.class))).thenReturn(cqInfo.showCQ()); - - procedure.recoverScheduledTask(env); - - Mockito.verify(executor, Mockito.never()) - .schedule(Mockito.any(Runnable.class), Mockito.anyLong(), Mockito.any(TimeUnit.class)); - } -}
