This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 91ee3016d18 Fix duplicate scheduling in procedure execution (#17902) 
(#17969)
91ee3016d18 is described below

commit 91ee3016d18c06d123ca34b983309418bd9eafe1
Author: Caideyipi <[email protected]>
AuthorDate: Thu Jun 18 16:20:09 2026 +0800

    Fix duplicate scheduling in procedure execution (#17902) (#17969)
    
    * Fix duplicate scheduling in procedure execution
    
    * Fix delayed procedure deduplication and semaphore release
    
    * Fix SQL parser error handler traversal
    
    * Fix pipe procedure lock release race
    
    * Fix procedure lock wait scheduling
    
    (cherry picked from commit c25849a093b466966353ecd7b63f722535b8d5b3)
    (cherry picked from commit 0a45a3b7b708c3b436e6b9b4c6aa9b65bede7076)
---
 .../coordinator/task/PipeTaskCoordinatorLock.java  |  13 +--
 .../iotdb/confignode/procedure/Procedure.java      |  20 ++++
 .../confignode/procedure/ProcedureExecutor.java    | 113 ++++++++++++++-------
 .../procedure/TimeoutExecutorThread.java           |  32 +++++-
 .../procedure/impl/StateMachineProcedure.java      |   3 +-
 .../procedure/impl/cq/CreateCQProcedure.java       |   2 +-
 .../procedure/impl/node/AbstractNodeProcedure.java |  12 +++
 .../impl/pipe/AbstractOperatePipeProcedureV2.java  |  23 ++---
 .../pipe/plugin/CreatePipePluginProcedure.java     |   2 +-
 .../impl/pipe/plugin/DropPipePluginProcedure.java  |   2 +-
 .../runtime/PipeHandleLeaderChangeProcedure.java   |   2 +-
 .../runtime/PipeHandleMetaChangeProcedure.java     |   2 +-
 .../impl/pipe/task/DropPipeProcedureV2.java        |   2 +-
 .../impl/pipe/task/StartPipeProcedureV2.java       |   2 +-
 .../impl/pipe/task/StopPipeProcedureV2.java        |   2 +-
 .../impl/schema/DeleteDatabaseProcedure.java       |   2 +-
 .../impl/schema/DeleteLogicalViewProcedure.java    |   2 +-
 .../impl/schema/DeleteTimeSeriesProcedure.java     |   2 +-
 .../impl/trigger/CreateTriggerProcedure.java       |   2 +-
 .../impl/trigger/DropTriggerProcedure.java         |   2 +-
 .../confignode/procedure/scheduler/LockQueue.java  |  10 +-
 .../scheduler/SimpleProcedureScheduler.java        |  37 ++++++-
 .../task/PipeTaskCoordinatorLockTest.java          |  60 +++++++++++
 .../iotdb/confignode/procedure/TestLockRegime.java |  23 +++++
 .../procedure/TestProcedureExecutor.java           |  92 +++++++++++++++++
 .../procedure/entity/SimpleLockProcedure.java      |  11 +-
 .../PipeHandleLeaderChangeProcedureTest.java       |  47 +++++++++
 27 files changed, 433 insertions(+), 89 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
index b86c556f20d..58347f35af5 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
@@ -43,16 +43,9 @@ public class PipeTaskCoordinatorLock {
   public void lock() {
     LOGGER.debug(
         "PipeTaskCoordinator lock waiting for thread {}", 
Thread.currentThread().getName());
-    try {
-      semaphore.acquire();
-      LOGGER.debug(
-          "PipeTaskCoordinator lock acquired by thread {}", 
Thread.currentThread().getName());
-    } catch (final InterruptedException e) {
-      Thread.currentThread().interrupt();
-      LOGGER.error(
-          "Interrupted while waiting for PipeTaskCoordinator lock, current 
thread: {}",
-          Thread.currentThread().getName());
-    }
+    semaphore.acquireUninterruptibly();
+    LOGGER.debug(
+        "PipeTaskCoordinator lock acquired by thread {}", 
Thread.currentThread().getName());
   }
 
   public boolean tryLock() {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java
index 91af03d3971..862bc449c73 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java
@@ -35,6 +35,7 @@ import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
@@ -60,6 +61,7 @@ public abstract class Procedure<Env> implements 
Comparable<Procedure<Env>> {
   private volatile long lastUpdate;
 
   private final AtomicReference<byte[]> result = new AtomicReference<>();
+  private final AtomicBoolean executing = new AtomicBoolean(false);
   private volatile boolean locked = false;
   private boolean lockedWhenLoading = false;
 
@@ -233,6 +235,16 @@ public abstract class Procedure<Env> implements 
Comparable<Procedure<Env>> {
     // no op
   }
 
+  /**
+   * Called after an execution attempt returns {@link 
ProcedureLockState#LOCK_EVENT_WAIT}. Override
+   * it to put the procedure into the corresponding lock wait queue.
+   *
+   * @param env env
+   */
+  protected void waitForLock(Env env) {
+    // no op
+  }
+
   /**
    * Used to keep procedure lock even when the procedure is yielded or 
suspended.
    *
@@ -254,6 +266,14 @@ public abstract class Procedure<Env> implements 
Comparable<Procedure<Env>> {
   }
 
   // -------------------------Internal methods - called by the 
procedureExecutor------------------
+  final boolean tryAcquireExecution() {
+    return executing.compareAndSet(false, true);
+  }
+
+  final void releaseExecution() {
+    executing.set(false);
+  }
+
   /**
    * Internal method called by the ProcedureExecutor that starts the 
user-level code execute().
    *
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
index 0d8368583b4..1633f78eec9 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.confignode.procedure;
 
 import org.apache.iotdb.commons.concurrent.ThreadName;
 import org.apache.iotdb.commons.utils.TestOnly;
-import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
 import org.apache.iotdb.confignode.procedure.scheduler.ProcedureScheduler;
 import 
org.apache.iotdb.confignode.procedure.scheduler.SimpleProcedureScheduler;
@@ -40,7 +39,6 @@ import java.util.Arrays;
 import java.util.Deque;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
@@ -82,6 +80,16 @@ public class ProcedureExecutor<Env> {
   private final Env environment;
   private final IProcedureStore<Env> store;
 
+  private static final class LockStateResult<Env> {
+    private final ProcedureLockState lockState;
+    private final Procedure<Env> procedure;
+
+    private LockStateResult(ProcedureLockState lockState, Procedure<Env> 
procedure) {
+      this.lockState = lockState;
+      this.procedure = procedure;
+    }
+  }
+
   public ProcedureExecutor(
       final Env environment, final IProcedureStore<Env> store, final 
ProcedureScheduler scheduler) {
     this.environment = environment;
@@ -320,32 +328,38 @@ public class ProcedureExecutor<Env> {
       return;
     }
     ProcedureLockState lockState = null;
+    Procedure<Env> lockEventWaitProcedure = null;
     try {
       do {
         if (!rootProcStack.acquire()) {
           if (rootProcStack.setRollback()) {
-            lockState = executeRootStackRollback(rootProcId, rootProcStack);
+            LockStateResult<Env> lockStateResult =
+                executeRootStackRollback(rootProcId, rootProcStack);
+            lockState = lockStateResult.lockState;
             switch (lockState) {
               case LOCK_ACQUIRED:
                 break;
               case LOCK_EVENT_WAIT:
-                LOG.info("LOCK_EVENT_WAIT rollback {}", proc);
+                LOG.info("LOCK_EVENT_WAIT rollback {}", 
lockStateResult.procedure);
                 rootProcStack.unsetRollback();
+                lockEventWaitProcedure = lockStateResult.procedure;
                 break;
               case LOCK_YIELD_WAIT:
                 rootProcStack.unsetRollback();
-                scheduler.yield(proc);
+                scheduler.yield(lockStateResult.procedure);
                 break;
               default:
                 throw new UnsupportedOperationException();
             }
           } else {
             if (!proc.wasExecuted()) {
-              switch (executeRollback(proc)) {
+              lockState = executeRollback(proc);
+              switch (lockState) {
                 case LOCK_ACQUIRED:
                   break;
                 case LOCK_EVENT_WAIT:
                   LOG.info("LOCK_EVENT_WAIT can't rollback child running for 
{}", proc);
+                  lockEventWaitProcedure = proc;
                   break;
                 case LOCK_YIELD_WAIT:
                   scheduler.yield(proc);
@@ -357,19 +371,25 @@ public class ProcedureExecutor<Env> {
           }
           break;
         }
-        lockState = acquireLock(proc);
-        switch (lockState) {
-          case LOCK_ACQUIRED:
-            executeProcedure(rootProcStack, proc);
-            break;
-          case LOCK_YIELD_WAIT:
-          case LOCK_EVENT_WAIT:
-            LOG.info("{} lockstate is {}", proc, lockState);
-            break;
-          default:
-            throw new UnsupportedOperationException();
+        try {
+          lockState = acquireLock(proc);
+          switch (lockState) {
+            case LOCK_ACQUIRED:
+              executeProcedure(rootProcStack, proc);
+              break;
+            case LOCK_YIELD_WAIT:
+            case LOCK_EVENT_WAIT:
+              LOG.info("{} lockstate is {}", proc, lockState);
+              if (lockState == ProcedureLockState.LOCK_EVENT_WAIT) {
+                lockEventWaitProcedure = proc;
+              }
+              break;
+            default:
+              throw new UnsupportedOperationException();
+          }
+        } finally {
+          rootProcStack.release();
         }
-        rootProcStack.release();
 
         if (proc.isSuccess()) {
           // update metrics on finishing the procedure
@@ -387,9 +407,9 @@ public class ProcedureExecutor<Env> {
     } finally {
       // Only after procedure has completed execution can it be allowed to be 
rescheduled to prevent
       // data races
-      if (Objects.equals(lockState, ProcedureLockState.LOCK_EVENT_WAIT)) {
-        LOG.info("procedureId {} wait for lock.", proc.getProcId());
-        ((ConfigNodeProcedureEnv) 
this.environment).getNodeLock().waitProcedure(proc);
+      if (lockEventWaitProcedure != null) {
+        LOG.info("procedureId {} wait for lock.", 
lockEventWaitProcedure.getProcId());
+        lockEventWaitProcedure.waitForLock(this.environment);
       }
     }
   }
@@ -404,6 +424,7 @@ public class ProcedureExecutor<Env> {
     if (proc.getState() != ProcedureState.RUNNABLE) {
       LOG.error(
           "The executing procedure should in RUNNABLE state, but it's not. 
Procedure is {}", proc);
+      releaseLock(proc, false);
       return;
     }
     boolean reExecute;
@@ -570,8 +591,8 @@ public class ProcedureExecutor<Env> {
    * @param procedureStack root procedure stack
    * @return lock state
    */
-  private ProcedureLockState executeRootStackRollback(
-      Long rootProcId, RootProcedureStack procedureStack) {
+  private LockStateResult<Env> executeRootStackRollback(
+      Long rootProcId, RootProcedureStack<Env> procedureStack) {
     Procedure<Env> rootProcedure = procedures.get(rootProcId);
     ProcedureException exception = rootProcedure.getException();
     if (exception == null) {
@@ -590,7 +611,7 @@ public class ProcedureExecutor<Env> {
       }
       ProcedureLockState lockState = acquireLock(procedure);
       if (lockState != ProcedureLockState.LOCK_ACQUIRED) {
-        return lockState;
+        return new LockStateResult<>(lockState, procedure);
       }
       lockState = executeRollback(procedure);
       releaseLock(procedure, false);
@@ -598,11 +619,11 @@ public class ProcedureExecutor<Env> {
       boolean abortRollback = lockState != ProcedureLockState.LOCK_ACQUIRED;
       abortRollback |= !isRunning() || !store.isRunning();
       if (abortRollback) {
-        return lockState;
+        return new LockStateResult<>(lockState, procedure);
       }
 
       if (!procedure.isFinished() && 
procedure.isYieldAfterExecution(this.environment)) {
-        return ProcedureLockState.LOCK_YIELD_WAIT;
+        return new LockStateResult<>(ProcedureLockState.LOCK_YIELD_WAIT, 
procedure);
       }
 
       if (procedure != rootProcedure) {
@@ -612,7 +633,7 @@ public class ProcedureExecutor<Env> {
 
     LOG.info("Rolled back {}, time duration is {}", rootProcedure, 
rootProcedure.elapsedTime());
     rootProcedureCleanup(rootProcedure);
-    return ProcedureLockState.LOCK_ACQUIRED;
+    return new LockStateResult<>(ProcedureLockState.LOCK_ACQUIRED, 
rootProcedure);
   }
 
   private ProcedureLockState acquireLock(Procedure<Env> proc) {
@@ -728,16 +749,33 @@ public class ProcedureExecutor<Env> {
             Thread.sleep(1000);
             continue;
           }
-          this.activeProcedure.set(procedure);
-          activeExecutorCount.incrementAndGet();
-          startTime.set(System.currentTimeMillis());
-          executeProcedure(procedure);
-          activeExecutorCount.decrementAndGet();
-          LOG.trace(
-              "Halt pid={}, activeCount={}", procedure.getProcId(), 
activeExecutorCount.get());
-          this.activeProcedure.set(null);
-          lastUpdated = System.currentTimeMillis();
-          startTime.set(lastUpdated);
+          boolean executionAcquired = false;
+          while (isRunning() && !(executionAcquired = 
procedure.tryAcquireExecution())) {
+            Thread.sleep(10);
+          }
+          if (!executionAcquired) {
+            continue;
+          }
+          try {
+            this.activeProcedure.set(procedure);
+            activeExecutorCount.incrementAndGet();
+            startTime.set(System.currentTimeMillis());
+            try {
+              executeProcedure(procedure);
+            } finally {
+              procedure.releaseExecution();
+              activeExecutorCount.decrementAndGet();
+              LOG.trace(
+                  "Halt pid={}, activeCount={}", procedure.getProcId(), 
activeExecutorCount.get());
+              this.activeProcedure.set(null);
+              lastUpdated = System.currentTimeMillis();
+              startTime.set(lastUpdated);
+            }
+          } catch (Exception e) {
+            LOG.warn(
+                "Exception happened when worker {} execute procedure {}", 
getName(), procedure, e);
+            throw e;
+          }
         }
 
       } catch (Exception e) {
@@ -748,6 +786,7 @@ public class ProcedureExecutor<Env> {
               this.activeProcedure.get(),
               e);
         }
+        this.activeProcedure.set(null);
       } finally {
         LOG.info("Procedure worker {} terminated.", getName());
       }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java
index 5aaf9a623f5..c998ad903c2 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java
@@ -37,11 +37,13 @@ public class TimeoutExecutorThread<Env> extends 
StoppableThread {
   }
 
   public void add(Procedure<Env> procedure) {
-    queue.add(new ProcedureDelayContainer<>(procedure));
+    ProcedureDelayContainer<Env> delayTask = new 
ProcedureDelayContainer<>(procedure);
+    queue.remove(delayTask);
+    queue.add(delayTask);
   }
 
   public boolean remove(Procedure<Env> procedure) {
-    return queue.remove(new ProcedureDelayContainer<>(procedure));
+    return queue.remove(new ProcedureDelayContainer<>(procedure)) || 
procedure.isFinished();
   }
 
   private ProcedureDelayContainer<Env> takeQuietly() {
@@ -62,10 +64,15 @@ public class TimeoutExecutorThread<Env> extends 
StoppableThread {
       }
       Procedure<Env> procedure = delayTask.getProcedure();
       if (procedure instanceof InternalProcedure) {
+        if (procedure.isFinished()) {
+          continue;
+        }
         InternalProcedure internal = (InternalProcedure) procedure;
         internal.periodicExecute(executor.getEnvironment());
-        procedure.updateTimestamp();
-        queue.add(delayTask);
+        if (!procedure.isFinished()) {
+          procedure.updateTimestamp();
+          queue.add(delayTask);
+        }
       } else {
         if (procedure.setTimeoutFailure(executor.getEnvironment())) {
           long rootProcId = executor.getRootProcedureId(procedure);
@@ -92,6 +99,23 @@ public class TimeoutExecutorThread<Env> extends 
StoppableThread {
       return procedure;
     }
 
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (!(o instanceof ProcedureDelayContainer)) {
+        return false;
+      }
+      ProcedureDelayContainer<?> that = (ProcedureDelayContainer<?>) o;
+      return procedure == that.procedure;
+    }
+
+    @Override
+    public int hashCode() {
+      return System.identityHashCode(procedure);
+    }
+
     @Override
     public long getDelay(TimeUnit unit) {
       long delay = procedure.getTimeoutTimestamp() - 
System.currentTimeMillis();
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/StateMachineProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/StateMachineProcedure.java
index f698735ee95..400da565318 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/StateMachineProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/StateMachineProcedure.java
@@ -195,7 +195,8 @@ public abstract class StateMachineProcedure<Env, TState> 
extends Procedure<Env>
             nextState);
       }
     }
-    if (getStateId(getCurrentState()) == stateToBeAdded) {
+    final TState currentState = getCurrentState();
+    if (currentState != null && getStateId(currentState) == stateToBeAdded) {
       cycles++;
     } else {
       cycles = 0;
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java
index af7f968e8a5..76df7765c9a 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java
@@ -295,7 +295,7 @@ public class CreateCQProcedure extends 
AbstractNodeProcedure<CreateCQState> {
     }
     CreateCQProcedure that = (CreateCQProcedure) o;
     return getProcId() == that.getProcId()
-        && getCurrentState().equals(that.getCurrentState())
+        && Objects.equals(getCurrentState(), that.getCurrentState())
         && getCycles() == that.getCycles()
         && isGeneratedByPipe == that.isGeneratedByPipe
         && firstExecutionTime == that.firstExecutionTime
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AbstractNodeProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AbstractNodeProcedure.java
index b1410279173..6cade537f1f 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AbstractNodeProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AbstractNodeProcedure.java
@@ -56,6 +56,18 @@ public abstract class AbstractNodeProcedure<TState>
     }
   }
 
+  @Override
+  protected void waitForLock(ConfigNodeProcedureEnv configNodeProcedureEnv) {
+    configNodeProcedureEnv.getSchedulerLock().lock();
+    try {
+      configNodeProcedureEnv
+          .getNodeLock()
+          .waitProcedure(this, configNodeProcedureEnv.getScheduler());
+    } finally {
+      configNodeProcedureEnv.getSchedulerLock().unlock();
+    }
+  }
+
   @Override
   protected void releaseLock(ConfigNodeProcedureEnv configNodeProcedureEnv) {
     configNodeProcedureEnv.getSchedulerLock().lock();
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
index 2fa0091568f..592f5291c13 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
@@ -133,12 +133,7 @@ public abstract class AbstractOperatePipeProcedureV2
           LOGGER.warn("ProcedureId {}: LOCK_EVENT_WAIT. Without acquiring pipe 
lock.", getProcId());
         } else {
           LOGGER.debug("ProcedureId {}: LOCK_EVENT_WAIT. Pipe lock will be 
released.", getProcId());
-          configNodeProcedureEnv
-              .getConfigManager()
-              .getPipeManager()
-              .getPipeTaskCoordinator()
-              .unlock();
-          pipeTaskInfo = null;
+          releasePipeTaskCoordinatorLock(configNodeProcedureEnv);
         }
         break;
       default:
@@ -152,12 +147,7 @@ public abstract class AbstractOperatePipeProcedureV2
               "ProcedureId {}: {}. Invalid lock state. Pipe lock will be 
released.",
               getProcId(),
               procedureLockState);
-          configNodeProcedureEnv
-              .getConfigManager()
-              .getPipeManager()
-              .getPipeTaskCoordinator()
-              .unlock();
-          pipeTaskInfo = null;
+          releasePipeTaskCoordinatorLock(configNodeProcedureEnv);
         }
         break;
     }
@@ -181,11 +171,16 @@ public abstract class AbstractOperatePipeProcedureV2
       }
       PipeProcedureMetrics.getInstance()
           .updateTimer(this.getOperation().getName(), this.elapsedTime());
-      
configNodeProcedureEnv.getConfigManager().getPipeManager().getPipeTaskCoordinator().unlock();
-      pipeTaskInfo = null;
+      releasePipeTaskCoordinatorLock(configNodeProcedureEnv);
     }
   }
 
+  private void releasePipeTaskCoordinatorLock(ConfigNodeProcedureEnv 
configNodeProcedureEnv) {
+    // Clear before releasing the semaphore to avoid clobbering a re-scheduled 
execution's marker.
+    pipeTaskInfo = null;
+    
configNodeProcedureEnv.getConfigManager().getPipeManager().getPipeTaskCoordinator().unlock();
+  }
+
   protected abstract PipeTaskOperation getOperation();
 
   /**
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java
index f4fa738428d..22fe0ef40f2 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java
@@ -314,7 +314,7 @@ public class CreatePipePluginProcedure extends 
AbstractNodeProcedure<CreatePipeP
     if (that instanceof CreatePipePluginProcedure) {
       CreatePipePluginProcedure thatProcedure = (CreatePipePluginProcedure) 
that;
       return thatProcedure.getProcId() == getProcId()
-          && thatProcedure.getCurrentState().equals(getCurrentState())
+          && Objects.equals(thatProcedure.getCurrentState(), getCurrentState())
           && thatProcedure.getCycles() == getCycles()
           && thatProcedure.pipePluginMeta.equals(pipePluginMeta);
     }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
index efbe1ee6ccd..5deb0ee4bf3 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
@@ -275,7 +275,7 @@ public class DropPipePluginProcedure extends 
AbstractNodeProcedure<DropPipePlugi
     if (that instanceof DropPipePluginProcedure) {
       final DropPipePluginProcedure thatProcedure = (DropPipePluginProcedure) 
that;
       return thatProcedure.getProcId() == getProcId()
-          && thatProcedure.getCurrentState().equals(this.getCurrentState())
+          && Objects.equals(thatProcedure.getCurrentState(), 
this.getCurrentState())
           && thatProcedure.getCycles() == this.getCycles()
           && (thatProcedure.pluginName).equals(pluginName);
     }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
index 300d04ee11b..bcc4d0a7065 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
@@ -183,7 +183,7 @@ public class PipeHandleLeaderChangeProcedure extends 
AbstractOperatePipeProcedur
     }
     PipeHandleLeaderChangeProcedure that = (PipeHandleLeaderChangeProcedure) o;
     return getProcId() == that.getProcId()
-        && getCurrentState().equals(that.getCurrentState())
+        && Objects.equals(getCurrentState(), that.getCurrentState())
         && getCycles() == that.getCycles()
         && this.regionGroupToOldAndNewLeaderPairMap.equals(
             that.regionGroupToOldAndNewLeaderPairMap);
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
index 3089b3dd072..c7e85f7f2bc 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
@@ -186,7 +186,7 @@ public class PipeHandleMetaChangeProcedure extends 
AbstractOperatePipeProcedureV
     }
     PipeHandleMetaChangeProcedure that = (PipeHandleMetaChangeProcedure) o;
     return getProcId() == that.getProcId()
-        && getCurrentState().equals(that.getCurrentState())
+        && Objects.equals(getCurrentState(), that.getCurrentState())
         && getCycles() == that.getCycles()
         && needWriteConsensusOnConfigNodes == 
that.needWriteConsensusOnConfigNodes
         && needPushPipeMetaToDataNodes == that.needPushPipeMetaToDataNodes;
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java
index 0c7042caf3f..0f9a8bc0d8c 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java
@@ -165,7 +165,7 @@ public class DropPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
     }
     DropPipeProcedureV2 that = (DropPipeProcedureV2) o;
     return getProcId() == that.getProcId()
-        && getCurrentState().equals(that.getCurrentState())
+        && Objects.equals(getCurrentState(), that.getCurrentState())
         && getCycles() == that.getCycles()
         && pipeName.equals(that.pipeName);
   }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
index fe36137b35f..29cb2b51ab4 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
@@ -184,7 +184,7 @@ public class StartPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
     }
     StartPipeProcedureV2 that = (StartPipeProcedureV2) o;
     return getProcId() == that.getProcId()
-        && getCurrentState().equals(that.getCurrentState())
+        && Objects.equals(getCurrentState(), that.getCurrentState())
         && getCycles() == that.getCycles()
         && pipeName.equals(that.pipeName);
   }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
index b2e1a584ec5..44ca8a7bd5c 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
@@ -187,7 +187,7 @@ public class StopPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
     }
     StopPipeProcedureV2 that = (StopPipeProcedureV2) o;
     return getProcId() == that.getProcId()
-        && getCurrentState().equals(that.getCurrentState())
+        && Objects.equals(getCurrentState(), that.getCurrentState())
         && getCycles() == that.getCycles()
         && isStoppedByRuntimeExceptionBeforeStop == 
that.isStoppedByRuntimeExceptionBeforeStop
         && pipeName.equals(that.pipeName);
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java
index 7288090e815..b6ad21128af 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java
@@ -303,7 +303,7 @@ public class DeleteDatabaseProcedure
     if (that instanceof DeleteDatabaseProcedure) {
       DeleteDatabaseProcedure thatProc = (DeleteDatabaseProcedure) that;
       return thatProc.getProcId() == this.getProcId()
-          && thatProc.getCurrentState().equals(this.getCurrentState())
+          && Objects.equals(thatProc.getCurrentState(), this.getCurrentState())
           && thatProc.getCycles() == this.getCycles()
           && thatProc.isGeneratedByPipe == this.isGeneratedByPipe
           && 
thatProc.deleteDatabaseSchema.equals(this.getDeleteDatabaseSchema());
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteLogicalViewProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteLogicalViewProcedure.java
index aaf7bc1a7bf..fd5d0d52f22 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteLogicalViewProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteLogicalViewProcedure.java
@@ -303,7 +303,7 @@ public class DeleteLogicalViewProcedure
     if (o == null || getClass() != o.getClass()) return false;
     DeleteLogicalViewProcedure that = (DeleteLogicalViewProcedure) o;
     return this.getProcId() == that.getProcId()
-        && this.getCurrentState().equals(that.getCurrentState())
+        && Objects.equals(this.getCurrentState(), that.getCurrentState())
         && this.getCycles() == that.getCycles()
         && isGeneratedByPipe == that.isGeneratedByPipe
         && patternTree.equals(that.patternTree);
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteTimeSeriesProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteTimeSeriesProcedure.java
index 74f21abc72d..0a223de1525 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteTimeSeriesProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteTimeSeriesProcedure.java
@@ -367,7 +367,7 @@ public class DeleteTimeSeriesProcedure
     if (o == null || getClass() != o.getClass()) return false;
     final DeleteTimeSeriesProcedure that = (DeleteTimeSeriesProcedure) o;
     return this.getProcId() == that.getProcId()
-        && this.getCurrentState().equals(that.getCurrentState())
+        && Objects.equals(this.getCurrentState(), that.getCurrentState())
         && this.getCycles() == getCycles()
         && this.isGeneratedByPipe == that.isGeneratedByPipe
         && this.patternTree.equals(that.patternTree);
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/trigger/CreateTriggerProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/trigger/CreateTriggerProcedure.java
index 74f4112075a..548ba1ee22a 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/trigger/CreateTriggerProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/trigger/CreateTriggerProcedure.java
@@ -304,7 +304,7 @@ public class CreateTriggerProcedure extends 
AbstractNodeProcedure<CreateTriggerS
     if (that instanceof CreateTriggerProcedure) {
       CreateTriggerProcedure thatProc = (CreateTriggerProcedure) that;
       return thatProc.getProcId() == this.getProcId()
-          && thatProc.getCurrentState().equals(this.getCurrentState())
+          && Objects.equals(thatProc.getCurrentState(), this.getCurrentState())
           && thatProc.getCycles() == this.getCycles()
           && thatProc.isGeneratedByPipe == this.isGeneratedByPipe
           && thatProc.triggerInformation.equals(this.triggerInformation);
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/trigger/DropTriggerProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/trigger/DropTriggerProcedure.java
index 19bfcdc30d2..958000d2b3f 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/trigger/DropTriggerProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/trigger/DropTriggerProcedure.java
@@ -178,7 +178,7 @@ public class DropTriggerProcedure extends 
AbstractNodeProcedure<DropTriggerState
     if (that instanceof DropTriggerProcedure) {
       DropTriggerProcedure thatProc = (DropTriggerProcedure) that;
       return thatProc.getProcId() == this.getProcId()
-          && thatProc.getCurrentState().equals(this.getCurrentState())
+          && Objects.equals(thatProc.getCurrentState(), this.getCurrentState())
           && thatProc.getCycles() == this.getCycles()
           && thatProc.isGeneratedByPipe == this.isGeneratedByPipe
           && (thatProc.triggerName).equals(this.triggerName);
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/scheduler/LockQueue.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/scheduler/LockQueue.java
index 832e339c0ae..e2f5935a909 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/scheduler/LockQueue.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/scheduler/LockQueue.java
@@ -45,7 +45,15 @@ public class LockQueue {
     return true;
   }
 
-  public void waitProcedure(Procedure<?> procedure) {
+  public void waitProcedure(Procedure<?> procedure, ProcedureScheduler 
procedureScheduler) {
+    if (lockOwnerProcedure == null) {
+      procedureScheduler.addFront(procedure);
+      return;
+    }
+    if (deque.stream()
+        .anyMatch(waitingProcedure -> waitingProcedure.getProcId() == 
procedure.getProcId())) {
+      return;
+    }
     deque.addLast(procedure);
   }
 
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/scheduler/SimpleProcedureScheduler.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/scheduler/SimpleProcedureScheduler.java
index 3cd5ceacf4b..94b6f311930 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/scheduler/SimpleProcedureScheduler.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/scheduler/SimpleProcedureScheduler.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.confignode.procedure.scheduler;
 import org.apache.iotdb.confignode.procedure.Procedure;
 
 import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.locks.ReentrantLock;
 
 /** Simple scheduler for procedures */
 public class SimpleProcedureScheduler extends AbstractProcedureScheduler {
@@ -48,6 +49,7 @@ public class SimpleProcedureScheduler extends 
AbstractProcedureScheduler {
     schedLock();
     try {
       runnables.clear();
+      waitings.clear();
     } finally {
       schedUnlock();
     }
@@ -68,12 +70,37 @@ public class SimpleProcedureScheduler extends 
AbstractProcedureScheduler {
     return runnables.size();
   }
 
-  public void addWaiting(Procedure proc) {
-    waitings.add(proc);
+  public void waitProcedure(Procedure proc, ReentrantLock lock) {
+    boolean signal = false;
+    schedLock();
+    try {
+      if (lock.isLocked()) {
+        waitings.add(proc);
+      } else {
+        runnables.addFirst(proc);
+        signal = true;
+      }
+    } finally {
+      schedUnlock();
+    }
+    if (signal) {
+      signalAll();
+    }
   }
 
-  public void releaseWaiting() {
-    runnables.addAll(waitings);
-    waitings.clear();
+  public void releaseWaiting(ReentrantLock lock) {
+    boolean signal;
+    schedLock();
+    try {
+      lock.unlock();
+      signal = !waitings.isEmpty();
+      runnables.addAll(waitings);
+      waitings.clear();
+    } finally {
+      schedUnlock();
+    }
+    if (signal) {
+      signalAll();
+    }
   }
 }
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLockTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLockTest.java
new file mode 100644
index 00000000000..74d5d821d76
--- /dev/null
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLockTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.pipe.coordinator.task;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class PipeTaskCoordinatorLockTest {
+
+  @Test
+  public void testInterruptedThreadDoesNotAcquireWithoutPermit() throws 
Exception {
+    PipeTaskCoordinatorLock lock = new PipeTaskCoordinatorLock();
+    lock.lock();
+
+    CountDownLatch waiting = new CountDownLatch(1);
+    AtomicBoolean acquired = new AtomicBoolean(false);
+    Thread thread =
+        new Thread(
+            () -> {
+              Thread.currentThread().interrupt();
+              waiting.countDown();
+              lock.lock();
+              acquired.set(true);
+              lock.unlock();
+            });
+    thread.start();
+
+    Assert.assertTrue(waiting.await(3, TimeUnit.SECONDS));
+    TimeUnit.MILLISECONDS.sleep(200);
+    Assert.assertFalse(acquired.get());
+
+    lock.unlock();
+    thread.join(TimeUnit.SECONDS.toMillis(3));
+
+    Assert.assertFalse(thread.isAlive());
+    Assert.assertTrue(acquired.get());
+    Assert.assertFalse(lock.isLocked());
+  }
+}
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestLockRegime.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestLockRegime.java
index 500a51e9e3d..967611ca8ee 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestLockRegime.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestLockRegime.java
@@ -19,7 +19,10 @@
 
 package org.apache.iotdb.confignode.procedure;
 
+import org.apache.iotdb.confignode.procedure.entity.NoopProcedure;
 import org.apache.iotdb.confignode.procedure.entity.SimpleLockProcedure;
+import org.apache.iotdb.confignode.procedure.scheduler.LockQueue;
+import 
org.apache.iotdb.confignode.procedure.scheduler.SimpleProcedureScheduler;
 import org.apache.iotdb.confignode.procedure.util.ProcedureTestUtil;
 
 import org.junit.Assert;
@@ -43,4 +46,24 @@ public class TestLockRegime extends TestProcedureBase {
         this.procExecutor, 
procIdList.stream().mapToLong(Long::longValue).toArray());
     Assert.assertEquals(env.lockAcquireSeq.toString(), 
env.executeSeq.toString());
   }
+
+  @Test
+  public void testLockQueueDoesNotWakeDuplicateProcedure() {
+    LockQueue lockQueue = new LockQueue();
+    SimpleProcedureScheduler scheduler = new SimpleProcedureScheduler();
+    scheduler.start();
+
+    NoopProcedure lockOwner = new NoopProcedure();
+    lockOwner.setProcId(0);
+    Assert.assertTrue(lockQueue.tryLock(lockOwner));
+
+    NoopProcedure procedure = new NoopProcedure();
+    procedure.setProcId(1);
+    lockQueue.waitProcedure(procedure, scheduler);
+    lockQueue.waitProcedure(procedure, scheduler);
+
+    Assert.assertEquals(1, lockQueue.wakeWaitingProcedures(scheduler));
+    Assert.assertEquals(1, scheduler.size());
+    scheduler.stop();
+  }
 }
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestProcedureExecutor.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestProcedureExecutor.java
index dce7f2ba5dc..ba5f635507a 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestProcedureExecutor.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestProcedureExecutor.java
@@ -23,11 +23,14 @@ import 
org.apache.iotdb.confignode.procedure.entity.IncProcedure;
 import org.apache.iotdb.confignode.procedure.entity.NoopProcedure;
 import org.apache.iotdb.confignode.procedure.entity.StuckProcedure;
 import org.apache.iotdb.confignode.procedure.env.TestProcEnv;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
+import org.apache.iotdb.confignode.procedure.state.ProcedureState;
 import org.apache.iotdb.confignode.procedure.util.ProcedureTestUtil;
 
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.IOException;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -96,6 +99,37 @@ public class TestProcedureExecutor extends TestProcedureBase 
{
     ProcedureTestUtil.waitForProcedure(procExecutor, busyProcId2);
   }
 
+  @Test
+  public void testDuplicatedSchedulingDoesNotExecuteConcurrently() throws 
InterruptedException {
+    BlockingProcedure blockingProcedure = new BlockingProcedure();
+    long procId = procExecutor.submitProcedure(blockingProcedure);
+
+    Assert.assertTrue(blockingProcedure.awaitExecution(30, TimeUnit.SECONDS));
+
+    procExecutor.getScheduler().addFront(blockingProcedure);
+    boolean duplicated = blockingProcedure.awaitExecution(3, TimeUnit.SECONDS);
+
+    blockingProcedure.releaseExecutions(duplicated ? 2 : 1);
+    ProcedureTestUtil.waitForProcedure(procExecutor, procId);
+
+    Assert.assertFalse(duplicated);
+    Assert.assertEquals(1, blockingProcedure.getExecutionCount());
+  }
+
+  @Test
+  public void testInternalProcedureCanBeDeduplicatedAndRemoved() throws 
InterruptedException {
+    CompletingInternalProcedure internalProcedure = new 
CompletingInternalProcedure();
+
+    procExecutor.addInternalProcedure(internalProcedure);
+    procExecutor.addInternalProcedure(internalProcedure);
+
+    Assert.assertTrue(internalProcedure.awaitExecution(30, TimeUnit.SECONDS));
+    Assert.assertFalse(internalProcedure.awaitExecution(300, 
TimeUnit.MILLISECONDS));
+    Assert.assertEquals(1, internalProcedure.getExecutionCount());
+
+    Assert.assertTrue(procExecutor.removeInternalProcedure(internalProcedure));
+  }
+
   private int waitThreadCount(final int expectedThreads) {
     long startTime = System.currentTimeMillis();
     while (procExecutor.isRunning()
@@ -107,4 +141,62 @@ public class TestProcedureExecutor extends 
TestProcedureBase {
     }
     return procExecutor.getWorkerThreadCount();
   }
+
+  private static class BlockingProcedure extends Procedure<TestProcEnv> {
+
+    private final Semaphore entered = new Semaphore(0);
+    private final Semaphore finish = new Semaphore(0);
+    private final AtomicInteger executionCount = new AtomicInteger();
+
+    @Override
+    protected Procedure<TestProcEnv>[] execute(TestProcEnv env) throws 
InterruptedException {
+      executionCount.incrementAndGet();
+      entered.release();
+      finish.acquire();
+      return null;
+    }
+
+    @Override
+    protected void rollback(TestProcEnv env)
+        throws IOException, InterruptedException, ProcedureException {
+      // No state to roll back.
+    }
+
+    private boolean awaitExecution(long timeout, TimeUnit unit) throws 
InterruptedException {
+      return entered.tryAcquire(timeout, unit);
+    }
+
+    private void releaseExecutions(int permits) {
+      finish.release(permits);
+    }
+
+    private int getExecutionCount() {
+      return executionCount.get();
+    }
+  }
+
+  private static class CompletingInternalProcedure extends 
InternalProcedure<TestProcEnv> {
+
+    private final Semaphore entered = new Semaphore(0);
+    private final AtomicInteger executionCount = new AtomicInteger();
+
+    private CompletingInternalProcedure() {
+      super(0);
+    }
+
+    @Override
+    protected void periodicExecute(TestProcEnv env) {
+      executionCount.incrementAndGet();
+      entered.release();
+      setState(ProcedureState.SUCCESS);
+    }
+
+    private boolean awaitExecution(long timeout, TimeUnit unit) throws 
InterruptedException {
+      return entered.tryAcquire(timeout, unit);
+    }
+
+    private int getExecutionCount() {
+      return executionCount.get();
+    }
+  }
 }
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/SimpleLockProcedure.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/SimpleLockProcedure.java
index ce9fea39d55..42badd70079 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/SimpleLockProcedure.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/SimpleLockProcedure.java
@@ -53,18 +53,21 @@ public class SimpleLockProcedure extends 
Procedure<TestProcEnv> {
 
       return ProcedureLockState.LOCK_ACQUIRED;
     }
-    SimpleProcedureScheduler scheduler = (SimpleProcedureScheduler) 
testProcEnv.getScheduler();
-    scheduler.addWaiting(this);
     System.out.println(procName + " wait for lock.");
     return ProcedureLockState.LOCK_EVENT_WAIT;
   }
 
+  @Override
+  protected void waitForLock(TestProcEnv testProcEnv) {
+    SimpleProcedureScheduler scheduler = (SimpleProcedureScheduler) 
testProcEnv.getScheduler();
+    scheduler.waitProcedure(this, testProcEnv.getEnvLock());
+  }
+
   @Override
   protected void releaseLock(TestProcEnv testProcEnv) {
     System.out.println(procName + " release lock.");
-    testProcEnv.getEnvLock().unlock();
     SimpleProcedureScheduler scheduler = (SimpleProcedureScheduler) 
testProcEnv.getScheduler();
-    scheduler.releaseWaiting();
+    scheduler.releaseWaiting(testProcEnv.getEnvLock());
   }
 
   @Override
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedureTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedureTest.java
index 75c0963a27f..b2ec615fbcd 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedureTest.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedureTest.java
@@ -106,4 +106,51 @@ public class PipeHandleLeaderChangeProcedureTest {
       fail();
     }
   }
+
+  @Test
+  public void completedProcedureEqualsTest() {
+    Map<TConsensusGroupId, Pair<Integer, Integer>> leaderMap = new HashMap<>();
+    leaderMap.put(new TConsensusGroupId(TConsensusGroupType.DataRegion, 1), 
new Pair<>(1, 2));
+
+    try {
+      PipeHandleLeaderChangeProcedure proc = 
deserializeCompletedProcedure(leaderMap);
+      PipeHandleLeaderChangeProcedure proc2 = 
deserializeCompletedProcedure(leaderMap);
+
+      assertEquals(proc, proc2);
+      assertEquals(proc.hashCode(), proc2.hashCode());
+    } catch (Exception e) {
+      fail();
+    }
+  }
+
+  private PipeHandleLeaderChangeProcedure deserializeCompletedProcedure(
+      Map<TConsensusGroupId, Pair<Integer, Integer>> leaderMap) throws 
Exception {
+    PublicBAOS byteArrayOutputStream = new PublicBAOS();
+    DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream);
+
+    
outputStream.writeShort(ProcedureType.PIPE_HANDLE_LEADER_CHANGE_PROCEDURE.getTypeCode());
+    outputStream.writeLong(Procedure.NO_PROC_ID);
+    outputStream.writeInt(ProcedureState.SUCCESS.ordinal());
+    outputStream.writeLong(0L);
+    outputStream.writeLong(0L);
+    outputStream.writeLong(Procedure.NO_PROC_ID);
+    outputStream.writeLong(Procedure.NO_TIMEOUT);
+    outputStream.writeInt(-1);
+    outputStream.write((byte) 0);
+    outputStream.writeInt(-1);
+    outputStream.write((byte) 0);
+    outputStream.writeInt(1);
+    outputStream.writeInt(Integer.MIN_VALUE);
+    outputStream.write((byte) 0);
+    outputStream.writeInt(leaderMap.size());
+    for (Map.Entry<TConsensusGroupId, Pair<Integer, Integer>> entry : 
leaderMap.entrySet()) {
+      outputStream.writeInt(entry.getKey().getId());
+      outputStream.writeInt(entry.getValue().getLeft());
+      outputStream.writeInt(entry.getValue().getRight());
+    }
+
+    ByteBuffer buffer =
+        ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size());
+    return (PipeHandleLeaderChangeProcedure) 
ProcedureFactory.getInstance().create(buffer);
+  }
 }

Reply via email to