Repository: flink Updated Branches: refs/heads/master 2d302abab -> e966a0dd1
[FLINK-2183][runtime] fix deadlock for concurrent slot release This closes #824. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e966a0dd Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e966a0dd Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e966a0dd Branch: refs/heads/master Commit: e966a0dd1c9f35ba6cb0ff4e09205c411fc4585d Parents: 2d302ab Author: Maximilian Michels <m...@apache.org> Authored: Thu Jun 11 13:27:23 2015 +0200 Committer: Maximilian Michels <m...@apache.org> Committed: Thu Jun 11 16:45:56 2015 +0200 ---------------------------------------------------------------------- .../flink/runtime/instance/SimpleSlot.java | 15 ++-- .../instance/SlotSharingGroupAssignment.java | 76 ++++++++++---------- .../TaskManagerFailsWithSlotSharingITCase.scala | 2 +- 3 files changed, 48 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/e966a0dd/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java index 9bc977d..dbe961a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java @@ -142,11 +142,9 @@ public class SimpleSlot extends Slot { @Override public void releaseSlot() { - - // try to transition to the CANCELED state. That state marks - // that the releasing is in progress - if (markCancelled()) { - + + if (!isCanceled()) { + // kill all tasks currently running in this slot Execution exec = this.executedTask; if (exec != null && !exec.isFinished()) { @@ -159,9 +157,10 @@ public class SimpleSlot extends Slot { // otherwise release through the parent shared slot if (getParent() == null) { // we have to give back the slot to the owning instance - getInstance().returnAllocatedSlot(this); - } - else { + if (markCancelled()) { + getInstance().returnAllocatedSlot(this); + } + } else { // we have to ask our parent to dispose us getParent().releaseChild(this); } http://git-wip-us.apache.org/repos/asf/flink/blob/e966a0dd/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java index f2b7dba..801e9ca 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java @@ -474,48 +474,52 @@ public class SlotSharingGroupAssignment { */ void releaseSimpleSlot(SimpleSlot simpleSlot) { synchronized (lock) { - // sanity checks - if (simpleSlot.isAlive()) { - throw new IllegalStateException("slot is still alive"); - } - - // check whether the slot is already released - if (simpleSlot.markReleased()) { - - AbstractID groupID = simpleSlot.getGroupID(); - SharedSlot parent = simpleSlot.getParent(); + // try to transition to the CANCELED state. That state marks + // that the releasing is in progress + if (simpleSlot.markCancelled()) { - // if we have a group ID, then our parent slot is tracked here - if (groupID != null && !allSlots.contains(parent)) { - throw new IllegalArgumentException("Slot was not associated with this SlotSharingGroup before."); + // sanity checks + if (simpleSlot.isAlive()) { + throw new IllegalStateException("slot is still alive"); } - int parentRemaining = parent.removeDisposedChildSlot(simpleSlot); - - if (parentRemaining > 0) { - // the parent shared slot is still alive. make sure we make it - // available again to the group of the just released slot - - if (groupID != null) { - // if we have a group ID, then our parent becomes available - // for that group again. otherwise, the slot is part of a - // co-location group and nothing becomes immediately available - - Map<Instance, List<SharedSlot>> slotsForJid = availableSlotsPerJid.get(groupID); + // check whether the slot is already released + if (simpleSlot.markReleased()) { - // sanity check - if (slotsForJid == null) { - throw new IllegalStateException("Trying to return a slot for group " + groupID + - " when available slots indicated that all slots were available."); - } + AbstractID groupID = simpleSlot.getGroupID(); + SharedSlot parent = simpleSlot.getParent(); - putIntoMultiMap(slotsForJid, parent.getInstance(), parent); + // if we have a group ID, then our parent slot is tracked here + if (groupID != null && !allSlots.contains(parent)) { + throw new IllegalArgumentException("Slot was not associated with this SlotSharingGroup before."); + } + + int parentRemaining = parent.removeDisposedChildSlot(simpleSlot); + + if (parentRemaining > 0) { + // the parent shared slot is still alive. make sure we make it + // available again to the group of the just released slot + + if (groupID != null) { + // if we have a group ID, then our parent becomes available + // for that group again. otherwise, the slot is part of a + // co-location group and nothing becomes immediately available + + Map<Instance, List<SharedSlot>> slotsForJid = availableSlotsPerJid.get(groupID); + + // sanity check + if (slotsForJid == null) { + throw new IllegalStateException("Trying to return a slot for group " + groupID + + " when available slots indicated that all slots were available."); + } + + putIntoMultiMap(slotsForJid, parent.getInstance(), parent); + } + } else { + // the parent shared slot is now empty and can be released + parent.markCancelled(); + internalDisposeEmptySharedSlot(parent); } - } - else { - // the parent shared slot is now empty and can be released - parent.markCancelled(); - internalDisposeEmptySharedSlot(parent); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/e966a0dd/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala index 39543f7..e98fd98 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala @@ -44,7 +44,7 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll { "The JobManager" should { "handle gracefully failing task manager with slot sharing" in { - val num_tasks = 20 + val num_tasks = 100 val sender = new AbstractJobVertex("Sender") val receiver = new AbstractJobVertex("Receiver")