Repository: flink
Updated Branches:
  refs/heads/master 2d302abab -> e966a0dd1


[FLINK-2183][runtime] fix deadlock for concurrent slot release

This closes #824.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e966a0dd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e966a0dd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e966a0dd

Branch: refs/heads/master
Commit: e966a0dd1c9f35ba6cb0ff4e09205c411fc4585d
Parents: 2d302ab
Author: Maximilian Michels <m...@apache.org>
Authored: Thu Jun 11 13:27:23 2015 +0200
Committer: Maximilian Michels <m...@apache.org>
Committed: Thu Jun 11 16:45:56 2015 +0200

----------------------------------------------------------------------
 .../flink/runtime/instance/SimpleSlot.java      | 15 ++--
 .../instance/SlotSharingGroupAssignment.java    | 76 ++++++++++----------
 .../TaskManagerFailsWithSlotSharingITCase.scala |  2 +-
 3 files changed, 48 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e966a0dd/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
index 9bc977d..dbe961a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
@@ -142,11 +142,9 @@ public class SimpleSlot extends Slot {
 
        @Override
        public void releaseSlot() {
-               
-               // try to transition to the CANCELED state. That state marks
-               // that the releasing is in progress
-               if (markCancelled()) {
-                       
+
+               if (!isCanceled()) {
+
                        // kill all tasks currently running in this slot
                        Execution exec = this.executedTask;
                        if (exec != null && !exec.isFinished()) {
@@ -159,9 +157,10 @@ public class SimpleSlot extends Slot {
                        // otherwise release through the parent shared slot
                        if (getParent() == null) {
                                // we have to give back the slot to the owning 
instance
-                               getInstance().returnAllocatedSlot(this);
-                       }
-                       else {
+                               if (markCancelled()) {
+                                       getInstance().returnAllocatedSlot(this);
+                               }
+                       } else {
                                // we have to ask our parent to dispose us
                                getParent().releaseChild(this);
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/e966a0dd/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
index f2b7dba..801e9ca 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
@@ -474,48 +474,52 @@ public class SlotSharingGroupAssignment {
         */
        void releaseSimpleSlot(SimpleSlot simpleSlot) {
                synchronized (lock) {
-                       // sanity checks
-                       if (simpleSlot.isAlive()) {
-                               throw new IllegalStateException("slot is still 
alive");
-                       }
-                       
-                       // check whether the slot is already released
-                       if (simpleSlot.markReleased()) {
-                               
-                               AbstractID groupID = simpleSlot.getGroupID();
-                               SharedSlot parent = simpleSlot.getParent();
+                       // try to transition to the CANCELED state. That state 
marks
+                       // that the releasing is in progress
+                       if (simpleSlot.markCancelled()) {
 
-                               // if we have a group ID, then our parent slot 
is tracked here
-                               if (groupID != null && 
!allSlots.contains(parent)) {
-                                       throw new 
IllegalArgumentException("Slot was not associated with this SlotSharingGroup 
before.");
+                               // sanity checks
+                               if (simpleSlot.isAlive()) {
+                                       throw new IllegalStateException("slot 
is still alive");
                                }
 
-                               int parentRemaining = 
parent.removeDisposedChildSlot(simpleSlot);
-                               
-                               if (parentRemaining > 0) {
-                                       // the parent shared slot is still 
alive. make sure we make it
-                                       // available again to the group of the 
just released slot
-                                       
-                                       if (groupID != null) {
-                                               // if we have a group ID, then 
our parent becomes available
-                                               // for that group again. 
otherwise, the slot is part of a
-                                               // co-location group and 
nothing becomes immediately available
-                                               
-                                               Map<Instance, List<SharedSlot>> 
slotsForJid = availableSlotsPerJid.get(groupID);
+                               // check whether the slot is already released
+                               if (simpleSlot.markReleased()) {
 
-                                               // sanity check
-                                               if (slotsForJid == null) {
-                                                       throw new 
IllegalStateException("Trying to return a slot for group " + groupID +
-                                                                       " when 
available slots indicated that all slots were available.");
-                                               }
+                                       AbstractID groupID = 
simpleSlot.getGroupID();
+                                       SharedSlot parent = 
simpleSlot.getParent();
 
-                                               putIntoMultiMap(slotsForJid, 
parent.getInstance(), parent);
+                                       // if we have a group ID, then our 
parent slot is tracked here
+                                       if (groupID != null && 
!allSlots.contains(parent)) {
+                                               throw new 
IllegalArgumentException("Slot was not associated with this SlotSharingGroup 
before.");
+                                       }
+
+                                       int parentRemaining = 
parent.removeDisposedChildSlot(simpleSlot);
+
+                                       if (parentRemaining > 0) {
+                                               // the parent shared slot is 
still alive. make sure we make it
+                                               // available again to the group 
of the just released slot
+
+                                               if (groupID != null) {
+                                                       // if we have a group 
ID, then our parent becomes available
+                                                       // for that group 
again. otherwise, the slot is part of a
+                                                       // co-location group 
and nothing becomes immediately available
+
+                                                       Map<Instance, 
List<SharedSlot>> slotsForJid = availableSlotsPerJid.get(groupID);
+
+                                                       // sanity check
+                                                       if (slotsForJid == 
null) {
+                                                               throw new 
IllegalStateException("Trying to return a slot for group " + groupID +
+                                                                               
" when available slots indicated that all slots were available.");
+                                                       }
+
+                                                       
putIntoMultiMap(slotsForJid, parent.getInstance(), parent);
+                                               }
+                                       } else {
+                                               // the parent shared slot is 
now empty and can be released
+                                               parent.markCancelled();
+                                               
internalDisposeEmptySharedSlot(parent);
                                        }
-                               }
-                               else {
-                                       // the parent shared slot is now empty 
and can be released
-                                       parent.markCancelled();
-                                       internalDisposeEmptySharedSlot(parent);
                                }
                        }
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/e966a0dd/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
index 39543f7..e98fd98 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
@@ -44,7 +44,7 @@ ImplicitSender with WordSpecLike with Matchers with 
BeforeAndAfterAll {
 
   "The JobManager" should {
     "handle gracefully failing task manager with slot sharing" in {
-      val num_tasks = 20
+      val num_tasks = 100
 
       val sender = new AbstractJobVertex("Sender")
       val receiver = new AbstractJobVertex("Receiver")

Reply via email to