[hotfix] Replace check state condition in Execution#tryAssignResource with if 
check

Instead of risking an IllegalStateException it is better to check that the
taskManagerLocationFuture has not been completed yet. If, then we also reject
the assignment of the LogicalSlot to the Execution. That way, we don't risk
that we don't release the slot in case of an exception in
Execution#allocateAndAssignSlotForExecution.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a678f45d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a678f45d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a678f45d

Branch: refs/heads/release-1.6
Commit: a678f45d9713d2dccb0cd2459e94af75f0649803
Parents: 17fde33
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Sun Jul 22 20:34:33 2018 +0200
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Tue Jul 24 00:06:31 2018 +0200

----------------------------------------------------------------------
 .../flink/runtime/executiongraph/Execution.java | 20 ++++++++++++--------
 1 file changed, 12 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a678f45d/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 853732f..57aa0d5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -280,15 +280,19 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
                // only allow to set the assigned resource in state SCHEDULED 
or CREATED
                // note: we also accept resource assignment when being in state 
CREATED for testing purposes
                if (state == SCHEDULED || state == CREATED) {
-                       if (ASSIGNED_SLOT_UPDATER.compareAndSet(this, null, 
logicalSlot) && logicalSlot.tryAssignPayload(this)) {
-                               // check for concurrent modification (e.g. 
cancelling call)
-                               if (state == SCHEDULED || state == CREATED) {
-                                       
checkState(!taskManagerLocationFuture.isDone(), "The TaskManagerLocationFuture 
should not be set if we haven't assigned a resource yet.");
-                                       
taskManagerLocationFuture.complete(logicalSlot.getTaskManagerLocation());
-                                       assignedAllocationID = 
logicalSlot.getAllocationId();
-                                       return true;
+                       if (ASSIGNED_SLOT_UPDATER.compareAndSet(this, null, 
logicalSlot)) {
+                               if (logicalSlot.tryAssignPayload(this)) {
+                                       // check for concurrent modification 
(e.g. cancelling call)
+                                       if ((state == SCHEDULED || state == 
CREATED) && !taskManagerLocationFuture.isDone()) {
+                                               
taskManagerLocationFuture.complete(logicalSlot.getTaskManagerLocation());
+                                               assignedAllocationID = 
logicalSlot.getAllocationId();
+                                               return true;
+                                       } else {
+                                               // free assigned resource and 
return false
+                                               ASSIGNED_SLOT_UPDATER.set(this, 
null);
+                                               return false;
+                                       }
                                } else {
-                                       // free assigned resource and return 
false
                                        ASSIGNED_SLOT_UPDATER.set(this, null);
                                        return false;
                                }

Reply via email to