Repository: flink
Updated Branches:
  refs/heads/master 8afadd459 -> 3ff91be1d


[FLINK-7153] Re-introduce preferred locations for scheduling


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c73b2fe1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c73b2fe1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c73b2fe1

Branch: refs/heads/master
Commit: c73b2fe1f93f9ff2f05eb9130051729320634448
Parents: 8afadd4
Author: Till Rohrmann <[email protected]>
Authored: Mon Oct 16 14:04:13 2017 +0200
Committer: Till Rohrmann <[email protected]>
Committed: Thu Nov 2 17:04:44 2017 +0100

----------------------------------------------------------------------
 .../flink/runtime/executiongraph/Execution.java | 162 ++++++++++++++-----
 .../ExecutionAndAllocationFuture.java           |  45 ++++++
 .../executiongraph/ExecutionAndSlot.java        |  47 ------
 .../runtime/executiongraph/ExecutionGraph.java  | 146 ++++++-----------
 .../executiongraph/ExecutionGraphUtils.java     | 106 ------------
 .../executiongraph/ExecutionJobVertex.java      |  32 +---
 .../runtime/executiongraph/ExecutionVertex.java |  51 +++---
 .../apache/flink/runtime/instance/SlotPool.java |   9 +-
 .../instance/SlotSharingGroupAssignment.java    |  20 +--
 .../runtime/jobmanager/scheduler/Scheduler.java |  50 +++---
 .../ExecutionGraphSchedulingTest.java           | 151 +----------------
 .../executiongraph/ExecutionGraphStopTest.java  |  15 +-
 .../executiongraph/ExecutionGraphTestUtils.java |  23 +--
 .../executiongraph/ExecutionGraphUtilsTest.java | 124 --------------
 .../ExecutionVertexCancelTest.java              |   8 +-
 .../ExecutionVertexLocalityTest.java            |  19 +--
 .../ScheduleWithCoLocationHintTest.java         |   3 +-
 .../scheduler/SchedulerSlotSharingTest.java     |   3 +-
 .../scheduler/SchedulerTestUtils.java           |  21 ++-
 19 files changed, 352 insertions(+), 683 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c73b2fe1/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index c1f423b..9d3e128 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -44,6 +44,8 @@ import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.StackTraceSampleResponse;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
 
 import org.slf4j.Logger;
 
@@ -52,9 +54,11 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
 import static org.apache.flink.runtime.execution.ExecutionState.CANCELED;
@@ -126,9 +130,11 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
        /** A future that completes once the Execution reaches a terminal 
ExecutionState */
        private final CompletableFuture<ExecutionState> terminationFuture;
 
+       private final CompletableFuture<TaskManagerLocation> 
taskManagerLocationFuture;
+
        private volatile ExecutionState state = CREATED;
 
-       private volatile SimpleSlot assignedResource;     // once assigned, 
never changes until the execution is archived
+       private final AtomicReference<SimpleSlot> assignedResource;
 
        private volatile Throwable failureCause;          // once assigned, 
never changes
 
@@ -185,6 +191,9 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
 
                this.partialInputChannelDeploymentDescriptors = new 
ConcurrentLinkedQueue<>();
                this.terminationFuture = new CompletableFuture<>();
+               this.taskManagerLocationFuture = new CompletableFuture<>();
+
+               this.assignedResource = new AtomicReference<>();
        }
 
        // 
--------------------------------------------------------------------------------------------
@@ -220,14 +229,53 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
                return globalModVersion;
        }
 
