This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f75d8e1fbb16ba08ab5a625f50e988c708a8a2bf
Author: tison <wander4...@gmail.com>
AuthorDate: Fri Jul 26 23:44:43 2019 +0800

    [FLINK-13334][coordination] Remove legacy slot implementation
    
    This closes #9245.
---
 .../runtime/executiongraph/ExecutionJobVertex.java |   4 -
 .../apache/flink/runtime/instance/SharedSlot.java  | 333 -----------
 .../apache/flink/runtime/instance/SimpleSlot.java  | 290 ---------
 .../org/apache/flink/runtime/instance/Slot.java    | 381 ------------
 .../instance/SlotSharingGroupAssignment.java       | 664 ---------------------
 .../jobmanager/scheduler/CoLocationConstraint.java |  91 +--
 .../jobmanager/scheduler/CoLocationGroup.java      |   6 -
 .../jobmanager/scheduler/SlotSharingGroup.java     |  23 -
 .../ExecutionGraphDeploymentTest.java              |  24 +-
 .../ExecutionGraphSchedulingTest.java              |  56 +-
 .../ExecutionPartitionLifecycleTest.java           |  13 +-
 .../runtime/executiongraph/ExecutionTest.java      |  34 +-
 .../ExecutionVertexLocalityTest.java               |  18 +-
 .../flink/runtime/instance/SharedSlotsTest.java    | 630 -------------------
 .../flink/runtime/instance/SimpleSlotTest.java     | 135 -----
 .../scheduler/ScheduleWithCoLocationHintTest.java  |  10 -
 16 files changed, 73 insertions(+), 2639 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index e883792..1a0d389 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -515,10 +515,6 @@ public class ExecutionJobVertex implements 
