This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 91ee3016d18 Fix duplicate scheduling in procedure execution (#17902)
(#17969)
91ee3016d18 is described below
commit 91ee3016d18c06d123ca34b983309418bd9eafe1
Author: Caideyipi <[email protected]>
AuthorDate: Thu Jun 18 16:20:09 2026 +0800
Fix duplicate scheduling in procedure execution (#17902) (#17969)
* Fix duplicate scheduling in procedure execution
* Fix delayed procedure deduplication and semaphore release
* Fix SQL parser error handler traversal
* Fix pipe procedure lock release race
* Fix procedure lock wait scheduling
(cherry picked from commit c25849a093b466966353ecd7b63f722535b8d5b3)
(cherry picked from commit 0a45a3b7b708c3b436e6b9b4c6aa9b65bede7076)
---
.../coordinator/task/PipeTaskCoordinatorLock.java | 13 +--
.../iotdb/confignode/procedure/Procedure.java | 20 ++++
.../confignode/procedure/ProcedureExecutor.java | 113 ++++++++++++++-------
.../procedure/TimeoutExecutorThread.java | 32 +++++-
.../procedure/impl/StateMachineProcedure.java | 3 +-
.../procedure/impl/cq/CreateCQProcedure.java | 2 +-
.../procedure/impl/node/AbstractNodeProcedure.java | 12 +++
.../impl/pipe/AbstractOperatePipeProcedureV2.java | 23 ++---
.../pipe/plugin/CreatePipePluginProcedure.java | 2 +-
.../impl/pipe/plugin/DropPipePluginProcedure.java | 2 +-
.../runtime/PipeHandleLeaderChangeProcedure.java | 2 +-
.../runtime/PipeHandleMetaChangeProcedure.java | 2 +-
.../impl/pipe/task/DropPipeProcedureV2.java | 2 +-
.../impl/pipe/task/StartPipeProcedureV2.java | 2 +-
.../impl/pipe/task/StopPipeProcedureV2.java | 2 +-
.../impl/schema/DeleteDatabaseProcedure.java | 2 +-
.../impl/schema/DeleteLogicalViewProcedure.java | 2 +-
.../impl/schema/DeleteTimeSeriesProcedure.java | 2 +-
.../impl/trigger/CreateTriggerProcedure.java | 2 +-
.../impl/trigger/DropTriggerProcedure.java | 2 +-
.../confignode/procedure/scheduler/LockQueue.java | 10 +-
.../scheduler/SimpleProcedureScheduler.java | 37 ++++++-
.../task/PipeTaskCoordinatorLockTest.java | 60 +++++++++++
.../iotdb/confignode/procedure/TestLockRegime.java | 23 +++++
.../procedure/TestProcedureExecutor.java | 92 +++++++++++++++++
.../procedure/entity/SimpleLockProcedure.java | 11 +-
.../PipeHandleLeaderChangeProcedureTest.java | 47 +++++++++
27 files changed, 433 insertions(+), 89 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
index b86c556f20d..58347f35af5 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
@@ -43,16 +43,9 @@ public class PipeTaskCoordinatorLock {
public void lock() {
LOGGER.debug(
"PipeTaskCoordinator lock waiting for thread {}",
Thread.currentThread().getName());
- try {
- semaphore.acquire();
- LOGGER.debug(
- "PipeTaskCoordinator lock acquired by thread {}",
Thread.currentThread().getName());
- } catch (final InterruptedException e) {
- Thread.currentThread().interrupt();
- LOGGER.error(
- "Interrupted while waiting for PipeTaskCoordinator lock, current
thread: {}",
- Thread.currentThread().getName());
- }
+ semaphore.acquireUninterruptibly();
+ LOGGER.debug(
+ "PipeTaskCoordinator lock acquired by thread {}",
Thread.currentThread().getName());
}
public boolean tryLock() {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java
index 91af03d3971..862bc449c73 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java
@@ -35,6 +35,7 @@ import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
/**
@@ -60,6 +61,7 @@ public abstract class Procedure<Env> implements
Comparable<Procedure<Env>> {
private volatile long lastUpdate;
private final AtomicReference<byte[]> result = new AtomicReference<>();
+ private final AtomicBoolean executing = new AtomicBoolean(false);
private volatile boolean locked = false;
private boolean lockedWhenLoading = false;
@@ -233,6 +235,16 @@ public abstract class Procedure<Env> implements
Comparable<Procedure<Env>> {
// no op
}
+ /**
+ * Called after an execution attempt returns {@link
ProcedureLockState#LOCK_EVENT_WAIT}. Override
+ * it to put the procedure into the corresponding lock wait queue.
+ *
+ * @param env env
+ */
+ protected void waitForLock(Env env) {
+ // no op
+ }
+
/**
* Used to keep procedure lock even when the procedure is yielded or
suspended.
*
@@ -254,6 +266,14 @@ public abstract class Procedure<Env> implements
Comparable<Procedure<Env>> {
}
// -------------------------Internal methods - called by the
procedureExecutor------------------
+ final boolean tryAcquireExecution() {
+ return executing.compareAndSet(false, true);
+ }
+
+ final void releaseExecution() {
+ executing.set(false);
+ }
+
/**
* Internal method called by the ProcedureExecutor that starts the
user-level code execute().
*
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
index 0d8368583b4..1633f78eec9 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.confignode.procedure;
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.utils.TestOnly;
-import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
import org.apache.iotdb.confignode.procedure.scheduler.ProcedureScheduler;
import
org.apache.iotdb.confignode.procedure.scheduler.SimpleProcedureScheduler;
@@ -40,7 +39,6 @@ import java.util.Arrays;
import java.util.Deque;
import java.util.HashSet;
import java.util.List;
-import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -82,6 +80,16 @@ public class ProcedureExecutor<Env> {
private final Env environment;
private final IProcedureStore<Env> store;
+ private static final class LockStateResult<Env> {
+ private final ProcedureLockState lockState;
+ private final Procedure<Env> procedure;
+
+ private LockStateResult(ProcedureLockState lockState, Procedure<Env>
procedure) {
+ this.lockState = lockState;
+ this.procedure = procedure;
+ }
+ }
+
public ProcedureExecutor(
final Env environment, final IProcedureStore<Env> store, final
ProcedureScheduler scheduler) {
this.environment = environment;
@@ -320,32 +328,38 @@ public class ProcedureExecutor<Env> {
return;
}
ProcedureLockState lockState = null;
+ Procedure<Env> lockEventWaitProcedure = null;
try {
do {
if (!rootProcStack.acquire()) {
if (rootProcStack.setRollback()) {
- lockState = executeRootStackRollback(rootProcId, rootProcStack);
+ LockStateResult<Env> lockStateResult =
+ executeRootStackRollback(rootProcId, rootProcStack);
+ lockState = lockStateResult.lockState;
switch (lockState) {
case LOCK_ACQUIRED:
break;
case LOCK_EVENT_WAIT:
- LOG.info("LOCK_EVENT_WAIT rollback {}", proc);
+ LOG.info("LOCK_EVENT_WAIT rollback {}",
lockStateResult.procedure);
rootProcStack.unsetRollback();
+ lockEventWaitProcedure = lockStateResult.procedure;
break;
case LOCK_YIELD_WAIT:
rootProcStack.unsetRollback();
- scheduler.yield(proc);
+ scheduler.yield(lockStateResult.procedure);
break;
default:
throw new UnsupportedOperationException();
}
} else {
if (!proc.wasExecuted()) {
- switch (executeRollback(proc)) {
+ lockState = executeRollback(proc);
+ switch (lockState) {
case LOCK_ACQUIRED:
break;
case LOCK_EVENT_WAIT:
LOG.info("LOCK_EVENT_WAIT can't rollback child running for
{}", proc);
+ lockEventWaitProcedure = proc;
break;
case LOCK_YIELD_WAIT:
scheduler.yield(proc);
@@ -357,19 +371,25 @@ public class ProcedureExecutor<Env> {
}
break;
}
- lockState = acquireLock(proc);
- switch (lockState) {
- case LOCK_ACQUIRED:
- executeProcedure(rootProcStack, proc);
- break;
- case LOCK_YIELD_WAIT:
- case LOCK_EVENT_WAIT:
- LOG.info("{} lockstate is {}", proc, lockState);
- break;
- default:
- throw new UnsupportedOperationException();
+ try {
+ lockState = acquireLock(proc);
+ switch (lockState) {
+ case LOCK_ACQUIRED:
+ executeProcedure(rootProcStack, proc);
+ break;
+ case LOCK_YIELD_WAIT:
+ case LOCK_EVENT_WAIT:
+ LOG.info("{} lockstate is {}", proc, lockState);
+ if (lockState == ProcedureLockState.LOCK_EVENT_WAIT) {
+ lockEventWaitProcedure = proc;
+ }
+ break;
+ default:
+ throw new UnsupportedOperationException();
+ }
+ } finally {
+ rootProcStack.release();
}
- rootProcStack.release();
if (proc.isSuccess()) {
// update metrics on finishing the procedure
@@ -387,9 +407,9 @@ public class ProcedureExecutor<Env> {
} finally {
// Only after procedure has completed execution can it be allowed to be
rescheduled to prevent
// data races
- if (Objects.equals(lockState, ProcedureLockState.LOCK_EVENT_WAIT)) {
- LOG.info("procedureId {} wait for lock.", proc.getProcId());
- ((ConfigNodeProcedureEnv)
this.environment).getNodeLock().waitProcedure(proc);
+ if (lockEventWaitProcedure != null) {
+ LOG.info("procedureId {} wait for lock.",
lockEventWaitProcedure.getProcId());
+ lockEventWaitProcedure.waitForLock(this.environment);
}
}
}
@@ -404,6 +424,7 @@ public class ProcedureExecutor<Env> {
if (proc.getState() != ProcedureState.RUNNABLE) {
LOG.error(
"The executing procedure should in RUNNABLE state, but it's not.
Procedure is {}", proc);
+ releaseLock(proc, false);
return;
}
boolean reExecute;
@@ -570,8 +591,8 @@ public class ProcedureExecutor<Env> {
* @param procedureStack root procedure stack
* @return lock state
*/
- private ProcedureLockState executeRootStackRollback(
- Long rootProcId, RootProcedureStack procedureStack) {
+ private LockStateResult<Env> executeRootStackRollback(
+ Long rootProcId, RootProcedureStack<Env> procedureStack) {
Procedure<Env> rootProcedure = procedures.get(rootProcId);
ProcedureException exception = rootProcedure.getException();
if (exception == null) {
@@ -590,7 +611,7 @@ public class ProcedureExecutor<Env> {
}
ProcedureLockState lockState = acquireLock(procedure);
if (lockState != ProcedureLockState.LOCK_ACQUIRED) {
- return lockState;
+ return new LockStateResult<>(lockState, procedure);
}
lockState = executeRollback(procedure);
releaseLock(procedure, false);
@@ -598,11 +619,11 @@ public class ProcedureExecutor<Env> {
boolean abortRollback = lockState != ProcedureLockState.LOCK_ACQUIRED;
abortRollback |= !isRunning() || !store.isRunning();
if (abortRollback) {
- return lockState;
+ return new LockStateResult<>(lockState, procedure);
}
if (!procedure.isFinished() &&
procedure.isYieldAfterExecution(this.environment)) {
- return ProcedureLockState.LOCK_YIELD_WAIT;
+ return new LockStateResult<>(ProcedureLockState.LOCK_YIELD_WAIT,
procedure);
}
if (procedure != rootProcedure) {
@@ -612,7 +633,7 @@ public class ProcedureExecutor<Env> {
LOG.info("Rolled back {}, time duration is {}", rootProcedure,
rootProcedure.elapsedTime());
rootProcedureCleanup(rootProcedure);
- return ProcedureLockState.LOCK_ACQUIRED;
+ return new LockStateResult<>(ProcedureLockState.LOCK_ACQUIRED,
rootProcedure);
}
private ProcedureLockState acquireLock(Procedure<Env> proc) {
@@ -728,16 +749,33 @@ public class ProcedureExecutor<Env> {
Thread.sleep(1000);
continue;
}
- this.activeProcedure.set(procedure);
- activeExecutorCount.incrementAndGet();
- startTime.set(System.currentTimeMillis());
- executeProcedure(procedure);
- activeExecutorCount.decrementAndGet();
- LOG.trace(
- "Halt pid={}, activeCount={}", procedure.getProcId(),
activeExecutorCount.get());
- this.activeProcedure.set(null);
- lastUpdated = System.currentTimeMillis();
- startTime.set(lastUpdated);
+ boolean executionAcquired = false;
+ while (isRunning() && !(executionAcquired =
procedure.tryAcquireExecution())) {
+ Thread.sleep(10);
+ }
+ if (!executionAcquired) {
+ continue;
+ }
+ try {
+ this.activeProcedure.set(procedure);
+ activeExecutorCount.incrementAndGet();
+ startTime.set(System.currentTimeMillis());
+ try {
+ executeProcedure(procedure);
+ } finally {
+ procedure.releaseExecution();
+ activeExecutorCount.decrementAndGet();
+ LOG.trace(
+ "Halt pid={}, activeCount={}", procedure.getProcId(),
activeExecutorCount.get());
+ this.activeProcedure.set(null);
+ lastUpdated = System.currentTimeMillis();
+ startTime.set(lastUpdated);
+ }
+ } catch (Exception e) {
+ LOG.warn(
+ "Exception happened when worker {} execute procedure {}",
getName(), procedure, e);
+ throw e;
+ }
}
} catch (Exception e) {
@@ -748,6 +786,7 @@ public class ProcedureExecutor<Env> {
this.activeProcedure.get(),
e);
}
+ this.activeProcedure.set(null);
} finally {
LOG.info("Procedure worker {} terminated.", getName());
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java
index 5aaf9a623f5..c998ad903c2 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java
@@ -37,11 +37,13 @@ public class TimeoutExecutorThread<Env> extends
StoppableThread {
}
public void add(Procedure<Env> procedure) {
- queue.add(new ProcedureDelayContainer<>(procedure));
+ ProcedureDelayContainer<Env> delayTask = new
ProcedureDelayContainer<>(procedure);
+ queue.remove(delayTask);
+ queue.add(delayTask);
}
public boolean remove(Procedure<Env> procedure) {
- return queue.remove(new ProcedureDelayContainer<>(procedure));
+ return queue.remove(new ProcedureDelayContainer<>(procedure)) ||
procedure.isFinished();
}
private ProcedureDelayContainer<Env> takeQuietly() {
@@ -62,10 +64,15 @@ public class TimeoutExecutorThread<Env> extends
StoppableThread {
}
Procedure<Env> procedure = delayTask.getProcedure();
if (procedure instanceof InternalProcedure) {
+ if (procedure.isFinished()) {
+ continue;
+ }
InternalProcedure internal = (InternalProcedure) procedure;
internal.periodicExecute(executor.getEnvironment());
- procedure.updateTimestamp();
- queue.add(delayTask);
+ if (!procedure.isFinished()) {
+ procedure.updateTimestamp();
+ queue.add(delayTask);
+ }
} else {
if (procedure.setTimeoutFailure(executor.getEnvironment())) {
long rootProcId = executor.getRootProcedureId(procedure);
@@ -92,6 +99,23 @@ public class TimeoutExecutorThread<Env> extends
StoppableThread {
return procedure;
}
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof ProcedureDelayContainer)) {
+ return false;
+ }
+ ProcedureDelayContainer<?> that = (ProcedureDelayContainer<?>) o;
+ return procedure == that.procedure;
+ }
+
+ @Override
+ public int hashCode() {
+ return System.identityHashCode(procedure);
+ }
+
@Override
public long getDelay(TimeUnit unit) {
long delay = procedure.getTimeoutTimestamp() -
System.currentTimeMillis();
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/StateMachineProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/StateMachineProcedure.java
index f698735ee95..400da565318 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/StateMachineProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/StateMachineProcedure.java
@@ -195,7 +195,8 @@ public abstract class StateMachineProcedure<Env, TState>
extends Procedure<Env>
nextState);
}
}
- if (getStateId(getCurrentState()) == stateToBeAdded) {
+ final TState currentState = getCurrentState();
+ if (currentState != null && getStateId(currentState) == stateToBeAdded) {
cycles++;
} else {
cycles = 0;
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java
index af7f968e8a5..76df7765c9a 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java
@@ -295,7 +295,7 @@ public class CreateCQProcedure extends
AbstractNodeProcedure<CreateCQState> {
}
CreateCQProcedure that = (CreateCQProcedure) o;
return getProcId() == that.getProcId()
- && getCurrentState().equals(that.getCurrentState())
+ && Objects.equals(getCurrentState(), that.getCurrentState())
&& getCycles() == that.getCycles()
&& isGeneratedByPipe == that.isGeneratedByPipe
&& firstExecutionTime == that.firstExecutionTime
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AbstractNodeProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AbstractNodeProcedure.java
index b1410279173..6cade537f1f 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AbstractNodeProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AbstractNodeProcedure.java
@@ -56,6 +56,18 @@ public abstract class AbstractNodeProcedure<TState>
}
}
+ @Override
+ protected void waitForLock(ConfigNodeProcedureEnv configNodeProcedureEnv) {
+ configNodeProcedureEnv.getSchedulerLock().lock();
+ try {
+ configNodeProcedureEnv
+ .getNodeLock()
+ .waitProcedure(this, configNodeProcedureEnv.getScheduler());
+ } finally {
+ configNodeProcedureEnv.getSchedulerLock().unlock();
+ }
+ }
+
@Override
protected void releaseLock(ConfigNodeProcedureEnv configNodeProcedureEnv) {
configNodeProcedureEnv.getSchedulerLock().lock();
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
index 2fa0091568f..592f5291c13 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
@@ -133,12 +133,7 @@ public abstract class AbstractOperatePipeProcedureV2
LOGGER.warn("ProcedureId {}: LOCK_EVENT_WAIT. Without acquiring pipe
lock.", getProcId());
} else {
LOGGER.debug("ProcedureId {}: LOCK_EVENT_WAIT. Pipe lock will be
released.", getProcId());
- configNodeProcedureEnv
- .getConfigManager()
- .getPipeManager()
- .getPipeTaskCoordinator()
- .unlock();
- pipeTaskInfo = null;
+ releasePipeTaskCoordinatorLock(configNodeProcedureEnv);
}
break;
default:
@@ -152,12 +147,7 @@ public abstract class AbstractOperatePipeProcedureV2
"ProcedureId {}: {}. Invalid lock state. Pipe lock will be
released.",
getProcId(),
procedureLockState);
- configNodeProcedureEnv
- .getConfigManager()
- .getPipeManager()
- .getPipeTaskCoordinator()
- .unlock();
- pipeTaskInfo = null;
+ releasePipeTaskCoordinatorLock(configNodeProcedureEnv);
}
break;
}
@@ -181,11 +171,16 @@ public abstract class AbstractOperatePipeProcedureV2
}
PipeProcedureMetrics.getInstance()
.updateTimer(this.getOperation().getName(), this.elapsedTime());
-
configNodeProcedureEnv.getConfigManager().getPipeManager().getPipeTaskCoordinator().unlock();
- pipeTaskInfo = null;
+ releasePipeTaskCoordinatorLock(configNodeProcedureEnv);
}
}
+ private void releasePipeTaskCoordinatorLock(ConfigNodeProcedureEnv
configNodeProcedureEnv) {
+ // Clear before releasing the semaphore to avoid clobbering a re-scheduled
execution's marker.
+ pipeTaskInfo = null;
+
configNodeProcedureEnv.getConfigManager().getPipeManager().getPipeTaskCoordinator().unlock();
+ }
+
protected abstract PipeTaskOperation getOperation();
/**
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java
index f4fa738428d..22fe0ef40f2 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java
@@ -314,7 +314,7 @@ public class CreatePipePluginProcedure extends
AbstractNodeProcedure<CreatePipeP
if (that instanceof CreatePipePluginProcedure) {
CreatePipePluginProcedure thatProcedure = (CreatePipePluginProcedure)
that;
return thatProcedure.getProcId() == getProcId()
- && thatProcedure.getCurrentState().equals(getCurrentState())
+ && Objects.equals(thatProcedure.getCurrentState(), getCurrentState())
&& thatProcedure.getCycles() == getCycles()
&& thatProcedure.pipePluginMeta.equals(pipePluginMeta);
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
index efbe1ee6ccd..5deb0ee4bf3 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
@@ -275,7 +275,7 @@ public class DropPipePluginProcedure extends
AbstractNodeProcedure<DropPipePlugi
if (that instanceof DropPipePluginProcedure) {
final DropPipePluginProcedure thatProcedure = (DropPipePluginProcedure)
that;
return thatProcedure.getProcId() == getProcId()
- && thatProcedure.getCurrentState().equals(this.getCurrentState())
+ && Objects.equals(thatProcedure.getCurrentState(),
this.getCurrentState())
&& thatProcedure.getCycles() == this.getCycles()
&& (thatProcedure.pluginName).equals(pluginName);
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
index 300d04ee11b..bcc4d0a7065 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
@@ -183,7 +183,7 @@ public class PipeHandleLeaderChangeProcedure extends
AbstractOperatePipeProcedur
}
PipeHandleLeaderChangeProcedure that = (PipeHandleLeaderChangeProcedure) o;
return getProcId() == that.getProcId()
- && getCurrentState().equals(that.getCurrentState())
+ && Objects.equals(getCurrentState(), that.getCurrentState())
&& getCycles() == that.getCycles()
&& this.regionGroupToOldAndNewLeaderPairMap.equals(
that.regionGroupToOldAndNewLeaderPairMap);
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
index 3089b3dd072..c7e85f7f2bc 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
@@ -186,7 +186,7 @@ public class PipeHandleMetaChangeProcedure extends
AbstractOperatePipeProcedureV
}
PipeHandleMetaChangeProcedure that = (PipeHandleMetaChangeProcedure) o;
return getProcId() == that.getProcId()
- && getCurrentState().equals(that.getCurrentState())
+ && Objects.equals(getCurrentState(), that.getCurrentState())
&& getCycles() == that.getCycles()
&& needWriteConsensusOnConfigNodes ==
that.needWriteConsensusOnConfigNodes
&& needPushPipeMetaToDataNodes == that.needPushPipeMetaToDataNodes;
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java
index 0c7042caf3f..0f9a8bc0d8c 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java
@@ -165,7 +165,7 @@ public class DropPipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
}
DropPipeProcedureV2 that = (DropPipeProcedureV2) o;
return getProcId() == that.getProcId()
- && getCurrentState().equals(that.getCurrentState())
+ && Objects.equals(getCurrentState(), that.getCurrentState())
&& getCycles() == that.getCycles()
&& pipeName.equals(that.pipeName);
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
index fe36137b35f..29cb2b51ab4 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
@@ -184,7 +184,7 @@ public class StartPipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
}
StartPipeProcedureV2 that = (StartPipeProcedureV2) o;
return getProcId() == that.getProcId()
- && getCurrentState().equals(that.getCurrentState())
+ && Objects.equals(getCurrentState(), that.getCurrentState())
&& getCycles() == that.getCycles()
&& pipeName.equals(that.pipeName);
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
index b2e1a584ec5..44ca8a7bd5c 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
@@ -187,7 +187,7 @@ public class StopPipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
}
StopPipeProcedureV2 that = (StopPipeProcedureV2) o;
return getProcId() == that.getProcId()
- && getCurrentState().equals(that.getCurrentState())
+ && Objects.equals(getCurrentState(), that.getCurrentState())
&& getCycles() == that.getCycles()
&& isStoppedByRuntimeExceptionBeforeStop ==
that.isStoppedByRuntimeExceptionBeforeStop
&& pipeName.equals(that.pipeName);
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java
index 7288090e815..b6ad21128af 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java
@@ -303,7 +303,7 @@ public class DeleteDatabaseProcedure
if (that instanceof DeleteDatabaseProcedure) {
DeleteDatabaseProcedure thatProc = (DeleteDatabaseProcedure) that;
return thatProc.getProcId() == this.getProcId()
- && thatProc.getCurrentState().equals(this.getCurrentState())
+ && Objects.equals(thatProc.getCurrentState(), this.getCurrentState())
&& thatProc.getCycles() == this.getCycles()
&& thatProc.isGeneratedByPipe == this.isGeneratedByPipe
&&
thatProc.deleteDatabaseSchema.equals(this.getDeleteDatabaseSchema());
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteLogicalViewProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteLogicalViewProcedure.java
index aaf7bc1a7bf..fd5d0d52f22 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteLogicalViewProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteLogicalViewProcedure.java
@@ -303,7 +303,7 @@ public class DeleteLogicalViewProcedure
if (o == null || getClass() != o.getClass()) return false;
DeleteLogicalViewProcedure that = (DeleteLogicalViewProcedure) o;
return this.getProcId() == that.getProcId()
- && this.getCurrentState().equals(that.getCurrentState())
+ && Objects.equals(this.getCurrentState(), that.getCurrentState())
&& this.getCycles() == that.getCycles()
&& isGeneratedByPipe == that.isGeneratedByPipe
&& patternTree.equals(that.patternTree);
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteTimeSeriesProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteTimeSeriesProcedure.java
index 74f21abc72d..0a223de1525 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteTimeSeriesProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteTimeSeriesProcedure.java
@@ -367,7 +367,7 @@ public class DeleteTimeSeriesProcedure
if (o == null || getClass() != o.getClass()) return false;
final DeleteTimeSeriesProcedure that = (DeleteTimeSeriesProcedure) o;
return this.getProcId() == that.getProcId()
- && this.getCurrentState().equals(that.getCurrentState())
+ && Objects.equals(this.getCurrentState(), that.getCurrentState())
&& this.getCycles() == getCycles()
&& this.isGeneratedByPipe == that.isGeneratedByPipe
&& this.patternTree.equals(that.patternTree);
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/trigger/CreateTriggerProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/trigger/CreateTriggerProcedure.java
index 74f4112075a..548ba1ee22a 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/trigger/CreateTriggerProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/trigger/CreateTriggerProcedure.java
@@ -304,7 +304,7 @@ public class CreateTriggerProcedure extends
AbstractNodeProcedure<CreateTriggerS
if (that instanceof CreateTriggerProcedure) {
CreateTriggerProcedure thatProc = (CreateTriggerProcedure) that;
return thatProc.getProcId() == this.getProcId()
- && thatProc.getCurrentState().equals(this.getCurrentState())
+ && Objects.equals(thatProc.getCurrentState(), this.getCurrentState())
&& thatProc.getCycles() == this.getCycles()
&& thatProc.isGeneratedByPipe == this.isGeneratedByPipe
&& thatProc.triggerInformation.equals(this.triggerInformation);
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/trigger/DropTriggerProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/trigger/DropTriggerProcedure.java
index 19bfcdc30d2..958000d2b3f 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/trigger/DropTriggerProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/trigger/DropTriggerProcedure.java
@@ -178,7 +178,7 @@ public class DropTriggerProcedure extends
AbstractNodeProcedure<DropTriggerState
if (that instanceof DropTriggerProcedure) {
DropTriggerProcedure thatProc = (DropTriggerProcedure) that;
return thatProc.getProcId() == this.getProcId()
- && thatProc.getCurrentState().equals(this.getCurrentState())
+ && Objects.equals(thatProc.getCurrentState(), this.getCurrentState())
&& thatProc.getCycles() == this.getCycles()
&& thatProc.isGeneratedByPipe == this.isGeneratedByPipe
&& (thatProc.triggerName).equals(this.triggerName);
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/scheduler/LockQueue.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/scheduler/LockQueue.java
index 832e339c0ae..e2f5935a909 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/scheduler/LockQueue.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/scheduler/LockQueue.java
@@ -45,7 +45,15 @@ public class LockQueue {
return true;
}
- public void waitProcedure(Procedure<?> procedure) {
+ public void waitProcedure(Procedure<?> procedure, ProcedureScheduler
procedureScheduler) {
+ if (lockOwnerProcedure == null) {
+ procedureScheduler.addFront(procedure);
+ return;
+ }
+ if (deque.stream()
+ .anyMatch(waitingProcedure -> waitingProcedure.getProcId() ==
procedure.getProcId())) {
+ return;
+ }
deque.addLast(procedure);
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/scheduler/SimpleProcedureScheduler.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/scheduler/SimpleProcedureScheduler.java
index 3cd5ceacf4b..94b6f311930 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/scheduler/SimpleProcedureScheduler.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/scheduler/SimpleProcedureScheduler.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.confignode.procedure.scheduler;
import org.apache.iotdb.confignode.procedure.Procedure;
import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.locks.ReentrantLock;
/** Simple scheduler for procedures */
public class SimpleProcedureScheduler extends AbstractProcedureScheduler {
@@ -48,6 +49,7 @@ public class SimpleProcedureScheduler extends
AbstractProcedureScheduler {
schedLock();
try {
runnables.clear();
+ waitings.clear();
} finally {
schedUnlock();
}
@@ -68,12 +70,37 @@ public class SimpleProcedureScheduler extends
AbstractProcedureScheduler {
return runnables.size();
}
- public void addWaiting(Procedure proc) {
- waitings.add(proc);
+ public void waitProcedure(Procedure proc, ReentrantLock lock) {
+ boolean signal = false;
+ schedLock();
+ try {
+ if (lock.isLocked()) {
+ waitings.add(proc);
+ } else {
+ runnables.addFirst(proc);
+ signal = true;
+ }
+ } finally {
+ schedUnlock();
+ }
+ if (signal) {
+ signalAll();
+ }
}
- public void releaseWaiting() {
- runnables.addAll(waitings);
- waitings.clear();
+ public void releaseWaiting(ReentrantLock lock) {
+ boolean signal;
+ schedLock();
+ try {
+ lock.unlock();
+ signal = !waitings.isEmpty();
+ runnables.addAll(waitings);
+ waitings.clear();
+ } finally {
+ schedUnlock();
+ }
+ if (signal) {
+ signalAll();
+ }
}
}
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLockTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLockTest.java
new file mode 100644
index 00000000000..74d5d821d76
--- /dev/null
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLockTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.pipe.coordinator.task;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class PipeTaskCoordinatorLockTest {
+
+ @Test
+ public void testInterruptedThreadDoesNotAcquireWithoutPermit() throws
Exception {
+ PipeTaskCoordinatorLock lock = new PipeTaskCoordinatorLock();
+ lock.lock();
+
+ CountDownLatch waiting = new CountDownLatch(1);
+ AtomicBoolean acquired = new AtomicBoolean(false);
+ Thread thread =
+ new Thread(
+ () -> {
+ Thread.currentThread().interrupt();
+ waiting.countDown();
+ lock.lock();
+ acquired.set(true);
+ lock.unlock();
+ });
+ thread.start();
+
+ Assert.assertTrue(waiting.await(3, TimeUnit.SECONDS));
+ TimeUnit.MILLISECONDS.sleep(200);
+ Assert.assertFalse(acquired.get());
+
+ lock.unlock();
+ thread.join(TimeUnit.SECONDS.toMillis(3));
+
+ Assert.assertFalse(thread.isAlive());
+ Assert.assertTrue(acquired.get());
+ Assert.assertFalse(lock.isLocked());
+ }
+}
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestLockRegime.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestLockRegime.java
index 500a51e9e3d..967611ca8ee 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestLockRegime.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestLockRegime.java
@@ -19,7 +19,10 @@
package org.apache.iotdb.confignode.procedure;
+import org.apache.iotdb.confignode.procedure.entity.NoopProcedure;
import org.apache.iotdb.confignode.procedure.entity.SimpleLockProcedure;
+import org.apache.iotdb.confignode.procedure.scheduler.LockQueue;
+import
org.apache.iotdb.confignode.procedure.scheduler.SimpleProcedureScheduler;
import org.apache.iotdb.confignode.procedure.util.ProcedureTestUtil;
import org.junit.Assert;
@@ -43,4 +46,24 @@ public class TestLockRegime extends TestProcedureBase {
this.procExecutor,
procIdList.stream().mapToLong(Long::longValue).toArray());
Assert.assertEquals(env.lockAcquireSeq.toString(),
env.executeSeq.toString());
}
+
+ @Test
+ public void testLockQueueDoesNotWakeDuplicateProcedure() {
+ LockQueue lockQueue = new LockQueue();
+ SimpleProcedureScheduler scheduler = new SimpleProcedureScheduler();
+ scheduler.start();
+
+ NoopProcedure lockOwner = new NoopProcedure();
+ lockOwner.setProcId(0);
+ Assert.assertTrue(lockQueue.tryLock(lockOwner));
+
+ NoopProcedure procedure = new NoopProcedure();
+ procedure.setProcId(1);
+ lockQueue.waitProcedure(procedure, scheduler);
+ lockQueue.waitProcedure(procedure, scheduler);
+
+ Assert.assertEquals(1, lockQueue.wakeWaitingProcedures(scheduler));
+ Assert.assertEquals(1, scheduler.size());
+ scheduler.stop();
+ }
}
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestProcedureExecutor.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestProcedureExecutor.java
index dce7f2ba5dc..ba5f635507a 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestProcedureExecutor.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestProcedureExecutor.java
@@ -23,11 +23,14 @@ import
org.apache.iotdb.confignode.procedure.entity.IncProcedure;
import org.apache.iotdb.confignode.procedure.entity.NoopProcedure;
import org.apache.iotdb.confignode.procedure.entity.StuckProcedure;
import org.apache.iotdb.confignode.procedure.env.TestProcEnv;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
+import org.apache.iotdb.confignode.procedure.state.ProcedureState;
import org.apache.iotdb.confignode.procedure.util.ProcedureTestUtil;
import org.junit.Assert;
import org.junit.Test;
+import java.io.IOException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -96,6 +99,37 @@ public class TestProcedureExecutor extends TestProcedureBase
{
ProcedureTestUtil.waitForProcedure(procExecutor, busyProcId2);
}
+ @Test
+ public void testDuplicatedSchedulingDoesNotExecuteConcurrently() throws
InterruptedException {
+ BlockingProcedure blockingProcedure = new BlockingProcedure();
+ long procId = procExecutor.submitProcedure(blockingProcedure);
+
+ Assert.assertTrue(blockingProcedure.awaitExecution(30, TimeUnit.SECONDS));
+
+ procExecutor.getScheduler().addFront(blockingProcedure);
+ boolean duplicated = blockingProcedure.awaitExecution(3, TimeUnit.SECONDS);
+
+ blockingProcedure.releaseExecutions(duplicated ? 2 : 1);
+ ProcedureTestUtil.waitForProcedure(procExecutor, procId);
+
+ Assert.assertFalse(duplicated);
+ Assert.assertEquals(1, blockingProcedure.getExecutionCount());
+ }
+
+ @Test
+ public void testInternalProcedureCanBeDeduplicatedAndRemoved() throws
InterruptedException {
+ CompletingInternalProcedure internalProcedure = new
CompletingInternalProcedure();
+
+ procExecutor.addInternalProcedure(internalProcedure);
+ procExecutor.addInternalProcedure(internalProcedure);
+
+ Assert.assertTrue(internalProcedure.awaitExecution(30, TimeUnit.SECONDS));
+ Assert.assertFalse(internalProcedure.awaitExecution(300,
TimeUnit.MILLISECONDS));
+ Assert.assertEquals(1, internalProcedure.getExecutionCount());
+
+ Assert.assertTrue(procExecutor.removeInternalProcedure(internalProcedure));
+ }
+
private int waitThreadCount(final int expectedThreads) {
long startTime = System.currentTimeMillis();
while (procExecutor.isRunning()
@@ -107,4 +141,62 @@ public class TestProcedureExecutor extends
TestProcedureBase {
}
return procExecutor.getWorkerThreadCount();
}
+
+ private static class BlockingProcedure extends Procedure<TestProcEnv> {
+
+ private final Semaphore entered = new Semaphore(0);
+ private final Semaphore finish = new Semaphore(0);
+ private final AtomicInteger executionCount = new AtomicInteger();
+
+ @Override
+ protected Procedure<TestProcEnv>[] execute(TestProcEnv env) throws
InterruptedException {
+ executionCount.incrementAndGet();
+ entered.release();
+ finish.acquire();
+ return null;
+ }
+
+ @Override
+ protected void rollback(TestProcEnv env)
+ throws IOException, InterruptedException, ProcedureException {
+ // No state to roll back.
+ }
+
+ private boolean awaitExecution(long timeout, TimeUnit unit) throws
InterruptedException {
+ return entered.tryAcquire(timeout, unit);
+ }
+
+ private void releaseExecutions(int permits) {
+ finish.release(permits);
+ }
+
+ private int getExecutionCount() {
+ return executionCount.get();
+ }
+ }
+
+ private static class CompletingInternalProcedure extends
InternalProcedure<TestProcEnv> {
+
+ private final Semaphore entered = new Semaphore(0);
+ private final AtomicInteger executionCount = new AtomicInteger();
+
+ private CompletingInternalProcedure() {
+ super(0);
+ }
+
+ @Override
+ protected void periodicExecute(TestProcEnv env) {
+ executionCount.incrementAndGet();
+ entered.release();
+ setState(ProcedureState.SUCCESS);
+ }
+
+ private boolean awaitExecution(long timeout, TimeUnit unit) throws
InterruptedException {
+ return entered.tryAcquire(timeout, unit);
+ }
+
+ private int getExecutionCount() {
+ return executionCount.get();
+ }
+ }
}
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/SimpleLockProcedure.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/SimpleLockProcedure.java
index ce9fea39d55..42badd70079 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/SimpleLockProcedure.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/SimpleLockProcedure.java
@@ -53,18 +53,21 @@ public class SimpleLockProcedure extends
Procedure<TestProcEnv> {
return ProcedureLockState.LOCK_ACQUIRED;
}
- SimpleProcedureScheduler scheduler = (SimpleProcedureScheduler)
testProcEnv.getScheduler();
- scheduler.addWaiting(this);
System.out.println(procName + " wait for lock.");
return ProcedureLockState.LOCK_EVENT_WAIT;
}
+ @Override
+ protected void waitForLock(TestProcEnv testProcEnv) {
+ SimpleProcedureScheduler scheduler = (SimpleProcedureScheduler)
testProcEnv.getScheduler();
+ scheduler.waitProcedure(this, testProcEnv.getEnvLock());
+ }
+
@Override
protected void releaseLock(TestProcEnv testProcEnv) {
System.out.println(procName + " release lock.");
- testProcEnv.getEnvLock().unlock();
SimpleProcedureScheduler scheduler = (SimpleProcedureScheduler)
testProcEnv.getScheduler();
- scheduler.releaseWaiting();
+ scheduler.releaseWaiting(testProcEnv.getEnvLock());
}
@Override
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedureTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedureTest.java
index 75c0963a27f..b2ec615fbcd 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedureTest.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedureTest.java
@@ -106,4 +106,51 @@ public class PipeHandleLeaderChangeProcedureTest {
fail();
}
}
+
+ @Test
+ public void completedProcedureEqualsTest() {
+ Map<TConsensusGroupId, Pair<Integer, Integer>> leaderMap = new HashMap<>();
+ leaderMap.put(new TConsensusGroupId(TConsensusGroupType.DataRegion, 1),
new Pair<>(1, 2));
+
+ try {
+ PipeHandleLeaderChangeProcedure proc =
deserializeCompletedProcedure(leaderMap);
+ PipeHandleLeaderChangeProcedure proc2 =
deserializeCompletedProcedure(leaderMap);
+
+ assertEquals(proc, proc2);
+ assertEquals(proc.hashCode(), proc2.hashCode());
+ } catch (Exception e) {
+ fail();
+ }
+ }
+
+ private PipeHandleLeaderChangeProcedure deserializeCompletedProcedure(
+ Map<TConsensusGroupId, Pair<Integer, Integer>> leaderMap) throws
Exception {
+ PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream);
+
+
outputStream.writeShort(ProcedureType.PIPE_HANDLE_LEADER_CHANGE_PROCEDURE.getTypeCode());
+ outputStream.writeLong(Procedure.NO_PROC_ID);
+ outputStream.writeInt(ProcedureState.SUCCESS.ordinal());
+ outputStream.writeLong(0L);
+ outputStream.writeLong(0L);
+ outputStream.writeLong(Procedure.NO_PROC_ID);
+ outputStream.writeLong(Procedure.NO_TIMEOUT);
+ outputStream.writeInt(-1);
+ outputStream.write((byte) 0);
+ outputStream.writeInt(-1);
+ outputStream.write((byte) 0);
+ outputStream.writeInt(1);
+ outputStream.writeInt(Integer.MIN_VALUE);
+ outputStream.write((byte) 0);
+ outputStream.writeInt(leaderMap.size());
+ for (Map.Entry<TConsensusGroupId, Pair<Integer, Integer>> entry :
leaderMap.entrySet()) {
+ outputStream.writeInt(entry.getKey().getId());
+ outputStream.writeInt(entry.getValue().getLeft());
+ outputStream.writeInt(entry.getValue().getRight());
+ }
+
+ ByteBuffer buffer =
+ ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0,
byteArrayOutputStream.size());
+ return (PipeHandleLeaderChangeProcedure)
ProcedureFactory.getInstance().create(buffer);
+ }
}