This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch deadlock-cp
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/deadlock-cp by this push:
     new c80536a2cf1 Update PipeTaskCoordinatorLock.java
c80536a2cf1 is described below

commit c80536a2cf1e00c6bbbc233b15849d8af65f4317
Author: Caideyipi <[email protected]>
AuthorDate: Thu Apr 2 14:25:38 2026 +0800

    Update PipeTaskCoordinatorLock.java
---
 .../coordinator/task/PipeTaskCoordinatorLock.java     | 19 ++++++++++++-------
 1 file changed, 12 insertions(+), 7 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
index 12b92619004..b86c556f20d 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
@@ -22,24 +22,29 @@ package 
org.apache.iotdb.confignode.manager.pipe.coordinator.task;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantLock;
 
 /**
- * {@link PipeTaskCoordinatorLock} is a cross thread lock for pipe task 
coordinator. It is used to
+ * {@link PipeTaskCoordinatorLock} is a cross-thread lock for pipe task 
coordinator. It is used to
  * ensure that only one thread can execute the pipe task coordinator at the 
same time.
+ *
+ * <p>Uses {@link Semaphore} instead of {@link 
java.util.concurrent.locks.ReentrantLock} to support
+ * cross-thread acquire/release, which is required by the procedure recovery 
mechanism: locks may be
+ * acquired on the StateMachineUpdater thread during {@code restoreLock()} and 
released on a
+ * ProcedureCoreWorker thread after execution.
  */
 public class PipeTaskCoordinatorLock {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeTaskCoordinatorLock.class);
 
-  private final ReentrantLock lock = new ReentrantLock();
+  private final Semaphore semaphore = new Semaphore(1);
 
   public void lock() {
     LOGGER.debug(
         "PipeTaskCoordinator lock waiting for thread {}", 
Thread.currentThread().getName());
     try {
-      lock.lockInterruptibly();
+      semaphore.acquire();
       LOGGER.debug(
           "PipeTaskCoordinator lock acquired by thread {}", 
Thread.currentThread().getName());
     } catch (final InterruptedException e) {
@@ -54,7 +59,7 @@ public class PipeTaskCoordinatorLock {
     try {
       LOGGER.debug(
           "PipeTaskCoordinator lock waiting for thread {}", 
Thread.currentThread().getName());
-      if (lock.tryLock(10, TimeUnit.SECONDS)) {
+      if (semaphore.tryAcquire(10, TimeUnit.SECONDS)) {
         LOGGER.debug(
             "PipeTaskCoordinator lock acquired by thread {}", 
Thread.currentThread().getName());
         return true;
@@ -74,12 +79,12 @@ public class PipeTaskCoordinatorLock {
   }
 
   public void unlock() {
-    lock.unlock();
+    semaphore.release();
     LOGGER.debug(
         "PipeTaskCoordinator lock released by thread {}", 
Thread.currentThread().getName());
   }
 
   public boolean isLocked() {
-    return lock.isLocked();
+    return semaphore.availablePermits() == 0;
   }
 }

Reply via email to