This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch rc/1.3.5
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 299180a4dcc72e94bd23411524452147a7d12287
Author: Zhenyu Luo <[email protected]>
AuthorDate: Wed Aug 6 09:43:51 2025 +0800

    [To dev/1.3] Pipe: Fix the stuck state caused by unfair lock in Sink start 
phase (#16100) (#16106)
    
    * Pipe: Fix the stuck state caused by unfair lock in Sink start phase
    
    * fix
    
    (cherry picked from commit 2df3c45cef51f90bd2a01586aacfa7cc52c6bee5)
---
 .../subtask/connector/PipeConnectorSubtask.java    |  7 +--
 .../connector/PipeConnectorSubtaskLifeCycle.java   |  7 ++-
 .../task/subtask/PipeAbstractConnectorSubtask.java | 23 +--------
 .../agent/task/subtask/PipeReportableSubtask.java  | 54 +++++++++++++++-------
 4 files changed, 48 insertions(+), 43 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java
index 5d8d622a74a..a5cf1246ee7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java
@@ -222,11 +222,8 @@ public class PipeConnectorSubtask extends 
PipeAbstractConnectorSubtask {
     // Try to remove the events as much as possible
     inputPendingQueue.discardEventsOfPipe(pipeNameToDrop, regionId);
 
-    highPriorityLockTaskCount.incrementAndGet();
     try {
-      synchronized (highPriorityLockTaskCount) {
-        highPriorityLockTaskCount.notifyAll();
-      }
+      increaseHighPriorityTaskCount();
 
       // synchronized to use the lastEvent & lastExceptionEvent
       synchronized (this) {
@@ -265,7 +262,7 @@ public class PipeConnectorSubtask extends 
PipeAbstractConnectorSubtask {
         }
       }
     } finally {
-      highPriorityLockTaskCount.decrementAndGet();
+      decreaseHighPriorityTaskCount();
     }
 
     if (outputPipeConnector instanceof IoTDBConnector) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtaskLifeCycle.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtaskLifeCycle.java
index 6e4a858a370..7ab13731c4d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtaskLifeCycle.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtaskLifeCycle.java
@@ -124,7 +124,12 @@ public class PipeConnectorSubtaskLifeCycle implements 
AutoCloseable {
     }
 
     if (runningTaskCount == 0) {
-      executor.start(subtask.getTaskID());
+      try {
+        subtask.increaseHighPriorityTaskCount();
+        executor.start(subtask.getTaskID());
+      } finally {
+        subtask.decreaseHighPriorityTaskCount();
+      }
     }
 
     runningTaskCount++;
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractConnectorSubtask.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractConnectorSubtask.java
index 427193c4f18..153df169237 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractConnectorSubtask.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractConnectorSubtask.java
@@ -170,14 +170,8 @@ public abstract class PipeAbstractConnectorSubtask extends 
PipeReportableSubtask
             MAX_RETRY_TIMES,
             e);
         try {
-          synchronized (highPriorityLockTaskCount) {
-            // The wait operation will release the highPriorityLockTaskCount 
lock, so there will be
-            // no deadlock.
-            if (highPriorityLockTaskCount.get() == 0) {
-              highPriorityLockTaskCount.wait(
-                  retry * 
PipeConfig.getInstance().getPipeConnectorRetryIntervalMs());
-            }
-          }
+          sleepIfNoHighPriorityTask(
+              retry * 
PipeConfig.getInstance().getPipeConnectorRetryIntervalMs());
         } catch (final InterruptedException interruptedException) {
           LOGGER.info(
               "Interrupted while sleeping, will retry to handshake with the 
target system.",
@@ -254,17 +248,4 @@ public abstract class PipeAbstractConnectorSubtask extends 
PipeReportableSubtask
       lastExceptionEvent = null;
     }
   }
-
-  private void preScheduleLowPriorityTask(int maxRetries) {
-    while (highPriorityLockTaskCount.get() != 0L && maxRetries-- > 0) {
-      try {
-        // Introduce a short delay to avoid CPU spinning
-        Thread.sleep(10);
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        LOGGER.warn("Interrupted while waiting for the high priority lock 
task.", e);
-        break;
-      }
-    }
-  }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
index 37916d8f25e..966ab18ed2a 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
@@ -90,14 +90,8 @@ public abstract class PipeReportableSubtask extends 
PipeSubtask {
           throwable.getMessage(),
           throwable);
       try {
-        synchronized (highPriorityLockTaskCount) {
-          // The wait operation will release the highPriorityLockTaskCount 
lock, so there will be
-          // no deadlock.
-          if (highPriorityLockTaskCount.get() == 0) {
-            highPriorityLockTaskCount.wait(
-                retryCount.get() * 
PipeConfig.getInstance().getPipeConnectorRetryIntervalMs());
-          }
-        }
+        sleepIfNoHighPriorityTask(
+            retryCount.get() * 
PipeConfig.getInstance().getPipeConnectorRetryIntervalMs());
       } catch (final InterruptedException e) {
         LOGGER.warn(
             "Interrupted when retrying to execute subtask {} (creation time: 
{}, simple class: {})",
@@ -164,14 +158,8 @@ public abstract class PipeReportableSubtask extends 
PipeSubtask {
         throwable.getMessage(),
         throwable);
     try {
-      synchronized (highPriorityLockTaskCount) {
-        // The wait operation will release the highPriorityLockTaskCount lock, 
so there will be
-        // no deadlock.
-        if (highPriorityLockTaskCount.get() == 0) {
-          highPriorityLockTaskCount.wait(
-              retryCount.get() * 
PipeConfig.getInstance().getPipeConnectorRetryIntervalMs());
-        }
-      }
+      sleepIfNoHighPriorityTask(
+          retryCount.get() * 
PipeConfig.getInstance().getPipeConnectorRetryIntervalMs());
     } catch (final InterruptedException e) {
       LOGGER.warn(
           "Interrupted when retrying to execute subtask {} (creation time: {}, 
simple class: {})",
@@ -183,4 +171,38 @@ public abstract class PipeReportableSubtask extends 
PipeSubtask {
 
     submitSelf();
   }
+
+  protected void preScheduleLowPriorityTask(int maxRetries) {
+    while (highPriorityLockTaskCount.get() != 0L && maxRetries-- > 0) {
+      try {
+        // Introduce a short delay to avoid CPU spinning
+        Thread.sleep(10);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        LOGGER.warn("Interrupted while waiting for the high priority lock 
task.", e);
+        break;
+      }
+    }
+  }
+
+  protected void sleepIfNoHighPriorityTask(long sleepMillis) throws 
InterruptedException {
+    synchronized (highPriorityLockTaskCount) {
+      // The wait operation will release the highPriorityLockTaskCount lock, 
so there will be
+      // no deadlock.
+      if (highPriorityLockTaskCount.get() > 0) {
+        highPriorityLockTaskCount.wait(sleepMillis);
+      }
+    }
+  }
+
+  public void increaseHighPriorityTaskCount() {
+    highPriorityLockTaskCount.incrementAndGet();
+    synchronized (highPriorityLockTaskCount) {
+      highPriorityLockTaskCount.notifyAll();
+    }
+  }
+
+  public void decreaseHighPriorityTaskCount() {
+    highPriorityLockTaskCount.decrementAndGet();
+  }
 }

Reply via email to