This is an automated email from the ASF dual-hosted git repository. xuba pushed a commit to branch v0.7.x-test-front in repository https://gitbox.apache.org/repos/asf/amoro.git
commit 5a4e133ca853b3d9d2d9e11a9cc371c12a510289 Author: Paul Lin <[email protected]> AuthorDate: Thu Mar 28 09:43:39 2024 +0800 [AMORO-2623] Avoid deadlock in task cancellation (#2684) * [AMORO-2623] Avoid deadlock in task cancellation * fix: update cancelTasks invoke order (cherry picked from commit 326c1fefa4690ee64dd4d3c87f8ad1b93609a103) --- .../arctic/server/optimizing/OptimizingQueue.java | 56 ++++++++++------------ 1 file changed, 26 insertions(+), 30 deletions(-) diff --git a/ams/server/src/main/java/com/netease/arctic/server/optimizing/OptimizingQueue.java b/ams/server/src/main/java/com/netease/arctic/server/optimizing/OptimizingQueue.java index 12322fcba..2234de016 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/optimizing/OptimizingQueue.java +++ b/ams/server/src/main/java/com/netease/arctic/server/optimizing/OptimizingQueue.java @@ -421,6 +421,7 @@ public class OptimizingQueue extends PersistentBase { } finally { lock.unlock(); } + cancelTasks(); } @Override @@ -467,6 +468,10 @@ public class OptimizingQueue extends PersistentBase { } finally { lock.unlock(); } + // the cleanup of task should be done after unlock to avoid deadlock + if (this.status == OptimizingProcess.Status.FAILED) { + cancelTasks(); + } } @Override @@ -559,6 +564,9 @@ public class OptimizingQueue extends PersistentBase { } finally { clearProcess(this); lock.unlock(); + if (this.status == OptimizingProcess.Status.FAILED) { + cancelTasks(); + } } } @@ -624,36 +632,24 @@ public class OptimizingQueue extends PersistentBase { } private void persistProcessCompleted(boolean success) { - if (!success) { - doAsTransaction( - () -> taskMap.values().forEach(TaskRuntime::tryCanceling), - () -> - doAs( - OptimizingMapper.class, - mapper -> - mapper.updateOptimizingProcess( - tableRuntime.getTableIdentifier().getId(), - processId, - status, - endTime, - getSummary(), - getFailedReason())), - () -> tableRuntime.completeProcess(false)); - } else { - doAsTransaction( - () -> - doAs( - OptimizingMapper.class, - mapper -> - mapper.updateOptimizingProcess( - tableRuntime.getTableIdentifier().getId(), - processId, - status, - endTime, - getSummary(), - getFailedReason())), - () -> tableRuntime.completeProcess(true)); - } + doAsTransaction( + () -> + doAs( + OptimizingMapper.class, + mapper -> + mapper.updateOptimizingProcess( + tableRuntime.getTableIdentifier().getId(), + processId, + status, + endTime, + getSummary(), + getFailedReason())), + () -> tableRuntime.completeProcess(success)); + } + + /** The cancellation should be invoked outside the process lock to avoid deadlock. */ + private void cancelTasks() { + taskMap.values().forEach(TaskRuntime::tryCanceling); } private void loadTaskRuntimes(OptimizingProcess optimizingProcess) {
