[hotfix] Replace check state condition in Execution#tryAssignResource with if 
check

Instead of risking an IllegalStateException it is better to check that the
taskManagerLocationFuture has not been completed yet. If, then we also reject
the assignment of the LogicalSlot to the Execution. That way, we don't risk
that we don't release the slot in case of an exception in
Execution#allocateAndAssignSlotForExecution.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/99d524a8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/99d524a8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/99d524a8

Branch: refs/heads/release-1.5
Commit: 99d524a80f3357e5455f9aa0e6bcb4f06690ee59
Parents: 1b429b8
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Sun Jul 22 20:34:33 2018 +0200
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Mon Jul 23 17:22:52 2018 +0200

----------------------------------------------------------------------
 .../flink/runtime/executiongraph/Execution.java | 20 ++++++++++++--------
 1 file changed, 12 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/99d524a8/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 853732f..57aa0d5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -280,15 +280,19 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
                // only allow to set the assigned resource in state SCHEDULED 
or CREATED
                // note: we also accept resource assignment when being in state 
CREATED for testing purposes
                if (state == SCHEDULED || state == CREATED) {
-                       if (ASSIGNED_SLOT_UPDATER.compareAndSet(this, null, 
logicalSlot) && logicalSlot.tryAssignPayload(this)) {
-                               // check for concurrent modification (e.g. 
cancelling call)
-                               if (state == SCHEDULED || state == CREATED) {
-                                       
checkState(!taskManagerLocationFuture.isDone(), "The TaskManagerLocationFuture 
should not be set if we haven't assigned a resource yet.");
-                                       
taskManagerLocationFuture.complete(logicalSlot.getTaskManagerLocation());
-                                       assignedAllocationID = 
logicalSlot.getAllocationId();
-                                       return true;
+                       if (ASSIGNED_SLOT_UPDATER.compareAndSet(this, null, 
logicalSlot)) {
+                               if (logicalSlot.tryAssignPayload(this)) {
+                                       // check for concurrent modification 
(e.g. cancelling call)
+                                       if ((state == SCHEDULED || state == 
CREATED) && !taskManagerLocationFuture.isDone()) {
+                                               
taskManagerLocationFuture.complete(logicalSlot.getTaskManagerLocation());
+                                               assignedAllocationID = 
logicalSlot.getAllocationId();
+                                               return true;
+                                       } else {
+                                               // free assigned resource and 
return false
+                                               ASSIGNED_SLOT_UPDATER.set(this, 
null);
+                                               return false;
+                                       }
                                } else {
-                                       // free assigned resource and return 
false
                                        ASSIGNED_SLOT_UPDATER.set(this, null);
                                        return false;
                                }

Reply via email to