This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 28c4e020bc2 [To dev/1.3] Fix the deadlock at ConfigNode 
PipeTaskCoordinatorLock (#17233) (#17424)
28c4e020bc2 is described below

commit 28c4e020bc25ab4af1bd7522c1d20ac383724575
Author: Caideyipi <[email protected]>
AuthorDate: Thu Apr 2 16:35:36 2026 +0800

    [To dev/1.3] Fix the deadlock at ConfigNode PipeTaskCoordinatorLock 
(#17233) (#17424)
    
    * fix
    
    * Update PipeTaskCoordinatorLock.java
---
 .../pipe/coordinator/task/PipeTaskCoordinator.java    | 14 ++------------
 .../coordinator/task/PipeTaskCoordinatorLock.java     | 19 ++++++++++++-------
 .../manager/subscription/SubscriptionCoordinator.java | 10 ++--------
 3 files changed, 16 insertions(+), 27 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
index f48c64c3ea2..ee1ccd2a8fb 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
@@ -82,19 +82,9 @@ public class PipeTaskCoordinator {
   /**
    * Unlock the pipe task coordinator. Calling this method will clear the pipe 
task info holder,
    * which means that the holder will be null after calling this method.
-   *
-   * @return {@code true} if successfully unlocked, {@code false} if current 
thread is not holding
-   *     the lock.
    */
-  public boolean unlock() {
-    try {
-      pipeTaskCoordinatorLock.unlock();
-      return true;
-    } catch (IllegalMonitorStateException ignored) {
-      // This is thrown if unlock() is called without lock() called first.
-      LOGGER.warn("This thread is not holding the lock.");
-      return false;
-    }
+  public void unlock() {
+    pipeTaskCoordinatorLock.unlock();
   }
 
   public boolean isLocked() {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
index 12b92619004..b86c556f20d 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
@@ -22,24 +22,29 @@ package 
org.apache.iotdb.confignode.manager.pipe.coordinator.task;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantLock;
 
 /**
- * {@link PipeTaskCoordinatorLock} is a cross thread lock for pipe task 
coordinator. It is used to
+ * {@link PipeTaskCoordinatorLock} is a cross-thread lock for pipe task 
coordinator. It is used to
  * ensure that only one thread can execute the pipe task coordinator at the 
same time.
+ *
+ * <p>Uses {@link Semaphore} instead of {@link 
java.util.concurrent.locks.ReentrantLock} to support
+ * cross-thread acquire/release, which is required by the procedure recovery 
mechanism: locks may be
+ * acquired on the StateMachineUpdater thread during {@code restoreLock()} and 
released on a
+ * ProcedureCoreWorker thread after execution.
  */
 public class PipeTaskCoordinatorLock {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeTaskCoordinatorLock.class);
 
-  private final ReentrantLock lock = new ReentrantLock();
+  private final Semaphore semaphore = new Semaphore(1);
 
   public void lock() {
     LOGGER.debug(
         "PipeTaskCoordinator lock waiting for thread {}", 
Thread.currentThread().getName());
     try {
-      lock.lockInterruptibly();
+      semaphore.acquire();
       LOGGER.debug(
           "PipeTaskCoordinator lock acquired by thread {}", 
Thread.currentThread().getName());
     } catch (final InterruptedException e) {
@@ -54,7 +59,7 @@ public class PipeTaskCoordinatorLock {
     try {
       LOGGER.debug(
           "PipeTaskCoordinator lock waiting for thread {}", 
Thread.currentThread().getName());
-      if (lock.tryLock(10, TimeUnit.SECONDS)) {
+      if (semaphore.tryAcquire(10, TimeUnit.SECONDS)) {
         LOGGER.debug(
             "PipeTaskCoordinator lock acquired by thread {}", 
Thread.currentThread().getName());
         return true;
@@ -74,12 +79,12 @@ public class PipeTaskCoordinatorLock {
   }
 
   public void unlock() {
-    lock.unlock();
+    semaphore.release();
     LOGGER.debug(
         "PipeTaskCoordinator lock released by thread {}", 
Thread.currentThread().getName());
   }
 
   public boolean isLocked() {
-    return lock.isLocked();
+    return semaphore.availablePermits() == 0;
   }
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java
index 28c596de7ba..ac88f31f921 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java
@@ -105,14 +105,8 @@ public class SubscriptionCoordinator {
       subscriptionInfoHolder = null;
     }
 
-    try {
-      coordinatorLock.unlock();
-      return true;
-    } catch (IllegalMonitorStateException ignored) {
-      // This is thrown if unlock() is called without lock() called first.
-      LOGGER.warn("This thread is not holding the lock.");
-      return false;
-    }
+    coordinatorLock.unlock();
+    return true;
   }
 
   public boolean isLocked() {

Reply via email to