AccessExecutionJobVertex, Archiveable
 
                synchronized (stateMonitor) {
                        // check and reset the sharing groups with scheduler 
hints
-                       if (slotSharingGroup != null) {
-                               slotSharingGroup.clearTaskAssignment();
-                       }
-
                        for (int i = 0; i < parallelism; i++) {
                                taskVertices[i].resetForNewExecution(timestamp, 
expectedGlobalModVersion);
                        }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java
deleted file mode 100644
index 0f9c104..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java
+++ /dev/null
@@ -1,333 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.instance;
-
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.jobmanager.scheduler.Locality;
-import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
-import org.apache.flink.runtime.jobmaster.LogicalSlot;
-import org.apache.flink.runtime.jobmaster.SlotContext;
-import org.apache.flink.runtime.jobmaster.SlotOwner;
-import org.apache.flink.runtime.jobmaster.SlotRequestId;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.util.AbstractID;
-
-import javax.annotation.Nullable;
-
-import java.util.ConcurrentModificationException;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * This class represents a shared slot. A shared slot can have multiple
- * {@link SimpleSlot} instances within itself. This allows to
- * schedule multiple tasks simultaneously to the same resource. Sharing a 
resource with multiple
- * tasks is crucial for simple pipelined / streamed execution, where both the 
sender and the receiver
- * are typically active at the same time.
- *
- * <p><b>IMPORTANT:</b> This class contains no synchronization. Thus, the 
caller has to guarantee proper
- * synchronization. In the current implementation, all concurrently modifying 
operations are
- * passed through a {@link SlotSharingGroupAssignment} object which is 
responsible for
- * synchronization.
- */
-public class SharedSlot extends Slot implements LogicalSlot {
-
-       /** The assignment group os shared slots that manages the availability 
and release of the slots */
-       private final SlotSharingGroupAssignment assignmentGroup;
-
-       /** The set os sub-slots allocated from this shared slot */
-       private final Set<Slot> subSlots;
-
-       // 
------------------------------------------------------------------------
-       //  Old Constructors (legacy code)
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Creates a new shared slot that has no parent (is a root slot) and 
does not belong to any task group.
-        * This constructor is used to create a slot directly from an instance. 
-        *
-        * @param owner The component from which this slot is allocated.
-        * @param location The location info of the TaskManager where the slot 
was allocated from
-        * @param slotNumber The number of the slot.
-        * @param taskManagerGateway The gateway to communicate with the 
TaskManager
-        * @param assignmentGroup The assignment group that this shared slot 
belongs to.
-        */
-       public SharedSlot(
-               SlotOwner owner, TaskManagerLocation location, int slotNumber,
-               TaskManagerGateway taskManagerGateway,
-               SlotSharingGroupAssignment assignmentGroup) {
-
-               this(owner, location, slotNumber, taskManagerGateway, 
assignmentGroup, null, null);
-       }
-
-       /**
-        * Creates a new shared slot that has is a sub-slot of the given parent 
shared slot, and that belongs
-        * to the given task group.
-        *
-        * @param owner The component from which this slot is allocated.
-        * @param location The location info of the TaskManager where the slot 
was allocated from
-        * @param slotNumber The number of the slot.
-        * @param taskManagerGateway The gateway to communicate with the 
TaskManager
-        * @param assignmentGroup The assignment group that this shared slot 
belongs to.
-        * @param parent The parent slot of this slot.
-        * @param groupId The assignment group of this slot.
-        */
-       public SharedSlot(
-                       SlotOwner owner,
-                       TaskManagerLocation location,
-                       int slotNumber,
-                       TaskManagerGateway taskManagerGateway,
-                       SlotSharingGroupAssignment assignmentGroup,
-                       @Nullable SharedSlot parent,
-                       @Nullable AbstractID groupId) {
-
-               super(owner, location, slotNumber, taskManagerGateway, parent, 
groupId);
-
-               this.assignmentGroup = checkNotNull(assignmentGroup);
-               this.subSlots = new HashSet<Slot>();
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Constructors
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Creates a new shared slot that has no parent (is a root slot) and 
does not belong to any task group.
-        * This constructor is used to create a slot directly from an instance.
-        * 
-        * @param slotContext The slot context of this shared slot
-        * @param owner The component from which this slot is allocated.
-        * @param assignmentGroup The assignment group that this shared slot 
belongs to.
-        */
-       public SharedSlot(SlotContext slotContext, SlotOwner owner, 
SlotSharingGroupAssignment assignmentGroup) {
-               this(slotContext, owner, slotContext.getPhysicalSlotNumber(), 
assignmentGroup, null, null);
-       }
-
-       private SharedSlot(
-                       SlotContext slotInformation,
-                       SlotOwner owner,
-                       int slotNumber,
-                       SlotSharingGroupAssignment assignmentGroup,
-                       @Nullable SharedSlot parent,
-                       @Nullable AbstractID groupId) {
-
-               super(slotInformation, owner, slotNumber, parent, groupId);
-
-               this.assignmentGroup = checkNotNull(assignmentGroup);
-               this.subSlots = new HashSet<Slot>();
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Properties
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public int getNumberLeaves() {
-               while (true) {
-                       try {
-                               int result = 0;
-                               for (Slot slot: subSlots){
-                                       result += slot.getNumberLeaves();
-                               }
-                               return result;
-                       }
-                       catch (ConcurrentModificationException e) {
-                               // ignore and retry
-                       }
-               }
-       }
-
-       /**
-        * Checks whether this slot is a root slot that has not yet added any 
child slots.
-        * 
-        * @return True, if this slot is a root slot and has not yet added any 
children, false otherwise.
-        */
-       public boolean isRootAndEmpty() {
-               return getParent() == null && subSlots.isEmpty();
-       }
-
-       /**
-        * Checks whether this shared slot has any sub slots.
-        * 
-        * @return True, if the shared slot has sub slots, false otherwise.
-        */
-       public boolean hasChildren() {
-               return subSlots.size() > 0;
-       }
-
-       @Override
-       public Locality getLocality() {
-               return Locality.UNKNOWN;
-       }
-
-       @Override
-       public boolean tryAssignPayload(Payload payload) {
-               throw new UnsupportedOperationException("Cannot assign an 
execution attempt id to a shared slot.");
-       }
-
-       @Nullable
-       @Override
-       public Payload getPayload() {
-               return null;
-       }
-
-       @Override
-       public CompletableFuture<?> releaseSlot(@Nullable Throwable cause) {
-               assignmentGroup.releaseSharedSlot(this);
-
-               if (!(isReleased() && subSlots.isEmpty())) {
-                       throw new IllegalStateException("Bug: SharedSlot is not 
released or not empty after call to releaseSlot()");
-               }
-
-               return CompletableFuture.completedFuture(null);
-       }
-
-       @Override
-       public int getPhysicalSlotNumber() {
-               return getRootSlotNumber();
-       }
-
-       @Override
-       public AllocationID getAllocationId() {
-               return getSlotContext().getAllocationId();
-       }
-
-       @Override
-       public SlotRequestId getSlotRequestId() {
-               return NO_SLOT_REQUEST_ID;
-       }
-
-       @Nullable
-       @Override
-       public SlotSharingGroupId getSlotSharingGroupId() {
-               return NO_SLOT_SHARING_GROUP_ID;
-       }
-
-       /**
-        * Gets the set of all slots allocated as sub-slots of this shared slot.
-        *
-        * @return All sub-slots allocated from this shared slot.
-        */
-       Set<Slot> getSubSlots() {
-               return subSlots;
-       }
-
-       // 
------------------------------------------------------------------------
-       //  INTERNAL : TO BE CALLED ONLY BY THE assignmentGroup - Allocating 
sub-slots
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Creates a new sub slot if the slot is not dead, yet. This method 
should only be called from
-        * the assignment group instance to guarantee synchronization.
-        * 
-        * <b>NOTE:</b> This method is not synchronized and must only be called 
from
-        *              the slot's assignment group.
-        *
-        * @param groupId The ID to identify tasks which can be deployed in 
this sub slot.
-        * @return The new sub slot if the shared slot is still alive, 
otherwise null.
-        */
-       SimpleSlot allocateSubSlot(AbstractID groupId) {
-               if (isAlive()) {
-                       SimpleSlot slot = new SimpleSlot(
-                               getOwner(),
-                               getTaskManagerLocation(),
-                               subSlots.size(),
-                               getTaskManagerGateway(),
-                               this,
-                               groupId);
-                       subSlots.add(slot);
-                       return slot;
-               }
-               else {
-                       return null;
-               }
-       }
-
-       /**
-        * Creates a new sub slot if the slot is not dead, yet. This method 
should only be called from
-        * the assignment group instance to guarantee synchronization.
-        * 
-        * NOTE: This method should only be called from the slot's assignment 
group.
-        *
-        * @param groupId The ID to identify tasks which can be deployed in 
this sub slot.
-        * @return The new sub slot if the shared slot is still alive, 
otherwise null.
-        */
-       SharedSlot allocateSharedSlot(AbstractID groupId){
-               if (isAlive()) {
-                       SharedSlot slot = new SharedSlot(
-                               getOwner(),
-                               getTaskManagerLocation(),
-                               subSlots.size(),
-                               getTaskManagerGateway(),
-                               assignmentGroup,
-                               this,
-                               groupId);
-                       subSlots.add(slot);
-                       return slot;
-               }
-               else {
-                       return null;
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-       //  INTERNAL : TO BE CALLED ONLY BY THE assignmentGroup - releasing 
slots
-       // 
------------------------------------------------------------------------
-       
-       /**
-        * Disposes the given sub slot. This method is called by the child 
simple slot to tell this
-        * shared slot to release it.
-        *
-        * The releasing process itself is done by the {@link 
SlotSharingGroupAssignment}, which controls
-        * all the modifications in this shared slot.
-        *
-        * NOTE: This method must not modify the shared slot directly !!!
-        *
-        * @param slot The sub-slot which shall be removed from the shared slot.
-        */
-       void releaseChild(SimpleSlot slot) {
-               assignmentGroup.releaseSimpleSlot(slot);
-       }
-       
-       /**
-        * Removes the given slot from this shared slot. This method Should 
only be called
-        * through this shared slot's {@link SlotSharingGroupAssignment}
-        *
-        * @param slot slot to be removed from the set of sub slots.
-        * @return Number of remaining sub slots
-        */
-       int removeDisposedChildSlot(Slot slot) {
-               if (!slot.isReleased() || !subSlots.remove(slot)) {
-                       throw new IllegalArgumentException();
-               }
-               return subSlots.size();
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Utilities
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public String toString() {
-               return "Shared " + super.toString();
-       }
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
deleted file mode 100644
index 3363b15..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
+++ /dev/null
@@ -1,290 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.instance;
-
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.jobmanager.scheduler.Locality;
-import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
-import org.apache.flink.runtime.jobmaster.LogicalSlot;
-import org.apache.flink.runtime.jobmaster.SlotContext;
-import org.apache.flink.runtime.jobmaster.SlotOwner;
-import org.apache.flink.runtime.jobmaster.SlotRequestId;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.util.AbstractID;
-import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.Preconditions;
-
-import javax.annotation.Nullable;
-
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-
-/**
- * A SimpleSlot represents a single slot on a TaskManager instance, or a slot 
within a shared slot.
- *
- * <p>If this slot is part of a {@link SharedSlot}, then the parent attribute 
will point to that shared slot.
- * If not, then the parent attribute is null.
- */
-public class SimpleSlot extends Slot implements LogicalSlot {
-
-       /** The updater used to atomically swap in the payload */
-       private static final AtomicReferenceFieldUpdater<SimpleSlot, Payload> 
PAYLOAD_UPDATER =
-                       
AtomicReferenceFieldUpdater.newUpdater(SimpleSlot.class, Payload.class, 
"payload");
-
-       // 
------------------------------------------------------------------------
-
-       private final CompletableFuture<?> releaseFuture = new 
CompletableFuture<>();
-
-       /** Id of the task being executed in the slot. Volatile to force a 
memory barrier and allow for correct double-checking */
-       private volatile Payload payload;
-
-       /** The locality attached to the slot, defining whether the slot was 
allocated at the desired location. */
-       private volatile Locality locality = Locality.UNCONSTRAINED;
-
-       // 
------------------------------------------------------------------------
-       //  Old Constructors (legacy mode)
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Creates a new simple slot that stands alone and does not belong to 
shared slot.
-        * 
-        * @param owner The component from which this slot is allocated.
-        * @param location The location info of the TaskManager where the slot 
was allocated from
-        * @param slotNumber The number of the task slot on the instance.
-        * @param taskManagerGateway The gateway to communicate with the 
TaskManager of this slot
-        */
-       public SimpleSlot(
-               SlotOwner owner, TaskManagerLocation location, int slotNumber,
-               TaskManagerGateway taskManagerGateway) {
-               this(owner, location, slotNumber, taskManagerGateway, null, 
null);
-       }
-
-       /**
-        * Creates a new simple slot that belongs to the given shared slot and
-        * is identified by the given ID.
-        *
-        * @param owner The component from which this slot is allocated.
-        * @param location The location info of the TaskManager where the slot 
was allocated from
-        * @param slotNumber The number of the simple slot in its parent shared 
slot.
-        * @param taskManagerGateway to communicate with the associated task 
manager.
-        * @param parent The parent shared slot.
-        * @param groupID The ID that identifies the group that the slot 
belongs to.
-        */
-       public SimpleSlot(
-                       SlotOwner owner,
-                       TaskManagerLocation location,
-                       int slotNumber,
-                       TaskManagerGateway taskManagerGateway,
-                       @Nullable SharedSlot parent,
-                       @Nullable AbstractID groupID) {
-
-               super(
-                       parent != null ?
-                               parent.getSlotContext() :
-                               new SimpleSlotContext(
-                                       NO_ALLOCATION_ID,
-                                       location,
-                                       slotNumber,
-                                       taskManagerGateway),
-                       owner,
-                       slotNumber,
-                       parent,
-                       groupID);
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Constructors
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Creates a new simple slot that stands alone and does not belong to 
shared slot.
-        *
-        * @param slotContext The slot context of this simple slot
-        * @param owner The component from which this slot is allocated.
-        */
-       public SimpleSlot(SlotContext slotContext, SlotOwner owner, int 
slotNumber) {
-               this(slotContext, owner, slotNumber, null, null);
-       }
-
-       /**
-        * Creates a new simple slot that belongs to the given shared slot and
-        * is identified by the given ID..
-        *
-        * @param parent The parent shared slot.
-        * @param owner The component from which this slot is allocated.
-        * @param groupID The ID that identifies the group that the slot 
belongs to.
-        */
-       public SimpleSlot(SharedSlot parent, SlotOwner owner, int slotNumber, 
AbstractID groupID) {
-               this(parent.getSlotContext(), owner, slotNumber, parent, 
groupID);
-       }
-       
-       /**
-        * Creates a new simple slot that belongs to the given shared slot and
-        * is identified by the given ID..
-        *
-        * @param slotContext The slot context of this simple slot
-        * @param owner The component from which this slot is allocated.
-        * @param slotNumber The number of the simple slot in its parent shared 
slot.
-        * @param parent The parent shared slot.
-        * @param groupID The ID that identifies the group that the slot 
belongs to.
-        */
-       private SimpleSlot(
-                       SlotContext slotContext,
-                       SlotOwner owner,
-                       int slotNumber,
-                       @Nullable SharedSlot parent,
-                       @Nullable AbstractID groupID) {
-               super(slotContext, owner, slotNumber, parent, groupID);
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Properties
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public int getNumberLeaves() {
-               return 1;
-       }
-
-       /**
-        * Atomically sets the executed vertex, if no vertex has been assigned 
to this slot so far.
-        *
-        * @param payload The vertex to assign to this slot.
-        * @return True, if the vertex was assigned, false, otherwise.
-        */
-       @Override
-       public boolean tryAssignPayload(Payload payload) {
-               Preconditions.checkNotNull(payload);
-
-               // check that we can actually run in this slot
-               if (isCanceled()) {
-                       return false;
-               }
-
-               // atomically assign the vertex
-               if (!PAYLOAD_UPDATER.compareAndSet(this, null, payload)) {
-                       return false;
-               }
-
-               // we need to do a double check that we were not cancelled in 
the meantime
-               if (isCanceled()) {
-                       this.payload = null;
-                       return false;
-               }
-
-               return true;
-       }
-
-       @Nullable
-       @Override
-       public Payload getPayload() {
-               return payload;
-       }
-
-       /**
-        * Gets the locality information attached to this slot.
-        * @return The locality attached to the slot.
-        */
-       public Locality getLocality() {
-               return locality;
-       }
-
-       /**
-        * Attached locality information to this slot.
-        * @param locality The locality attached to the slot.
-        */
-       public void setLocality(Locality locality) {
-               this.locality = locality;
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Cancelling & Releasing
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public CompletableFuture<?> releaseSlot(@Nullable Throwable cause) {
-               if (!isCanceled()) {
-                       final CompletableFuture<?> terminationFuture;
-
-                       if (payload != null) {
-                               // trigger the failure of the slot payload
-                               payload.fail(cause != null ? cause : new 
FlinkException("TaskManager was lost/killed: " + getTaskManagerLocation()));
-
-                               // wait for the termination of the payload 
before releasing the slot
-                               terminationFuture = 
payload.getTerminalStateFuture();
-                       } else {
-                               terminationFuture = 
CompletableFuture.completedFuture(null);
-                       }
-
-                       terminationFuture.whenComplete(
-                               (Object ignored, Throwable throwable) -> {
-                                       // release directly (if we are directly 
allocated),
-                                       // otherwise release through the parent 
shared slot
-                                       if (getParent() == null) {
-                                               // we have to give back the 
slot to the owning instance
-                                               if (markCancelled()) {
-                                                       try {
-                                                               
getOwner().returnLogicalSlot(this);
-                                                               
releaseFuture.complete(null);
-                                                       } catch (Exception e) {
-                                                               
releaseFuture.completeExceptionally(e);
-                                                       }
-                                               }
-                                       } else {
-                                               // we have to ask our parent to 
dispose us
-                                               getParent().releaseChild(this);
-
-                                               releaseFuture.complete(null);
-                                       }
-                               });
-               }
-
-               return releaseFuture;
-       }
-
-       @Override
-       public int getPhysicalSlotNumber() {
-               return getRootSlotNumber();
-       }
-
-       @Override
-       public AllocationID getAllocationId() {
-               return getSlotContext().getAllocationId();
-       }
-
-       @Override
-       public SlotRequestId getSlotRequestId() {
-               return NO_SLOT_REQUEST_ID;
-       }
-
-       @Nullable
-       @Override
-       public SlotSharingGroupId getSlotSharingGroupId() {
-               return NO_SLOT_SHARING_GROUP_ID;
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Utilities
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public String toString() {
-               return "SimpleSlot " + super.toString();
-       }
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
deleted file mode 100644
index 9c1b627..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
+++ /dev/null
@@ -1,381 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.instance;
-
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
-import org.apache.flink.runtime.jobmaster.SlotContext;
-import org.apache.flink.runtime.jobmaster.SlotOwner;
-import org.apache.flink.runtime.jobmaster.SlotRequestId;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.util.AbstractID;
-
-import javax.annotation.Nullable;
-
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Base class for slots that the Scheduler / ExecutionGraph take from the 
SlotPool and use to place
- * tasks to execute into. A slot corresponds to an AllocatedSlot (a slice of a 
TaskManager's resources),
- * plus additional fields to track what is currently executed in that slot, or 
if the slot is still
- * used or disposed (ExecutionGraph gave it back to the pool).
- *
- * <p>In the simplest case, a slot holds a single task ({@link SimpleSlot}). 
In the more complex
- * case, a slot is shared ({@link SharedSlot}) and contains a set of tasks. 
Shared slots may contain
- * other shared slots which in turn can hold simple slots. That way, a shared 
slot may define a tree
- * of slots that belong to it.
- */
-public abstract class Slot {
-
-       /** Updater for atomic state transitions */
-       private static final AtomicIntegerFieldUpdater<Slot> STATUS_UPDATER =
-                       AtomicIntegerFieldUpdater.newUpdater(Slot.class, 
"status");
-
-       /** State where slot is fresh and alive. Tasks may be added to the 
slot. */
-       private static final int ALLOCATED_AND_ALIVE = 0;
-
-       /** State where the slot has been canceled and is in the process of 
being released */
-       private static final int CANCELLED = 1;
-
-       /** State where all tasks in this slot have been canceled and the slot 
been given back to the instance */
-       private static final int RELEASED = 2;
-
-       // temporary placeholder for Slots that are not constructed from an 
AllocatedSlot (by legacy code)
-       protected static final AllocationID NO_ALLOCATION_ID = new 
AllocationID(0L, 0L);
-       protected static final SlotRequestId NO_SLOT_REQUEST_ID = new 
SlotRequestId(0L, 0L);
-       protected static final SlotSharingGroupId NO_SLOT_SHARING_GROUP_ID = 
new SlotSharingGroupId(0L, 0L);
-
-       // 
------------------------------------------------------------------------
-
-       /** Context of this logical slot. */
-       private final SlotContext slotContext;
-
-       /** The owner of this slot - the slot was taken from that owner and 
must be disposed to it */
-       private final SlotOwner owner;
-
-       /** The parent of this slot in the hierarchy, or null, if this is the 
parent */
-       @Nullable
-       private final SharedSlot parent;
-
-       /** The id of the group that this slot is allocated to. May be null. */
-       @Nullable
-       private final AbstractID groupID;
-
-       private final int slotNumber;
-
-       /** The state of the vertex, only atomically updated */
-       private volatile int status = ALLOCATED_AND_ALIVE;
-
-       // 
--------------------------------------------------------------------------------------------
-
-       /**
-        * Base constructor for slots.
-        * 
-        * <p>This is the old way of constructing slots by the legacy code
-        * 
-        * @param owner The component from which this slot is allocated.
-        * @param location The location info of the TaskManager where the slot 
was allocated from
-        * @param slotNumber The number of this slot.
-        * @param taskManagerGateway The actor gateway to communicate with the 
TaskManager
-        * @param parent The parent slot that contains this slot. May be null, 
if this slot is the root.
-        * @param groupID The ID that identifies the task group for which this 
slot is allocated. May be null
-        *                if the slot does not belong to any task group.   
-        */
-       protected Slot(
-                       SlotOwner owner,
-                       TaskManagerLocation location,
-                       int slotNumber,
-                       TaskManagerGateway taskManagerGateway,
-                       @Nullable SharedSlot parent,
-                       @Nullable AbstractID groupID) {
-
-               checkArgument(slotNumber >= 0);
-
-               // create a simple slot context
-               this.slotContext = new SimpleSlotContext(
-                       NO_ALLOCATION_ID,
-                       location,
-                       slotNumber,
-                       taskManagerGateway);
-
-               this.owner = checkNotNull(owner);
-               this.parent = parent; // may be null
-               this.groupID = groupID; // may be null
-               this.slotNumber = slotNumber;
-       }
-
-       /**
-        * Base constructor for slots.
-        *
-        * @param slotContext The slot context of this slot.
-        * @param owner The component from which this slot is allocated.
-        * @param slotNumber The number of this slot.
-        * @param parent The parent slot that contains this slot. May be null, 
if this slot is the root.
-        * @param groupID The ID that identifies the task group for which this 
slot is allocated. May be null
-        *                if the slot does not belong to any task group.   
-        */
-       protected Slot(
-                       SlotContext slotContext,
-                       SlotOwner owner,
-                       int slotNumber,
-                       @Nullable SharedSlot parent,
-                       @Nullable AbstractID groupID) {
-
-               this.slotContext = checkNotNull(slotContext);
-               this.owner = checkNotNull(owner);
-               this.parent = parent; // may be null
-               this.groupID = groupID; // may be null
-               this.slotNumber = slotNumber;
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-
-       /**
-        * Gets the allocated slot that this slot refers to.
-        * 
-        * @return This slot's allocated slot.
-        */
-       public SlotContext getSlotContext() {
-               return slotContext;
-       }
-
-       /**
-        * Gets the ID of the TaskManager that offers this slot.
-        *
-        * @return The ID of the TaskManager that offers this slot
-        */
-       public ResourceID getTaskManagerID() {
-               return slotContext.getTaskManagerLocation().getResourceID();
-       }
-
-       /**
-        * Gets the location info of the TaskManager that offers this slot.
-        *
-        * @return The location info of the TaskManager that offers this slot
-        */
-       public TaskManagerLocation getTaskManagerLocation() {
-               return slotContext.getTaskManagerLocation();
-       }
-
-       /**
-        * Gets the actor gateway that can be used to send messages to the 
TaskManager.
-        *
-        * <p>This method should be removed once the new interface-based RPC 
abstraction is in place
-        *
-        * @return The actor gateway that can be used to send messages to the 
TaskManager.
-        */
-       public TaskManagerGateway getTaskManagerGateway() {
-               return slotContext.getTaskManagerGateway();
-       }
-
-       /**
-        * Gets the owner of this slot. The owner is the component that the 
slot was created from
-        * and to which it needs to be returned after the executed tasks are 
done.
-        * 
-        * @return The owner of this slot.
-        */
-       public SlotOwner getOwner() {
-               return owner;
-       }
-
-       /**
-        * Gets the number of the slot. For a simple slot, that is the number 
of the slot
-        * on its instance. For a non-root slot, this returns the number of the 
slot in the
-        * list amongst its siblings in the tree.
-        *
-        * @return The number of the slot on the instance or amongst its 
siblings that share the same slot.
-        */
-       public int getSlotNumber() {
-               return slotNumber;
-       }
-
-       /**
-        * Gets the number of the root slot. This code behaves equal to {@code 
getRoot().getSlotNumber()}.
-        * If this slot is the root of the tree of shared slots, then this 
method returns the same
-        * value as {@link #getSlotNumber()}.
-        *
-        * @return The slot number of the root slot.
-        */
-       public int getRootSlotNumber() {
-               if (parent == null) {
-                       return slotNumber;
-               } else {
-                       return parent.getRootSlotNumber();
-               }
-       }
-
-       /**
-        * Gets the ID that identifies the logical group to which this slot 
belongs:
-        * <ul>
-        *     <li>If the slot does not belong to any group in particular, this 
field is null.</li>
-        *     <li>If this slot was allocated as a sub-slot of a
-        *         {@link 
org.apache.flink.runtime.instance.SlotSharingGroupAssignment}, 
-        *         then this ID will be the JobVertexID of the vertex whose 
task the slot
-        *         holds in its shared slot.</li>
-        *     <li>In case that the slot represents the shared slot of a 
co-location constraint, this ID will be the
-        *         ID of the co-location constraint.</li>
-        * </ul>
-        * 
-        * @return The ID identifying the logical group of slots.
-        */
-       @Nullable
-       public AbstractID getGroupID() {
-               return groupID;
-       }
-
-       /**
-        * Gets the parent slot of this slot. Returns null, if this slot has no 
parent.
-        * 
-        * @return The parent slot, or null, if no this slot has no parent.
-        */
-       @Nullable
-       public SharedSlot getParent() {
-               return parent;
-       }
-
-       /**
-        * Gets the root slot of the tree containing this slot. If this slot is 
the root,
-        * the method returns this slot directly, otherwise it recursively goes 
to the parent until
-        * it reaches the root.
-        * 
-        * @return The root slot of the tree containing this slot
-        */
-       public Slot getRoot() {
-               if (parent == null) {
-                       return this;
-               } else {
-                       return parent.getRoot();
-               }
-       }
-
-       /**
-        * Gets the number of simple slots that are at the leaves of the tree 
of slots.
-        *
-        * @return The number of simple slots at the leaves.
-        */
-       public abstract int getNumberLeaves();
-
-       // 
--------------------------------------------------------------------------------------------
-       //  Status and life cycle
-       // 
--------------------------------------------------------------------------------------------
-
-       /**
-        * Checks of the slot is still alive, i.e. in state {@link 
#ALLOCATED_AND_ALIVE}.
-        *
-        * @return True if the slot is alive, false otherwise.
-        */
-       public boolean isAlive() {
-               return status == ALLOCATED_AND_ALIVE;
-       }
-
-       /**
-        * Checks of the slot has been cancelled. Note that a released slot is 
also cancelled.
-        *
-        * @return True if the slot is cancelled or released, false otherwise.
-        */
-       public boolean isCanceled() {
-               return status != ALLOCATED_AND_ALIVE;
-       }
-
-       /**
-        * Checks of the slot has been released.
-        *
-        * @return True if the slot is released, false otherwise.
-        */
-       public boolean isReleased() {
-               return status == RELEASED;
-       }
-
-       /**
-        * Atomically marks the slot as cancelled, if it was alive before.
-        *
-        * @return True, if the state change was successful, false otherwise.
-        */
-       final boolean markCancelled() {
-               return STATUS_UPDATER.compareAndSet(this, ALLOCATED_AND_ALIVE, 
CANCELLED);
-       }
-
-       /**
-        * Atomically marks the slot as released, if it was cancelled before.
-        *
-        * @return True, if the state change was successful, false otherwise.
-        */
-       final boolean markReleased() {
-               return STATUS_UPDATER.compareAndSet(this, CANCELLED, RELEASED);
-       }
-
-       /**
-        * This method cancels and releases the slot and all its sub-slots.
-        * 
-        * After this method completed successfully, the slot will be in state 
"released", and the
-        * {@link #isReleased()} method will return {@code true}.
-        * 
-        * If this slot is a simple slot, it will be returned to its instance. 
If it is a shared slot,
-        * it will release all of its sub-slots and release itself.
-        */
-       public abstract CompletableFuture<?> releaseSlot(@Nullable Throwable 
cause);
-
-
-       // 
--------------------------------------------------------------------------------------------
-       //  Utilities
-       // 
--------------------------------------------------------------------------------------------
-
-       /**
-        * Slots must always has based on reference identity.
-        */
-       @Override
-       public final int hashCode() {
-               return super.hashCode();
-       }
-
-       /**
-        * Slots must always compare on referential equality.
-        */
-       @Override
-       public final boolean equals(Object obj) {
-               return this == obj;
-       }
-
-       @Override
-       public String toString() {
-               return hierarchy() + " - " + getTaskManagerLocation() + " - " + 
getStateName(status);
-       }
-
-       protected String hierarchy() {
-               return (getParent() != null ? getParent().hierarchy() : "") + 
'(' + getSlotNumber() + ')';
-       }
-
-       private static String getStateName(int state) {
-               switch (state) {
-                       case ALLOCATED_AND_ALIVE:
-                               return "ALLOCATED/ALIVE";
-                       case CANCELLED:
-                               return "CANCELLED";
-                       case RELEASED:
-                               return "RELEASED";
-                       default:
-                               return "(unknown)";
-               }
-       }
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
deleted file mode 100644
index d16c332..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
+++ /dev/null
@@ -1,664 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.instance;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
-import org.apache.flink.runtime.jobmanager.scheduler.Locality;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.util.AbstractID;
-import org.apache.flink.util.FlinkException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-
-/**
- * The SlotSharingGroupAssignment manages a set of shared slots, which are 
shared between
- * tasks of a {@link 
org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup}.
- * 
- * <p>The assignments shares tasks by allowing a shared slot to hold one 
vertex per
- * JobVertexID. For example, consider a program consisting of job vertices 
"source", "map",
- * "reduce", and "sink". If the slot sharing group spans all four job 
vertices, then
- * each shared slot can hold one parallel subtask of the source, the map, the 
reduce, and the
- * sink vertex. Each shared slot holds the actual subtasks in child slots, 
which are (at the leaf level),
- * the {@link SimpleSlot}s.</p>
- * 
- * <p>An exception are the co-location-constraints, that define that the i-th 
subtask of one
- * vertex needs to be scheduled strictly together with the i-th subtasks of 
the vertices
- * that share the co-location-constraint. To manage that, a 
co-location-constraint gets its
- * own shared slot inside the shared slots of a sharing group.</p>
- * 
- * <p>Consider a job set up like this:</p>
- * 
- * <pre>{@code
- * +-------------- Slot Sharing Group --------------+
- * |                                                |
- * |            +-- Co Location Group --+           |
- * |            |                       |           |
- * |  (source) ---> (head) ---> (tail) ---> (sink)  |
- * |            |                       |           |
- * |            +-----------------------+           |
- * +------------------------------------------------+
- * }</pre>
- * 
- * <p>The slot hierarchy in the slot sharing group will look like the 
following</p> 
- * 
- * <pre>
- *     Shared(0)(root)
- *        |
- *        +-- Simple(2)(sink)
- *        |
- *        +-- Shared(1)(co-location-group)
- *        |      |
- *        |      +-- Simple(0)(tail)
- *        |      +-- Simple(1)(head)
- *        |
- *        +-- Simple(0)(source)
- * </pre>
- */
-public class SlotSharingGroupAssignment {
-
-       private final static Logger LOG = 
LoggerFactory.getLogger(SlotSharingGroupAssignment.class);
-
-       /** The lock globally guards against concurrent modifications in the 
data structures */
-       private final Object lock = new Object();
-       
-       /** All slots currently allocated to this sharing group */
-       private final Set<SharedSlot> allSlots = new 
LinkedHashSet<SharedSlot>();
-
-       /** The slots available per vertex type (JobVertexId), keyed by 
TaskManager, to make them locatable */
-       private final Map<AbstractID, Map<ResourceID, List<SharedSlot>>> 
availableSlotsPerJid = new LinkedHashMap<>();
-
-
-       // 
--------------------------------------------------------------------------------------------
-       //  Accounting
-       // 
--------------------------------------------------------------------------------------------
-
-       /**
-        * Gets the number of slots that are currently governed by this 
assignment group.
-        * This refers to the slots allocated from an Instance,
-        * and not the sub-slots given out as children of those shared slots.
-        * 
-        * @return The number of resource slots managed by this assignment 
group.
-        */
-       public int getNumberOfSlots() {
-               return allSlots.size();
-       }
-
-       /**
-        * Gets the number of shared slots into which the given group can place 
subtasks or 
-        * nested task groups.
-        * 
-        * @param groupId The ID of the group.
-        * @return The number of shared slots available to the given job vertex.
-        */
-       public int getNumberOfAvailableSlotsForGroup(AbstractID groupId) {
-               synchronized (lock) {
-                       Map<ResourceID, List<SharedSlot>> available = 
availableSlotsPerJid.get(groupId);
-
-                       if (available != null) {
-                               Set<SharedSlot> set = new HashSet<SharedSlot>();
-
-                               for (List<SharedSlot> list : 
available.values()) {
-                                       for (SharedSlot slot : list) {
-                                               set.add(slot);
-                                       }
-                               }
-
-                               return set.size();
-                       }
-                       else {
-                               // if no entry exists for a JobVertexID so far, 
then the vertex with that ID can
-                               // add a subtask into each shared slot of this 
group. Consequently, all
-                               // of them are available for that JobVertexID.
-                               return allSlots.size();
-                       }
-               }
-       }
-       
-       // 
------------------------------------------------------------------------
-       //  Slot allocation
-       // 
------------------------------------------------------------------------
-
-       public SimpleSlot addSharedSlotAndAllocateSubSlot(SharedSlot 
sharedSlot, Locality locality, JobVertexID groupId) {
-               return addSharedSlotAndAllocateSubSlot(sharedSlot, locality, 
groupId, null);
-       }
-
-       public SimpleSlot addSharedSlotAndAllocateSubSlot(
-                       SharedSlot sharedSlot, Locality locality, 
CoLocationConstraint constraint)
-       {
-               return addSharedSlotAndAllocateSubSlot(sharedSlot, locality, 
null, constraint);
-       }
-
-       private SimpleSlot addSharedSlotAndAllocateSubSlot(
-                       SharedSlot sharedSlot, Locality locality, JobVertexID 
groupId, CoLocationConstraint constraint) {
-
-               // sanity checks
-               if (!sharedSlot.isRootAndEmpty()) {
-                       throw new IllegalArgumentException("The given slot is 
not an empty root slot.");
-               }
-
-               final ResourceID location = sharedSlot.getTaskManagerID();
-
-               synchronized (lock) {
-                       // early out in case that the slot died (instance 
disappeared)
-                       if (!sharedSlot.isAlive()) {
-                               return null;
-                       }
-                       
-                       // add to the total bookkeeping
-                       if (!allSlots.add(sharedSlot)) {
-                               throw new IllegalArgumentException("Slot was 
already contained in the assignment group");
-                       }
-                       
-                       SimpleSlot subSlot;
-                       AbstractID groupIdForMap;
-                                       
-                       if (constraint == null) {
-                               // allocate us a sub slot to return
-                               subSlot = sharedSlot.allocateSubSlot(groupId);
-                               groupIdForMap = groupId;
-                       }
-                       else {
-                               // sanity check
-                               if (constraint.isAssignedAndAlive()) {
-                                       throw new IllegalStateException(
-                                                       "Trying to add a shared 
slot to a co-location constraint that has a life slot.");
-                               }
-                               
-                               // we need a co-location slot --> a SimpleSlot 
nested in a SharedSlot to
-                               //                                host other 
co-located tasks
-                               SharedSlot constraintGroupSlot = 
sharedSlot.allocateSharedSlot(constraint.getGroupId());
-                               groupIdForMap = constraint.getGroupId();
-                               
-                               if (constraintGroupSlot != null) {
-                                       // the sub-slots in the co-location 
constraint slot have no own group IDs
-                                       subSlot = 
constraintGroupSlot.allocateSubSlot(null);
-                                       if (subSlot != null) {
-                                               // all went well, we can give 
the constraint its slot
-                                               
constraint.setSharedSlot(constraintGroupSlot);
-                                               
-                                               // NOTE: Do not lock the 
location constraint, because we don't yet know whether we will
-                                               // take the slot here
-                                       }
-                                       else {
-                                               // if we could not create a sub 
slot, release the co-location slot
-                                               // note that this does 
implicitly release the slot we have just added
-                                               // as well, because we release 
its last child slot. That is expected
-                                               // and desired.
-                                               
constraintGroupSlot.releaseSlot(new FlinkException("Could not create a sub slot 
in this shared slot."));
-                                       }
-                               }
-                               else {
-                                       // this should not happen, as we are 
under the lock that also
-                                       // guards slot disposals. Keep the 
check to be on the safe side
-                                       subSlot = null;
-                               }
-                       }
-                       
-                       if (subSlot != null) {
-                               // preserve the locality information
-                               subSlot.setLocality(locality);
-                               
-                               // let the other groups know that this slot 
exists and that they
-                               // can place a task into this slot.
-                               boolean entryForNewJidExists = false;
-                               
-                               for (Map.Entry<AbstractID, Map<ResourceID, 
List<SharedSlot>>> entry : availableSlotsPerJid.entrySet()) {
-                                       // there is already an entry for this 
groupID
-                                       if 
(entry.getKey().equals(groupIdForMap)) {
-                                               entryForNewJidExists = true;
-                                               continue;
-                                       }
-
-                                       Map<ResourceID, List<SharedSlot>> 
available = entry.getValue();
-                                       putIntoMultiMap(available, location, 
sharedSlot);
-                               }
-
-                               // make sure an empty entry exists for this 
group, if no other entry exists
-                               if (!entryForNewJidExists) {
-                                       availableSlotsPerJid.put(groupIdForMap, 
new LinkedHashMap<ResourceID, List<SharedSlot>>());
-                               }
-
-                               return subSlot;
-                       }
-                       else {
-                               // if sharedSlot is releases, abort.
-                               // This should be a rare case, since this 
method is called with a fresh slot.
-                               return null;
-                       }
-               }
-               // end synchronized (lock)
-       }
-
-       /**
-        * Gets a slot suitable for the given task vertex. This method will 
prefer slots that are local
-        * (with respect to {@link 
ExecutionVertex#getPreferredLocationsBasedOnInputs()}), but will return non 
local
-        * slots if no local slot is available. The method returns null, when 
this sharing group has
-        * no slot available for the given JobVertexID.
-        *
-        * @param vertexID the vertex id
-        * @param locationPreferences location preferences
-        *
-        * @return A slot to execute the given ExecutionVertex in, or null, if 
none is available.
-        */
-       public SimpleSlot getSlotForTask(JobVertexID vertexID, 
Iterable<TaskManagerLocation> locationPreferences) {
-               synchronized (lock) {
-                       Tuple2<SharedSlot, Locality> p = 
getSharedSlotForTask(vertexID, locationPreferences, false);
-
-                       if (p != null) {
-                               SharedSlot ss = p.f0;
-                               SimpleSlot slot = ss.allocateSubSlot(vertexID);
-                               slot.setLocality(p.f1);
-                               return slot;
-                       }
-                       else {
-                               return null;
-                       }
-               }
-       }
-
-       /**
-        * Gets a slot for a task that has a co-location constraint. This 
method tries to grab
-        * a slot form the location-constraint's shared slot. If that slot has 
not been initialized,
-        * then the method tries to grab another slot that is available for the 
location-constraint-group.
-        * 
-        * <p>In cases where the co-location constraint has not yet been 
initialized with a slot,
-        * or where that slot has been disposed in the meantime, this method 
tries to allocate a shared
-        * slot for the co-location constraint (inside on of the other 
available slots).</p>
-        * 
-        * <p>If a suitable shared slot is available, this method allocates a 
simple slot within that
-        * shared slot and returns it. If no suitable shared slot could be 
found, this method
-        * returns null.</p>
-        * 
-        * @param constraint The co-location constraint for the placement of 
the execution vertex.
-        * @param locationPreferences location preferences
-        * 
-        * @return A simple slot allocate within a suitable shared slot, or 
{@code null}, if no suitable
-        *         shared slot is available.
-        */
-       public SimpleSlot getSlotForTask(CoLocationConstraint constraint, 
Iterable<TaskManagerLocation> locationPreferences) {
-               synchronized (lock) {
-                       if (constraint.isAssignedAndAlive()) {
-                               // the shared slot of the co-location group is 
initialized and set we allocate a sub-slot
-                               final SharedSlot shared = 
constraint.getSharedSlot();
-                               SimpleSlot subslot = 
shared.allocateSubSlot(null);
-                               subslot.setLocality(Locality.LOCAL);
-                               return subslot;
-                       }
-                       else if (constraint.isAssigned()) {
-                               // we had an assignment before.
-                               
-                               SharedSlot previous = 
constraint.getSharedSlot();
-                               if (previous == null) {
-                                       throw new IllegalStateException("Bug: 
Found assigned co-location constraint without a slot.");
-                               }
-
-                               TaskManagerLocation location = 
previous.getTaskManagerLocation();
-                               Tuple2<SharedSlot, Locality> p = 
getSharedSlotForTask(
-                                               constraint.getGroupId(), 
Collections.singleton(location), true);
-
-                               if (p == null) {
-                                       return null;
-                               }
-                               else {
-                                       SharedSlot newSharedSlot = p.f0;
-
-                                       // allocate the co-location group slot 
inside the shared slot
-                                       SharedSlot constraintGroupSlot = 
newSharedSlot.allocateSharedSlot(constraint.getGroupId());
-                                       if (constraintGroupSlot != null) {
-                                               
constraint.setSharedSlot(constraintGroupSlot);
-
-                                               // the sub slots in the co 
location constraint slot have no group that they belong to
-                                               // (other than the 
co-location-constraint slot)
-                                               SimpleSlot subSlot = 
constraintGroupSlot.allocateSubSlot(null);
-                                               
subSlot.setLocality(Locality.LOCAL);
-                                               return subSlot;
-                                       }
-                                       else {
-                                               // could not allocate the 
co-location-constraint shared slot
-                                               return null;
-                                       }
-                               }
-                       }
-                       else {
-                               // the location constraint has not been 
associated with a shared slot, yet.
-                               // grab a new slot and initialize the 
constraint with that one.
-                               // preferred locations are defined by the vertex
-                               Tuple2<SharedSlot, Locality> p =
-                                               
getSharedSlotForTask(constraint.getGroupId(), locationPreferences, false);
-                               if (p == null) {
-                                       // could not get a shared slot for this 
co-location-group
-                                       return null;
-                               }
-                               else {
-                                       final SharedSlot availableShared = p.f0;
-                                       final Locality l = p.f1;
-
-                                       // allocate the co-location group slot 
inside the shared slot
-                                       SharedSlot constraintGroupSlot = 
availableShared.allocateSharedSlot(constraint.getGroupId());
-                                       
-                                       // IMPORTANT: We do not lock the 
location, yet, since we cannot be sure that the
-                                       //            caller really sticks with 
the slot we picked!
-                                       
constraint.setSharedSlot(constraintGroupSlot);
-                                       
-                                       // the sub slots in the co location 
constraint slot have no group that they belong to
-                                       // (other than the 
co-location-constraint slot)
-                                       SimpleSlot sub = 
constraintGroupSlot.allocateSubSlot(null);
-                                       sub.setLocality(l);
-                                       return sub;
-                               }
-                       }
-               }
-       }
-
-
-       public Tuple2<SharedSlot, Locality> getSharedSlotForTask(
-                       AbstractID groupId,
-                       Iterable<TaskManagerLocation> preferredLocations,
-                       boolean localOnly) {
-               // check if there is anything at all in this group assignment
-               if (allSlots.isEmpty()) {
-                       return null;
-               }
-
-               // get the available slots for the group
-               Map<ResourceID, List<SharedSlot>> slotsForGroup = 
availableSlotsPerJid.get(groupId);
-               
-               if (slotsForGroup == null) {
-                       // we have a new group, so all slots are available
-                       slotsForGroup = new LinkedHashMap<>();
-                       availableSlotsPerJid.put(groupId, slotsForGroup);
-
-                       for (SharedSlot availableSlot : allSlots) {
-                               putIntoMultiMap(slotsForGroup, 
availableSlot.getTaskManagerID(), availableSlot);
-                       }
-               }
-               else if (slotsForGroup.isEmpty()) {
-                       // the group exists, but nothing is available for that 
group
-                       return null;
-               }
-
-               // check whether we can schedule the task to a preferred 
location
-               boolean didNotGetPreferred = false;
-
-               if (preferredLocations != null) {
-                       for (TaskManagerLocation location : preferredLocations) 
{
-
-                               // set the flag that we failed a preferred 
location. If one will be found,
-                               // we return early anyways and skip the flag 
evaluation
-                               didNotGetPreferred = true;
-
-                               SharedSlot slot = 
removeFromMultiMap(slotsForGroup, location.getResourceID());
-                               if (slot != null && slot.isAlive()) {
-                                       return new Tuple2<>(slot, 
Locality.LOCAL);
-                               }
-                       }
-               }
-
-               // if we want only local assignments, exit now with a "not 
found" result
-               if (didNotGetPreferred && localOnly) {
-                       return null;
-               }
-
-               Locality locality = didNotGetPreferred ? Locality.NON_LOCAL : 
Locality.UNCONSTRAINED;
-
-               // schedule the task to any available location
-               SharedSlot slot;
-               while ((slot = pollFromMultiMap(slotsForGroup)) != null) {
-                       if (slot.isAlive()) {
-                               return new Tuple2<>(slot, locality);
-                       }
-               }
-               
-               // nothing available after all, all slots were dead
-               return null;
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Slot releasing
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Releases the simple slot from the assignment group.
-        * 
-        * @param simpleSlot The SimpleSlot to be released
-        */
-       void releaseSimpleSlot(SimpleSlot simpleSlot) {
-               synchronized (lock) {
-                       // try to transition to the CANCELED state. That state 
marks
-                       // that the releasing is in progress
-                       if (simpleSlot.markCancelled()) {
-
-                               // sanity checks
-                               if (simpleSlot.isAlive()) {
-                                       throw new IllegalStateException("slot 
is still alive");
-                               }
-
-                               // check whether the slot is already released
-                               if (simpleSlot.markReleased()) {
-                                       LOG.debug("Release simple slot {}.", 
simpleSlot);
-
-                                       AbstractID groupID = 
simpleSlot.getGroupID();
-                                       SharedSlot parent = 
simpleSlot.getParent();
-
-                                       // if we have a group ID, then our 
parent slot is tracked here
-                                       if (groupID != null && 
!allSlots.contains(parent)) {
-                                               throw new 
IllegalArgumentException("Slot was not associated with this SlotSharingGroup 
before.");
-                                       }
-
-                                       int parentRemaining = 
parent.removeDisposedChildSlot(simpleSlot);
-
-                                       if (parentRemaining > 0) {
-                                               // the parent shared slot is 
still alive. make sure we make it
-                                               // available again to the group 
of the just released slot
-
-                                               if (groupID != null) {
-                                                       // if we have a group 
ID, then our parent becomes available
-                                                       // for that group 
again. otherwise, the slot is part of a
-                                                       // co-location group 
and nothing becomes immediately available
-
-                                                       Map<ResourceID, 
List<SharedSlot>> slotsForJid = availableSlotsPerJid.get(groupID);
-
-                                                       // sanity check
-                                                       if (slotsForJid == 
null) {
-                                                               throw new 
IllegalStateException("Trying to return a slot for group " + groupID +
-                                                                               
" when available slots indicated that all slots were available.");
-                                                       }
-
-                                                       
putIntoMultiMap(slotsForJid, parent.getTaskManagerID(), parent);
-                                               }
-                                       } else {
-                                               // the parent shared slot is 
now empty and can be released
-                                               parent.markCancelled();
-                                               
internalDisposeEmptySharedSlot(parent);
-                                       }
-                               }
-                       }
-               }
-       }
-
-       /**
-        * Called from {@link 
org.apache.flink.runtime.instance.SharedSlot#releaseSlot(Throwable)}.
-        * 
-        * @param sharedSlot The slot to be released.
-        */
-       void releaseSharedSlot(SharedSlot sharedSlot) {
-               synchronized (lock) {
-                       if (sharedSlot.markCancelled()) {
-                               // we are releasing this slot
-                               
-                               if (sharedSlot.hasChildren()) {
-                                       final FlinkException cause = new 
FlinkException("Releasing shared slot parent.");
-                                       // by simply releasing all children, we 
should eventually release this slot.
-                                       Set<Slot> children = 
sharedSlot.getSubSlots();
-                                       while (children.size() > 0) {
-                                               
children.iterator().next().releaseSlot(cause);
-                                       }
-                               }
-                               else {
-                                       // if there are no children that 
trigger the release, we trigger it directly
-                                       
internalDisposeEmptySharedSlot(sharedSlot);
-                               }
-                       }
-               }
-       }
-
-       /**
-        * 
-        * <p><b>NOTE: This method must be called from within a scope that 
holds the lock.</b></p>
-        */
-       private void internalDisposeEmptySharedSlot(SharedSlot sharedSlot) {
-               // sanity check
-               if (sharedSlot.isAlive() || 
!sharedSlot.getSubSlots().isEmpty()) {
-                       throw new IllegalArgumentException();
-               }
-               
-               final SharedSlot parent = sharedSlot.getParent();
-               final AbstractID groupID = sharedSlot.getGroupID();
-               
-               // 1) If we do not have a parent, we are a root slot.
-               // 2) If we are not a root slot, we are a slot with a groupID 
and our parent
-               //    becomes available for that group
-               
-               if (parent == null) {
-                       // root slot, return to the instance.
-                       sharedSlot.getOwner().returnLogicalSlot(sharedSlot);
-
-                       // also, make sure we remove this slot from everywhere
-                       allSlots.remove(sharedSlot);
-                       removeSlotFromAllEntries(availableSlotsPerJid, 
sharedSlot);
-               }
-               else if (groupID != null) {
-                       // we remove ourselves from our parent slot
-
-                       if (sharedSlot.markReleased()) {
-                               LOG.debug("Internally dispose empty shared slot 
{}.", sharedSlot);
-
-                               int parentRemaining = 
parent.removeDisposedChildSlot(sharedSlot);
-                               
-                               if (parentRemaining > 0) {
-                                       // the parent becomes available for the 
group again
-                                       Map<ResourceID, List<SharedSlot>> 
slotsForGroup = availableSlotsPerJid.get(groupID);
-
-                                       // sanity check
-                                       if (slotsForGroup == null) {
-                                               throw new 
IllegalStateException("Trying to return a slot for group " + groupID +
-                                                               " when 
available slots indicated that all slots were available.");
-                                       }
-
-                                       putIntoMultiMap(slotsForGroup, 
parent.getTaskManagerID(), parent);
-                                       
-                               }
-                               else {
-                                       // this was the last child of the 
parent. release the parent.
-                                       parent.markCancelled();
-                                       internalDisposeEmptySharedSlot(parent);
-                               }
-                       }
-               }
-               else {
-                       throw new IllegalStateException(
-                                       "Found a shared slot that is neither a 
root slot, nor associated with a vertex group.");
-               }
-       }
-       
-       // 
------------------------------------------------------------------------
-       //  Utilities
-       // 
------------------------------------------------------------------------
-
-       private static void putIntoMultiMap(Map<ResourceID, List<SharedSlot>> 
map, ResourceID location, SharedSlot slot) {
-               List<SharedSlot> slotsForInstance = map.get(location);
-               if (slotsForInstance == null) {
-                       slotsForInstance = new ArrayList<SharedSlot>();
-                       map.put(location, slotsForInstance);
-               }
-               slotsForInstance.add(slot);
-       }
-       
-       private static SharedSlot removeFromMultiMap(Map<ResourceID, 
List<SharedSlot>> map, ResourceID location) {
-               List<SharedSlot> slotsForLocation = map.get(location);
-               
-               if (slotsForLocation == null) {
-                       return null;
-               }
-               else {
-                       SharedSlot slot = 
slotsForLocation.remove(slotsForLocation.size() - 1);
-                       if (slotsForLocation.isEmpty()) {
-                               map.remove(location);
-                       }
-                       
-                       return slot;
-               }
-       }
-       
-       private static SharedSlot pollFromMultiMap(Map<ResourceID, 
List<SharedSlot>> map) {
-               Iterator<Map.Entry<ResourceID, List<SharedSlot>>> iter = 
map.entrySet().iterator();
-               
-               while (iter.hasNext()) {
-                       List<SharedSlot> slots = iter.next().getValue();
-                       
-                       if (slots.isEmpty()) {
-                               iter.remove();
-                       }
-                       else if (slots.size() == 1) {
-                               SharedSlot slot = slots.remove(0);
-                               iter.remove();
-                               return slot;
-                       }
-                       else {
-                               return slots.remove(slots.size() - 1);
-                       }
-               }
-               
-               return null;
-       }
-       
-       private static void removeSlotFromAllEntries(
-                       Map<AbstractID, Map<ResourceID, List<SharedSlot>>> 
availableSlots, SharedSlot slot)
-       {
-               final ResourceID taskManagerId = slot.getTaskManagerID();
-               
-               for (Map.Entry<AbstractID, Map<ResourceID, List<SharedSlot>>> 
entry : availableSlots.entrySet()) {
-                       Map<ResourceID, List<SharedSlot>> map = 
entry.getValue();
-
-                       List<SharedSlot> list = map.get(taskManagerId);
-                       if (list != null) {
-                               list.remove(slot);
-                               if (list.isEmpty()) {
-                                       map.remove(taskManagerId);
-                               }
-                       }
-               }
-       }
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
index 0e9f585..df8e9f0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
@@ -18,18 +18,14 @@
 
 package org.apache.flink.runtime.jobmanager.scheduler;
 
-import org.apache.flink.runtime.instance.SharedSlot;
 import org.apache.flink.runtime.jobmaster.SlotRequestId;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.AbstractID;
-import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nullable;
 
-import java.util.Objects;
-
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
@@ -45,8 +41,6 @@ public class CoLocationConstraint {
 
        private final CoLocationGroup group;
 
-       private volatile SharedSlot sharedSlot;
-
        private volatile TaskManagerLocation lockedLocation;
 
        private volatile SlotRequestId slotRequestId;
@@ -62,15 +56,6 @@ public class CoLocationConstraint {
        // 
------------------------------------------------------------------------
 
        /**
-        * Gets the shared slot into which this constraint's tasks are places.
-        * 
-        * @return The shared slot into which this constraint's tasks are 
places.
-        */
-       public SharedSlot getSharedSlot() {
-               return sharedSlot;
-       }
-
-       /**
         * Gets the ID that identifies the co-location group.
         * 
         * @return The ID that identifies the co-location group.
@@ -81,9 +66,8 @@ public class CoLocationConstraint {
 
        /**
         * Checks whether the location of this constraint has been assigned.
-        * The location is assigned once a slot has been set, via the
-        * {@link #setSharedSlot(org.apache.flink.runtime.instance.SharedSlot)} 
method,
-        * and the location is locked via the {@link #lockLocation()} method.
+        * The location is locked via the {@link 
#lockLocation(TaskManagerLocation)}
+        * method.
         * 
         * @return True if the location has been assigned, false otherwise.
         */
@@ -92,23 +76,9 @@ public class CoLocationConstraint {
        }
 
        /**
-        * Checks whether the location of this constraint has been assigned
-        * (as defined in the {@link #isAssigned()} method, and the current
-        * shared slot is alive.
-        *
-        * @return True if the location has been assigned and the shared slot 
is alive,
-        *         false otherwise.
-        * @deprecated Should only be called by legacy code
-        */
-       @Deprecated
-       public boolean isAssignedAndAlive() {
-               return lockedLocation != null && sharedSlot != null && 
sharedSlot.isAlive();
-       }
-
-       /**
         * Gets the location assigned to this slot. This method only succeeds 
after
-        * the location has been locked via the {@link #lockLocation()} method 
and
-        * {@link #isAssigned()} returns true.
+        * the location has been locked via the {@link 
#lockLocation(TaskManagerLocation)}
+        * method and {@link #isAssigned()} returns true.
         *
         * @return The instance describing the location for the tasks of this 
constraint.
         * @throws IllegalStateException Thrown if the location has not been 
assigned, yet.
@@ -126,51 +96,6 @@ public class CoLocationConstraint {
        // 
------------------------------------------------------------------------
 
        /**
-        * Assigns a new shared slot to this co-location constraint. The shared 
slot
-        * will hold the subtasks that are executed under this co-location 
constraint.
-        * If the constraint's location is assigned, then this slot needs to be 
from
-        * the same location (instance) as the one assigned to this constraint.
-        * 
-        * <p>If the constraint already has a slot, the current one will be 
released.</p>
-        *
-        * @param newSlot The new shared slot to assign to this constraint.
-        * @throws IllegalArgumentException If the constraint's location has 
been assigned and
-        *                                  the new slot is from a different 
location.
-        */
-       public void setSharedSlot(SharedSlot newSlot) {
-               checkNotNull(newSlot);
-
-               if (this.sharedSlot == null) {
-                       this.sharedSlot = newSlot;
-               }
-               else if (newSlot != this.sharedSlot){
-                       if (lockedLocation != null && 
!Objects.equals(lockedLocation, newSlot.getTaskManagerLocation())) {
-                               throw new IllegalArgumentException(
-                                               "Cannot assign different 
location to a constraint whose location is locked.");
-                       }
-                       if (this.sharedSlot.isAlive()) {
-                               this.sharedSlot.releaseSlot(new 
FlinkException("Setting new shared slot for co-location constraint."));
-                       }
-
-                       this.sharedSlot = newSlot;
-               }
-       }
-
-       /**
-        * Locks the location of this slot. The location can be locked only once
-        * and only after a shared slot has been assigned.
-        * 
-        * @throws IllegalStateException Thrown, if the location is already 
locked,
-        *                               or is no slot has been set, yet.
-        */
-       public void lockLocation() throws IllegalStateException {
-               checkState(lockedLocation == null, "Location is already 
locked");
-               checkState(sharedSlot != null, "Cannot lock location without a 
slot.");
-
-               lockedLocation = sharedSlot.getTaskManagerLocation();
-       }
-
-       /**
         * Locks the location of this slot. The location can be locked only once
         * and only after a shared slot has been assigned.
         *
@@ -209,9 +134,13 @@ public class CoLocationConstraint {
        // 
------------------------------------------------------------------------
        //  Utilities
        // 
------------------------------------------------------------------------
-       
+
        @Override
        public String toString() {
-               return "CoLocation constraint id " + getGroupId() + " shared 
slot " + sharedSlot;
+               return "CoLocationConstraint{" +
+                       "group=" + group +
+                       ", lockedLocation=" + lockedLocation +
+                       ", slotRequestId=" + slotRequestId +
+                       '}';
        }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationGroup.java
index ef8bd67..c8bc9b8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationGroup.java
@@ -119,12 +119,6 @@ public class CoLocationGroup implements 
java.io.Serializable {
         * executed any more.</p>
         */
        public void resetConstraints() {
-               for (CoLocationConstraint c : this.constraints) {
-                       if (c.isAssignedAndAlive()) {
-                               throw new IllegalStateException(
-                                               "Cannot reset co-location 
group: some constraints still have live tasks");
-                       }
-               }
                this.constraints.clear();
        }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java
index 86be9d4..c93456e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java
@@ -22,7 +22,6 @@ import java.util.Collections;
 import java.util.Set;
 import java.util.TreeSet;
 
-import org.apache.flink.runtime.instance.SlotSharingGroupAssignment;
 import org.apache.flink.runtime.instance.SlotSharingGroupId;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
@@ -34,13 +33,8 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 public class SlotSharingGroup implements java.io.Serializable {
        
        private static final long serialVersionUID = 1L;
-       
 
        private final Set<JobVertexID> ids = new TreeSet<JobVertexID>();
-       
-       /** Mapping of tasks to subslots. This field is only needed inside the 
JobManager, and is not RPCed. */
-       private transient SlotSharingGroupAssignment taskAssignment;
-
        private final SlotSharingGroupId slotSharingGroupId = new 
SlotSharingGroupId();
        
        public SlotSharingGroup() {}
@@ -68,23 +62,6 @@ public class SlotSharingGroup implements 
java.io.Serializable {
        public SlotSharingGroupId getSlotSharingGroupId() {
                return slotSharingGroupId;
        }
-
-       public SlotSharingGroupAssignment getTaskAssignment() {
-               if (this.taskAssignment == null) {
-                       this.taskAssignment = new SlotSharingGroupAssignment();
-               }
-               
-               return this.taskAssignment;
-       }
-       
-       public void clearTaskAssignment() {
-               if (this.taskAssignment != null) {
-                       if (this.taskAssignment.getNumberOfSlots() > 0) {
-                               throw new 
IllegalStateException("SlotSharingGroup cannot clear task assignment, group 
still has allocated resources.");
-                       }
-               }
-               this.taskAssignment = null;
-       }
        
        // 
------------------------------------------------------------------------
        //  Utilities
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index 13f74f4..ebef29e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -42,7 +42,6 @@ import 
org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
-import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.io.network.partition.NoOpPartitionTracker;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
@@ -56,7 +55,6 @@ import 
org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway;
-import org.apache.flink.runtime.jobmaster.SlotOwner;
 import org.apache.flink.runtime.jobmaster.SlotRequestId;
 import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
@@ -105,7 +103,6 @@ import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
 
 /**
  * Tests for {@link ExecutionGraph} deployment.
@@ -630,13 +627,11 @@ public class ExecutionGraphDeploymentTest extends 
TestLogger {
 
                final TaskManagerLocation localTaskManagerLocation = new 
LocalTaskManagerLocation();
 
-               final SimpleSlot sourceSlot1 = 
createSlot(localTaskManagerLocation, 0);
+               final LogicalSlot sourceSlot1 = 
createSlot(localTaskManagerLocation, 0);
+               final LogicalSlot sourceSlot2 = 
createSlot(localTaskManagerLocation, 1);
 
-               final SimpleSlot sourceSlot2 = 
createSlot(localTaskManagerLocation, 1);
-
-               final SimpleSlot sinkSlot1 = 
createSlot(localTaskManagerLocation, 0);
-
-               final SimpleSlot sinkSlot2 = 
createSlot(localTaskManagerLocation, 1);
+               final LogicalSlot sinkSlot1 = 
createSlot(localTaskManagerLocation, 0);
+               final LogicalSlot sinkSlot2 = 
createSlot(localTaskManagerLocation, 1);
 
                slotFutures.get(sourceVertexId)[0].complete(sourceSlot1);
                slotFutures.get(sourceVertexId)[1].complete(sourceSlot2);
@@ -751,12 +746,11 @@ public class ExecutionGraphDeploymentTest extends 
TestLogger {
                }
        }
 
-       private SimpleSlot createSlot(TaskManagerLocation taskManagerLocation, 
int index) {
-               return new SimpleSlot(
-                       mock(SlotOwner.class),
-                       taskManagerLocation,
-                       index,
-                       new SimpleAckingTaskManagerGateway());
+       private LogicalSlot createSlot(TaskManagerLocation taskManagerLocation, 
int index) {
+               return new TestingLogicalSlotBuilder()
+                       .setTaskManagerLocation(taskManagerLocation)
+                       .setSlotNumber(index)
+                       .createTestingLogicalSlot();
        }
 
        private ExecutionGraph createExecutionGraph(Configuration 
configuration) throws Exception {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
index 3850a70..802dff0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
@@ -30,7 +30,6 @@ import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAda
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
-import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.instance.SimpleSlotContext;
 import org.apache.flink.runtime.io.network.partition.NoOpPartitionTracker;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
@@ -76,8 +75,8 @@ import java.util.concurrent.TimeoutException;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
 
 /**
  * Tests for the scheduling of the execution graph. This tests that
@@ -139,8 +138,8 @@ public class ExecutionGraphSchedulingTest extends 
TestLogger {
                final InteractionsCountingTaskManagerGateway gatewaySource = 
createTaskManager();
                final InteractionsCountingTaskManagerGateway gatewayTarget = 
createTaskManager();
 
-               final SimpleSlot sourceSlot = createSlot(gatewaySource, jobId);
-               final SimpleSlot targetSlot = createSlot(gatewayTarget, jobId);
+               final LogicalSlot sourceSlot = 
createTestingLogicalSlot(gatewaySource);
+               final LogicalSlot targetSlot = 
createTestingLogicalSlot(gatewayTarget);
 
                eg.scheduleForExecution();
 
@@ -164,6 +163,12 @@ public class ExecutionGraphSchedulingTest extends 
TestLogger {
                assertEquals(JobStatus.RUNNING, eg.getState());
        }
 
+       private TestingLogicalSlot 
createTestingLogicalSlot(InteractionsCountingTaskManagerGateway gatewaySource) {
+               return new TestingLogicalSlotBuilder()
+                       .setTaskManagerGateway(gatewaySource)
+                       .createTestingLogicalSlot();
+       }
+
        /**
         * This test verifies that before deploying a pipelined connected 
component, the
         * full set of slots is available, and that not some tasks are 
deployed, and later the
@@ -203,15 +208,15 @@ public class ExecutionGraphSchedulingTest extends 
TestLogger {
                final InteractionsCountingTaskManagerGateway[] 
sourceTaskManagers = new InteractionsCountingTaskManagerGateway[parallelism];
                final InteractionsCountingTaskManagerGateway[] 
targetTaskManagers = new InteractionsCountingTaskManagerGateway[parallelism];
 
-               final SimpleSlot[] sourceSlots = new SimpleSlot[parallelism];
-               final SimpleSlot[] targetSlots = new SimpleSlot[parallelism];
+               final LogicalSlot[] sourceSlots = new LogicalSlot[parallelism];
+               final LogicalSlot[] targetSlots = new LogicalSlot[parallelism];
 
                for (int i = 0; i < parallelism; i++) {
                        sourceTaskManagers[i] = createTaskManager();
                        targetTaskManagers[i] = createTaskManager();
 
-                       sourceSlots[i] = createSlot(sourceTaskManagers[i], 
jobId);
-                       targetSlots[i] = createSlot(targetTaskManagers[i], 
jobId);
+                       sourceSlots[i] = 
createTestingLogicalSlot(sourceTaskManagers[i]);
+                       targetSlots[i] = 
createTestingLogicalSlot(targetTaskManagers[i]);
 
                        sourceFutures[i] = new CompletableFuture<>();
                        targetFutures[i] = new CompletableFuture<>();
@@ -298,8 +303,8 @@ public class ExecutionGraphSchedulingTest extends 
TestLogger {
                slotOwner.setReturnAllocatedSlotConsumer(
                        (LogicalSlot logicalSlot) -> 
returnedSlots.offer(logicalSlot.getAllocationId()));
 
-               final SimpleSlot[] sourceSlots = new SimpleSlot[parallelism];
-               final SimpleSlot[] targetSlots = new SimpleSlot[parallelism];
+               final LogicalSlot[] sourceSlots = new LogicalSlot[parallelism];
+               final LogicalSlot[] targetSlots = new LogicalSlot[parallelism];
 
                @SuppressWarnings({"unchecked", "rawtypes"})
                final CompletableFuture<LogicalSlot>[] sourceFutures = new 
CompletableFuture[parallelism];
@@ -307,8 +312,8 @@ public class ExecutionGraphSchedulingTest extends 
TestLogger {
                final CompletableFuture<LogicalSlot>[] targetFutures = new 
CompletableFuture[parallelism];
 
                for (int i = 0; i < parallelism; i++) {
-                       sourceSlots[i] = createSlot(taskManager, jobId, 
slotOwner);
-                       targetSlots[i] = createSlot(taskManager, jobId, 
slotOwner);
+                       sourceSlots[i] = createSingleLogicalSlot(slotOwner, 
taskManager, new SlotRequestId());
+                       targetSlots[i] = createSingleLogicalSlot(slotOwner, 
taskManager, new SlotRequestId());
 
                        sourceFutures[i] = new CompletableFuture<>();
                        targetFutures[i] = new CompletableFuture<>();
@@ -348,8 +353,8 @@ public class ExecutionGraphSchedulingTest extends 
TestLogger {
 
                // all completed futures must have been returns
                for (int i = 0; i < parallelism; i += 2) {
-                       assertTrue(sourceSlots[i].isCanceled());
-                       assertTrue(targetSlots[i].isCanceled());
+                       assertFalse(sourceSlots[i].isAlive());
+                       assertFalse(targetSlots[i].isAlive());
                }
        }
 
@@ -379,12 +384,12 @@ public class ExecutionGraphSchedulingTest extends 
TestLogger {
                        (LogicalSlot logicalSlot) -> 
returnedSlots.offer(logicalSlot.getAllocationId()));
 
                final InteractionsCountingTaskManagerGateway taskManager = 
createTaskManager();
-               final SimpleSlot[] slots = new SimpleSlot[parallelism];
+               final LogicalSlot[] slots = new LogicalSlot[parallelism];
                @SuppressWarnings({"unchecked", "rawtypes"})
                final CompletableFuture<LogicalSlot>[] slotFutures = new 
CompletableFuture[parallelism];
 
                for (int i = 0; i < parallelism; i++) {
-                       slots[i] = createSlot(taskManager, jobId, slotOwner);
+                       slots[i] = createSingleLogicalSlot(slotOwner, 
taskManager, new SlotRequestId());
                        slotFutures[i] = new CompletableFuture<>();
                }
 
@@ -478,7 +483,7 @@ public class ExecutionGraphSchedulingTest extends 
TestLogger {
 
                final ProgrammedSlotProvider slotProvider = new 
ProgrammedSlotProvider(parallelism);
 
-               final SimpleSlot slot = createSlot(new 
SimpleAckingTaskManagerGateway(), jobGraph.getJobID(), new DummySlotOwner());
+               final LogicalSlot slot = createSingleLogicalSlot(new 
DummySlotOwner(), new SimpleAckingTaskManagerGateway(), new SlotRequestId());
                slotProvider.addSlot(jobVertex.getID(), 0, 
CompletableFuture.completedFuture(slot));
 
                final CompletableFuture<LogicalSlot> slotFuture = new 
CompletableFuture<>();
@@ -592,23 +597,6 @@ public class ExecutionGraphSchedulingTest extends 
TestLogger {
                        NoOpPartitionTracker.INSTANCE);
        }
 
-       private SimpleSlot createSlot(TaskManagerGateway taskManager, JobID 
jobId) {
-               return createSlot(taskManager, jobId, new TestingSlotOwner());
-       }
-
-       private SimpleSlot createSlot(TaskManagerGateway taskManager, JobID 
jobId, SlotOwner slotOwner) {
-               TaskManagerLocation location = new TaskManagerLocation(
-                               ResourceID.generate(), 
InetAddress.getLoopbackAddress(), 12345);
-
-               SimpleSlotContext slot = new SimpleSlotContext(
-                       new AllocationID(),
-                       location,
-                       0,
-                       taskManager);
-
-               return new SimpleSlot(slot, slotOwner, 0);
-       }
-
        @Nonnull
        static SingleLogicalSlot createSingleLogicalSlot(SlotOwner slotOwner, 
TaskManagerGateway taskManagerGateway, SlotRequestId slotRequestId) {
                TaskManagerLocation location = new TaskManagerLocation(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionPartitionLifecycleTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionPartitionLifecycleTest.java
index ab4048d..21bd002 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionPartitionLifecycleTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionPartitionLifecycleTest.java
@@ -33,7 +33,6 @@ import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAda
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
-import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.instance.SlotSharingGroupId;
 import org.apache.flink.runtime.io.network.partition.NoOpPartitionTracker;
 import org.apache.flink.runtime.io.network.partition.PartitionTracker;
@@ -51,6 +50,7 @@ import 
org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobmaster.SlotOwner;
 import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
 import org.apache.flink.runtime.shuffle.PartitionDescriptor;
@@ -219,11 +219,12 @@ public class ExecutionPartitionLifecycleTest extends 
TestLogger {
                final SlotProvider slotProvider = new SlotProvider() {
                        @Override
                        public CompletableFuture<LogicalSlot> 
allocateSlot(SlotRequestId slotRequestId, ScheduledUnit scheduledUnit, 
SlotProfile slotProfile, boolean allowQueuedScheduling, Time allocationTimeout) 
{
-                               return CompletableFuture.completedFuture(new 
SimpleSlot(
-                                       new SingleSlotTestingSlotOwner(),
-                                       taskManagerLocation,
-                                       0,
-                                       taskManagerGateway));
+                               return CompletableFuture.completedFuture(
+                                       new TestingLogicalSlotBuilder()
+                                               
.setTaskManagerLocation(taskManagerLocation)
+                                               
.setTaskManagerGateway(taskManagerGateway)
+                                               .setSlotOwner(new 
SingleSlotTestingSlotOwner())
+                                               .createTestingLogicalSlot());
                        }
 
                        @Override
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
index 6df8366..3be8657 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
@@ -25,7 +25,6 @@ import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAda
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
-import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import 
org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
@@ -33,6 +32,7 @@ import 
org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobmaster.SlotOwner;
 import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
 import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
 import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
@@ -106,11 +106,7 @@ public class ExecutionTest extends TestLogger {
 
                final SingleSlotTestingSlotOwner slotOwner = new 
SingleSlotTestingSlotOwner();
 
-               final SimpleSlot slot = new SimpleSlot(
-                       slotOwner,
-                       new LocalTaskManagerLocation(),
-                       0,
-                       new SimpleAckingTaskManagerGateway());
+               final LogicalSlot slot = createTestingLogicalSlot(slotOwner);
 
                final LogicalSlot otherSlot = new 
TestingLogicalSlotBuilder().createTestingLogicalSlot();
 
@@ -132,6 +128,12 @@ public class ExecutionTest extends TestLogger {
                assertEquals(slot, slotOwner.getReturnedSlotFuture().get());
        }
 
+       private TestingLogicalSlot createTestingLogicalSlot(SlotOwner 
slotOwner) {
+               return new TestingLogicalSlotBuilder()
+                       .setSlotOwner(slotOwner)
+                       .createTestingLogicalSlot();
+       }
+
        /**
         * Tests that the slot is released in case of a execution cancellation 
when having
         * a slot assigned and being in state SCHEDULED.
@@ -143,11 +145,7 @@ public class ExecutionTest extends TestLogger {
 
                final SingleSlotTestingSlotOwner slotOwner = new 
SingleSlotTestingSlotOwner();
 
-               final SimpleSlot slot = new SimpleSlot(
-                       slotOwner,
-                       new LocalTaskManagerLocation(),
-                       0,
-                       new SimpleAckingTaskManagerGateway());
+               final LogicalSlot slot = createTestingLogicalSlot(slotOwner);
 
                final ProgrammedSlotProvider slotProvider = new 
ProgrammedSlotProvider(1);
                slotProvider.addSlot(jobVertexId, 0, 
CompletableFuture.completedFuture(slot));
@@ -193,11 +191,7 @@ public class ExecutionTest extends TestLogger {
 
                final SingleSlotTestingSlotOwner slotOwner = new 
SingleSlotTestingSlotOwner();
 
-               final SimpleSlot slot = new SimpleSlot(
-                       slotOwner,
-                       new LocalTaskManagerLocation(),
-                       0,
-                       new SimpleAckingTaskManagerGateway());
+               final LogicalSlot slot = createTestingLogicalSlot(slotOwner);
 
                final ProgrammedSlotProvider slotProvider = new 
ProgrammedSlotProvider(1);
                slotProvider.addSlot(jobVertexId, 0, 
CompletableFuture.completedFuture(slot));
@@ -563,13 +557,7 @@ public class ExecutionTest extends TestLogger {
 
                for (JobVertexID jobVertexId : jobVertexIds) {
                        for (int i = 0; i < parallelism; i++) {
-                               final SimpleSlot slot = new SimpleSlot(
-                                       slotOwner,
-                                       new LocalTaskManagerLocation(),
-                                       0,
-                                       new SimpleAckingTaskManagerGateway(),
-                                       null,
-                                       null);
+                               final LogicalSlot slot = 
createTestingLogicalSlot(slotOwner);
 
                                slotProvider.addSlot(jobVertexId, 0, 
CompletableFuture.completedFuture(slot));
                        }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
index 3e95587..37e5183 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
@@ -29,7 +29,6 @@ import 
org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.execution.ExecutionState;
 import 
org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
-import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.instance.SimpleSlotContext;
 import org.apache.flink.runtime.io.network.partition.NoOpPartitionTracker;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
@@ -37,9 +36,13 @@ import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobmaster.SlotContext;
 import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
@@ -237,15 +240,22 @@ public class ExecutionVertexLocalityTest extends 
TestLogger {
                //  - mocking the scheduler created fragile tests that break 
whenever the scheduler is adjusted
                //  - exposing test methods in the ExecutionVertex leads to 
undesirable setters 
 
-               SlotContext slot = new SimpleSlotContext(
+               SlotContext slotContext = new SimpleSlotContext(
                        new AllocationID(),
                        location,
                        0,
                        mock(TaskManagerGateway.class));
 
-               SimpleSlot simpleSlot = new SimpleSlot(slot, 
mock(SlotOwner.class), 0);
 
-               if 
(!vertex.getCurrentExecutionAttempt().tryAssignResource(simpleSlot)) {
+
+               LogicalSlot slot = new SingleLogicalSlot(
+                       new SlotRequestId(),
+                       slotContext,
+                       null,
+                       Locality.LOCAL,
+                       mock(SlotOwner.class));
+
+               if 
(!vertex.getCurrentExecutionAttempt().tryAssignResource(slot)) {
                        throw new FlinkException("Could not assign resource.");
                }
        }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SharedSlotsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SharedSlotsTest.java
deleted file mode 100644
index dcf5c98..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SharedSlotsTest.java
+++ /dev/null
@@ -1,630 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.instance;
-
-import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
-import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
-import org.apache.flink.runtime.jobmanager.scheduler.Locality;
-import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.runtime.jobmanager.slots.TestingSlotOwner;
-import org.apache.flink.runtime.jobmaster.LogicalSlot;
-import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.util.AbstractID;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Test;
-
-import java.util.Collections;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- * Tests for the allocation, properties, and release of shared slots.
- */
-public class SharedSlotsTest extends TestLogger {
-
-       private static final Iterable<TaskManagerLocation> NO_LOCATION = 
Collections.emptySet();
-
-       @Test
-       public void allocateAndReleaseEmptySlot() {
-               try {
-                       JobVertexID vertexId = new JobVertexID();
-                       
-                       SlotSharingGroup sharingGroup = new 
SlotSharingGroup(vertexId);
-                       SlotSharingGroupAssignment assignment = 
sharingGroup.getTaskAssignment();
-                       
-                       assertEquals(0, assignment.getNumberOfSlots());
-                       assertEquals(0, 
assignment.getNumberOfAvailableSlotsForGroup(vertexId));
-
-                       // get a shared slot
-                       SharedSlot slot = getSharedSlot(assignment);
-                       
-                       // check that the new slot is fresh
-                       assertTrue(slot.isAlive());
-                       assertFalse(slot.isCanceled());
-                       assertFalse(slot.isReleased());
-                       assertEquals(0, slot.getNumberLeaves());
-                       assertFalse(slot.hasChildren());
-                       assertTrue(slot.isRootAndEmpty());
-                       assertNotNull(slot.toString());
-                       assertTrue(slot.getSubSlots().isEmpty());
-                       assertEquals(0, slot.getSlotNumber());
-                       assertEquals(0, slot.getRootSlotNumber());
-                       
-                       // release the slot immediately.
-                       slot.releaseSlot(null);
-
-                       assertTrue(slot.isCanceled());
-                       assertTrue(slot.isReleased());
-
-                       assertEquals(0, assignment.getNumberOfSlots());
-                       assertEquals(0, 
assignment.getNumberOfAvailableSlotsForGroup(vertexId));
-                       
-                       // we should not be able to allocate any children from 
this released slot
-                       assertNull(slot.allocateSharedSlot(new AbstractID()));
-                       assertNull(slot.allocateSubSlot(new AbstractID()));
-                       
-                       // we cannot add this slot to the assignment group
-                       
assertNull(assignment.addSharedSlotAndAllocateSubSlot(slot, Locality.NON_LOCAL, 
vertexId));
-                       assertEquals(0, assignment.getNumberOfSlots());
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-
-       @Test
-       public void allocateSimpleSlotsAndReleaseFromRoot() {
-               try {
-                       JobVertexID vid1 = new JobVertexID();
-                       JobVertexID vid2 = new JobVertexID();
-                       JobVertexID vid3 = new JobVertexID();
-                       JobVertexID vid4 = new JobVertexID();
-
-                       SlotSharingGroup sharingGroup = new 
SlotSharingGroup(vid1, vid2, vid3, vid4);
-                       SlotSharingGroupAssignment assignment = 
sharingGroup.getTaskAssignment();
-
-                       // get a shared slot
-                       SharedSlot sharedSlot = getSharedSlot(assignment);
-
-                       // allocate a series of sub slots
-
-                       SimpleSlot sub1 = 
assignment.addSharedSlotAndAllocateSubSlot(sharedSlot, Locality.LOCAL, vid1);
-                       assertNotNull(sub1);
-
-                       assertNull(sub1.getPayload());
-                       assertEquals(Locality.LOCAL, sub1.getLocality());
-                       assertEquals(1, sub1.getNumberLeaves());
-                       assertEquals(vid1, sub1.getGroupID());
-                       assertEquals(sharedSlot, sub1.getParent());
-                       assertEquals(sharedSlot, sub1.getRoot());
-                       assertEquals(0, sub1.getRootSlotNumber());
-                       assertEquals(0, sub1.getSlotNumber());
-
-                       assertEquals(0, 
assignment.getNumberOfAvailableSlotsForGroup(vid1));
-                       assertEquals(1, 
assignment.getNumberOfAvailableSlotsForGroup(vid2));
-                       assertEquals(1, 
assignment.getNumberOfAvailableSlotsForGroup(vid3));
-                       assertEquals(1, 
assignment.getNumberOfAvailableSlotsForGroup(vid4));
-                       
-                       SimpleSlot sub2 = assignment.getSlotForTask(vid2, 
NO_LOCATION);
-                       assertNotNull(sub2);
-                       
-                       assertNull(sub2.getPayload());
-                       assertEquals(Locality.UNCONSTRAINED, 
sub2.getLocality());
-                       assertEquals(1, sub2.getNumberLeaves());
-                       assertEquals(vid2, sub2.getGroupID());
-                       assertEquals(sharedSlot, sub2.getParent());
-                       assertEquals(sharedSlot, sub2.getRoot());
-                       assertEquals(0, sub2.getRootSlotNumber());
-                       assertEquals(1, sub2.getSlotNumber());
-
-                       assertEquals(0, 
assignment.getNumberOfAvailableSlotsForGroup(vid1));
-                       assertEquals(0, 
assignment.getNumberOfAvailableSlotsForGroup(vid2));
-                       assertEquals(1, 
assignment.getNumberOfAvailableSlotsForGroup(vid3));
-                       assertEquals(1, 
assignment.getNumberOfAvailableSlotsForGroup(vid4));
-                       
-                       SimpleSlot sub3 = assignment.getSlotForTask(vid3, 
Collections.singleton(sub2.getTaskManagerLocation()));
-                       assertNotNull(sub3);
-                       
-                       assertNull(sub3.getPayload());
-                       assertEquals(Locality.LOCAL, sub3.getLocality());
-                       assertEquals(1, sub3.getNumberLeaves());
-                       assertEquals(vid3, sub3.getGroupID());
-                       assertEquals(sharedSlot, sub3.getParent());
-                       assertEquals(sharedSlot, sub3.getRoot());
-                       assertEquals(0, sub3.getRootSlotNumber());
-                       assertEquals(2, sub3.getSlotNumber());
-
-                       assertEquals(0, 
assignment.getNumberOfAvailableSlotsForGroup(vid1));
-                       assertEquals(0, 
assignment.getNumberOfAvailableSlotsForGroup(vid2));
-                       assertEquals(0, 
assignment.getNumberOfAvailableSlotsForGroup(vid3));
-                       assertEquals(1, 
assignment.getNumberOfAvailableSlotsForGroup(vid4));
-
-                       LocalTaskManagerLocation taskManagerLocation = new 
LocalTaskManagerLocation();
-                       SimpleSlot sub4 = assignment.getSlotForTask(vid4,
-                                       
Collections.singleton(taskManagerLocation));
-                       assertNotNull(sub4);
-                       
-                       assertNull(sub4.getPayload());
-                       assertEquals(Locality.NON_LOCAL, sub4.getLocality());
-                       assertEquals(1, sub4.getNumberLeaves());
-                       assertEquals(vid4, sub4.getGroupID());
-                       assertEquals(sharedSlot, sub4.getParent());
-                       assertEquals(sharedSlot, sub4.getRoot());
-                       assertEquals(0, sub4.getRootSlotNumber());
-                       assertEquals(3, sub4.getSlotNumber());
-
-                       assertEquals(0, 
assignment.getNumberOfAvailableSlotsForGroup(vid1));
-                       assertEquals(0, 
assignment.getNumberOfAvailableSlotsForGroup(vid2));
-                       assertEquals(0, 
assignment.getNumberOfAvailableSlotsForGroup(vid3));
-                       assertEquals(0, 
assignment.getNumberOfAvailableSlotsForGroup(vid4));
-                       
-                       // release from the root.
-                       sharedSlot.releaseSlot(null);
-
-                       assertTrue(sharedSlot.isReleased());
-                       assertTrue(sub1.isReleased());
-                       assertTrue(sub2.isReleased());
-                       assertTrue(sub3.isReleased());
-                       assertTrue(sub4.isReleased());
-                       
-                       assertEquals(0, sharedSlot.getNumberLeaves());
-                       assertFalse(sharedSlot.hasChildren());
-
-                       assertEquals(0, assignment.getNumberOfSlots());
-
-                       assertEquals(0, 
assignment.getNumberOfAvailableSlotsForGroup(vid1));
-                       assertEquals(0, 
assignment.getNumberOfAvailableSlotsForGroup(vid2));
-                       assertEquals(0, 
assignment.getNumberOfAvailableSlotsForGroup(vid3));
-                       assertEquals(0, 
assignment.getNumberOfAvailableSlotsForGroup(vid4));
-
-                       assertNull(sharedSlot.allocateSharedSlot(new 
AbstractID()));
-                       assertNull(sharedSlot.allocateSubSlot(new 
AbstractID()));
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-
-       @Test
-       public void allocateSimpleSlotsAndReleaseFromLeaves() {
-               try {
-                       JobVertexID vid1 = new JobVertexID();
-                       JobVertexID vid2 = new JobVertexID();
-                       JobVertexID vid3 = new JobVertexID();
-
-                       SlotSharingGroup sharingGroup = new 
SlotSharingGroup(vid1, vid2, vid3);
-                       SlotSharingGroupAssignment assignment = 
sharingGroup.getTaskAssignment();
-
-                       // allocate a shared slot
-                       SharedSlot sharedSlot = getSharedSlot(assignment);
-
-                       // allocate a series of sub slots
-
-                       SimpleSlot sub1 = 
assignment.addSharedSlotAndAllocateSubSlot(sharedSlot, Locality.UNCONSTRAINED, 
vid1);
-                       SimpleSlot sub2 = assignment.getSlotForTask(vid2, 
NO_LOCATION);
-                       SimpleSlot sub3 = assignment.getSlotForTask(vid3, 
NO_LOCATION);
-                       
-                       assertNotNull(sub1);
-                       assertNotNull(sub2);
-                       assertNotNull(sub3);
-
-                       assertEquals(3, sharedSlot.getNumberLeaves());
-
-                       assertEquals(1, assignment.getNumberOfSlots());
-                       
-                       // release from the leaves.
-                       
-                       sub2.releaseSlot(null);
-
-                       assertTrue(sharedSlot.isAlive());
-                       assertTrue(sub1.isAlive());
-                       assertTrue(sub2.isReleased());
-                       assertTrue(sub3.isAlive());
-                       
-                       assertEquals(0, 
assignment.getNumberOfAvailableSlotsForGroup(vid1));
-                       assertEquals(1, 
assignment.getNumberOfAvailableSlotsForGroup(vid2));
-                       assertEquals(0, 
assignment.getNumberOfAvailableSlotsForGroup(vid3));
-                       assertEquals(1, assignment.getNumberOfSlots());
-                       
-                       assertEquals(2, sharedSlot.getNumberLeaves());
-
-                       
-                       sub1.releaseSlot(null);
-
-                       assertTrue(sharedSlot.isAlive());
-                       assertTrue(sub1.isReleased());
-                       assertTrue(sub2.isReleased());
-                       assertTrue(sub3.isAlive());
-                       
-                       assertEquals(1, 
assignment.getNumberOfAvailableSlotsForGroup(vid1));
-                       assertEquals(1, 
assignment.getNumberOfAvailableSlotsForGroup(vid2));
-                       assertEquals(0, 
assignment.getNumberOfAvailableSlotsForGroup(vid3));
-                       assertEquals(1, assignment.getNumberOfSlots());
-                       
-                       assertEquals(1, sharedSlot.getNumberLeaves());
-
-                       sub3.releaseSlot(null);
-
-                       assertTrue(sharedSlot.isReleased());
-                       assertTrue(sub1.isReleased());
-                       assertTrue(sub2.isReleased());
-                       assertTrue(sub3.isReleased());
-
-                       assertEquals(0, 
assignment.getNumberOfAvailableSlotsForGroup(vid1));
-                       assertEquals(0, 
assignment.getNumberOfAvailableSlotsForGroup(vid2));
-                       assertEquals(0, 
assignment.getNumberOfAvailableSlotsForGroup(vid3));
-                       assertEquals(0, assignment.getNumberOfSlots());
-
-                       assertEquals(0, assignment.getNumberOfSlots());
-
-                       assertNull(sharedSlot.allocateSharedSlot(new 
AbstractID()));
-                       assertNull(sharedSlot.allocateSubSlot(new 
AbstractID()));
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-
-       @Test
-       public void allocateAndReleaseInMixedOrder() {
-               try {
-                       JobVertexID vid1 = new JobVertexID();
-                       JobVertexID vid2 = new JobVertexID();
-                       JobVertexID vid3 = new JobVertexID();
-
-                       SlotSharingGroup sharingGroup = new 
SlotSharingGroup(vid1, vid2, vid3);
-                       SlotSharingGroupAssignment assignment = 
sharingGroup.getTaskAssignment();
-
-                       // get a shared slot
-                       SharedSlot sharedSlot = getSharedSlot(assignment);
-
-                       // allocate a series of sub slots
-
-                       SimpleSlot sub1 = 
assignment.addSharedSlotAndAllocateSubSlot(sharedSlot, Locality.UNCONSTRAINED, 
vid1);
-                       SimpleSlot sub2 = assignment.getSlotForTask(vid2, 
NO_LOCATION);
-
-                       assertNotNull(sub1);
-                       assertNotNull(sub2);
-
-                       assertEquals(2, sharedSlot.getNumberLeaves());
-                       assertEquals(0, 
assignment.getNumberOfAvailableSlotsForGroup(vid1));
-                       assertEquals(0, 
assignment.getNumberOfAvailableSlotsForGroup(vid2));
-                       assertEquals(1, 
assignment.getNumberOfAvailableSlotsForGroup(vid3));
-                       assertEquals(1, assignment.getNumberOfSlots());
-                       
-                       
-                       sub2.releaseSlot(null);
-
-                       assertEquals(1, sharedSlot.getNumberLeaves());
-                       assertEquals(0, 
assignment.getNumberOfAvailableSlotsForGroup(vid1));
-                       assertEquals(1, 
assignment.getNumberOfAvailableSlotsForGroup(vid2));
-                       assertEquals(1, 
assignment.getNumberOfAvailableSlotsForGroup(vid3));
-                       assertEquals(1, assignment.getNumberOfSlots());
-                       
-                       
-                       SimpleSlot sub3 = assignment.getSlotForTask(vid3, 
NO_LOCATION);
-                       assertNotNull(sub3);
-                       
-                       assertEquals(2, sharedSlot.getNumberLeaves());
-                       assertEquals(0, 
assignment.getNumberOfAvailableSlotsForGroup(vid1));
-                       assertEquals(1, 
assignment.getNumberOfAvailableSlotsForGroup(vid2));
-                       assertEquals(0, 
assignment.getNumberOfAvailableSlotsForGroup(vid3));
-                       assertEquals(1, assignment.getNumberOfSlots());
-                       
-                       sub3.releaseSlot(null);
-                       sub1.releaseSlot(null);
-
-                       assertTrue(sharedSlot.isReleased());
-                       assertEquals(0, sharedSlot.getNumberLeaves());
-                       assertEquals(0, 
assignment.getNumberOfAvailableSlotsForGroup(vid1));
-                       assertEquals(0, 
assignment.getNumberOfAvailableSlotsForGroup(vid2));
-                       assertEquals(0, 
assignment.getNumberOfAvailableSlotsForGroup(vid3));
-                       assertEquals(0, assignment.getNumberOfSlots());
-
-                       assertEquals(0, assignment.getNumberOfSlots());
-
-                       assertNull(sharedSlot.allocateSharedSlot(new 
AbstractID()));
-                       assertNull(sharedSlot.allocateSubSlot(new 
AbstractID()));
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       /**
-        * We allocate and release the structure below, starting by allocating 
a simple slot in the
-        * shared slot and finishing by releasing a simple slot.
-        * 
-        * <pre>
-        *     Shared(0)(root)
-        *        |
-        *        +-- Simple(2)(sink)
-        *        |
-        *        +-- Shared(1)(co-location-group)
-        *        |      |
-        *        |      +-- Simple(0)(tail)
-        *        |      +-- Simple(1)(head)
-        *        |
-        *        +-- Simple(0)(source)
-        * </pre>
-        */
-       @Test
-       public void testAllocateAndReleaseTwoLevels() {
-               try {
-                       JobVertexID sourceId = new JobVertexID();
-                       JobVertexID headId = new JobVertexID();
-                       JobVertexID tailId = new JobVertexID();
-                       JobVertexID sinkId = new JobVertexID();
-
-                       JobVertex headVertex = new JobVertex("head", headId);
-                       JobVertex tailVertex = new JobVertex("tail", tailId);
-                       
-                       SlotSharingGroup sharingGroup = new 
SlotSharingGroup(sourceId, headId, tailId, sinkId);
-                       SlotSharingGroupAssignment assignment = 
sharingGroup.getTaskAssignment();
-                       assertEquals(0, assignment.getNumberOfSlots());
-                       
-                       CoLocationGroup coLocationGroup = new 
CoLocationGroup(headVertex, tailVertex);
-                       CoLocationConstraint constraint = 
coLocationGroup.getLocationConstraint(0);
-                       assertFalse(constraint.isAssigned());
-
-                       // allocate a shared slot
-                       SharedSlot sharedSlot = getSharedSlot(assignment);
-                       
-                       // get the first simple slot
-                       SimpleSlot sourceSlot = 
assignment.addSharedSlotAndAllocateSubSlot(sharedSlot, Locality.LOCAL, 
sourceId);
-                       
-                       assertEquals(1, sharedSlot.getNumberLeaves());
-                       
-                       // get the first slot in the nested shared slot from 
the co-location constraint
-                       SimpleSlot headSlot = 
assignment.getSlotForTask(constraint, 
Collections.<TaskManagerLocation>emptySet());
-                       assertEquals(2, sharedSlot.getNumberLeaves());
-
-                       assertNotNull(constraint.getSharedSlot());
-                       assertTrue(constraint.getSharedSlot().isAlive());
-                       assertFalse(constraint.isAssigned());
-                       
-                       // we do not immediately lock the location
-                       headSlot.releaseSlot();
-                       assertEquals(1, sharedSlot.getNumberLeaves());
-
-                       assertNotNull(constraint.getSharedSlot());
-                       assertTrue(constraint.getSharedSlot().isReleased());
-                       assertFalse(constraint.isAssigned());
-                       
-                       // re-allocate the head slot
-                       headSlot = assignment.getSlotForTask(constraint, 
Collections.<TaskManagerLocation>emptySet());
-                       
-                       constraint.lockLocation();
-                       assertNotNull(constraint.getSharedSlot());
-                       assertTrue(constraint.isAssigned());
-                       assertTrue(constraint.isAssignedAndAlive());
-                       
-                       SimpleSlot tailSlot = 
assignment.getSlotForTask(constraint, 
Collections.<TaskManagerLocation>emptySet());
-                       
-                       assertEquals(constraint.getSharedSlot(), 
headSlot.getParent());
-                       assertEquals(constraint.getSharedSlot(), 
tailSlot.getParent());
-                       
-                       SimpleSlot sinkSlot = assignment.getSlotForTask(sinkId, 
Collections.<TaskManagerLocation>emptySet());
-                       assertEquals(4, sharedSlot.getNumberLeaves());
-                       
-                       // we release our co-location constraint tasks
-                       headSlot.releaseSlot(null);
-                       tailSlot.releaseSlot(null);
-
-                       assertEquals(2, sharedSlot.getNumberLeaves());
-                       assertTrue(headSlot.isReleased());
-                       assertTrue(tailSlot.isReleased());
-                       assertTrue(constraint.isAssigned());
-                       assertFalse(constraint.isAssignedAndAlive());
-                       
-                       // we should have resources again for the co-location 
constraint
-                       assertEquals(1, 
assignment.getNumberOfAvailableSlotsForGroup(constraint.getGroupId()));
-                       
-                       // re-allocate head and tail from the constraint
-                       headSlot = assignment.getSlotForTask(constraint, 
NO_LOCATION);
-                       tailSlot = assignment.getSlotForTask(constraint, 
NO_LOCATION);
-                       
-                       assertEquals(4, sharedSlot.getNumberLeaves());
-                       assertEquals(0, 
assignment.getNumberOfAvailableSlotsForGroup(constraint.getGroupId()));
-                       
-                       // verify some basic properties of the slots
-                       assertEquals(sourceSlot.getTaskManagerID(), 
headSlot.getTaskManagerID());
-                       assertEquals(sourceSlot.getTaskManagerID(), 
tailSlot.getTaskManagerID());
-                       assertEquals(sourceSlot.getTaskManagerID(), 
sinkSlot.getTaskManagerID());
-
-                       assertEquals(sourceId, sourceSlot.getGroupID());
-                       assertEquals(sinkId, sinkSlot.getGroupID());
-                       assertNull(headSlot.getGroupID());
-                       assertNull(tailSlot.getGroupID());
-                       assertEquals(constraint.getGroupId(), 
constraint.getSharedSlot().getGroupID());
-                       
-                       // release all
-                       sourceSlot.releaseSlot(null);
-                       headSlot.releaseSlot(null);
-                       tailSlot.releaseSlot(null);
-                       sinkSlot.releaseSlot(null);
-                       
-                       assertTrue(sharedSlot.isReleased());
-                       assertTrue(sourceSlot.isReleased());
-                       assertTrue(headSlot.isReleased());
-                       assertTrue(tailSlot.isReleased());
-                       assertTrue(sinkSlot.isReleased());
-                       assertTrue(constraint.getSharedSlot().isReleased());
-                       
-                       assertTrue(constraint.isAssigned());
-                       assertFalse(constraint.isAssignedAndAlive());
-
-                       assertEquals(0, assignment.getNumberOfSlots());
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-
-       /**
-        * We allocate and the structure below and release it from the root.
-        *
-        * <pre>
-        *     Shared(0)(root)
-        *        |
-        *        +-- Simple(2)(sink)
-        *        |
-        *        +-- Shared(1)(co-location-group)
-        *        |      |
-        *        |      +-- Simple(0)(tail)
-        *        |      +-- Simple(1)(head)
-        *        |
-        *        +-- Simple(0)(source)
-        * </pre>
-        */
-       @Test
-       public void testReleaseTwoLevelsFromRoot() {
-               try {
-                       JobVertexID sourceId = new JobVertexID();
-                       JobVertexID headId = new JobVertexID();
-                       JobVertexID tailId = new JobVertexID();
-                       JobVertexID sinkId = new JobVertexID();
-
-                       JobVertex headVertex = new JobVertex("head", headId);
-                       JobVertex tailVertex = new JobVertex("tail", tailId);
-
-                       SlotSharingGroup sharingGroup = new 
SlotSharingGroup(sourceId, headId, tailId, sinkId);
-                       SlotSharingGroupAssignment assignment = 
sharingGroup.getTaskAssignment();
-                       assertEquals(0, assignment.getNumberOfSlots());
-
-                       CoLocationGroup coLocationGroup = new 
CoLocationGroup(headVertex, tailVertex);
-                       CoLocationConstraint constraint = 
coLocationGroup.getLocationConstraint(0);
-                       assertFalse(constraint.isAssigned());
-
-                       // allocate a shared slot
-                       SharedSlot sharedSlot = getSharedSlot(assignment);
-
-                       // get the first simple slot
-                       SimpleSlot sourceSlot = 
assignment.addSharedSlotAndAllocateSubSlot(sharedSlot, Locality.LOCAL, 
sourceId);
-                       
-                       SimpleSlot headSlot = 
assignment.getSlotForTask(constraint, NO_LOCATION);
-                       constraint.lockLocation();
-                       SimpleSlot tailSlot = 
assignment.getSlotForTask(constraint, NO_LOCATION);
-                       
-                       SimpleSlot sinkSlot = assignment.getSlotForTask(sinkId, 
NO_LOCATION);
-                       
-                       assertEquals(4, sharedSlot.getNumberLeaves());
-
-                       // release all
-                       sourceSlot.releaseSlot(null);
-                       headSlot.releaseSlot(null);
-                       tailSlot.releaseSlot(null);
-                       sinkSlot.releaseSlot(null);
-
-                       assertTrue(sharedSlot.isReleased());
-                       assertTrue(sourceSlot.isReleased());
-                       assertTrue(headSlot.isReleased());
-                       assertTrue(tailSlot.isReleased());
-                       assertTrue(sinkSlot.isReleased());
-                       assertTrue(constraint.getSharedSlot().isReleased());
-
-                       assertTrue(constraint.isAssigned());
-                       assertFalse(constraint.isAssignedAndAlive());
-                       
-                       assertEquals(0, assignment.getNumberOfSlots());
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-
-       @Test
-       public void testImmediateReleaseOneLevel() {
-               try {
-                       JobVertexID vid = new JobVertexID();
-
-                       SlotSharingGroup sharingGroup = new 
SlotSharingGroup(vid);
-                       SlotSharingGroupAssignment assignment = 
sharingGroup.getTaskAssignment();
-
-                       SharedSlot sharedSlot = getSharedSlot(assignment);
-
-                       SimpleSlot sub = 
assignment.addSharedSlotAndAllocateSubSlot(sharedSlot, Locality.UNCONSTRAINED, 
vid);
-                       sub.releaseSlot(null);
-                       
-                       assertTrue(sub.isReleased());
-                       assertTrue(sharedSlot.isReleased());
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-
-       @Test
-       public void testImmediateReleaseTwoLevel() {
-               try {
-                       JobVertexID vid = new JobVertexID();
-                       JobVertex vertex = new JobVertex("vertex", vid);
-                       
-                       SlotSharingGroup sharingGroup = new 
SlotSharingGroup(vid);
-                       SlotSharingGroupAssignment assignment = 
sharingGroup.getTaskAssignment();
-
-                       CoLocationGroup coLocationGroup = new 
CoLocationGroup(vertex);
-                       CoLocationConstraint constraint = 
coLocationGroup.getLocationConstraint(0);
-
-                       SharedSlot sharedSlot = getSharedSlot(assignment);
-
-                       SimpleSlot sub = 
assignment.addSharedSlotAndAllocateSubSlot(sharedSlot, Locality.UNCONSTRAINED, 
constraint);
-                       
-                       assertNull(sub.getGroupID());
-                       assertEquals(constraint.getSharedSlot(), 
sub.getParent());
-                       
-                       sub.releaseSlot(null);
-
-                       assertTrue(sub.isReleased());
-                       assertTrue(sharedSlot.isReleased());
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-
-       public SharedSlot getSharedSlot(SlotSharingGroupAssignment assignment) {
-               final TestingSlotOwner slotOwner = new TestingSlotOwner();
-               slotOwner.setReturnAllocatedSlotConsumer((LogicalSlot 
logicalSlot) -> ((SharedSlot) logicalSlot).markReleased());
-               return new SharedSlot(
-                       slotOwner,
-                       new LocalTaskManagerLocation(),
-                       0,
-                       new SimpleAckingTaskManagerGateway(),
-                       assignment);
-       }
-}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
deleted file mode 100644
index affe5cb..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.instance;
-
-import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
-import org.apache.flink.runtime.jobmanager.slots.TestingSlotOwner;
-import org.apache.flink.runtime.jobmaster.LogicalSlot;
-import org.apache.flink.runtime.jobmaster.TestingPayload;
-import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-public class SimpleSlotTest extends  TestLogger {
-
-       @Test
-       public void testStateTransitions() {
-               try {
-                       // release immediately
-                       {
-                               SimpleSlot slot = getSlot();
-                               assertTrue(slot.isAlive());
-
-                               slot.releaseSlot(null);
-                               assertFalse(slot.isAlive());
-                               assertTrue(slot.isCanceled());
-                               assertTrue(slot.isReleased());
-                       }
-
-                       // state transitions manually
-                       {
-                               SimpleSlot slot = getSlot();
-                               assertTrue(slot.isAlive());
-
-                               slot.markCancelled();
-                               assertFalse(slot.isAlive());
-                               assertTrue(slot.isCanceled());
-                               assertFalse(slot.isReleased());
-
-                               slot.markReleased();
-                               assertFalse(slot.isAlive());
-                               assertTrue(slot.isCanceled());
-                               assertTrue(slot.isReleased());
-                       }
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-
-       @Test
-       public void testSetExecutionVertex() {
-               try {
-                       TestingPayload payload1 = new TestingPayload();
-                       TestingPayload payload2 = new TestingPayload();
-
-                       // assign to alive slot
-                       {
-                               SimpleSlot slot = getSlot();
-
-                               assertTrue(slot.tryAssignPayload(payload1));
-                               assertEquals(payload1, slot.getPayload());
-
-                               // try to add another one
-                               assertFalse(slot.tryAssignPayload(payload2));
-                               assertEquals(payload1, slot.getPayload());
-                       }
-
-                       // assign to canceled slot
-                       {
-                               SimpleSlot slot = getSlot();
-                               assertTrue(slot.markCancelled());
-
-                               assertFalse(slot.tryAssignPayload(payload1));
-                               assertNull(slot.getPayload());
-                       }
-
-                       // assign to released marked slot
-                       {
-                               SimpleSlot slot = getSlot();
-                               assertTrue(slot.markCancelled());
-                               assertTrue(slot.markReleased());
-
-                               assertFalse(slot.tryAssignPayload(payload1));
-                               assertNull(slot.getPayload());
-                       }
-                       
-                       // assign to released
-                       {
-                               SimpleSlot slot = getSlot();
-                               slot.releaseSlot(null);
-
-                               assertFalse(slot.tryAssignPayload(payload1));
-                               assertNull(slot.getPayload());
-                       }
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-
-       public static SimpleSlot getSlot() {
-               final TestingSlotOwner slotOwner = new TestingSlotOwner();
-               slotOwner.setReturnAllocatedSlotConsumer((LogicalSlot 
logicalSlot) -> ((SimpleSlot) logicalSlot).markReleased());
-               return new SimpleSlot(
-                       slotOwner,
-                       new LocalTaskManagerLocation(),
-                       0,
-                       new SimpleAckingTaskManagerGateway());
-       }
-}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
index eccccf7..cad0af5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
@@ -365,7 +365,6 @@ public class ScheduleWithCoLocationHintTest extends 
SchedulerTestBase {
                s2.releaseSlot();
 
                assertEquals(2, 
testingSlotProvider.getNumberOfAvailableSlots());
-               assertEquals(0, 
sharingGroup.getTaskAssignment().getNumberOfSlots());
 
                LogicalSlot s3 = testingSlotProvider.allocateSlot(
                                new 
ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, sharingGroup, loc2), 
sharingGroup.getSlotSharingGroupId(), cc1), false, 
slotProfileForLocation(loc2), TestingUtils.infiniteTime()).get();
@@ -412,7 +411,6 @@ public class ScheduleWithCoLocationHintTest extends 
SchedulerTestBase {
                s2.releaseSlot();
 
                assertEquals(2, 
testingSlotProvider.getNumberOfAvailableSlots());
-               assertEquals(0, 
sharingGroup.getTaskAssignment().getNumberOfSlots());
 
                LogicalSlot sa = testingSlotProvider.allocateSlot(
                                new 
ScheduledUnit(getTestVertexWithLocation(jidx, 0, 2, null)), false, 
SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
@@ -487,10 +485,6 @@ public class ScheduleWithCoLocationHintTest extends 
SchedulerTestBase {
                s3.releaseSlot();
                s4.releaseSlot();
                assertEquals(2, 
testingSlotProvider.getNumberOfAvailableSlots());
-
-               assertEquals(0, 
sharingGroup.getTaskAssignment().getNumberOfSlots());
-               assertEquals(0, 
sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jid1));
-               assertEquals(0, 
sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jid2));
        }
 
        @Test
@@ -529,10 +523,6 @@ public class ScheduleWithCoLocationHintTest extends 
SchedulerTestBase {
                s4.releaseSlot();
 
                assertEquals(2, 
testingSlotProvider.getNumberOfAvailableSlots());
-
-               assertEquals(0, 
sharingGroup.getTaskAssignment().getNumberOfSlots());
-               assertEquals(0, 
sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jid1));
-               assertEquals(0, 
sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jid2));
        }
 
        private static SlotProfile slotProfileForLocation(TaskManagerLocation 
location) {

Reply via email to