Repository: flink
Updated Branches:
  refs/heads/master 9350264bd -> d0ecb9170


[FLINK-2348] [taskmanager] Make async call dispatching robust against 
concurrent finishing.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d0ecb917
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d0ecb917
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d0ecb917

Branch: refs/heads/master
Commit: d0ecb9170e5edef48c7efd95764eeec7dbdf51a8
Parents: 9350264
Author: Stephan Ewen <[email protected]>
Authored: Sun Jul 12 16:19:30 2015 +0200
Committer: Stephan Ewen <[email protected]>
Committed: Sun Jul 12 16:25:16 2015 +0200

----------------------------------------------------------------------
 .../java/org/apache/flink/runtime/taskmanager/Task.java  | 11 ++++++-----
 1 file changed, 6 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d0ecb917/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index d9168e3..1b2fb08 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -982,7 +982,7 @@ public class Task implements Runnable {
        private void executeAsyncCallRunnable(Runnable runnable, String 
callName) {
                // make sure the executor is initialized. lock against 
concurrent calls to this function
                synchronized (this) {
-                       if (isCanceledOrFailed()) {
+                       if (executionState != ExecutionState.RUNNING) {
                                return;
                        }
                        
@@ -996,7 +996,7 @@ public class Task implements Runnable {
                                
                                // double-check for execution state, and make 
sure we clean up after ourselves
                                // if we created the dispatcher while the task 
was concurrently canceled
-                               if (isCanceledOrFailed()) {
+                               if (executionState != ExecutionState.RUNNING) {
                                        executor.shutdown();
                                        asyncCallDispatcher = null;
                                        return;
@@ -1009,9 +1009,10 @@ public class Task implements Runnable {
                                executor.submit(runnable);
                        }
                        catch (RejectedExecutionException e) {
-                               // may be that we are concurrently canceled. if 
not, report that something is fishy
-                               if (!isCanceledOrFailed()) {
-                                       throw new RuntimeException("Async call 
was rejected, even though the task was not canceled.", e);
+                               // may be that we are concurrently finished or 
canceled.
+                               // if not, report that something is fishy
+                               if (executionState == ExecutionState.RUNNING) {
+                                       throw new RuntimeException("Async call 
was rejected, even though the task is running.", e);
                                }
                        }
                }

Reply via email to