>From Ali Alsuliman <[email protected]>:

Ali Alsuliman has submitted this change. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19465 )

Change subject: [NO ISSUE][HYR] Keep trying to cancel job tasks
......................................................................

[NO ISSUE][HYR] Keep trying to cancel job tasks

- user model changes: no
- storage format changes: no
- interface changes: no

When cancelling a job's tasks by interrupting them, sometimes a task
would not respond to the interrupt and continues to work/wait causing
the job cancellation to get stuck. Typically, a task should respond to
the interrupt, however there have been cases where the interrupt status
is cleared due to some ill parts of the task. To account for such cases,
keep trying to cancel the tasks to set the interrupt status again.
Log warning since this is typically a code issue.

Ext-ref: MB-65432

Change-Id: I51fccbceeed0222aeedbaa7b6f138f3ed3e7c44d
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19465
Reviewed-by: Michael Blow <[email protected]>
Integration-Tests: Jenkins <[email protected]>
Tested-by: Jenkins <[email protected]>
---
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
1 file changed, 60 insertions(+), 3 deletions(-)

Approvals:
  Michael Blow: Looks good to me, approved
  Jenkins: Verified; Verified
  Anon. E. Moose #1000171:




diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
index 17f5cb1..e031284 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
@@ -35,6 +35,7 @@
 import java.util.concurrent.Future;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;

 import org.apache.commons.collections4.MapUtils;
 import org.apache.commons.lang3.tuple.Pair;
@@ -51,12 +52,17 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.util.ExceptionUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;

 /**
  * The runtime of a SuperActivity, which internally executes a DAG of 
one-to-one
  * connected activities in a single thread.
  */
 public class SuperActivityOperatorNodePushable implements 
IOperatorNodePushable {
+
+    private static final Logger LOGGER = LogManager.getLogger();
+
     private static final String CLASS_ABBREVIATION = "SAO";
     private final Map<ActivityId, IOperatorNodePushable> operatorNodePushables 
= new HashMap<>();
     private final List<IOperatorNodePushable> operatorNodePushablesBFSOrder = 
new ArrayList<>();
@@ -264,13 +270,37 @@
     }

     private void cancelTasks(List<Future<Void>> tasks, Semaphore 
startSemaphore, Semaphore completeSemaphore) {
+        boolean cancelCompleted = false;
         try {
             startSemaphore.acquireUninterruptibly();
-            for (Future<Void> task : tasks) {
-                task.cancel(true);
+            cancelCompleted = cancelTasks(tasks, completeSemaphore);
+        } finally {
+            if (!cancelCompleted) {
+                completeSemaphore.acquireUninterruptibly();
+            }
+        }
+    }
+
+    private static boolean cancelTasks(List<Future<Void>> tasks, Semaphore 
completeSemaphore) {
+        boolean interrupted = Thread.interrupted();
+        try {
+            while (true) {
+                for (Future<Void> task : tasks) {
+                    task.cancel(true);
+                }
+                try {
+                    if (completeSemaphore.tryAcquire(5, TimeUnit.MINUTES)) {
+                        return true;
+                    }
+                    LOGGER.warn("not all tasks were cancelled within 5 
minutes. retrying cancelling...");
+                } catch (InterruptedException e) {
+                    interrupted = true;
+                }
             }
         } finally {
-            completeSemaphore.acquireUninterruptibly();
+            if (interrupted) {
+                Thread.currentThread().interrupt();
+            }
         }
     }
 }

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19465
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: ionic
Gerrit-Change-Id: I51fccbceeed0222aeedbaa7b6f138f3ed3e7c44d
Gerrit-Change-Number: 19465
Gerrit-PatchSet: 4
Gerrit-Owner: Ali Alsuliman <[email protected]>
Gerrit-Reviewer: Ali Alsuliman <[email protected]>
Gerrit-Reviewer: Anon. E. Moose #1000171
Gerrit-Reviewer: Jenkins <[email protected]>
Gerrit-Reviewer: Michael Blow <[email protected]>
Gerrit-Reviewer: Murtadha Hubail <[email protected]>
Gerrit-MessageType: merged

Reply via email to