+       public CompletableFuture<TaskManagerLocation> 
getTaskManagerLocationFuture() {
+               return taskManagerLocationFuture;
+       }
+
        public SimpleSlot getAssignedResource() {
-               return assignedResource;
+               return assignedResource.get();
+       }
+
+       /**
+        * Tries to assign the given slot to the execution. The assignment 
works only if the
+        * Execution is in state SCHEDULED. Returns true, if the resource could 
be assigned.
+        *
+        * @param slot to assign to this execution
+        * @return true if the slot could be assigned to the execution, 
otherwise false
+        */
+       boolean tryAssignResource(final SimpleSlot slot) {
+               Preconditions.checkNotNull(slot);
+
+               // only allow to set the assigned resource in state SCHEDULED 
or CREATED
+               // note: we also accept resource assignment when being in state 
CREATED for testing purposes
+               if (state == SCHEDULED || state == CREATED) {
+                       if (assignedResource.compareAndSet(null, slot)) {
+                               // check for concurrent modification (e.g. 
cancelling call)
+                               if (state == SCHEDULED || state == CREATED) {
+                                       
Preconditions.checkState(!taskManagerLocationFuture.isDone(), "The 
TaskManagerLocationFuture should not be set if we haven't assigned a resource 
yet.");
+                                       
taskManagerLocationFuture.complete(slot.getTaskManagerLocation());
+
+                                       return true;
+                               } else {
+                                       // free assigned resource and return 
false
+                                       assignedResource.set(null);
+                                       return false;
+                               }
+                       } else {
+                               // the slot already has another slot assigned
+                               return false;
+                       }
+               } else {
+                       // do not allow resource assignment if we are not in 
state SCHEDULED
+                       return false;
+               }
        }
 
        @Override
        public TaskManagerLocation getAssignedResourceLocation() {
                // returns non-null only when a location is already assigned
-               return assignedResource != null ? 
assignedResource.getTaskManagerLocation() : null;
+               return assignedResource.get() != null ? 
assignedResource.get().getTaskManagerLocation() : null;
        }
 
        public Throwable getFailureCause() {
@@ -301,27 +349,23 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
         */
        public boolean scheduleForExecution(SlotProvider slotProvider, boolean 
queued) {
                try {
-                       final CompletableFuture<SimpleSlot> 
slotAllocationFuture = allocateSlotForExecution(slotProvider, queued);
+                       final CompletableFuture<Execution> allocationFuture = 
allocateAndAssignSlotForExecution(slotProvider, queued);
 
                        // IMPORTANT: We have to use the synchronous handle 
operation (direct executor) here so
                        // that we directly deploy the tasks if the slot 
allocation future is completed. This is
                        // necessary for immediate deployment.
-                       final CompletableFuture<Void> deploymentFuture = 
slotAllocationFuture.handle(
-                               (simpleSlot, throwable) ->  {
-                                       if (simpleSlot != null) {
+                       final CompletableFuture<Void> deploymentFuture = 
allocationFuture.handle(
+                               (Execution ignored, Throwable throwable) ->  {
+                                       if (throwable != null) {
+                                               
markFailed(ExceptionUtils.stripCompletionException(throwable));
+                                       }
+                                       else {
                                                try {
-                                                       
deployToSlot(simpleSlot);
+                                                       deploy();
                                                } catch (Throwable t) {
-                                                       try {
-                                                               
simpleSlot.releaseSlot();
-                                                       } finally {
-                                                               markFailed(t);
-                                                       }
+                                                       
markFailed(ExceptionUtils.stripCompletionException(t));
                                                }
                                        }
-                                       else {
-                                               markFailed(throwable);
-                                       }
                                        return null;
                                }
                        );
@@ -338,8 +382,16 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
                }
        }
 
-       public CompletableFuture<SimpleSlot> 
allocateSlotForExecution(SlotProvider slotProvider, boolean queued)
-                       throws IllegalExecutionStateException {
+       /**
+        * Allocates and assigns a slot obtained from the slot provider to the 
execution.
+        *
+        * @param slotProvider to obtain a new slot from
+        * @param queued if the allocation can be queued
+        * @return Future which is completed with this execution once the slot 
has been assigned
+        *                      or with an exception if an error occurred.
+        * @throws IllegalExecutionStateException if this method has been 
called while not being in the CREATED state
+        */
+       public CompletableFuture<Execution> 
allocateAndAssignSlotForExecution(SlotProvider slotProvider, boolean queued) 
throws IllegalExecutionStateException {
 
                checkNotNull(slotProvider);
 
@@ -359,7 +411,18 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
                                        new ScheduledUnit(this, sharingGroup) :
                                        new ScheduledUnit(this, sharingGroup, 
locationConstraint);
 
-                       return slotProvider.allocateSlot(toSchedule, queued);
+                       CompletableFuture<SimpleSlot> slotFuture = 
slotProvider.allocateSlot(toSchedule, queued);
+
+                       return slotFuture.thenApply(slot -> {
+                               if (tryAssignResource(slot)) {
+                                       return this;
+                               } else {
+                                       // release the slot
+                                       slot.releaseSlot();
+
+                                       throw new CompletionException(new 
FlinkException("Could not assign slot " + slot + " to execution " + this + " 
because it has already been assigned "));
+                               }
+                       });
                }
                else {
                        // call race, already deployed, or already done
@@ -367,8 +430,15 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
                }
        }
 
-       public void deployToSlot(final SimpleSlot slot) throws JobException {
-               checkNotNull(slot);
+       /**
+        * Deploys the execution to the previously assigned resource.
+        *
+        * @throws JobException if the execution cannot be deployed to the 
assigned resource
+        */
+       public void deploy() throws JobException {
+               final SimpleSlot slot  = assignedResource.get();
+
+               checkNotNull(slot, "In order to deploy the execution we first 
have to assign a resource via tryAssignResource.");
 
                // Check if the TaskManager died in the meantime
                // This only speeds up the response to TaskManagers failing 
concurrently to deployments.
@@ -397,7 +467,6 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
                        if (!slot.setExecutedVertex(this)) {
                                throw new JobException("Could not assign the 
ExecutionVertex to the slot " + slot);
                        }
-                       this.assignedResource = slot;
 
                        // race double check, did we fail/cancel and do we need 
to release the slot?
                        if (this.state != DEPLOYING) {
@@ -447,7 +516,7 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
         * Sends stop RPC call.
         */
        public void stop() {
-               final SimpleSlot slot = assignedResource;
+               final SimpleSlot slot = assignedResource.get();
 
                if (slot != null) {
                        final TaskManagerGateway taskManagerGateway = 
slot.getTaskManagerGateway();
@@ -504,10 +573,16 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
                                        // we skip the canceling state. set the 
timestamp, for a consistent appearance
                                        markTimestamp(CANCELING, 
getStateTimestamp(CANCELED));
 
+                                       // cancel the future in order to fail 
depending scheduling operations
+                                       taskManagerLocationFuture.cancel(false);
+
                                        try {
                                                
vertex.getExecutionGraph().deregisterExecution(this);
-                                               if (assignedResource != null) {
-                                                       
assignedResource.releaseSlot();
+
+                                               final SimpleSlot slot = 
assignedResource.get();
+
+                                               if (slot != null) {
+                                                       slot.releaseSlot();
                                                }
                                        }
                                        finally {
@@ -673,7 +748,7 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
                        int maxStrackTraceDepth,
                        Time timeout) {
 
-               final SimpleSlot slot = assignedResource;
+               final SimpleSlot slot = assignedResource.get();
 
                if (slot != null) {
                        final TaskManagerGateway taskManagerGateway = 
slot.getTaskManagerGateway();
@@ -697,7 +772,7 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
         * @param timestamp of the completed checkpoint
         */
        public void notifyCheckpointComplete(long checkpointId, long timestamp) 
{
-               final SimpleSlot slot = assignedResource;
+               final SimpleSlot slot = assignedResource.get();
 
                if (slot != null) {
                        final TaskManagerGateway taskManagerGateway = 
slot.getTaskManagerGateway();
@@ -717,7 +792,7 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
         * @param checkpointOptions of the checkpoint to trigger
         */
        public void triggerCheckpoint(long checkpointId, long timestamp, 
CheckpointOptions checkpointOptions) {
-               final SimpleSlot slot = assignedResource;
+               final SimpleSlot slot = assignedResource.get();
 
                if (slot != null) {
                        final TaskManagerGateway taskManagerGateway = 
slot.getTaskManagerGateway();
@@ -775,7 +850,12 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
 
                                                
updateAccumulatorsAndMetrics(userAccumulators, metrics);
 
-                                               assignedResource.releaseSlot();
+                                               final SimpleSlot slot = 
assignedResource.get();
+
+                                               if (slot != null) {
+                                                       slot.releaseSlot();
+                                               }
+
                                                
vertex.getExecutionGraph().deregisterExecution(this);
                                        }
                                        finally {
@@ -828,7 +908,12 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
 
                                if (transitionState(current, CANCELED)) {
                                        try {
-                                               assignedResource.releaseSlot();
+                                               final SimpleSlot slot = 
assignedResource.get();
+
+                                               if (slot != null) {
+                                                       slot.releaseSlot();
+                                               }
+
                                                
vertex.getExecutionGraph().deregisterExecution(this);
                                        }
                                        finally {
@@ -920,8 +1005,9 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
                                updateAccumulatorsAndMetrics(userAccumulators, 
metrics);
 
                                try {
-                                       if (assignedResource != null) {
-                                               assignedResource.releaseSlot();
+                                       final SimpleSlot slot = 
assignedResource.get();
+                                       if (slot != null) {
+                                               slot.releaseSlot();
                                        }
                                        
vertex.getExecutionGraph().deregisterExecution(this);
                                }
@@ -936,7 +1022,7 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
                                        }
 
                                        try {
-                                               if (assignedResource != null) {
+                                               if (assignedResource.get() != 
null) {
                                                        sendCancelRpcCall();
                                                }
                                        } catch (Throwable tt) {
@@ -1003,7 +1089,7 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
         * The sending is tried up to NUM_CANCEL_CALL_TRIES times.
         */
        private void sendCancelRpcCall() {
-               final SimpleSlot slot = assignedResource;
+               final SimpleSlot slot = assignedResource.get();
 
                if (slot != null) {
                        final TaskManagerGateway taskManagerGateway = 
slot.getTaskManagerGateway();
@@ -1024,7 +1110,7 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
        }
 
        private void sendFailIntermediateResultPartitionsRpcCall() {
-               final SimpleSlot slot = assignedResource;
+               final SimpleSlot slot = assignedResource.get();
 
                if (slot != null) {
                        final TaskManagerGateway taskManagerGateway = 
slot.getTaskManagerGateway();
@@ -1042,7 +1128,7 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
        private void sendUpdatePartitionInfoRpcCall(
                        final Iterable<PartitionInfo> partitionInfos) {
 
-               final SimpleSlot slot = assignedResource;
+               final SimpleSlot slot = assignedResource.get();
 
                if (slot != null) {
                        final TaskManagerGateway taskManagerGateway = 
slot.getTaskManagerGateway();
@@ -1162,8 +1248,10 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
        
        @Override
        public String toString() {
+               final SimpleSlot slot = assignedResource.get();
+
                return String.format("Attempt #%d (%s) @ %s - [%s]", 
attemptNumber, vertex.getTaskNameWithSubtaskIndex(),
-                               (assignedResource == null ? "(unassigned)" : 
assignedResource.toString()), state);
+                               (slot == null ? "(unassigned)" : slot), state);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/c73b2fe1/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAndAllocationFuture.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAndAllocationFuture.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAndAllocationFuture.java
new file mode 100644
index 0000000..1022dbc
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAndAllocationFuture.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A pair of an {@link Execution} together with an allocation future.
+ */
+public class ExecutionAndAllocationFuture {
+
+       public final Execution executionAttempt;
+
+       public final CompletableFuture<Void> allocationFuture;
+
+       public ExecutionAndAllocationFuture(Execution executionAttempt, 
CompletableFuture<Void> allocationFuture) {
+               this.executionAttempt = checkNotNull(executionAttempt);
+               this.allocationFuture = checkNotNull(allocationFuture);
+       }
+
+       // 
-----------------------------------------------------------------------
+
+       @Override
+       public String toString() {
+               return super.toString();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c73b2fe1/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAndSlot.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAndSlot.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAndSlot.java
deleted file mode 100644
index 123ff0c..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAndSlot.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.executiongraph;
-
-import org.apache.flink.runtime.instance.SimpleSlot;
-
-import java.util.concurrent.CompletableFuture;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * A pair of an {@link Execution} together with a slot future.
- */
-public class ExecutionAndSlot {
-
-       public final Execution executionAttempt;
-
-       public final CompletableFuture<SimpleSlot> slotFuture;
-
-       public ExecutionAndSlot(Execution executionAttempt, 
CompletableFuture<SimpleSlot> slotFuture) {
-               this.executionAttempt = checkNotNull(executionAttempt);
-               this.slotFuture = checkNotNull(slotFuture);
-       }
-
-       // 
-----------------------------------------------------------------------
-
-       @Override
-       public String toString() {
-               return super.toString();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c73b2fe1/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index dca6c44..62c6e99 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -50,7 +50,6 @@ import 
org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy;
 import 
org.apache.flink.runtime.executiongraph.restart.ExecutionGraphRestartCallback;
 import org.apache.flink.runtime.executiongraph.restart.RestartCallback;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
-import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.instance.SlotProvider;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
@@ -90,7 +89,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutionException;
@@ -878,113 +876,67 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
                // that way we do not have any operation that can fail between 
allocating the slots
                // and adding them to the list. If we had a failure in between 
there, that would
                // cause the slots to get lost
-               final ArrayList<ExecutionAndSlot[]> resources = new 
ArrayList<>(getNumberOfExecutionJobVertices());
                final boolean queued = allowQueuedScheduling;
 
-               // we use this flag to handle failures in a 'finally' clause
-               // that allows us to not go through clumsy cast-and-rethrow 
logic
-               boolean successful = false;
+               // collecting all the slots may resize and fail in that 
operation without slots getting lost
+               final ArrayList<CompletableFuture<Execution>> 
allAllocationFutures = new ArrayList<>(getNumberOfExecutionJobVertices());
 
-               try {
-                       // collecting all the slots may resize and fail in that 
operation without slots getting lost
-                       final ArrayList<CompletableFuture<SimpleSlot>> 
slotFutures = new ArrayList<>(getNumberOfExecutionJobVertices());
+               // allocate the slots (obtain all their futures
+               for (ExecutionJobVertex ejv : getVerticesTopologically()) {
+                       // these calls are not blocking, they only return 
futures
+                       Collection<CompletableFuture<Execution>> 
allocationFutures = ejv.allocateResourcesForAll(slotProvider, queued);
 
-                       // allocate the slots (obtain all their futures
-                       for (ExecutionJobVertex ejv : 
getVerticesTopologically()) {
-                               // these calls are not blocking, they only 
return futures
-                               ExecutionAndSlot[] slots = 
ejv.allocateResourcesForAll(slotProvider, queued);
+                       allAllocationFutures.addAll(allocationFutures);
+               }
 
-                               // we need to first add the slots to this list, 
to be safe on release
-                               resources.add(slots);
+               // this future is complete once all slot futures are complete.
+               // the future fails once one slot future fails.
+               final ConjunctFuture<Collection<Execution>> 
allAllocationsComplete = FutureUtils.combineAll(allAllocationFutures);
 
-                               for (ExecutionAndSlot ens : slots) {
-                                       slotFutures.add(ens.slotFuture);
-                               }
+               // make sure that we fail if the allocation timeout was exceeded
+               final ScheduledFuture<?> timeoutCancelHandle = 
futureExecutor.schedule(new Runnable() {
+                       @Override
+                       public void run() {
+                               // When the timeout triggers, we try to 
complete the conjunct future with an exception.
+                               // Note that this is a no-op if the future is 
already completed
+                               int numTotal = 
allAllocationsComplete.getNumFuturesTotal();
+                               int numComplete = 
allAllocationsComplete.getNumFuturesCompleted();
+                               String message = "Could not allocate all 
requires slots within timeout of " +
+                                               timeout + ". Slots required: " 
+ numTotal + ", slots allocated: " + numComplete;
+
+                               
allAllocationsComplete.completeExceptionally(new 
NoResourceAvailableException(message));
                        }
+               }, timeout.getSize(), timeout.getUnit());
 
-                       // this future is complete once all slot futures are 
complete.
-                       // the future fails once one slot future fails.
-                       final ConjunctFuture<Void> allAllocationsComplete = 
FutureUtils.waitForAll(slotFutures);
-
-                       // make sure that we fail if the allocation timeout was 
exceeded
-                       final ScheduledFuture<?> timeoutCancelHandle = 
futureExecutor.schedule(new Runnable() {
-                               @Override
-                               public void run() {
-                                       // When the timeout triggers, we try to 
complete the conjunct future with an exception.
-                                       // Note that this is a no-op if the 
future is already completed
-                                       int numTotal = 
allAllocationsComplete.getNumFuturesTotal();
-                                       int numComplete = 
allAllocationsComplete.getNumFuturesCompleted();
-                                       String message = "Could not allocate 
all requires slots within timeout of " +
-                                                       timeout + ". Slots 
required: " + numTotal + ", slots allocated: " + numComplete;
-
-                                       
allAllocationsComplete.completeExceptionally(new 
NoResourceAvailableException(message));
-                               }
-                       }, timeout.getSize(), timeout.getUnit());
-
-
-                       allAllocationsComplete.handleAsync(
-                               (Void slots, Throwable throwable) -> {
-                                       try {
-                                               // we do not need the 
cancellation timeout any more
-                                               
timeoutCancelHandle.cancel(false);
-
-                                               if (throwable == null) {
-                                                       // successfully 
obtained all slots, now deploy
-
-                                                       for (ExecutionAndSlot[] 
jobVertexTasks : resources) {
-                                                               for 
(ExecutionAndSlot execAndSlot : jobVertexTasks) {
-
-                                                                       // the 
futures must all be ready - this is simply a sanity check
-                                                                       final 
SimpleSlot slot;
-                                                                       try {
-                                                                               
slot = execAndSlot.slotFuture.getNow(null);
-                                                                               
checkNotNull(slot);
-                                                                       }
-                                                                       catch 
(CompletionException | NullPointerException e) {
-                                                                               
throw new IllegalStateException("SlotFuture is incomplete " +
-                                                                               
                "or erroneous even though all futures completed", e);
-                                                                       }
-
-                                                                       // 
actual deployment
-                                                                       
execAndSlot.executionAttempt.deployToSlot(slot);
-                                                               }
-                                                       }
-                                               }
-                                               else {
-                                                       // let the exception 
handler deal with this
-                                                       throw throwable;
-                                               }
-                                       }
-                                       catch (Throwable t) {
-                                               // we catch everything here to 
make sure cleanup happens and the
-                                               // ExecutionGraph notices the 
error
 
-                                               // we need to to release all 
slots before going into recovery!
-                                               try {
-                                                       
ExecutionGraphUtils.releaseAllSlotsSilently(resources);
-                                               }
-                                               finally {
-                                                       failGlobal(t);
+               allAllocationsComplete.handleAsync(
+                       (Collection<Execution> executions, Throwable throwable) 
-> {
+                               try {
+                                       // we do not need the cancellation 
timeout any more
+                                       timeoutCancelHandle.cancel(false);
+
+                                       if (throwable == null) {
+                                               // successfully obtained all 
slots, now deploy
+                                               for (Execution execution : 
executions) {
+                                                       execution.deploy();
                                                }
                                        }
+                                       else {
+                                               // let the exception handler 
deal with this
+                                               throw throwable;
+                                       }
+                               }
+                               catch (Throwable t) {
+                                       // we catch everything here to make 
sure cleanup happens and the
+                                       // ExecutionGraph notices the error
+                                       
failGlobal(ExceptionUtils.stripCompletionException(t));
+                               }
 
-                                       // Wouldn't it be nice if we could 
return an actual Void object?
-                                       // return (Void) 
Unsafe.getUnsafe().allocateInstance(Void.class);
-                                       return null;
-                               },
-                               futureExecutor);
-
-                       // from now on, slots will be rescued by the futures 
and their completion, or by the timeout
-                       successful = true;
-               }
-               finally {
-                       if (!successful) {
-                               // we come here only if the 'try' block 
finished with an exception
-                               // we release the slots (possibly failing some 
executions on the way) and
-                               // let the exception bubble up
-                               
ExecutionGraphUtils.releaseAllSlotsSilently(resources);
-                       }
-               }
+                               // Wouldn't it be nice if we could return an 
actual Void object?
+                               // return (Void) 
Unsafe.getUnsafe().allocateInstance(Void.class);
+                               return null;
+                       },
+                       futureExecutor);
        }
 
        public void cancel() {

http://git-wip-us.apache.org/repos/asf/flink/blob/c73b2fe1/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtils.java
deleted file mode 100644
index f1d793d..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtils.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.executiongraph;
-
-import org.apache.flink.runtime.instance.SimpleSlot;
-import org.apache.flink.util.ExceptionUtils;
-
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.function.BiFunction;
-
-/**
- * Utilities for dealing with the execution graphs and scheduling.
- */
-public class ExecutionGraphUtils {
-
-       /**
-        * Releases the slot represented by the given future. If the future is 
complete, the
-        * slot is immediately released. Otherwise, the slot is released as 
soon as the future
-        * is completed.
-        * 
-        * <p>Note that releasing the slot means cancelling any task execution 
currently
-        * associated with that slot.
-        * 
-        * @param slotFuture The future for the slot to release.
-        */
-       public static void releaseSlotFuture(CompletableFuture<SimpleSlot> 
slotFuture) {
-               slotFuture.handle(ReleaseSlotFunction.INSTANCE::apply);
-       }
-
-       /**
-        * Releases the all the slots in the list of arrays of {@code 
ExecutionAndSlot}.
-        * For each future in that collection holds: If the future is complete, 
its slot is
-        * immediately released. Otherwise, the slot is released as soon as the 
future
-        * is completed.
-        * 
-        * <p>This methods never throws any exceptions (except for fatal 
exceptions) and continues
-        * to release the remaining slots if one slot release failed.
-        *
-        * <p>Note that releasing the slot means cancelling any task execution 
currently
-        * associated with that slot.
-        * 
-        * @param resources The collection of ExecutionAndSlot whose slots 
should be released.
-        */
-       public static void releaseAllSlotsSilently(List<ExecutionAndSlot[]> 
resources) {
-               try {
-                       for (ExecutionAndSlot[] jobVertexResources : resources) 
{
-                               if (jobVertexResources != null) {
-                                       for (ExecutionAndSlot execAndSlot : 
jobVertexResources) {
-                                               if (execAndSlot != null) {
-                                                       try {
-                                                               
releaseSlotFuture(execAndSlot.slotFuture);
-                                                       }
-                                                       catch (Throwable t) {
-                                                               
ExceptionUtils.rethrowIfFatalError(t);
-                                                       }
-                                               }
-                                       }
-                               }
-                       }
-               }
-               catch (Throwable t) {
-                       ExceptionUtils.rethrowIfFatalError(t);
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-
-       /**
-        * A function to be applied into a future, releasing the slot 
immediately upon completion.
-        * Completion here refers to both the successful and exceptional 
completion.
-        */
-       private static final class ReleaseSlotFunction implements 
BiFunction<SimpleSlot, Throwable, Void> {
-
-               static final ReleaseSlotFunction INSTANCE = new 
ReleaseSlotFunction();
-
-               @Override
-               public Void apply(SimpleSlot simpleSlot, Throwable throwable) {
-                       if (simpleSlot != null) {
-                               simpleSlot.releaseSlot();
-                       }
-                       return null;
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-
-       /** Utility class is not meant to be instantiated */
-       private ExecutionGraphUtils() {}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c73b2fe1/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 90224b0..98191d0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -35,7 +35,6 @@ import 
org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.blob.PermanentBlobKey;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.instance.SlotProvider;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
@@ -57,6 +56,7 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -475,37 +475,21 @@ public class ExecutionJobVertex implements 
AccessExecutionJobVertex, Archiveable
         * 
         * @param resourceProvider The resource provider from whom the slots 
are requested.
         */
-       public ExecutionAndSlot[] allocateResourcesForAll(SlotProvider 
resourceProvider, boolean queued) {
+       public Collection<CompletableFuture<Execution>> 
allocateResourcesForAll(SlotProvider resourceProvider, boolean queued) {
                final ExecutionVertex[] vertices = this.taskVertices;
-               final ExecutionAndSlot[] slots = new 
ExecutionAndSlot[vertices.length];
+               final CompletableFuture<Execution>[] slots = new 
CompletableFuture[vertices.length];
 
                // try to acquire a slot future for each execution.
                // we store the execution with the future just to be on the 
safe side
                for (int i = 0; i < vertices.length; i++) {
-
-                       // we use this flag to handle failures in a 'finally' 
clause
-                       // that allows us to not go through clumsy 
cast-and-rethrow logic
-                       boolean successful = false;
-
-                       try {
-                               // allocate the next slot (future)
-                               final Execution exec = 
vertices[i].getCurrentExecutionAttempt();
-                               final CompletableFuture<SimpleSlot> future = 
exec.allocateSlotForExecution(resourceProvider, queued);
-                               slots[i] = new ExecutionAndSlot(exec, future);
-                               successful = true;
-                       }
-                       finally {
-                               if (!successful) {
-                                       // this is the case if an exception was 
thrown
-                                       for (int k = 0; k < i; k++) {
-                                               
ExecutionGraphUtils.releaseSlotFuture(slots[k].slotFuture);
-                                       }
-                               }
-                       }
+                       // allocate the next slot (future)
+                       final Execution exec = 
vertices[i].getCurrentExecutionAttempt();
+                       final CompletableFuture<Execution> allocationFuture = 
exec.allocateAndAssignSlotForExecution(resourceProvider, queued);
+                       slots[i] = allocationFuture;
                }
 
                // all good, we acquired all slots
-               return slots;
+               return Arrays.asList(slots);
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/c73b2fe1/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 6b9d481..e87a5a0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -55,6 +55,7 @@ import org.slf4j.Logger;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
@@ -266,6 +267,10 @@ public class ExecutionVertex implements 
AccessExecutionVertex, Archiveable<Archi
                return currentExecution.getFailureCause();
        }
 
+       public CompletableFuture<TaskManagerLocation> 
getCurrentTaskManagerLocationFuture() {
+               return currentExecution.getTaskManagerLocationFuture();
+       }
+
        public SimpleSlot getCurrentAssignedResource() {
                return currentExecution.getAssignedResource();
        }
@@ -445,8 +450,8 @@ public class ExecutionVertex implements 
AccessExecutionVertex, Archiveable<Archi
         * @see #getPreferredLocationsBasedOnState()
         * @see #getPreferredLocationsBasedOnInputs() 
         */
-       public Iterable<TaskManagerLocation> getPreferredLocations() {
-               Iterable<TaskManagerLocation> basedOnState = 
getPreferredLocationsBasedOnState();
+       public Collection<CompletableFuture<TaskManagerLocation>> 
getPreferredLocations() {
+               Collection<CompletableFuture<TaskManagerLocation>> basedOnState 
= getPreferredLocationsBasedOnState();
                return basedOnState != null ? basedOnState : 
getPreferredLocationsBasedOnInputs();
        }
        
@@ -454,13 +459,13 @@ public class ExecutionVertex implements 
AccessExecutionVertex, Archiveable<Archi
         * Gets the preferred location to execute the current task execution 
attempt, based on the state
         * that the execution attempt will resume.
         * 
-        * @return A size-one iterable with the location preference, or null, 
if there is no
+        * @return A size-one collection with the location preference, or null, 
if there is no
         *         location preference based on the state.
         */
-       public Iterable<TaskManagerLocation> 
getPreferredLocationsBasedOnState() {
+       public Collection<CompletableFuture<TaskManagerLocation>> 
getPreferredLocationsBasedOnState() {
                TaskManagerLocation priorLocation;
                if (currentExecution.getTaskStateSnapshot() != null && 
(priorLocation = getLatestPriorLocation()) != null) {
-                       return Collections.singleton(priorLocation);
+                       return 
Collections.singleton(CompletableFuture.completedFuture(priorLocation));
                }
                else {
                        return null;
@@ -476,14 +481,13 @@ public class ExecutionVertex implements 
AccessExecutionVertex, Archiveable<Archi
         * @return The preferred locations based in input streams, or an empty 
iterable,
         *         if there is no input-based preference.
         */
-       public Iterable<TaskManagerLocation> 
getPreferredLocationsBasedOnInputs() {
+       public Collection<CompletableFuture<TaskManagerLocation>> 
getPreferredLocationsBasedOnInputs() {
                // otherwise, base the preferred locations on the input 
connections
                if (inputEdges == null) {
                        return Collections.emptySet();
                }
                else {
-                       Set<TaskManagerLocation> locations = new HashSet<>();
-                       Set<TaskManagerLocation> inputLocations = new 
HashSet<>();
+                       Set<CompletableFuture<TaskManagerLocation>> 
inputLocations = new HashSet<>(4);
 
                        // go over all inputs
                        for (int i = 0; i < inputEdges.length; i++) {
@@ -493,28 +497,17 @@ public class ExecutionVertex implements 
AccessExecutionVertex, Archiveable<Archi
                                        // go over all input sources
                                        for (int k = 0; k < sources.length; 
k++) {
                                                // look-up assigned slot of 
input source
-                                               SimpleSlot sourceSlot = 
sources[k].getSource().getProducer().getCurrentAssignedResource();
-                                               if (sourceSlot != null) {
-                                                       // add input location
-                                                       
inputLocations.add(sourceSlot.getTaskManagerLocation());
-                                                       // inputs which have 
too many distinct sources are not considered
-                                                       if 
(inputLocations.size() > MAX_DISTINCT_LOCATIONS_TO_CONSIDER) {
-                                                               
inputLocations.clear();
-                                                               break;
-                                                       }
+                                               
CompletableFuture<TaskManagerLocation> taskManagerLocationFuture = 
sources[k].getSource().getProducer().getCurrentTaskManagerLocationFuture();
+                                               
inputLocations.add(taskManagerLocationFuture);
+
+                                               if (inputLocations.size() > 
MAX_DISTINCT_LOCATIONS_TO_CONSIDER) {
+                                                       return 
Collections.emptyList();
                                                }
                                        }
                                }
-                               // keep the locations of the input with the 
least preferred locations
-                               if (locations.isEmpty() || // nothing assigned 
yet
-                                               (!inputLocations.isEmpty() && 
inputLocations.size() < locations.size())) {
-                                       // current input has fewer preferred 
locations
-                                       locations.clear();
-                                       locations.addAll(inputLocations);
-                               }
                        }
 
-                       return locations.isEmpty() ? 
Collections.<TaskManagerLocation>emptyList() : locations;
+                       return inputLocations.isEmpty() ? 
Collections.emptyList() : inputLocations;
                }
        }
 
@@ -598,8 +591,14 @@ public class ExecutionVertex implements 
AccessExecutionVertex, Archiveable<Archi
                return this.currentExecution.scheduleForExecution(slotProvider, 
queued);
        }
 
+       @VisibleForTesting
        public void deployToSlot(SimpleSlot slot) throws JobException {
-               this.currentExecution.deployToSlot(slot);
+               if (this.currentExecution.tryAssignResource(slot)) {
+                       this.currentExecution.deploy();
+               } else {
+                       throw new IllegalStateException("Could not assign 
resource " + slot + " to current execution " +
+                               currentExecution + '.');
+               }
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/c73b2fe1/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
index 6397043..fcf7d40 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
@@ -1003,10 +1003,13 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway {
 
                @Override
                public CompletableFuture<SimpleSlot> allocateSlot(ScheduledUnit 
task, boolean allowQueued) {
-                       Iterable<TaskManagerLocation> locationPreferences = 
-                                       
task.getTaskToExecute().getVertex().getPreferredLocations();
+                       Collection<CompletableFuture<TaskManagerLocation>> 
locationPreferenceFutures =
+                               
task.getTaskToExecute().getVertex().getPreferredLocations();
 
-                       return gateway.allocateSlot(task, 
ResourceProfile.UNKNOWN, locationPreferences, timeout);
+                       CompletableFuture<Collection<TaskManagerLocation>> 
locationPreferencesFuture = FutureUtils.combineAll(locationPreferenceFutures);
+
+                       return locationPreferencesFuture.thenCompose(
+                               locationPreferences -> 
gateway.allocateSlot(task, ResourceProfile.UNKNOWN, locationPreferences, 
timeout));
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c73b2fe1/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
index 88fbc10..a071e50 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
@@ -266,18 +266,12 @@ public class SlotSharingGroupAssignment {
         * slots if no local slot is available. The method returns null, when 
this sharing group has
         * no slot is available for the given JobVertexID. 
         *
-        * @param vertex The vertex to allocate a slot for.
+        * @param vertexID the vertex id
+        * @param locationPreferences location preferences
         *
         * @return A slot to execute the given ExecutionVertex in, or null, if 
none is available.
         */
-       public SimpleSlot getSlotForTask(ExecutionVertex vertex) {
-               return getSlotForTask(vertex.getJobvertexId(), 
vertex.getPreferredLocationsBasedOnInputs());
-       }
-
-       /**
-        * 
-        */
-       SimpleSlot getSlotForTask(JobVertexID vertexID, 
Iterable<TaskManagerLocation> locationPreferences) {
+       public SimpleSlot getSlotForTask(JobVertexID vertexID, 
Iterable<TaskManagerLocation> locationPreferences) {
                synchronized (lock) {
                        Tuple2<SharedSlot, Locality> p = 
getSlotForTaskInternal(vertexID, locationPreferences, false);
 
@@ -306,17 +300,13 @@ public class SlotSharingGroupAssignment {
         * shared slot and returns it. If no suitable shared slot could be 
found, this method
         * returns null.</p>
         * 
-        * @param vertex The execution vertex to find a slot for.
         * @param constraint The co-location constraint for the placement of 
the execution vertex.
+        * @param locationPreferences location preferences
         * 
         * @return A simple slot allocate within a suitable shared slot, or 
{@code null}, if no suitable
         *         shared slot is available.
         */
-       public SimpleSlot getSlotForTask(ExecutionVertex vertex, 
CoLocationConstraint constraint) {
-               return getSlotForTask(constraint, 
vertex.getPreferredLocationsBasedOnInputs());
-       }
-       
-       SimpleSlot getSlotForTask(CoLocationConstraint constraint, 
Iterable<TaskManagerLocation> locationPreferences) {
+       public SimpleSlot getSlotForTask(CoLocationConstraint constraint, 
Iterable<TaskManagerLocation> locationPreferences) {
                synchronized (lock) {
                        if (constraint.isAssignedAndAlive()) {
                                // the shared slot of the co-location group is 
initialized and set we allocate a sub-slot

http://git-wip-us.apache.org/repos/asf/flink/blob/c73b2fe1/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
index 5a7e819..9b1ffbe 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.jobmanager.scheduler;
 
 import java.util.ArrayDeque;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -31,6 +32,7 @@ import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.LinkedBlockingQueue;
 
@@ -134,31 +136,36 @@ public class Scheduler implements InstanceListener, 
SlotAvailabilityListener, Sl
 
        @Override
        public CompletableFuture<SimpleSlot> allocateSlot(ScheduledUnit task, 
boolean allowQueued) {
-               try {
-                       final Object ret = scheduleTask(task, allowQueued);
+               Collection<CompletableFuture<TaskManagerLocation>> 
preferredLocationFutures = 
task.getTaskToExecute().getVertex().getPreferredLocationsBasedOnInputs();
 
-                       if (ret instanceof SimpleSlot) {
-                               return 
CompletableFuture.completedFuture((SimpleSlot) ret);
-                       }
-                       else if (ret instanceof CompletableFuture) {
-                               @SuppressWarnings("unchecked")
-                               CompletableFuture<SimpleSlot> typed = 
(CompletableFuture<SimpleSlot>) ret;
-                               return typed;
-                       }
-                       else {
-                               // this should never happen, simply guard this 
case with an exception
-                               throw new RuntimeException();
+               CompletableFuture<Collection<TaskManagerLocation>> 
preferredLocationsFuture = FutureUtils.combineAll(preferredLocationFutures);
+
+               return preferredLocationsFuture.thenCompose(
+                       preferredLocations -> {
+                               try {
+                                       final Object ret = scheduleTask(task, 
allowQueued, preferredLocations);
+
+                                       if (ret instanceof SimpleSlot) {
+                                               return 
CompletableFuture.completedFuture((SimpleSlot) ret);
+                                       } else if (ret instanceof 
CompletableFuture) {
+                                               @SuppressWarnings("unchecked")
+                                               CompletableFuture<SimpleSlot> 
typed = (CompletableFuture<SimpleSlot>) ret;
+                                               return typed;
+                                       } else {
+                                               // this should never happen, 
simply guard this case with an exception
+                                               throw new RuntimeException();
+                                       }
+                               } catch (NoResourceAvailableException e) {
+                                       throw new CompletionException(e);
+                               }
                        }
-               }
-               catch (NoResourceAvailableException e) {
-                       return FutureUtils.completedExceptionally(e);
-               }
+               );
        }
 
        /**
         * Returns either a {@link SimpleSlot}, or a {@link CompletableFuture}.
         */
-       private Object scheduleTask(ScheduledUnit task, boolean 
queueIfNoResource) throws NoResourceAvailableException {
+       private Object scheduleTask(ScheduledUnit task, boolean 
queueIfNoResource, Iterable<TaskManagerLocation> preferredLocations) throws 
NoResourceAvailableException {
                if (task == null) {
                        throw new NullPointerException();
                }
@@ -168,7 +175,6 @@ public class Scheduler implements InstanceListener, 
SlotAvailabilityListener, Sl
 
                final ExecutionVertex vertex = 
task.getTaskToExecute().getVertex();
                
-               final Iterable<TaskManagerLocation> preferredLocations = 
vertex.getPreferredLocationsBasedOnInputs();
                final boolean forceExternalLocation = false &&
                                                                        
preferredLocations != null && preferredLocations.iterator().hasNext();
        
@@ -197,10 +203,10 @@ public class Scheduler implements InstanceListener, 
SlotAvailabilityListener, Sl
                                // get a slot from the group, if the group has 
one for us (and can fulfill the constraint)
                                final SimpleSlot slotFromGroup;
                                if (constraint == null) {
-                                       slotFromGroup = 
assignment.getSlotForTask(vertex);
+                                       slotFromGroup = 
assignment.getSlotForTask(vertex.getJobvertexId(), preferredLocations);
                                }
                                else {
-                                       slotFromGroup = 
assignment.getSlotForTask(vertex, constraint);
+                                       slotFromGroup = 
assignment.getSlotForTask(constraint, preferredLocations);
                                }
 
                                SimpleSlot newSlot = null;
@@ -234,7 +240,7 @@ public class Scheduler implements InstanceListener, 
SlotAvailabilityListener, Sl
                                                localOnly = true;
                                        }
                                        else {
-                                               locations = 
vertex.getPreferredLocationsBasedOnInputs();
+                                               locations = preferredLocations;
                                                localOnly = 
forceExternalLocation;
                                        }
                                        

http://git-wip-us.apache.org/repos/asf/flink/blob/c73b2fe1/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
index b90c306..69a679a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
@@ -38,7 +38,6 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
-import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
 import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
 import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
@@ -49,21 +48,22 @@ import org.apache.flink.util.TestLogger;
 
 import org.junit.After;
 import org.junit.Test;
-
-import org.mockito.invocation.InvocationOnMock;
 import org.mockito.verification.Timeout;
 
 import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 /**
  * Tests for the scheduling of the execution graph. This tests that
@@ -399,141 +399,6 @@ public class ExecutionGraphSchedulingTest extends 
TestLogger {
                }
        }
 
-       /**
-        * Tests that the {@link 
ExecutionJobVertex#allocateResourcesForAll(SlotProvider, boolean)} method
-        * releases partially acquired resources upon exception.
-        */
-       @Test
-       public void 
testExecutionJobVertexAllocateResourcesReleasesOnException() throws Exception {
-               final int parallelism = 8;
-
-               final JobVertex vertex = new JobVertex("vertex");
-               vertex.setParallelism(parallelism);
-               vertex.setInvokableClass(NoOpInvokable.class);
-
-               final JobID jobId = new JobID();
-               final JobGraph jobGraph = new JobGraph(jobId, "test", vertex);
-
-               // set up some available slots and some slot owner that accepts 
released slots back
-               final List<SimpleSlot> returnedSlots = new ArrayList<>();
-               final SlotOwner recycler = new SlotOwner() {
-                       @Override
-                       public boolean returnAllocatedSlot(Slot slot) {
-                               returnedSlots.add((SimpleSlot) slot);
-                               return true;
-                       }
-               };
-
-               // slot provider that hand out parallelism / 3 slots, then 
throws an exception
-               final SlotProvider slotProvider = mock(SlotProvider.class);
-
-               final TaskManagerGateway taskManager = 
mock(TaskManagerGateway.class);
-               final List<SimpleSlot> availableSlots = new 
ArrayList<>(Arrays.asList(
-                       createSlot(taskManager, jobId, recycler),
-                       createSlot(taskManager, jobId, recycler),
-                       createSlot(taskManager, jobId, recycler)));
-
-               when(slotProvider.allocateSlot(any(ScheduledUnit.class), 
anyBoolean())).then(
-                       (InvocationOnMock invocation) -> {
-                                       if (availableSlots.isEmpty()) {
-                                               throw new 
TestRuntimeException();
-                                       } else {
-                                               return 
CompletableFuture.completedFuture(availableSlots.remove(0));
-                                       }
-                               });
-
-               final ExecutionGraph eg = createExecutionGraph(jobGraph, 
slotProvider);
-               final ExecutionJobVertex ejv = eg.getJobVertex(vertex.getID());
-
-               // acquire resources and check that all are back after the 
failure
-
-               final int numSlotsToExpectBack = availableSlots.size();
-
-               try {
-                       ejv.allocateResourcesForAll(slotProvider, false);
-                       fail("should have failed with an exception");
-               }
-               catch (TestRuntimeException e) {
-                       // expected
-               }
-
-               assertEquals(numSlotsToExpectBack, returnedSlots.size());
-       }
-
-       /**
-        * Tests that the {@link ExecutionGraph#scheduleForExecution()} method
-        * releases partially acquired resources upon exception.
-        */
-       @Test
-       public void testExecutionGraphScheduleReleasesResourcesOnException() 
throws Exception {
-
-               //                                            [pipelined]
-               //  we construct a simple graph    (source) ----------------> 
(target)
-
-               final int parallelism = 3;
-
-               final JobVertex sourceVertex = new JobVertex("source");
-               sourceVertex.setParallelism(parallelism);
-               sourceVertex.setInvokableClass(NoOpInvokable.class);
-
-               final JobVertex targetVertex = new JobVertex("target");
-               targetVertex.setParallelism(parallelism);
-               targetVertex.setInvokableClass(NoOpInvokable.class);
-
-               targetVertex.connectNewDataSetAsInput(sourceVertex, 
DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
-
-               final JobID jobId = new JobID();
-               final JobGraph jobGraph = new JobGraph(jobId, "test", 
sourceVertex, targetVertex);
-
-               // set up some available slots and some slot owner that accepts 
released slots back
-               final List<SimpleSlot> returnedSlots = new ArrayList<>();
-               final SlotOwner recycler = new SlotOwner() {
-                       @Override
-                       public boolean returnAllocatedSlot(Slot slot) {
-                               returnedSlots.add((SimpleSlot) slot);
-                               return true;
-                       }
-               };
-
-               final TaskManagerGateway taskManager = 
mock(TaskManagerGateway.class);
-               final List<SimpleSlot> availableSlots = new 
ArrayList<>(Arrays.asList(
-                       createSlot(taskManager, jobId, recycler),
-                       createSlot(taskManager, jobId, recycler),
-                       createSlot(taskManager, jobId, recycler),
-                       createSlot(taskManager, jobId, recycler),
-                       createSlot(taskManager, jobId, recycler)));
-
-
-               // slot provider that hand out parallelism / 3 slots, then 
throws an exception
-               final SlotProvider slotProvider = mock(SlotProvider.class);
-
-               when(slotProvider.allocateSlot(any(ScheduledUnit.class), 
anyBoolean())).then(
-                       (InvocationOnMock invocation) -> {
-                                       if (availableSlots.isEmpty()) {
-                                               throw new 
TestRuntimeException();
-                                       } else {
-                                               return 
CompletableFuture.completedFuture(availableSlots.remove(0));
-                                       }
-                               });
-
-               final ExecutionGraph eg = createExecutionGraph(jobGraph, 
slotProvider);
-
-               // acquire resources and check that all are back after the 
failure
-
-               final int numSlotsToExpectBack = availableSlots.size();
-
-               try {
-                       eg.setScheduleMode(ScheduleMode.EAGER);
-                       eg.scheduleForExecution();
-                       fail("should have failed with an exception");
-               }
-               catch (TestRuntimeException e) {
-                       // expected
-               }
-
-               assertEquals(numSlotsToExpectBack, returnedSlots.size());
-       }
-
        // 
------------------------------------------------------------------------
        //  Utilities
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/c73b2fe1/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphStopTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphStopTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphStopTest.java
index de9081b..4ce3f9d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphStopTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphStopTest.java
@@ -112,25 +112,29 @@ public class ExecutionGraphStopTest extends TestLogger {
                // deploy source 1
                for (ExecutionVertex ev : 
eg.getJobVertex(source1.getID()).getTaskVertices()) {
                        SimpleSlot slot = 
ExecutionGraphTestUtils.createMockSimpleSlot(jid, sourceGateway);
-                       ev.getCurrentExecutionAttempt().deployToSlot(slot);
+                       ev.getCurrentExecutionAttempt().tryAssignResource(slot);
+                       ev.getCurrentExecutionAttempt().deploy();
                }
 
                // deploy source 2
                for (ExecutionVertex ev : 
eg.getJobVertex(source2.getID()).getTaskVertices()) {
                        SimpleSlot slot = 
ExecutionGraphTestUtils.createMockSimpleSlot(jid, sourceGateway);
-                       ev.getCurrentExecutionAttempt().deployToSlot(slot);
+                       ev.getCurrentExecutionAttempt().tryAssignResource(slot);
+                       ev.getCurrentExecutionAttempt().deploy();
                }
 
                // deploy non-source 1
                for (ExecutionVertex ev : 
eg.getJobVertex(nonSource1.getID()).getTaskVertices()) {
                        SimpleSlot slot = 
ExecutionGraphTestUtils.createMockSimpleSlot(jid, nonSourceGateway);
-                       ev.getCurrentExecutionAttempt().deployToSlot(slot);
+                       ev.getCurrentExecutionAttempt().tryAssignResource(slot);
+                       ev.getCurrentExecutionAttempt().deploy();
                }
 
                // deploy non-source 2
                for (ExecutionVertex ev : 
eg.getJobVertex(nonSource2.getID()).getTaskVertices()) {
                        SimpleSlot slot = 
ExecutionGraphTestUtils.createMockSimpleSlot(jid, nonSourceGateway);
-                       ev.getCurrentExecutionAttempt().deployToSlot(slot);
+                       ev.getCurrentExecutionAttempt().tryAssignResource(slot);
+                       ev.getCurrentExecutionAttempt().deploy();
                }
 
                eg.stop();
@@ -162,7 +166,8 @@ public class ExecutionGraphStopTest extends TestLogger {
 
                final SimpleSlot slot = 
ExecutionGraphTestUtils.createMockSimpleSlot(jid, gateway);
 
-               exec.deployToSlot(slot);
+               exec.tryAssignResource(slot);
+               exec.deploy();
                exec.switchToRunning();
                assertEquals(ExecutionState.RUNNING, exec.getState());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c73b2fe1/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index b534ade..5feeabc 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.executiongraph;
 
-import akka.actor.Status;
-
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
@@ -63,20 +61,20 @@ import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.util.SerializedValue;
 
+import akka.actor.Status;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import scala.concurrent.ExecutionContext;
-import scala.concurrent.ExecutionContext$;
-
 import java.lang.reflect.Field;
 import java.net.InetAddress;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeoutException;
 
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.ExecutionContext$;
+
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
-
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 
@@ -231,15 +229,10 @@ public class ExecutionGraphTestUtils {
        }
        
        public static void setVertexResource(ExecutionVertex vertex, SimpleSlot 
slot) {
-               try {
-                       Execution exec = vertex.getCurrentExecutionAttempt();
-                       
-                       Field f = 
Execution.class.getDeclaredField("assignedResource");
-                       f.setAccessible(true);
-                       f.set(exec, slot);
-               }
-               catch (Exception e) {
-                       throw new RuntimeException("Modifying the slot failed", 
e);
+               Execution exec = vertex.getCurrentExecutionAttempt();
+
+               if(!exec.tryAssignResource(slot)) {
+                       throw new RuntimeException("Could not assign 
resource.");
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c73b2fe1/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtilsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtilsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtilsTest.java
deleted file mode 100644
index c616501..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtilsTest.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.executiongraph;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.instance.SimpleSlot;
-import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
-import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
-import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-
-import org.junit.Test;
-
-import java.net.InetAddress;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-
-import static org.mockito.Mockito.*;
-
-/**
- * Tests for the utility methods in the class {@link ExecutionGraphUtils}.
- */
-public class ExecutionGraphUtilsTest {
-
-       @Test
-       public void testReleaseSlots() {
-               final JobID jid = new JobID();
-               final SlotOwner owner = mock(SlotOwner.class);
-
-               final SimpleSlot slot1 = new 
SimpleSlot(createAllocatedSlot(jid, 0), owner, 0);
-               final SimpleSlot slot2 = new 
SimpleSlot(createAllocatedSlot(jid, 1), owner, 1);
-               final SimpleSlot slot3 = new 
SimpleSlot(createAllocatedSlot(jid, 2), owner, 2);
-
-               final CompletableFuture<SimpleSlot> incompleteFuture = new 
CompletableFuture<>();
-
-               final CompletableFuture<SimpleSlot> completeFuture = new 
CompletableFuture<>();
-               completeFuture.complete(slot2);
-
-               final CompletableFuture<SimpleSlot> disposedSlotFuture = new 
CompletableFuture<>();
-               slot3.releaseSlot();
-               disposedSlotFuture.complete(slot3);
-
-               // release all futures
-               ExecutionGraphUtils.releaseSlotFuture(incompleteFuture);
-               ExecutionGraphUtils.releaseSlotFuture(completeFuture);
-               ExecutionGraphUtils.releaseSlotFuture(disposedSlotFuture);
-
-               // only now complete the incomplete future
-               incompleteFuture.complete(slot1);
-
-               // verify that each slot was returned once to the owner
-               verify(owner, times(1)).returnAllocatedSlot(eq(slot1));
-               verify(owner, times(1)).returnAllocatedSlot(eq(slot2));
-               verify(owner, times(1)).returnAllocatedSlot(eq(slot3));
-       }
-
-       @Test
-       public void testReleaseSlotsWithNulls() {
-               final JobID jid = new JobID();
-               final SlotOwner owner = mock(SlotOwner.class);
-
-               final Execution mockExecution = mock(Execution.class);
-
-               final SimpleSlot slot1 = new 
SimpleSlot(createAllocatedSlot(jid, 0), owner, 0);
-               final SimpleSlot slot2 = new 
SimpleSlot(createAllocatedSlot(jid, 1), owner, 1);
-               final SimpleSlot slot3 = new 
SimpleSlot(createAllocatedSlot(jid, 2), owner, 2);
-               final SimpleSlot slot4 = new 
SimpleSlot(createAllocatedSlot(jid, 3), owner, 3);
-               final SimpleSlot slot5 = new 
SimpleSlot(createAllocatedSlot(jid, 4), owner, 4);
-
-               ExecutionAndSlot[] slots1 = new ExecutionAndSlot[] {
-                               null,
-                               new ExecutionAndSlot(mockExecution, 
CompletableFuture.completedFuture(slot1)),
-                               null,
-                               new ExecutionAndSlot(mockExecution, 
CompletableFuture.completedFuture(slot2)),
-                               null
-               };
-
-               ExecutionAndSlot[] slots2 = new ExecutionAndSlot[] {
-                               new ExecutionAndSlot(mockExecution, 
CompletableFuture.completedFuture(slot3)),
-                               new ExecutionAndSlot(mockExecution, 
CompletableFuture.completedFuture(slot4)),
-                               new ExecutionAndSlot(mockExecution, 
CompletableFuture.completedFuture(slot5))
-               };
-
-               List<ExecutionAndSlot[]> resources = Arrays.asList(null, 
slots1, new ExecutionAndSlot[0], null, slots2);
-
-               ExecutionGraphUtils.releaseAllSlotsSilently(resources);
-
-               verify(owner, times(1)).returnAllocatedSlot(eq(slot1));
-               verify(owner, times(1)).returnAllocatedSlot(eq(slot2));
-               verify(owner, times(1)).returnAllocatedSlot(eq(slot3));
-               verify(owner, times(1)).returnAllocatedSlot(eq(slot4));
-               verify(owner, times(1)).returnAllocatedSlot(eq(slot5));
-       }
-
-       // 
------------------------------------------------------------------------
-
-       private static AllocatedSlot createAllocatedSlot(JobID jid, int num) {
-               TaskManagerLocation loc = new TaskManagerLocation(
-                               ResourceID.generate(), 
InetAddress.getLoopbackAddress(), 10000 + num);
-       
-               return new AllocatedSlot(new AllocationID(), jid, loc, num,
-                               ResourceProfile.UNKNOWN, 
mock(TaskManagerGateway.class));
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c73b2fe1/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
index 44e1794..9908dae 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
@@ -263,8 +263,8 @@ public class ExecutionVertexCancelTest extends TestLogger {
                        Instance instance = getInstance(new 
ActorTaskManagerGateway(actorGateway));
                        SimpleSlot slot = instance.allocateSimpleSlot(new 
JobID());
 
-                       setVertexState(vertex, ExecutionState.RUNNING);
                        setVertexResource(vertex, slot);
+                       setVertexState(vertex, ExecutionState.RUNNING);
 
                        assertEquals(ExecutionState.RUNNING, 
vertex.getExecutionState());
 
@@ -303,8 +303,8 @@ public class ExecutionVertexCancelTest extends TestLogger {
                        Instance instance = getInstance(new 
ActorTaskManagerGateway(actorGateway));
                        SimpleSlot slot = instance.allocateSimpleSlot(new 
JobID());
 
-                       setVertexState(vertex, ExecutionState.RUNNING);
                        setVertexResource(vertex, slot);
+                       setVertexState(vertex, ExecutionState.RUNNING);
 
                        assertEquals(ExecutionState.RUNNING, 
vertex.getExecutionState());
 
@@ -351,8 +351,8 @@ public class ExecutionVertexCancelTest extends TestLogger {
                        Instance instance = getInstance(new 
ActorTaskManagerGateway(actorGateway));
                        SimpleSlot slot = instance.allocateSimpleSlot(new 
JobID());
 
-                       setVertexState(vertex, ExecutionState.RUNNING);
                        setVertexResource(vertex, slot);
+                       setVertexState(vertex, ExecutionState.RUNNING);
 
                        assertEquals(ExecutionState.RUNNING, 
vertex.getExecutionState());
 
@@ -384,8 +384,8 @@ public class ExecutionVertexCancelTest extends TestLogger {
                        Instance instance = getInstance(new 
ActorTaskManagerGateway(gateway));
                        SimpleSlot slot = instance.allocateSimpleSlot(new 
JobID());
 
-                       setVertexState(vertex, ExecutionState.RUNNING);
                        setVertexResource(vertex, slot);
+                       setVertexState(vertex, ExecutionState.RUNNING);
 
                        assertEquals(ExecutionState.RUNNING, 
vertex.getExecutionState());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c73b2fe1/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
index 5f12646..15d021a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
@@ -43,6 +43,7 @@ import 
org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
@@ -50,6 +51,7 @@ import org.junit.Test;
 import java.lang.reflect.Field;
 import java.net.InetAddress;
 import java.util.Iterator;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
@@ -92,10 +94,10 @@ public class ExecutionVertexLocalityTest extends TestLogger 
{
                // validate that the target vertices have no location preference
                for (int i = 0; i < parallelism; i++) {
                        ExecutionVertex target = 
graph.getAllVertices().get(targetVertexId).getTaskVertices()[i];
-                       Iterator<TaskManagerLocation> preference = 
target.getPreferredLocations().iterator();
+                       Iterator<CompletableFuture<TaskManagerLocation>> 
preference = target.getPreferredLocations().iterator();
 
                        assertTrue(preference.hasNext());
-                       assertEquals(locations[i], preference.next());
+                       assertEquals(locations[i], preference.next().get());
                        assertFalse(preference.hasNext());
                }
        }
@@ -122,7 +124,7 @@ public class ExecutionVertexLocalityTest extends TestLogger 
{
                for (int i = 0; i < parallelism; i++) {
                        ExecutionVertex target = 
graph.getAllVertices().get(targetVertexId).getTaskVertices()[i];
 
-                       Iterator<TaskManagerLocation> preference = 
target.getPreferredLocations().iterator();
+                       Iterator<CompletableFuture<TaskManagerLocation>> 
preference = target.getPreferredLocations().iterator();
                        assertFalse(preference.hasNext());
                }
        }
@@ -178,10 +180,10 @@ public class ExecutionVertexLocalityTest extends 
TestLogger {
                // validate that the target vertices have the state's location 
as the location preference
                for (int i = 0; i < parallelism; i++) {
                        ExecutionVertex target = 
graph.getAllVertices().get(targetVertexId).getTaskVertices()[i];
-                       Iterator<TaskManagerLocation> preference = 
target.getPreferredLocations().iterator();
+                       Iterator<CompletableFuture<TaskManagerLocation>> 
preference = target.getPreferredLocations().iterator();
 
                        assertTrue(preference.hasNext());
-                       assertEquals(locations[i], preference.next());
+                       assertEquals(locations[i], preference.next().get());
                        assertFalse(preference.hasNext());
                }
        }
@@ -236,10 +238,9 @@ public class ExecutionVertexLocalityTest extends 
TestLogger {
 
                SimpleSlot simpleSlot = new SimpleSlot(slot, 
mock(SlotOwner.class), 0);
 
-               final Field locationField = 
Execution.class.getDeclaredField("assignedResource");
-               locationField.setAccessible(true);
-
-               locationField.set(vertex.getCurrentExecutionAttempt(), 
simpleSlot);
+               if 
(!vertex.getCurrentExecutionAttempt().tryAssignResource(simpleSlot)) {
+                       throw new FlinkException("Could not assign resource.");
+               }
        }
 
        private void setState(Execution execution, ExecutionState state) throws 
Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/c73b2fe1/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
index afb9dac..9f4a675 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
@@ -33,12 +33,13 @@ import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
 import java.util.concurrent.ExecutionException;
 
-public class ScheduleWithCoLocationHintTest {
+public class ScheduleWithCoLocationHintTest extends TestLogger {
 
        @Test
        public void scheduleAllSharedAndCoLocated() {

http://git-wip-us.apache.org/repos/asf/flink/blob/c73b2fe1/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
index c049593..1f88dd8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
@@ -47,7 +48,7 @@ import static org.junit.Assert.fail;
 /**
  * Tests for the scheduler when scheduling tasks in slot sharing groups.
  */
-public class SchedulerSlotSharingTest {
+public class SchedulerSlotSharingTest extends TestLogger {
 
        @Test
        public void scheduleSingleVertexType() {

http://git-wip-us.apache.org/repos/asf/flink/blob/c73b2fe1/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
index 4312b0f..c7d0f09 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
@@ -25,9 +25,11 @@ import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -105,8 +107,13 @@ public class SchedulerTestUtils {
        
        public static Execution getTestVertex(Iterable<TaskManagerLocation> 
preferredLocations) {
                ExecutionVertex vertex = mock(ExecutionVertex.class);
-               
-               
when(vertex.getPreferredLocationsBasedOnInputs()).thenReturn(preferredLocations);
+
+               Collection<CompletableFuture<TaskManagerLocation>> 
preferredLocationFutures = new ArrayList<>(4);
+
+               for (TaskManagerLocation preferredLocation : 
preferredLocations) {
+                       
preferredLocationFutures.add(CompletableFuture.completedFuture(preferredLocation));
+               }
+               
when(vertex.getPreferredLocationsBasedOnInputs()).thenReturn(preferredLocationFutures);
                when(vertex.getJobId()).thenReturn(new JobID());
                when(vertex.toString()).thenReturn("TEST-VERTEX");
                
@@ -119,7 +126,7 @@ public class SchedulerTestUtils {
        public static Execution getTestVertex(JobVertexID jid, int taskIndex, 
int numTasks) {
                ExecutionVertex vertex = mock(ExecutionVertex.class);
                
-               
when(vertex.getPreferredLocationsBasedOnInputs()).thenReturn(null);
+               
when(vertex.getPreferredLocationsBasedOnInputs()).thenReturn(Collections.emptyList());
                when(vertex.getJobId()).thenReturn(new JobID());
                when(vertex.getJobvertexId()).thenReturn(jid);
                when(vertex.getParallelSubtaskIndex()).thenReturn(taskIndex);
@@ -139,7 +146,13 @@ public class SchedulerTestUtils {
 
                ExecutionVertex vertex = mock(ExecutionVertex.class);
 
-               
when(vertex.getPreferredLocationsBasedOnInputs()).thenReturn(Arrays.asList(locations));
+               Collection<CompletableFuture<TaskManagerLocation>> 
preferrecLocationFutures = new ArrayList<>(locations.length);
+
+               for (TaskManagerLocation location : locations) {
+                       
preferrecLocationFutures.add(CompletableFuture.completedFuture(location));
+               }
+
+               
when(vertex.getPreferredLocationsBasedOnInputs()).thenReturn(preferrecLocationFutures);
                when(vertex.getJobId()).thenReturn(new JobID());
                when(vertex.getJobvertexId()).thenReturn(jid);
                when(vertex.getParallelSubtaskIndex()).thenReturn(taskIndex);

Reply via email to