asfgit closed pull request #6961: [FLINK-9635] Fix scheduling for local recovery
URL: https://github.com/apache/flink/pull/6961
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotProfile.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotProfile.java
index 7cb364d687f..ab682a02dd1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotProfile.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotProfile.java
@@ -25,6 +25,7 @@
 
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Set;
 
 /**
  * A slot profile describes the profile of a slot into which a task wants to 
be scheduled. The profile contains
@@ -47,16 +48,30 @@
 
        /** This contains desired allocation ids of the slot. */
        @Nonnull
-       private final Collection<AllocationID> priorAllocations;
+       private final Collection<AllocationID> preferredAllocations;
+
+       /** This contains all prior allocation ids from the whole execution 
graph. */
+       @Nonnull
+       private final Set<AllocationID> previousExecutionGraphAllocations;
+
+       public SlotProfile(
+               @Nonnull ResourceProfile resourceProfile,
+               @Nonnull Collection<TaskManagerLocation> preferredLocations,
+               @Nonnull Collection<AllocationID> preferredAllocations) {
+
+               this(resourceProfile, preferredLocations, preferredAllocations, 
Collections.emptySet());
+       }
 
        public SlotProfile(
                @Nonnull ResourceProfile resourceProfile,
                @Nonnull Collection<TaskManagerLocation> preferredLocations,
-               @Nonnull Collection<AllocationID> priorAllocations) {
+               @Nonnull Collection<AllocationID> preferredAllocations,
+               @Nonnull Set<AllocationID> previousExecutionGraphAllocations) {
 
                this.resourceProfile = resourceProfile;
                this.preferredLocations = preferredLocations;
-               this.priorAllocations = priorAllocations;
+               this.preferredAllocations = preferredAllocations;
+               this.previousExecutionGraphAllocations = 
previousExecutionGraphAllocations;
        }
 
        /**
@@ -79,8 +94,18 @@ public ResourceProfile getResourceProfile() {
         * Returns the desired allocation ids for the slot.
         */
        @Nonnull
