[hotfix] Replace check state condition in Execution#tryAssignResource with if check
Instead of risking an IllegalStateException it is better to check that the taskManagerLocationFuture has not been completed yet. If, then we also reject the assignment of the LogicalSlot to the Execution. That way, we don't risk that we don't release the slot in case of an exception in Execution#allocateAndAssignSlotForExecution. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/99d524a8 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/99d524a8 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/99d524a8 Branch: refs/heads/release-1.5 Commit: 99d524a80f3357e5455f9aa0e6bcb4f06690ee59 Parents: 1b429b8 Author: Till Rohrmann <trohrm...@apache.org> Authored: Sun Jul 22 20:34:33 2018 +0200 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Mon Jul 23 17:22:52 2018 +0200 ---------------------------------------------------------------------- .../flink/runtime/executiongraph/Execution.java | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/99d524a8/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index 853732f..57aa0d5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -280,15 +280,19 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution // only allow to set the assigned resource in state SCHEDULED or CREATED // note: we also accept resource assignment when being in state CREATED for testing purposes if (state == SCHEDULED || state == CREATED) { - if (ASSIGNED_SLOT_UPDATER.compareAndSet(this, null, logicalSlot) && logicalSlot.tryAssignPayload(this)) { - // check for concurrent modification (e.g. cancelling call) - if (state == SCHEDULED || state == CREATED) { - checkState(!taskManagerLocationFuture.isDone(), "The TaskManagerLocationFuture should not be set if we haven't assigned a resource yet."); - taskManagerLocationFuture.complete(logicalSlot.getTaskManagerLocation()); - assignedAllocationID = logicalSlot.getAllocationId(); - return true; + if (ASSIGNED_SLOT_UPDATER.compareAndSet(this, null, logicalSlot)) { + if (logicalSlot.tryAssignPayload(this)) { + // check for concurrent modification (e.g. cancelling call) + if ((state == SCHEDULED || state == CREATED) && !taskManagerLocationFuture.isDone()) { + taskManagerLocationFuture.complete(logicalSlot.getTaskManagerLocation()); + assignedAllocationID = logicalSlot.getAllocationId(); + return true; + } else { + // free assigned resource and return false + ASSIGNED_SLOT_UPDATER.set(this, null); + return false; + } } else { - // free assigned resource and return false ASSIGNED_SLOT_UPDATER.set(this, null); return false; }