[hotfix] Replace check state condition in Execution#tryAssignResource with if check
Instead of risking an IllegalStateException it is better to check that the taskManagerLocationFuture has not been completed yet. If, then we also reject the assignment of the LogicalSlot to the Execution. That way, we don't risk that we don't release the slot in case of an exception in Execution#allocateAndAssignSlotForExecution. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a678f45d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a678f45d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a678f45d Branch: refs/heads/release-1.6 Commit: a678f45d9713d2dccb0cd2459e94af75f0649803 Parents: 17fde33 Author: Till Rohrmann <trohrm...@apache.org> Authored: Sun Jul 22 20:34:33 2018 +0200 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Tue Jul 24 00:06:31 2018 +0200 ---------------------------------------------------------------------- .../flink/runtime/executiongraph/Execution.java | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a678f45d/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index 853732f..57aa0d5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -280,15 +280,19 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution // only allow to set the assigned resource in state SCHEDULED or CREATED // note: we also accept resource assignment when being in state CREATED for testing purposes if (state == SCHEDULED || state == CREATED) { - if (ASSIGNED_SLOT_UPDATER.compareAndSet(this, null, logicalSlot) && logicalSlot.tryAssignPayload(this)) { - // check for concurrent modification (e.g. cancelling call) - if (state == SCHEDULED || state == CREATED) { - checkState(!taskManagerLocationFuture.isDone(), "The TaskManagerLocationFuture should not be set if we haven't assigned a resource yet."); - taskManagerLocationFuture.complete(logicalSlot.getTaskManagerLocation()); - assignedAllocationID = logicalSlot.getAllocationId(); - return true; + if (ASSIGNED_SLOT_UPDATER.compareAndSet(this, null, logicalSlot)) { + if (logicalSlot.tryAssignPayload(this)) { + // check for concurrent modification (e.g. cancelling call) + if ((state == SCHEDULED || state == CREATED) && !taskManagerLocationFuture.isDone()) { + taskManagerLocationFuture.complete(logicalSlot.getTaskManagerLocation()); + assignedAllocationID = logicalSlot.getAllocationId(); + return true; + } else { + // free assigned resource and return false + ASSIGNED_SLOT_UPDATER.set(this, null); + return false; + } } else { - // free assigned resource and return false ASSIGNED_SLOT_UPDATER.set(this, null); return false; }