-       public Collection<AllocationID> getPriorAllocations() {
-               return priorAllocations;
+       public Collection<AllocationID> getPreferredAllocations() {
+               return preferredAllocations;
+       }
+
+       /**
+        * Returns a set of all previous allocation ids from the execution 
graph.
+        *
+        * This is optional and can be empty if unused.
+        */
+       @Nonnull
+       public Set<AllocationID> getPreviousExecutionGraphAllocations() {
+               return previousExecutionGraphAllocations;
        }
 
        /**
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 801f35a41dc..cbf51037b8b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -58,6 +58,7 @@
 
 import org.slf4j.Logger;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.util.ArrayList;
@@ -65,6 +66,7 @@
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -385,7 +387,8 @@ public void setInitialState(@Nullable JobManagerTaskRestore 
taskRestore) {
                return scheduleForExecution(
                        resourceProvider,
                        allowQueued,
-                       LocationPreferenceConstraint.ANY);
+                       LocationPreferenceConstraint.ANY,
+                       Collections.emptySet());
        }
 
        /**
@@ -397,18 +400,22 @@ public void setInitialState(@Nullable 
JobManagerTaskRestore taskRestore) {
         * @param queued Flag to indicate whether the scheduler may queue this 
task if it cannot
         *               immediately deploy it.
         * @param locationPreferenceConstraint constraint for the location 
preferences
+        * @param allPreviousExecutionGraphAllocationIds set with all previous 
allocation ids in the job graph.
+        *                                                 Can be empty if the 
allocation ids are not required for scheduling.
         * @return Future which is completed once the Execution has been 
deployed
         */
        public CompletableFuture<Void> scheduleForExecution(
                        SlotProvider slotProvider,
                        boolean queued,
-                       LocationPreferenceConstraint 
locationPreferenceConstraint) {
+                       LocationPreferenceConstraint 
locationPreferenceConstraint,
+                       @Nonnull Set<AllocationID> 
allPreviousExecutionGraphAllocationIds) {
                final Time allocationTimeout = 
vertex.getExecutionGraph().getAllocationTimeout();
                try {
                        final CompletableFuture<Execution> allocationFuture = 
allocateAndAssignSlotForExecution(
                                slotProvider,
                                queued,
                                locationPreferenceConstraint,
+                               allPreviousExecutionGraphAllocationIds,
                                allocationTimeout);
 
                        // IMPORTANT: We have to use the synchronous handle 
operation (direct executor) here so
@@ -441,6 +448,8 @@ public void setInitialState(@Nullable JobManagerTaskRestore 
taskRestore) {
         * @param slotProvider to obtain a new slot from
         * @param queued if the allocation can be queued
         * @param locationPreferenceConstraint constraint for the location 
preferences
+        * @param allPreviousExecutionGraphAllocationIds set with all previous 
allocation ids in the job graph.
+        *                                                 Can be empty if the 
allocation ids are not required for scheduling.
         * @param allocationTimeout rpcTimeout for allocating a new slot
         * @return Future which is completed with this execution once the slot 
has been assigned
         *                      or with an exception if an error occurred.
@@ -450,6 +459,7 @@ public void setInitialState(@Nullable JobManagerTaskRestore 
taskRestore) {
                        SlotProvider slotProvider,
                        boolean queued,
                        LocationPreferenceConstraint 
locationPreferenceConstraint,
+                       @Nonnull Set<AllocationID> 
allPreviousExecutionGraphAllocationIds,
                        Time allocationTimeout) throws 
IllegalExecutionStateException {
 
                checkNotNull(slotProvider);
@@ -495,7 +505,8 @@ public void setInitialState(@Nullable JobManagerTaskRestore 
taskRestore) {
                                                        new SlotProfile(
                                                                
ResourceProfile.UNKNOWN,
                                                                
preferredLocations,
-                                                               
previousAllocationIDs),
+                                                               
previousAllocationIDs,
+                                                               
allPreviousExecutionGraphAllocationIds),
                                                        allocationTimeout));
 
                        // register call back to cancel slot request in case 
that the execution gets canceled
@@ -739,7 +750,8 @@ else if (numConsumers == 0) {
                                                        
consumerVertex.scheduleForExecution(
                                                                
executionGraph.getSlotProvider(),
                                                                
executionGraph.isQueuedSchedulingAllowed(),
-                                                               
LocationPreferenceConstraint.ANY); // there must be at least one known location
+                                                               
LocationPreferenceConstraint.ANY, // there must be at least one known location
+                                                               
Collections.emptySet());
                                                } catch (Throwable t) {
                                                        consumerVertex.fail(new 
IllegalStateException("Could not schedule consumer " +
                                                                        "vertex 
" + consumerVertex, t));
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 76cfbe1286d..3b55e009116 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -40,6 +40,7 @@
 import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
 import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.FutureUtils.ConjunctFuture;
 import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
@@ -60,6 +61,7 @@
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import 
org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
 import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.query.KvStateLocationRegistry;
 import org.apache.flink.runtime.state.SharedStateRegistry;
@@ -91,6 +93,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
+import java.util.Set;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
@@ -902,14 +905,14 @@ public void scheduleForExecution() throws JobException {
        private CompletableFuture<Void> scheduleLazy(SlotProvider slotProvider) 
{
 
                final ArrayList<CompletableFuture<Void>> schedulingFutures = 
new ArrayList<>(numVerticesTotal);
-
                // simply take the vertices without inputs.
                for (ExecutionJobVertex ejv : verticesInCreationOrder) {
                        if (ejv.getJobVertex().isInputVertex()) {
                                final CompletableFuture<Void> 
schedulingJobVertexFuture = ejv.scheduleAll(
                                        slotProvider,
                                        allowQueuedScheduling,
-                                       LocationPreferenceConstraint.ALL); // 
since it is an input vertex, the input based location preferences should be 
empty
+                                       LocationPreferenceConstraint.ALL,// 
since it is an input vertex, the input based location preferences should be 
empty
+                                       Collections.emptySet());
 
                                
schedulingFutures.add(schedulingJobVertexFuture);
                        }
@@ -939,6 +942,9 @@ public void scheduleForExecution() throws JobException {
                // collecting all the slots may resize and fail in that 
operation without slots getting lost
                final ArrayList<CompletableFuture<Execution>> 
allAllocationFutures = new ArrayList<>(getNumberOfExecutionJobVertices());
 
+               final Set<AllocationID> allPreviousAllocationIds =
+                       
Collections.unmodifiableSet(computeAllPriorAllocationIdsIfRequiredByScheduling());
+
                // allocate the slots (obtain all their futures
                for (ExecutionJobVertex ejv : getVerticesTopologically()) {
                        // these calls are not blocking, they only return 
futures
@@ -946,6 +952,7 @@ public void scheduleForExecution() throws JobException {
                                slotProvider,
                                queued,
                                LocationPreferenceConstraint.ALL,
+                               allPreviousAllocationIds,
                                timeout);
 
                        allAllocationFutures.addAll(allocationFutures);
@@ -1676,6 +1683,35 @@ public void updateAccumulators(AccumulatorSnapshot 
accumulatorSnapshot) {
                }
        }
 
+       /**
+        * Computes and returns a set with the prior allocation ids from all 
execution vertices in the graph.
+        */
+       private Set<AllocationID> computeAllPriorAllocationIds() {
+               HashSet<AllocationID> allPreviousAllocationIds = new 
HashSet<>(getNumberOfExecutionJobVertices());
+               for (ExecutionVertex executionVertex : 
getAllExecutionVertices()) {
+                       AllocationID latestPriorAllocation = 
executionVertex.getLatestPriorAllocation();
+                       if (latestPriorAllocation != null) {
+                               
allPreviousAllocationIds.add(latestPriorAllocation);
+                       }
+               }
+               return allPreviousAllocationIds;
+       }
+
+       /**
+        * Returns the result of {@link #computeAllPriorAllocationIds()}, but 
only if the scheduling really requires it.
+        * Otherwise this method simply returns an empty set.
+        */
+       private Set<AllocationID> 
computeAllPriorAllocationIdsIfRequiredByScheduling() {
+               // This is a temporary optimization to avoid computing all 
previous allocations if not required
+               // This can go away when we progress with the implementation of 
the Scheduler.
+               if (slotProvider instanceof SlotPool.ProviderAndOwner
+                       && ((SlotPool.ProviderAndOwner) 
slotProvider).requiresPreviousAllocationsForScheduling()) {
+                       return computeAllPriorAllocationIds();
+               } else {
+                       return Collections.emptySet();
+               }
+       }
+
        // 
--------------------------------------------------------------------------------------------
        //  Listeners & Observers
        // 
--------------------------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 6da1e0db892..2ab1d686410 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -34,6 +34,7 @@
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
@@ -54,6 +55,7 @@
 
 import org.slf4j.Logger;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.io.IOException;
@@ -64,6 +66,7 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -474,12 +477,15 @@ public void 
connectToPredecessors(Map<IntermediateDataSetID, IntermediateResult>
         * @param slotProvider to allocate the slots from
         * @param queued if the allocations can be queued
         * @param locationPreferenceConstraint constraint for the location 
preferences
+        * @param allPreviousExecutionGraphAllocationIds set with all previous 
allocation ids in the job graph.
+        *                                                 Can be empty if the 
allocation ids are not required for scheduling.
         * @return Future which is completed once all {@link Execution} could 
be deployed
         */
        public CompletableFuture<Void> scheduleAll(
                        SlotProvider slotProvider,
                        boolean queued,
-                       LocationPreferenceConstraint 
locationPreferenceConstraint) {
+                       LocationPreferenceConstraint 
locationPreferenceConstraint,
+                       @Nonnull Set<AllocationID> 
allPreviousExecutionGraphAllocationIds) {
 
                final ExecutionVertex[] vertices = this.taskVertices;
 
@@ -487,7 +493,11 @@ public void 
connectToPredecessors(Map<IntermediateDataSetID, IntermediateResult>
 
                // kick off the tasks
                for (ExecutionVertex ev : vertices) {
-                       
scheduleFutures.add(ev.scheduleForExecution(slotProvider, queued, 
locationPreferenceConstraint));
+                       scheduleFutures.add(ev.scheduleForExecution(
+                               slotProvider,
+                               queued,
+                               locationPreferenceConstraint,
+                               allPreviousExecutionGraphAllocationIds));
                }
 
                return FutureUtils.waitForAll(scheduleFutures);
@@ -503,12 +513,14 @@ public void 
connectToPredecessors(Map<IntermediateDataSetID, IntermediateResult>
         * @param resourceProvider The resource provider from whom the slots 
are requested.
         * @param queued if the allocation can be queued
         * @param locationPreferenceConstraint constraint for the location 
preferences
+        * @param allPreviousExecutionGraphAllocationIds the allocation ids of 
all previous executions in the execution job graph.
         * @param allocationTimeout timeout for allocating the individual slots
         */
        public Collection<CompletableFuture<Execution>> allocateResourcesForAll(
                        SlotProvider resourceProvider,
                        boolean queued,
                        LocationPreferenceConstraint 
locationPreferenceConstraint,
+                       @Nonnull Set<AllocationID> 
allPreviousExecutionGraphAllocationIds,
                        Time allocationTimeout) {
                final ExecutionVertex[] vertices = this.taskVertices;
                final CompletableFuture<Execution>[] slots = new 
CompletableFuture[vertices.length];
@@ -522,6 +534,7 @@ public void 
connectToPredecessors(Map<IntermediateDataSetID, IntermediateResult>
                                resourceProvider,
                                queued,
                                locationPreferenceConstraint,
+                               allPreviousExecutionGraphAllocationIds,
                                allocationTimeout);
                        slots[i] = allocationFuture;
                }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index e4228011830..a0747296c53 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -56,6 +56,7 @@
 
 import org.slf4j.Logger;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.io.IOException;
@@ -617,17 +618,21 @@ public Execution resetForNewExecution(final long 
timestamp, final long originati
         * @param slotProvider to allocate the slots from
         * @param queued if the allocation can be queued
         * @param locationPreferenceConstraint constraint for the location 
preferences
+        * @param allPreviousExecutionGraphAllocationIds set with all previous 
allocation ids in the job graph.
+        *                                                 Can be empty if the 
allocation ids are not required for scheduling.
         * @return Future which is completed once the execution is deployed. 
The future
         * can also completed exceptionally.
         */
        public CompletableFuture<Void> scheduleForExecution(
                        SlotProvider slotProvider,
                        boolean queued,
-                       LocationPreferenceConstraint 
locationPreferenceConstraint) {
+                       LocationPreferenceConstraint 
locationPreferenceConstraint,
+                       @Nonnull Set<AllocationID> 
allPreviousExecutionGraphAllocationIds) {
                return this.currentExecution.scheduleForExecution(
                        slotProvider,
                        queued,
-                       locationPreferenceConstraint);
+                       locationPreferenceConstraint,
+                       allPreviousExecutionGraphAllocationIds);
        }
 
        @VisibleForTesting
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java
index 0b00c0e039d..f3ba48e75ab 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.executiongraph.failover;
 
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
@@ -212,6 +213,15 @@ private void restart(long globalModVersionOfFailover) {
                                                        
connectedExecutionVertexes, false, false);
                                }
                                */
+
+                               HashSet<AllocationID> 
previousAllocationsInRegion = new HashSet<>(connectedExecutionVertexes.size());
+                               for (ExecutionVertex connectedExecutionVertex : 
connectedExecutionVertexes) {
+                                       AllocationID latestPriorAllocation = 
connectedExecutionVertex.getLatestPriorAllocation();
+                                       if (latestPriorAllocation != null) {
+                                               
previousAllocationsInRegion.add(latestPriorAllocation);
+                                       }
+                               }
+
                                //TODO, use restart strategy to schedule them.
                                //restart all connected ExecutionVertexes
                                for (ExecutionVertex ev : 
connectedExecutionVertexes) {
@@ -219,7 +229,8 @@ private void restart(long globalModVersionOfFailover) {
                                                ev.scheduleForExecution(
                                                        
executionGraph.getSlotProvider(),
                                                        
executionGraph.isQueuedSchedulingAllowed(),
-                                                       
LocationPreferenceConstraint.ANY); // some inputs not belonging to the failover 
region might have failed concurrently
+                                                       
LocationPreferenceConstraint.ANY,
+                                                       
previousAllocationsInRegion); // some inputs not belonging to the failover 
region might have failed concurrently
                                        }
                                        catch (Throwable e) {
                                                
failover(globalModVersionOfFailover);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlotContext.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlotContext.java
index 95dd1f6f9e6..282fd2ccf4e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlotContext.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlotContext.java
@@ -19,8 +19,8 @@
 package org.apache.flink.runtime.instance;
 
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.jobmaster.SlotContext;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.SlotContext;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.Preconditions;
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotContext.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotContext.java
index 38781676842..8777edd2fe7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotContext.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotContext.java
@@ -18,37 +18,14 @@
 
 package org.apache.flink.runtime.jobmaster;
 
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
 /**
  * Interface for the context of a {@link LogicalSlot}. This context contains 
information
  * about the underlying allocated slot and how to communicate with the 
TaskManager on which
  * it was allocated.
  */
-public interface SlotContext {
-       /**
-        * Gets the id under which the slot has been allocated on the 
TaskManager. This id uniquely identifies the
-        * physical slot.
-        *
-        * @return The id under which the slot has been allocated on the 
TaskManager
-        */
-       AllocationID getAllocationId();
-
-       /**
-        * Gets the location info of the TaskManager that offers this slot.
-        *
-        * @return The location info of the TaskManager that offers this slot
-        */
-       TaskManagerLocation getTaskManagerLocation();
-
-       /**
-        * Gets the number of the slot.
-        *
-        * @return The number of the slot on the TaskManager.
-        */
-       int getPhysicalSlotNumber();
+public interface SlotContext extends SlotInfo {
 
        /**
         * Gets the actor gateway that can be used to send messages to the 
TaskManager.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotInfo.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotInfo.java
new file mode 100644
index 00000000000..fd33aacfd6c
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotInfo.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+/**
+ * Interface that provides basic information in the context of a slot.
+ */
+public interface SlotInfo {
+
+       /**
+        * Gets the id under which the slot has been allocated on the 
TaskManager. This id uniquely identifies the
+        * physical slot.
+        *
+        * @return The id under which the slot has been allocated on the 
TaskManager
+        */
+       AllocationID getAllocationId();
+
+       /**
+        * Gets the location info of the TaskManager that offers this slot.
+        *
+        * @return The location info of the TaskManager that offers this slot
+        */
+       TaskManagerLocation getTaskManagerLocation();
+
+       /**
+        * Gets the number of the slot.
+        *
+        * @return The number of the slot on the TaskManager.
+        */
+       int getPhysicalSlotNumber();
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlot.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlot.java
index 75195cd9378..e4e583c09d6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlot.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlot.java
@@ -92,6 +92,7 @@ public SlotID getSlotId() {
         * 
         * @return The ID under which the slot is allocated
         */
+       @Override
        public AllocationID getAllocationId() {
                return allocationId;
        }
@@ -121,6 +122,7 @@ public ResourceProfile getResourceProfile() {
         *
         * @return The location info of the TaskManager that offers this slot
         */
+       @Override
        public TaskManagerLocation getTaskManagerLocation() {
                return taskManagerLocation;
        }
@@ -132,6 +134,7 @@ public TaskManagerLocation getTaskManagerLocation() {
         *
         * @return The actor gateway that can be used to send messages to the 
TaskManager.
         */
+       @Override
        public TaskManagerGateway getTaskManagerGateway() {
                return taskManagerGateway;
        }
@@ -142,6 +145,7 @@ public TaskManagerGateway getTaskManagerGateway() {
         *
         * @return Physical slot number of the allocated slot
         */
+       @Override
        public int getPhysicalSlotNumber() {
                return physicalSlotNumber;
        }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/LocationPreferenceSchedulingStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/LocationPreferenceSchedulingStrategy.java
index 25e884c32dd..bc90be65ac5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/LocationPreferenceSchedulingStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/LocationPreferenceSchedulingStrategy.java
@@ -21,7 +21,7 @@
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.SlotProfile;
 import org.apache.flink.runtime.jobmanager.scheduler.Locality;
-import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
 import javax.annotation.Nonnull;
@@ -34,6 +34,7 @@
 import java.util.function.BiFunction;
 import java.util.function.Function;
 import java.util.function.Predicate;
+import java.util.function.Supplier;
 import java.util.stream.Stream;
 
 /**
@@ -54,11 +55,26 @@
        @Override
        public <IN, OUT> OUT findMatchWithLocality(
                        @Nonnull SlotProfile slotProfile,
-                       @Nonnull Stream<IN> candidates,
-                       @Nonnull Function<IN, SlotContext> contextExtractor,
+                       @Nonnull Supplier<Stream<IN>> candidates,
+                       @Nonnull Function<IN, SlotInfo> contextExtractor,
                        @Nonnull Predicate<IN> additionalRequirementsFilter,
                        @Nonnull BiFunction<IN, Locality, OUT> resultProducer) {
 
+               return doFindMatchWithLocality(
+                       slotProfile,
+                       candidates.get(),
+                       contextExtractor,
+                       additionalRequirementsFilter,
+                       resultProducer);
+       }
+
+       @Nullable
+       protected  <IN, OUT> OUT doFindMatchWithLocality(
+               @Nonnull SlotProfile slotProfile,
+               @Nonnull Stream<IN> candidates,
+               @Nonnull Function<IN, SlotInfo> contextExtractor,
+               @Nonnull Predicate<IN> additionalRequirementsFilter,
+               @Nonnull BiFunction<IN, Locality, OUT> resultProducer) {
                Collection<TaskManagerLocation> locationPreferences = 
slotProfile.getPreferredLocations();
 
                // if we have no location preferences, we can only filter by 
the additional requirements.
@@ -88,7 +104,7 @@
                while (iterator.hasNext()) {
                        IN candidate = iterator.next();
                        if (additionalRequirementsFilter.test(candidate)) {
-                               SlotContext slotContext = 
contextExtractor.apply(candidate);
+                               SlotInfo slotContext = 
contextExtractor.apply(candidate);
 
                                // this gets candidate is local-weigh
                                Integer localWeigh = 
preferredResourceIDs.getOrDefault(slotContext.getTaskManagerLocation().getResourceID(),
 0);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSchedulingStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSchedulingStrategy.java
index 9b1872ec363..d2193ad7022 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSchedulingStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSchedulingStrategy.java
@@ -21,15 +21,17 @@
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.SlotProfile;
 import org.apache.flink.runtime.jobmanager.scheduler.Locality;
-import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.util.Collection;
+import java.util.Set;
 import java.util.function.BiFunction;
 import java.util.function.Function;
 import java.util.function.Predicate;
+import java.util.function.Supplier;
 import java.util.stream.Stream;
 
 /**
@@ -48,35 +50,68 @@ private PreviousAllocationSchedulingStrategy() {}
        @Override
        public <IN, OUT> OUT findMatchWithLocality(
                        @Nonnull SlotProfile slotProfile,
-                       @Nonnull Stream<IN> candidates,
-                       @Nonnull Function<IN, SlotContext> contextExtractor,
+                       @Nonnull Supplier<Stream<IN>> candidates,
+                       @Nonnull Function<IN, SlotInfo> contextExtractor,
                        @Nonnull Predicate<IN> additionalRequirementsFilter,
                        @Nonnull BiFunction<IN, Locality, OUT> resultProducer) {
 
-               Collection<AllocationID> priorAllocations = 
slotProfile.getPriorAllocations();
+               Collection<AllocationID> priorAllocations = 
slotProfile.getPreferredAllocations();
 
                if (priorAllocations.isEmpty()) {
-                       return super.findMatchWithLocality(slotProfile, 
candidates, contextExtractor, additionalRequirementsFilter, resultProducer);
+                       return super.findMatchWithLocality(
+                               slotProfile,
+                               candidates,
+                               contextExtractor,
+                               additionalRequirementsFilter,
+                               resultProducer);
                } else {
-                       return findPreviousAllocation(candidates, 
contextExtractor, additionalRequirementsFilter, resultProducer, 
priorAllocations);
+                       return findPreviousAllocation(
+                               slotProfile,
+                               candidates,
+                               contextExtractor,
+                               additionalRequirementsFilter,
+                               resultProducer,
+                               priorAllocations);
                }
        }
 
        @Nullable
        private <IN, OUT> OUT findPreviousAllocation(
-                       @Nonnull Stream<IN> candidates,
-                       @Nonnull Function<IN, SlotContext> contextExtractor,
+                       @Nonnull SlotProfile slotProfile,
+                       @Nonnull Supplier<Stream<IN>> candidates,
+                       @Nonnull Function<IN, SlotInfo> contextExtractor,
                        @Nonnull Predicate<IN> additionalRequirementsFilter,
                        @Nonnull BiFunction<IN, Locality, OUT> resultProducer,
-                       Collection<AllocationID> priorAllocations) {
+                       @Nonnull Collection<AllocationID> priorAllocations) {
+
                Predicate<IN> filterByAllocation =
-                       (candidate) -> 
priorAllocations.contains(contextExtractor.apply(candidate).getAllocationId());
+                       (IN candidate) -> 
priorAllocations.contains(contextExtractor.apply(candidate).getAllocationId());
 
-               return candidates
+               OUT previousAllocationCandidate = candidates
+                       .get()
                        
.filter(filterByAllocation.and(additionalRequirementsFilter))
                        .findFirst()
-                       .map((result) -> resultProducer.apply(result, 
Locality.LOCAL)) // TODO introduce special locality?
+                       .map((IN result) -> resultProducer.apply(result, 
Locality.LOCAL)) // TODO introduce special locality?
                        .orElse(null);
+
+               if (previousAllocationCandidate != null) {
+                       return previousAllocationCandidate;
+               }
+
+               Set<AllocationID> blackListedAllocationIDs = 
slotProfile.getPreviousExecutionGraphAllocations();
+               Stream<IN> candidateStream = candidates.get();
+               if (!blackListedAllocationIDs.isEmpty()) {
+                       candidateStream = candidateStream.filter(
+                               (IN candidate) -> 
!blackListedAllocationIDs.contains(
+                                       
contextExtractor.apply(candidate).getAllocationId()));
+               }
+
+               return doFindMatchWithLocality(
+                       slotProfile,
+                       candidateStream,
+                       contextExtractor,
+                       additionalRequirementsFilter,
+                       resultProducer);
        }
 
        public static PreviousAllocationSchedulingStrategy getInstance() {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulingStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulingStrategy.java
index fb27a214eff..89b2d05cada 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulingStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulingStrategy.java
@@ -21,6 +21,7 @@
 import org.apache.flink.runtime.clusterframework.types.SlotProfile;
 import org.apache.flink.runtime.jobmanager.scheduler.Locality;
 import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
@@ -28,6 +29,7 @@
 import java.util.function.BiFunction;
 import java.util.function.Function;
 import java.util.function.Predicate;
+import java.util.function.Supplier;
 import java.util.stream.Stream;
 
 /**
@@ -53,8 +55,8 @@
        @Nullable
        <IN, OUT> OUT findMatchWithLocality(
                @Nonnull SlotProfile slotProfile,
-               @Nonnull Stream<IN> candidates,
-               @Nonnull Function<IN, SlotContext> contextExtractor,
+               @Nonnull Supplier<Stream<IN>> candidates,
+               @Nonnull Function<IN, SlotInfo> contextExtractor,
                @Nonnull Predicate<IN> additionalRequirementsFilter,
                @Nonnull BiFunction<IN, Locality, OUT> resultProducer);
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
index b53ee93e643..34d5cdc49c3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
@@ -55,6 +55,7 @@
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.util.ArrayList;
@@ -171,7 +172,9 @@ public SlotPool(
                this.pendingRequests = new DualKeyMap<>(16);
                this.waitingForResourceManager = new HashMap<>(16);
 
-               this.providerAndOwner = new 
ProviderAndOwner(getSelfGateway(SlotPoolGateway.class));
+               this.providerAndOwner = new ProviderAndOwner(
+                       getSelfGateway(SlotPoolGateway.class),
+                       schedulingStrategy instanceof 
PreviousAllocationSchedulingStrategy);
 
                this.slotSharingManagers = new HashMap<>(4);
 
@@ -326,76 +329,93 @@ public void disconnectResourceManager() {
 
                log.debug("Received slot request [{}] for task: {}", 
slotRequestId, task.getTaskToExecute());
 
-               final SlotSharingGroupId slotSharingGroupId = 
task.getSlotSharingGroupId();
+               if (task.getSlotSharingGroupId() == null) {
+                       return allocateSingleSlot(slotRequestId, slotProfile, 
allowQueuedScheduling, allocationTimeout);
+               } else {
+                       return allocateSharedSlot(slotRequestId, task, 
slotProfile, allowQueuedScheduling, allocationTimeout);
+               }
+       }
 
-               if (slotSharingGroupId != null) {
-                       // allocate slot with slot sharing
-                       final SlotSharingManager multiTaskSlotManager = 
slotSharingManagers.computeIfAbsent(
-                               slotSharingGroupId,
-                               id -> new SlotSharingManager(
-                                       id,
-                                       this,
-                                       providerAndOwner));
-
-                       final SlotSharingManager.MultiTaskSlotLocality 
multiTaskSlotLocality;
-
-                       try {
-                               if (task.getCoLocationConstraint() != null) {
-                                       multiTaskSlotLocality = 
allocateCoLocatedMultiTaskSlot(
-                                               task.getCoLocationConstraint(),
-                                               multiTaskSlotManager,
-                                               slotProfile,
-                                               allowQueuedScheduling,
-                                               allocationTimeout);
+       private CompletableFuture<LogicalSlot> allocateSingleSlot(
+               SlotRequestId slotRequestId,
+               SlotProfile slotProfile,
+               boolean allowQueuedScheduling,
+               Time allocationTimeout) {
+               // request an allocated slot to assign a single logical slot to
+               CompletableFuture<SlotAndLocality> slotAndLocalityFuture = 
requestAllocatedSlot(
+                       slotRequestId,
+                       slotProfile,
+                       allowQueuedScheduling,
+                       allocationTimeout);
+
+               return slotAndLocalityFuture.thenApply(
+                       (SlotAndLocality slotAndLocality) -> {
+                               final AllocatedSlot allocatedSlot = 
slotAndLocality.getSlot();
+
+                               final SingleLogicalSlot singleTaskSlot = new 
SingleLogicalSlot(
+                                       slotRequestId,
+                                       allocatedSlot,
+                                       null,
+                                       slotAndLocality.getLocality(),
+                                       providerAndOwner);
+
+                               if 
(allocatedSlot.tryAssignPayload(singleTaskSlot)) {
+                                       return singleTaskSlot;
                                } else {
-                                       multiTaskSlotLocality = 
allocateMultiTaskSlot(
-                                               task.getJobVertexId(),
-                                               multiTaskSlotManager,
-                                               slotProfile,
-                                               allowQueuedScheduling,
-                                               allocationTimeout);
+                                       final FlinkException flinkException =
+                                               new FlinkException("Could not 
assign payload to allocated slot " + allocatedSlot.getAllocationId() + '.');
+                                       releaseSingleSlot(slotRequestId, 
flinkException);
+                                       throw new 
CompletionException(flinkException);
                                }
-                       } catch (NoResourceAvailableException 
noResourceException) {
-                               return 
FutureUtils.completedExceptionally(noResourceException);
-                       }
+                       });
+       }
 
-                       // sanity check
-                       
Preconditions.checkState(!multiTaskSlotLocality.getMultiTaskSlot().contains(task.getJobVertexId()));
+       private CompletableFuture<LogicalSlot> allocateSharedSlot(
+               SlotRequestId slotRequestId,
+               ScheduledUnit task,
+               SlotProfile slotProfile,
+               boolean allowQueuedScheduling,
+               Time allocationTimeout) {
 
-                       final SlotSharingManager.SingleTaskSlot leaf = 
multiTaskSlotLocality.getMultiTaskSlot().allocateSingleTaskSlot(
-                               slotRequestId,
-                               task.getJobVertexId(),
-                               multiTaskSlotLocality.getLocality());
+               // allocate slot with slot sharing
+               final SlotSharingManager multiTaskSlotManager = 
slotSharingManagers.computeIfAbsent(
+                       task.getSlotSharingGroupId(),
+                       id -> new SlotSharingManager(
+                               id,
+                               this,
+                               providerAndOwner));
 
-                       return leaf.getLogicalSlotFuture();
-               } else {
-                       // request an allocated slot to assign a single logical 
slot to
-                       CompletableFuture<SlotAndLocality> 
slotAndLocalityFuture = requestAllocatedSlot(
-                               slotRequestId,
-                               slotProfile,
-                               allowQueuedScheduling,
-                               allocationTimeout);
+               final SlotSharingManager.MultiTaskSlotLocality 
multiTaskSlotLocality;
+
+               try {
+                       if (task.getCoLocationConstraint() != null) {
+                               multiTaskSlotLocality = 
allocateCoLocatedMultiTaskSlot(
+                                       task.getCoLocationConstraint(),
+                                       multiTaskSlotManager,
+                                       slotProfile,
+                                       allowQueuedScheduling,
+                                       allocationTimeout);
+                       } else {
+                               multiTaskSlotLocality = allocateMultiTaskSlot(
+                                       task.getJobVertexId(),
+                                       multiTaskSlotManager,
+                                       slotProfile,
+                                       allowQueuedScheduling,
+                                       allocationTimeout);
+                       }
+               } catch (NoResourceAvailableException noResourceException) {
+                       return 
FutureUtils.completedExceptionally(noResourceException);
+               }
 
-                       return slotAndLocalityFuture.thenApply(
-                               (SlotAndLocality slotAndLocality) -> {
-                                       final AllocatedSlot allocatedSlot = 
slotAndLocality.getSlot();
+               // sanity check
+               
Preconditions.checkState(!multiTaskSlotLocality.getMultiTaskSlot().contains(task.getJobVertexId()));
 
-                                       final SingleLogicalSlot singleTaskSlot 
= new SingleLogicalSlot(
-                                               slotRequestId,
-                                               allocatedSlot,
-                                               null,
-                                               slotAndLocality.getLocality(),
-                                               providerAndOwner);
+               final SlotSharingManager.SingleTaskSlot leaf = 
multiTaskSlotLocality.getMultiTaskSlot().allocateSingleTaskSlot(
+                       slotRequestId,
+                       task.getJobVertexId(),
+                       multiTaskSlotLocality.getLocality());
 
-                                       if 
(allocatedSlot.tryAssignPayload(singleTaskSlot)) {
-                                               return singleTaskSlot;
-                                       } else {
-                                               final FlinkException 
flinkException = new FlinkException("Could not assign payload to allocated slot 
" + allocatedSlot.getAllocationId() + '.');
-                                               releaseSlot(slotRequestId, 
null, flinkException);
-                                               throw new 
CompletionException(flinkException);
-                                       }
-                               });
-               }
+               return leaf.getLogicalSlotFuture();
        }
 
        /**
@@ -439,12 +459,13 @@ public void disconnectResourceManager() {
                        slotProfile = new SlotProfile(
                                slotProfile.getResourceProfile(),
                                
Collections.singleton(coLocationConstraint.getLocation()),
-                               slotProfile.getPriorAllocations());
+                               slotProfile.getPreferredAllocations());
                }
 
                // get a new multi task slot
                final SlotSharingManager.MultiTaskSlotLocality 
multiTaskSlotLocality = allocateMultiTaskSlot(
-                       coLocationConstraint.getGroupId(), multiTaskSlotManager,
+                       coLocationConstraint.getGroupId(),
+                       multiTaskSlotManager,
                        slotProfile,
                        allowQueuedScheduling,
                        allocationTimeout);
@@ -549,9 +570,8 @@ public void disconnectResourceManager() {
                if (multiTaskSlotLocality != null) {
                        // prefer slot sharing group slots over unused slots
                        if (polledSlotAndLocality != null) {
-                               releaseSlot(
+                               releaseSingleSlot(
                                        allocatedSlotRequestId,
-                                       null,
                                        new FlinkException("Locality constraint 
is not better fulfilled by allocated slot."));
                        }
                        return multiTaskSlotLocality;
@@ -588,9 +608,8 @@ public void disconnectResourceManager() {
                                                                }
                                                        }
                                                } else {
-                                                       releaseSlot(
+                                                       releaseSingleSlot(
                                                                
allocatedSlotRequestId,
-                                                               null,
                                                                new 
FlinkException("Could not find task slot with " + multiTaskSlotRequestId + 
'.'));
                                                }
                                        });
@@ -742,41 +761,56 @@ private void stashRequestWaitingForResourceManager(final 
PendingRequest pendingR
        // 
------------------------------------------------------------------------
 
        @Override
-       public CompletableFuture<Acknowledge> releaseSlot(SlotRequestId 
slotRequestId, @Nullable SlotSharingGroupId slotSharingGroupId, Throwable 
cause) {
+       public CompletableFuture<Acknowledge> releaseSlot(
+               SlotRequestId slotRequestId,
+               @Nullable SlotSharingGroupId slotSharingGroupId,
+               Throwable cause) {
+
                log.debug("Releasing slot [{}] because: {}", slotRequestId, 
cause != null ? cause.getMessage() : "null");
 
                if (slotSharingGroupId != null) {
-                       final SlotSharingManager multiTaskSlotManager = 
slotSharingManagers.get(slotSharingGroupId);
+                       releaseSharedSlot(slotRequestId, slotSharingGroupId, 
cause);
+               } else {
+                       releaseSingleSlot(slotRequestId, cause);
+               }
 
-                       if (multiTaskSlotManager != null) {
-                               final SlotSharingManager.TaskSlot taskSlot = 
multiTaskSlotManager.getTaskSlot(slotRequestId);
+               return CompletableFuture.completedFuture(Acknowledge.get());
+       }
 
-                               if (taskSlot != null) {
-                                       taskSlot.release(cause);
-                               } else {
-                                       log.debug("Could not find slot [{}] in 
slot sharing group {}. Ignoring release slot request.", slotRequestId, 
slotSharingGroupId);
-                               }
+       private void releaseSharedSlot(
+               SlotRequestId slotRequestId,
+               @Nonnull SlotSharingGroupId slotSharingGroupId, Throwable 
cause) {
+
+               final SlotSharingManager multiTaskSlotManager = 
slotSharingManagers.get(slotSharingGroupId);
+
+               if (multiTaskSlotManager != null) {
+                       final SlotSharingManager.TaskSlot taskSlot = 
multiTaskSlotManager.getTaskSlot(slotRequestId);
+
+                       if (taskSlot != null) {
+                               taskSlot.release(cause);
                        } else {
-                               log.debug("Could not find slot sharing group 
{}. Ignoring release slot request.", slotSharingGroupId);
+                               log.debug("Could not find slot [{}] in slot 
sharing group {}. Ignoring release slot request.", slotRequestId, 
slotSharingGroupId);
                        }
                } else {
-                       final PendingRequest pendingRequest = 
removePendingRequest(slotRequestId);
+                       log.debug("Could not find slot sharing group {}. 
Ignoring release slot request.", slotSharingGroupId);
+               }
+       }
 
-                       if (pendingRequest != null) {
-                               failPendingRequest(pendingRequest, new 
FlinkException("Pending slot request with " + slotRequestId + " has been 
released."));
-                       } else {
-                               final AllocatedSlot allocatedSlot = 
allocatedSlots.remove(slotRequestId);
+       private void releaseSingleSlot(SlotRequestId slotRequestId, Throwable 
cause) {
+               final PendingRequest pendingRequest = 
removePendingRequest(slotRequestId);
 
-                               if (allocatedSlot != null) {
-                                       allocatedSlot.releasePayload(cause);
-                                       
tryFulfillSlotRequestOrMakeAvailable(allocatedSlot);
-                               } else {
-                                       log.debug("There is no allocated slot 
[{}]. Ignoring the release slot request.", slotRequestId);
-                               }
+               if (pendingRequest != null) {
+                       failPendingRequest(pendingRequest, new 
FlinkException("Pending slot request with " + slotRequestId + " has been 
released."));
+               } else {
+                       final AllocatedSlot allocatedSlot = 
allocatedSlots.remove(slotRequestId);
+
+                       if (allocatedSlot != null) {
+                               allocatedSlot.releasePayload(cause);
+                               
tryFulfillSlotRequestOrMakeAvailable(allocatedSlot);
+                       } else {
+                               log.debug("There is no allocated slot [{}]. 
Ignoring the release slot request.", slotRequestId);
                        }
                }
-
-               return CompletableFuture.completedFuture(Acknowledge.get());
        }
 
        /**
@@ -876,17 +910,13 @@ private PendingRequest pollMatchingPendingRequest(final 
AllocatedSlot slot) {
                validateRunsInMainThread();
 
                List<CompletableFuture<Optional<SlotOffer>>> acceptedSlotOffers 
= offers.stream().map(
-                       offer -> {
-                               CompletableFuture<Optional<SlotOffer>> 
acceptedSlotOffer = offerSlot(
-                                               taskManagerLocation,
-                                               taskManagerGateway,
-                                               offer)
-                                       .thenApply(
-                                               (acceptedSlot) -> acceptedSlot 
? Optional.of(offer) : Optional.empty()
-                                       );
-
-                               return acceptedSlotOffer;
-                       }
+                       offer -> offerSlot(
+                                       taskManagerLocation,
+                                       taskManagerGateway,
+                                       offer)
+                               .<Optional<SlotOffer>>thenApply(
+                                       (acceptedSlot) -> acceptedSlot ? 
Optional.of(offer) : Optional.empty()
+                               )
                ).collect(Collectors.toList());
 
                CompletableFuture<Collection<Optional<SlotOffer>>> 
optionalSlotOffers = FutureUtils.combineAll(acceptedSlotOffers);
@@ -1378,11 +1408,7 @@ int size() {
 
                @VisibleForTesting
                Set<AllocatedSlot> getSlotsForTaskManager(ResourceID 
resourceId) {
-                       if 
(allocatedSlotsByTaskManager.containsKey(resourceId)) {
-                               return 
allocatedSlotsByTaskManager.get(resourceId);
-                       } else {
-                               return Collections.emptySet();
-                       }
+                       return 
allocatedSlotsByTaskManager.getOrDefault(resourceId, Collections.emptySet());
                }
        }
 
@@ -1423,18 +1449,12 @@ void add(final AllocatedSlot slot, final long 
timestamp) {
                                final ResourceID resourceID = 
slot.getTaskManagerLocation().getResourceID();
                                final String host = 
slot.getTaskManagerLocation().getFQDNHostname();
 
-                               Set<AllocatedSlot> slotsForTaskManager = 
availableSlotsByTaskManager.get(resourceID);
-                               if (slotsForTaskManager == null) {
-                                       slotsForTaskManager = new HashSet<>();
-                                       
availableSlotsByTaskManager.put(resourceID, slotsForTaskManager);
-                               }
+                               Set<AllocatedSlot> slotsForTaskManager =
+                                       
availableSlotsByTaskManager.computeIfAbsent(resourceID, k -> new HashSet<>());
                                slotsForTaskManager.add(slot);
 
-                               Set<AllocatedSlot> slotsForHost = 
availableSlotsByHost.get(host);
-                               if (slotsForHost == null) {
-                                       slotsForHost = new HashSet<>();
-                                       availableSlotsByHost.put(host, 
slotsForHost);
-                               }
+                               Set<AllocatedSlot> slotsForHost =
+                                       
availableSlotsByHost.computeIfAbsent(host, k -> new HashSet<>());
                                slotsForHost.add(slot);
                        }
                        else {
@@ -1475,7 +1495,7 @@ SlotAndLocality poll(SchedulingStrategy 
schedulingStrategy, SlotProfile slotProf
 
                        SlotAndLocality matchingSlotAndLocality = 
schedulingStrategy.findMatchWithLocality(
                                slotProfile,
-                               slotAndTimestamps.stream(),
+                               slotAndTimestamps::stream,
                                SlotAndTimestamp::slot,
                                (SlotAndTimestamp slot) -> 
slot.slot().getResourceProfile().isMatching(slotProfile.getResourceProfile()),
                                (SlotAndTimestamp slotAndTimestamp, Locality 
locality) -> {
@@ -1582,12 +1602,18 @@ void clear() {
         * An implementation of the {@link SlotOwner} and {@link SlotProvider} 
interfaces
         * that delegates methods as RPC calls to the SlotPool's RPC gateway.
         */
-       private static class ProviderAndOwner implements SlotOwner, 
SlotProvider {
+       public static class ProviderAndOwner implements SlotOwner, SlotProvider 
{
 
                private final SlotPoolGateway gateway;
+               private final boolean requiresPreviousAllocationsForScheduling;
 
-               ProviderAndOwner(SlotPoolGateway gateway) {
+               ProviderAndOwner(SlotPoolGateway gateway, boolean 
requiresPreviousAllocationsForScheduling) {
                        this.gateway = gateway;
+                       this.requiresPreviousAllocationsForScheduling = 
requiresPreviousAllocationsForScheduling;
+               }
+
+               public boolean requiresPreviousAllocationsForScheduling() {
+                       return requiresPreviousAllocationsForScheduling;
                }
 
                @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
index ef288a26469..af5582752d9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
@@ -186,7 +186,7 @@ MultiTaskSlotLocality getResolvedRootSlot(AbstractID 
groupId, SchedulingStrategy
                Collection<Set<MultiTaskSlot>> resolvedRootSlotsValues = 
this.resolvedRootSlots.values();
                return matcher.findMatchWithLocality(
                        slotProfile,
-                       
resolvedRootSlotsValues.stream().flatMap(Collection::stream),
+                       () -> 
resolvedRootSlotsValues.stream().flatMap(Collection::stream),
                        (MultiTaskSlot multiTaskSlot) -> 
multiTaskSlot.getSlotContextFuture().join(),
                        (MultiTaskSlot multiTaskSlot) -> 
!multiTaskSlot.contains(groupId),
                        MultiTaskSlotLocality::of);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/SlotProfileTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/SlotProfileTest.java
index 6a826aa23e1..ee491499b77 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/SlotProfileTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/SlotProfileTest.java
@@ -124,18 +124,59 @@ public void 
matchPreviousAllocationOverridesPreferredLocation() {
        }
 
        @Test
-       public void matchPreviousLocationNotAvailable() {
+       public void matchPreviousLocationNotAvailableButByLocality() {
 
                SlotProfile slotProfile = new SlotProfile(resourceProfile, 
Collections.singletonList(tml4), Collections.singletonList(aidX));
                SlotContext match = runMatching(slotProfile);
 
-               Assert.assertEquals(null, match);
+               Assert.assertEquals(ssc4, match);
+       }
+
+       @Test
+       public void matchPreviousLocationNotAvailableAndAllOthersBlacklisted() {
+               HashSet<AllocationID> blacklisted = new HashSet<>(4);
+               blacklisted.add(aid1);
+               blacklisted.add(aid2);
+               blacklisted.add(aid3);
+               blacklisted.add(aid4);
+               SlotProfile slotProfile = new SlotProfile(resourceProfile, 
Collections.singletonList(tml4), Collections.singletonList(aidX), blacklisted);
+               SlotContext match = runMatching(slotProfile);
+
+               // there should be no valid option left and we expect null as 
return
+               Assert.assertNull(match);
+       }
+
+       @Test
+       public void matchPreviousLocationNotAvailableAndSomeOthersBlacklisted() 
{
+               HashSet<AllocationID> blacklisted = new HashSet<>(3);
+               blacklisted.add(aid1);
+               blacklisted.add(aid3);
+               blacklisted.add(aid4);
+               SlotProfile slotProfile = new SlotProfile(resourceProfile, 
Collections.singletonList(tml4), Collections.singletonList(aidX), blacklisted);
+               SlotContext match = runMatching(slotProfile);
+
+               // we expect that the candidate that is not blacklisted is 
returned
+               Assert.assertEquals(ssc2, match);
+       }
+
+       @Test
+       public void matchPreviousLocationAvailableButAlsoBlacklisted() {
+               HashSet<AllocationID> blacklisted = new HashSet<>(4);
+               blacklisted.add(aid1);
+               blacklisted.add(aid2);
+               blacklisted.add(aid3);
+               blacklisted.add(aid4);
+               SlotProfile slotProfile = new SlotProfile(resourceProfile, 
Collections.singletonList(tml3), Collections.singletonList(aid3), blacklisted);
+               SlotContext match = runMatching(slotProfile);
+
+               // available previous allocation should override blacklisting
+               Assert.assertEquals(ssc3, match);
        }
 
        private SlotContext runMatching(SlotProfile slotProfile) {
                return schedulingStrategy.findMatchWithLocality(
                        slotProfile,
-                       candidates.stream(),
+                       candidates::stream,
                        (candidate) -> candidate,
                        (candidate) -> true,
                        (candidate, locality) -> candidate);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
index 56fd7e12369..74724629eea 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
@@ -107,6 +107,7 @@ public void testSlotReleaseOnFailedResourceAssignment() 
throws Exception {
                        slotProvider,
                        false,
                        LocationPreferenceConstraint.ALL,
+                       Collections.emptySet(),
                        TestingUtils.infiniteTime());
 
                assertFalse(allocationFuture.isDone());
@@ -156,6 +157,7 @@ public void 
testSlotReleaseOnExecutionCancellationInScheduled() throws Exception
                        slotProvider,
                        false,
                        LocationPreferenceConstraint.ALL,
+                       Collections.emptySet(),
                        TestingUtils.infiniteTime());
 
                assertTrue(allocationFuture.isDone());
@@ -205,6 +207,7 @@ public void 
testSlotReleaseOnExecutionCancellationInRunning() throws Exception {
                        slotProvider,
                        false,
                        LocationPreferenceConstraint.ALL,
+                       Collections.emptySet(),
                        TestingUtils.infiniteTime());
 
                assertTrue(allocationFuture.isDone());
@@ -254,6 +257,7 @@ public void 
testSlotAllocationCancellationWhenExecutionCancelled() throws Except
                        slotProvider,
                        false,
                        LocationPreferenceConstraint.ALL,
+                       Collections.emptySet(),
                        TestingUtils.infiniteTime());
 
                assertThat(allocationFuture.isDone(), is(false));
@@ -357,7 +361,7 @@ public void 
testTerminationFutureIsCompletedAfterSlotRelease() throws Exception
 
                ExecutionVertex executionVertex = 
executionJobVertex.getTaskVertices()[0];
 
-               executionVertex.scheduleForExecution(slotProvider, false, 
LocationPreferenceConstraint.ANY).get();
+               executionVertex.scheduleForExecution(slotProvider, false, 
LocationPreferenceConstraint.ANY, Collections.emptySet()).get();
 
                Execution currentExecutionAttempt = 
executionVertex.getCurrentExecutionAttempt();
 
@@ -417,7 +421,7 @@ public void testTaskRestoreStateIsNulledAfterDeployment() 
throws Exception {
                assertThat(execution.getTaskRestore(), is(notNullValue()));
 
                // schedule the execution vertex and wait for its deployment
-               executionVertex.scheduleForExecution(slotProvider, false, 
LocationPreferenceConstraint.ANY).get();
+               executionVertex.scheduleForExecution(slotProvider, false, 
LocationPreferenceConstraint.ANY, Collections.emptySet()).get();
 
                assertThat(execution.getTaskRestore(), is(nullValue()));
        }
@@ -479,7 +483,8 @@ public void testEagerSchedulingFailureReturnsSlot() throws 
Exception {
                        final CompletableFuture<Void> schedulingFuture = 
execution.scheduleForExecution(
                                slotProvider,
                                false,
-                               LocationPreferenceConstraint.ANY);
+                               LocationPreferenceConstraint.ANY,
+                               Collections.emptySet());
 
                        try {
                                schedulingFuture.get();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
index cd613f0f50a..41894767b6a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
@@ -40,6 +40,7 @@
 import org.junit.Test;
 
 import java.io.IOException;
+import java.util.Collections;
 
 import scala.concurrent.ExecutionContext;
 
@@ -454,7 +455,7 @@ public void testScheduleOrDeployAfterCancel() {
                        // it can occur as the result of races
                        {
                                Scheduler scheduler = mock(Scheduler.class);
-                               vertex.scheduleForExecution(scheduler, false, 
LocationPreferenceConstraint.ALL);
+                               vertex.scheduleForExecution(scheduler, false, 
LocationPreferenceConstraint.ALL, Collections.emptySet());
 
                                assertEquals(ExecutionState.CANCELED, 
vertex.getExecutionState());
                        }
@@ -493,7 +494,7 @@ public void testActionsWhileCancelling() {
                                setVertexState(vertex, 
ExecutionState.CANCELING);
 
                                Scheduler scheduler = mock(Scheduler.class);
-                               vertex.scheduleForExecution(scheduler, false, 
LocationPreferenceConstraint.ALL);
+                               vertex.scheduleForExecution(scheduler, false, 
LocationPreferenceConstraint.ALL, Collections.emptySet());
                        }
                        catch (Exception e) {
                                fail("should not throw an exception");
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
index 51d1827e668..6bfcb7ff5d0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
@@ -36,6 +36,7 @@
 
 import org.junit.Test;
 
+import java.util.Collections;
 import java.util.concurrent.CompletableFuture;
 
 import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getExecutionVertex;
@@ -71,7 +72,7 @@ public void testSlotReleasedWhenScheduledImmediately() {
 
                        assertEquals(ExecutionState.CREATED, 
vertex.getExecutionState());
                        // try to deploy to the slot
-                       vertex.scheduleForExecution(scheduler, false, 
LocationPreferenceConstraint.ALL);
+                       vertex.scheduleForExecution(scheduler, false, 
LocationPreferenceConstraint.ALL, Collections.emptySet());
 
                        // will have failed
                        assertEquals(ExecutionState.FAILED, 
vertex.getExecutionState());
@@ -103,7 +104,7 @@ public void testSlotReleasedWhenScheduledQueued() {
 
                        assertEquals(ExecutionState.CREATED, 
vertex.getExecutionState());
                        // try to deploy to the slot
-                       vertex.scheduleForExecution(scheduler, true, 
LocationPreferenceConstraint.ALL);
+                       vertex.scheduleForExecution(scheduler, true, 
LocationPreferenceConstraint.ALL, Collections.emptySet());
 
                        // future has not yet a slot
                        assertEquals(ExecutionState.SCHEDULED, 
vertex.getExecutionState());
@@ -138,7 +139,7 @@ public void testScheduleToDeploying() {
                        assertEquals(ExecutionState.CREATED, 
vertex.getExecutionState());
 
                        // try to deploy to the slot
-                       vertex.scheduleForExecution(scheduler, false, 
LocationPreferenceConstraint.ALL);
+                       vertex.scheduleForExecution(scheduler, false, 
LocationPreferenceConstraint.ALL, Collections.emptySet());
                        assertEquals(ExecutionState.DEPLOYING, 
vertex.getExecutionState());
                }
                catch (Exception e) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
index c411393990c..a53debfd0b8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
@@ -49,6 +49,7 @@
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 
@@ -157,7 +158,7 @@ public void testMultiRegionsFailover() throws Exception {
 
                assertEquals(JobStatus.RUNNING, 
strategy.getFailoverRegion(ev11).getState());
 
-               ev21.scheduleForExecution(slotProvider, true, 
LocationPreferenceConstraint.ALL);
+               ev21.scheduleForExecution(slotProvider, true, 
LocationPreferenceConstraint.ALL, Collections.emptySet());
                ev21.getCurrentExecutionAttempt().fail(new Exception("New 
fail"));
                assertEquals(JobStatus.CANCELLING, 
strategy.getFailoverRegion(ev11).getState());
                assertEquals(JobStatus.RUNNING, 
strategy.getFailoverRegion(ev22).getState());
@@ -170,7 +171,7 @@ public void testMultiRegionsFailover() throws Exception {
 
                ev11.getCurrentExecutionAttempt().markFinished();
                ev21.getCurrentExecutionAttempt().markFinished();
-               ev22.scheduleForExecution(slotProvider, true, 
LocationPreferenceConstraint.ALL);
+               ev22.scheduleForExecution(slotProvider, true, 
LocationPreferenceConstraint.ALL, Collections.emptySet());
                ev22.getCurrentExecutionAttempt().markFinished();
                assertEquals(JobStatus.RUNNING, 
strategy.getFailoverRegion(ev11).getState());
                assertEquals(JobStatus.RUNNING, 
strategy.getFailoverRegion(ev22).getState());
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/SchedulingITCase.java 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/SchedulingITCase.java
index 0344d71879b..fb28c2c75a4 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/SchedulingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/SchedulingITCase.java
@@ -41,7 +41,6 @@
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
 
-import org.junit.Ignore;
 import org.junit.Test;
 
 import javax.annotation.Nonnull;
@@ -50,6 +49,7 @@
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import static 
org.apache.flink.configuration.JobManagerOptions.EXECUTION_FAILOVER_STRATEGY;
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertThat;
 
@@ -72,13 +72,26 @@ public void testDisablingLocalRecovery() throws Exception {
 
        /**
         * Tests that if local recovery is enabled we won't spread
-        * out tasks when recovering.
+        * out tasks when recovering for global failover.
         */
        @Test
-       @Ignore("The test should not pass until FLINK-9635 has been fixed")
-       public void testLocalRecovery() throws Exception {
+       public void testLocalRecoveryFull() throws Exception {
+               testLocalRecoveryInternal("full");
+       }
+
+       /**
+        * Tests that if local recovery is enabled we won't spread
+        * out tasks when recovering for regional failover.
+        */
+       @Test
+       public void testLocalRecoveryRegion() throws Exception {
+               testLocalRecoveryInternal("region");
+       }
+
+       private void testLocalRecoveryInternal(String failoverStrategyValue) 
throws Exception {
                final Configuration configuration = new Configuration();
                configuration.setBoolean(CheckpointingOptions.LOCAL_RECOVERY, 
true);
+               configuration.setString(EXECUTION_FAILOVER_STRATEGY.key(), 
failoverStrategyValue);
 
                executeSchedulingTest(configuration);
        }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to