This is an automated email from the ASF dual-hosted git repository.

xuba pushed a commit to branch v0.7.x-test-front
in repository https://gitbox.apache.org/repos/asf/amoro.git

commit 5a4e133ca853b3d9d2d9e11a9cc371c12a510289
Author: Paul Lin <[email protected]>
AuthorDate: Thu Mar 28 09:43:39 2024 +0800

    [AMORO-2623] Avoid deadlock in task cancellation (#2684)
    
    * [AMORO-2623] Avoid deadlock in task cancellation
    
    * fix: update cancelTasks invoke order
    
    (cherry picked from commit 326c1fefa4690ee64dd4d3c87f8ad1b93609a103)
---
 .../arctic/server/optimizing/OptimizingQueue.java  | 56 ++++++++++------------
 1 file changed, 26 insertions(+), 30 deletions(-)

diff --git 
a/ams/server/src/main/java/com/netease/arctic/server/optimizing/OptimizingQueue.java
 
b/ams/server/src/main/java/com/netease/arctic/server/optimizing/OptimizingQueue.java
index 12322fcba..2234de016 100644
--- 
a/ams/server/src/main/java/com/netease/arctic/server/optimizing/OptimizingQueue.java
+++ 
b/ams/server/src/main/java/com/netease/arctic/server/optimizing/OptimizingQueue.java
@@ -421,6 +421,7 @@ public class OptimizingQueue extends PersistentBase {
       } finally {
         lock.unlock();
       }
+      cancelTasks();
     }
 
     @Override
@@ -467,6 +468,10 @@ public class OptimizingQueue extends PersistentBase {
       } finally {
         lock.unlock();
       }
+      // the cleanup of task should be done after unlock to avoid deadlock
+      if (this.status == OptimizingProcess.Status.FAILED) {
+        cancelTasks();
+      }
     }
 
     @Override
@@ -559,6 +564,9 @@ public class OptimizingQueue extends PersistentBase {
       } finally {
         clearProcess(this);
         lock.unlock();
+        if (this.status == OptimizingProcess.Status.FAILED) {
+          cancelTasks();
+        }
       }
     }
 
@@ -624,36 +632,24 @@ public class OptimizingQueue extends PersistentBase {
     }
 
     private void persistProcessCompleted(boolean success) {
-      if (!success) {
-        doAsTransaction(
-            () -> taskMap.values().forEach(TaskRuntime::tryCanceling),
-            () ->
-                doAs(
-                    OptimizingMapper.class,
-                    mapper ->
-                        mapper.updateOptimizingProcess(
-                            tableRuntime.getTableIdentifier().getId(),
-                            processId,
-                            status,
-                            endTime,
-                            getSummary(),
-                            getFailedReason())),
-            () -> tableRuntime.completeProcess(false));
-      } else {
-        doAsTransaction(
-            () ->
-                doAs(
-                    OptimizingMapper.class,
-                    mapper ->
-                        mapper.updateOptimizingProcess(
-                            tableRuntime.getTableIdentifier().getId(),
-                            processId,
-                            status,
-                            endTime,
-                            getSummary(),
-                            getFailedReason())),
-            () -> tableRuntime.completeProcess(true));
-      }
+      doAsTransaction(
+          () ->
+              doAs(
+                  OptimizingMapper.class,
+                  mapper ->
+                      mapper.updateOptimizingProcess(
+                          tableRuntime.getTableIdentifier().getId(),
+                          processId,
+                          status,
+                          endTime,
+                          getSummary(),
+                          getFailedReason())),
+          () -> tableRuntime.completeProcess(success));
+    }
+
+    /** The cancellation should be invoked outside the process lock to avoid 
deadlock. */
+    private void cancelTasks() {
+      taskMap.values().forEach(TaskRuntime::tryCanceling);
     }
 
     private void loadTaskRuntimes(OptimizingProcess optimizingProcess) {

Reply via email to