Repository: flink Updated Branches: refs/heads/master 9350264bd -> d0ecb9170
[FLINK-2348] [taskmanager] Make async call dispatching robust against concurrent finishing. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d0ecb917 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d0ecb917 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d0ecb917 Branch: refs/heads/master Commit: d0ecb9170e5edef48c7efd95764eeec7dbdf51a8 Parents: 9350264 Author: Stephan Ewen <[email protected]> Authored: Sun Jul 12 16:19:30 2015 +0200 Committer: Stephan Ewen <[email protected]> Committed: Sun Jul 12 16:25:16 2015 +0200 ---------------------------------------------------------------------- .../java/org/apache/flink/runtime/taskmanager/Task.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d0ecb917/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index d9168e3..1b2fb08 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -982,7 +982,7 @@ public class Task implements Runnable { private void executeAsyncCallRunnable(Runnable runnable, String callName) { // make sure the executor is initialized. lock against concurrent calls to this function synchronized (this) { - if (isCanceledOrFailed()) { + if (executionState != ExecutionState.RUNNING) { return; } @@ -996,7 +996,7 @@ public class Task implements Runnable { // double-check for execution state, and make sure we clean up after ourselves // if we created the dispatcher while the task was concurrently canceled - if (isCanceledOrFailed()) { + if (executionState != ExecutionState.RUNNING) { executor.shutdown(); asyncCallDispatcher = null; return; @@ -1009,9 +1009,10 @@ public class Task implements Runnable { executor.submit(runnable); } catch (RejectedExecutionException e) { - // may be that we are concurrently canceled. if not, report that something is fishy - if (!isCanceledOrFailed()) { - throw new RuntimeException("Async call was rejected, even though the task was not canceled.", e); + // may be that we are concurrently finished or canceled. + // if not, report that something is fishy + if (executionState == ExecutionState.RUNNING) { + throw new RuntimeException("Async call was rejected, even though the task is running.", e); } } }
