This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 81c739ae462412e531216bb46bc567fce2355dd8
Author: Zhu Zhu <reed...@gmail.com>
AuthorDate: Thu Jul 7 13:36:27 2022 +0800

    [FLINK-28137][runtime] Introduce SpeculativeScheduler
    
    This closes #20222.
---
 .../runtime/blocklist/BlocklistOperations.java     |  33 ++
 .../RestartPipelinedRegionFailoverStrategy.java    |  12 -
 .../apache/flink/runtime/jobgraph/JobVertex.java   |   7 +-
 .../DefaultSlotPoolServiceSchedulerFactory.java    |   7 +-
 .../apache/flink/runtime/jobmaster/JobMaster.java  |   3 +-
 .../jobmaster/SlotPoolServiceSchedulerFactory.java |   4 +-
 .../flink/runtime/scheduler/DefaultScheduler.java  | 137 ++++---
 .../runtime/scheduler/DefaultSchedulerFactory.java |   4 +-
 .../flink/runtime/scheduler/ExecutionDeployer.java |   2 +-
 .../scheduler/ExecutionVertexVersioner.java        |   9 +
 .../flink/runtime/scheduler/SchedulerBase.java     |  50 +--
 .../runtime/scheduler/SchedulerNGFactory.java      |   4 +-
 .../scheduler/SimpleExecutionSlotAllocator.java    | 189 +++++++++
 .../adaptive/AdaptiveSchedulerFactory.java         |   4 +-
 .../adaptivebatch/AdaptiveBatchScheduler.java      |  14 +-
 .../AdaptiveBatchSchedulerFactory.java             | 139 ++++---
 .../adaptivebatch/SpeculativeScheduler.java        | 314 +++++++++++++++
 .../runtime/jobmaster/JobMasterSchedulerTest.java  |   4 +-
 .../runtime/scheduler/DefaultSchedulerBuilder.java |  47 ++-
 .../runtime/scheduler/DefaultSchedulerTest.java    |   4 +-
 .../SimpleExecutionSlotAllocatorTest.java          | 269 +++++++++++++
 .../scheduler/TestingSchedulerNGFactory.java       |   4 +-
 .../adaptivebatch/SpeculativeSchedulerTest.java    | 426 +++++++++++++++++++++
 23 files changed, 1516 insertions(+), 170 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlocklistOperations.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlocklistOperations.java
new file mode 100644
index 00000000000..f7ef00f5f01
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlocklistOperations.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blocklist;
+
+import java.util.Collection;
+
+/** Operations to perform on the blocklist. */
+public interface BlocklistOperations {
+
+    /**
+     * Add new blocked node records. If a node (identified by node id) already 
exists, the newly
+     * added one will be merged with the existing one.
+     *
+     * @param newNodes the new blocked node records
+     */
+    void addNewBlockedNodes(Collection<BlockedNode> newNodes);
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionFailoverStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionFailoverStrategy.java
index 4e977c1a7c8..6e7181ffd03 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionFailoverStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionFailoverStrategy.java
@@ -31,9 +31,6 @@ import 
org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.IterableUtils;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -53,10 +50,6 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class RestartPipelinedRegionFailoverStrategy implements 
FailoverStrategy {
 
-    /** The log object used for debugging. */
-    private static final Logger LOG =
-            
LoggerFactory.getLogger(RestartPipelinedRegionFailoverStrategy.class);
-
     /** The topology containing info about all the vertices and result 
partitions. */
     private final SchedulingTopology topology;
 
@@ -112,7 +105,6 @@ public class RestartPipelinedRegionFailoverStrategy 
implements FailoverStrategy
     @Override
     public Set<ExecutionVertexID> getTasksNeedingRestart(
             ExecutionVertexID executionVertexId, Throwable cause) {
-        LOG.info("Calculating tasks to restart to recover the failed task 
{}.", executionVertexId);
 
         final SchedulingPipelinedRegion failedRegion =
                 topology.getPipelinedRegionOfVertex(executionVertexId);
@@ -149,10 +141,6 @@ public class RestartPipelinedRegionFailoverStrategy 
implements FailoverStrategy
                     
dataConsumptionException.get().getPartitionId().getPartitionId());
         }
 
-        LOG.info(
-                "{} tasks should be restarted to recover the failed task {}. ",
-                tasksToRestart.size(),
-                executionVertexId);
         return tasksToRestart;
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
index af4a0189f06..36d1a1e5487 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
@@ -308,8 +308,11 @@ public class JobVertex implements java.io.Serializable {
      * @param parallelism The parallelism for the task.
      */
     public void setParallelism(int parallelism) {
-        if (parallelism < 1) {
-            throw new IllegalArgumentException("The parallelism must be at 
least one.");
+        if (parallelism < 1 && parallelism != 
ExecutionConfig.PARALLELISM_DEFAULT) {
+            throw new IllegalArgumentException(
+                    "The parallelism must be at least one, or "
+                            + ExecutionConfig.PARALLELISM_DEFAULT
+                            + " (unset).");
         }
         this.parallelism = parallelism;
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java
index 0b3757a761e..f218927ebb5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java
@@ -28,6 +28,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.SchedulerExecutionMode;
 import org.apache.flink.runtime.blob.BlobWriter;
+import org.apache.flink.runtime.blocklist.BlocklistOperations;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.runtime.executiongraph.JobStatusListener;
@@ -112,7 +113,8 @@ public final class DefaultSlotPoolServiceSchedulerFactory
             long initializationTimestamp,
             ComponentMainThreadExecutor mainThreadExecutor,
             FatalErrorHandler fatalErrorHandler,
-            JobStatusListener jobStatusListener)
+            JobStatusListener jobStatusListener,
+            BlocklistOperations blocklistOperations)
             throws Exception {
         return schedulerNGFactory.createInstance(
                 log,
@@ -133,7 +135,8 @@ public final class DefaultSlotPoolServiceSchedulerFactory
                 initializationTimestamp,
                 mainThreadExecutor,
                 fatalErrorHandler,
-                jobStatusListener);
+                jobStatusListener,
+                blocklistOperations);
     }
 
     public static DefaultSlotPoolServiceSchedulerFactory create(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 1d9024f50ab..25e2949182c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -385,7 +385,8 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId>
                         initializationTimestamp,
                         getMainThreadExecutor(),
                         fatalErrorHandler,
-                        jobStatusListener);
+                        jobStatusListener,
+                        blocklistHandler::addNewBlockedNodes);
 
         return scheduler;
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotPoolServiceSchedulerFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotPoolServiceSchedulerFactory.java
index d97dc577b41..df8c40c86c4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotPoolServiceSchedulerFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotPoolServiceSchedulerFactory.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.blob.BlobWriter;
+import org.apache.flink.runtime.blocklist.BlocklistOperations;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.runtime.executiongraph.JobStatusListener;
@@ -85,6 +86,7 @@ public interface SlotPoolServiceSchedulerFactory {
             long initializationTimestamp,
             ComponentMainThreadExecutor mainThreadExecutor,
             FatalErrorHandler fatalErrorHandler,
-            JobStatusListener jobStatusListener)
+            JobStatusListener jobStatusListener,
+            BlocklistOperations blocklistOperations)
             throws Exception;
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
index 521dda8b953..da6fa69b325 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
@@ -29,11 +29,8 @@ import 
org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.Execution;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.executiongraph.JobStatusListener;
-import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
 import 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler;
 import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategy;
 import 
org.apache.flink.runtime.executiongraph.failover.flip1.FailureHandlingResult;
@@ -45,7 +42,6 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
-import 
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder;
 import 
org.apache.flink.runtime.scheduler.exceptionhistory.FailureHandlingResultSnapshot;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategy;
@@ -53,8 +49,6 @@ import 
org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
 import org.apache.flink.runtime.shuffle.ShuffleMaster;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.runtime.topology.Vertex;
-import org.apache.flink.util.IterableUtils;
 import org.apache.flink.util.concurrent.FutureUtils;
 import org.apache.flink.util.concurrent.ScheduledExecutor;
 
@@ -76,7 +70,9 @@ import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
 
 /** The future default scheduler. */
 public class DefaultScheduler extends SchedulerBase implements 
SchedulerOperations {
@@ -85,13 +81,13 @@ public class DefaultScheduler extends SchedulerBase 
implements SchedulerOperatio
 
     private final ClassLoader userCodeLoader;
 
-    private final ExecutionSlotAllocator executionSlotAllocator;
+    protected final ExecutionSlotAllocator executionSlotAllocator;
 
     private final ExecutionFailureHandler executionFailureHandler;
 
     private final ScheduledExecutor delayExecutor;
 
-    protected final SchedulingStrategy schedulingStrategy;
+    private final SchedulingStrategy schedulingStrategy;
 
     private final ExecutionOperations executionOperations;
 
@@ -106,7 +102,7 @@ public class DefaultScheduler extends SchedulerBase 
implements SchedulerOperatio
     // anymore. The reserved allocation information is needed for local 
recovery.
     private final Map<ExecutionVertexID, AllocationID> 
reservedAllocationByExecutionVertex;
 
-    private final ExecutionDeployer executionDeployer;
+    protected final ExecutionDeployer executionDeployer;
 
     protected DefaultScheduler(
             final Logger log,
@@ -204,10 +200,9 @@ public class DefaultScheduler extends SchedulerBase 
implements SchedulerOperatio
 
     @Override
     protected void cancelAllPendingSlotRequestsInternal() {
-        IterableUtils.toStream(getSchedulingTopology().getVertices())
-                .map(Vertex::getId)
-                .map(this::getCurrentExecutionIdOfVertex)
-                .forEach(executionSlotAllocator::cancel);
+        getSchedulingTopology()
+                .getVertices()
+                .forEach(ev -> 
cancelAllPendingSlotRequestsForVertex(ev.getId()));
     }
 
     @Override
@@ -220,47 +215,46 @@ public class DefaultScheduler extends SchedulerBase 
implements SchedulerOperatio
     }
 
     @Override
-    protected void updateTaskExecutionStateInternal(
-            final ExecutionVertexID executionVertexId,
-            final TaskExecutionStateTransition taskExecutionState) {
+    protected void onTaskFinished(final Execution execution) {
+        checkState(execution.getState() == ExecutionState.FINISHED);
 
+        final ExecutionVertexID executionVertexId = 
execution.getVertex().getID();
         // once a task finishes, it will release the assigned allocation/slot 
and no longer
         // needs it. Therefore, it should stop reserving the slot so that 
other tasks are
         // possible to use the slot. Ideally, the `stopReserveAllocation` 
should happen
         // along with the release slot process. However, that process is 
hidden in the depth
         // of the ExecutionGraph, so we currently do it in DefaultScheduler 
after that process
         // is done.
-        if (taskExecutionState.getExecutionState() == ExecutionState.FINISHED) 
{
-            stopReserveAllocation(executionVertexId);
-        }
+        stopReserveAllocation(executionVertexId);
 
-        schedulingStrategy.onExecutionStateChange(
-                executionVertexId, taskExecutionState.getExecutionState());
-        maybeHandleTaskFailure(taskExecutionState, 
getCurrentExecutionOfVertex(executionVertexId));
+        schedulingStrategy.onExecutionStateChange(executionVertexId, 
ExecutionState.FINISHED);
     }
 
-    private void maybeHandleTaskFailure(
-            final TaskExecutionStateTransition taskExecutionState, final 
Execution execution) {
+    @Override
+    protected void onTaskFailed(final Execution execution) {
+        checkState(execution.getState() == ExecutionState.FAILED);
+        checkState(execution.getFailureInfo().isPresent());
+
+        final Throwable error =
+                
execution.getFailureInfo().get().getException().deserializeError(userCodeLoader);
+        handleTaskFailure(
+                execution,
+                maybeTranslateToCachedIntermediateDataSetException(
+                        error, execution.getVertex().getID()));
+    }
 
-        if (taskExecutionState.getExecutionState() == ExecutionState.FAILED) {
-            final Throwable error = 
taskExecutionState.getError(userCodeLoader);
-            handleTaskFailure(execution, error);
-        }
+    protected void handleTaskFailure(
+            final Execution failedExecution, @Nullable final Throwable error) {
+        maybeRestartTasks(recordTaskFailure(failedExecution, error));
     }
 
-    private void handleTaskFailure(
+    protected FailureHandlingResult recordTaskFailure(
             final Execution failedExecution, @Nullable final Throwable error) {
-        Throwable revisedError =
-                maybeTranslateToCachedIntermediateDataSetException(
-                        error, failedExecution.getVertex().getID());
         final long timestamp = System.currentTimeMillis();
-        setGlobalFailureCause(revisedError, timestamp);
-        
notifyCoordinatorsAboutTaskFailure(failedExecution.getVertex().getID(), 
revisedError);
+        setGlobalFailureCause(error, timestamp);
+        notifyCoordinatorsAboutTaskFailure(failedExecution, error);
 
-        final FailureHandlingResult failureHandlingResult =
-                executionFailureHandler.getFailureHandlingResult(
-                        failedExecution, revisedError, timestamp);
-        maybeRestartTasks(failureHandlingResult);
+        return 
executionFailureHandler.getFailureHandlingResult(failedExecution, error, 
timestamp);
     }
 
     private Throwable maybeTranslateToCachedIntermediateDataSetException(
@@ -286,10 +280,9 @@ public class DefaultScheduler extends SchedulerBase 
implements SchedulerOperatio
     }
 
     private void notifyCoordinatorsAboutTaskFailure(
-            final ExecutionVertexID executionVertexId, @Nullable final 
Throwable error) {
-        final ExecutionJobVertex jobVertex =
-                getExecutionJobVertex(executionVertexId.getJobVertexId());
-        final int subtaskIndex = executionVertexId.getSubtaskIndex();
+            final Execution execution, @Nullable final Throwable error) {
+        final ExecutionJobVertex jobVertex = 
execution.getVertex().getJobVertex();
+        final int subtaskIndex = execution.getParallelSubtaskIndex();
 
         jobVertex.getOperatorCoordinators().forEach(c -> 
c.subtaskFailed(subtaskIndex, error));
     }
@@ -324,13 +317,24 @@ public class DefaultScheduler extends SchedulerBase 
implements SchedulerOperatio
                                 .values());
         final boolean globalRecovery = failureHandlingResult.isGlobalFailure();
 
+        if (globalRecovery) {
+            log.info(
+                    "{} tasks will be restarted to recover from a global 
failure.",
+                    verticesToRestart.size());
+        } else {
+            
checkArgument(failureHandlingResult.getFailedExecution().isPresent());
+            log.info(
+                    "{} tasks will be restarted to recover the failed task 
{}.",
+                    verticesToRestart.size(),
+                    
failureHandlingResult.getFailedExecution().get().getAttemptId());
+        }
+
         addVerticesToRestartPending(verticesToRestart);
 
         final CompletableFuture<?> cancelFuture = 
cancelTasksAsync(verticesToRestart);
 
         final FailureHandlingResultSnapshot failureHandlingResultSnapshot =
-                FailureHandlingResultSnapshot.create(
-                        failureHandlingResult, id -> 
getExecutionVertex(id).getCurrentExecutions());
+                createFailureHandlingResultSnapshot(failureHandlingResult);
         delayExecutor.schedule(
                 () ->
                         FutureUtils.assertNoException(
@@ -345,6 +349,12 @@ public class DefaultScheduler extends SchedulerBase 
implements SchedulerOperatio
                 TimeUnit.MILLISECONDS);
     }
 
+    protected FailureHandlingResultSnapshot 
createFailureHandlingResultSnapshot(
+            final FailureHandlingResult failureHandlingResult) {
+        return FailureHandlingResultSnapshot.create(
+                failureHandlingResult, id -> 
getExecutionVertex(id).getCurrentExecutions());
+    }
+
     private void addVerticesToRestartPending(final Set<ExecutionVertexID> 
verticesToRestart) {
         verticesWaitingForRestart.addAll(verticesToRestart);
         transitionExecutionGraphState(JobStatus.RUNNING, JobStatus.RESTARTING);
@@ -380,9 +390,7 @@ public class DefaultScheduler extends SchedulerBase 
implements SchedulerOperatio
     private CompletableFuture<?> cancelTasksAsync(final Set<ExecutionVertexID> 
verticesToRestart) {
         // clean up all the related pending requests to avoid that immediately 
returned slot
         // is used to fulfill the pending requests of these tasks
-        verticesToRestart.stream()
-                .map(this::getCurrentExecutionIdOfVertex)
-                .forEach(executionSlotAllocator::cancel);
+        cancelAllPendingSlotRequestsForVertices(verticesToRestart);
 
         final List<CompletableFuture<?>> cancelFutures =
                 verticesToRestart.stream()
@@ -393,15 +401,27 @@ public class DefaultScheduler extends SchedulerBase 
implements SchedulerOperatio
     }
 
     private CompletableFuture<?> cancelExecutionVertex(final ExecutionVertexID 
executionVertexId) {
-        final ExecutionVertex vertex = getExecutionVertex(executionVertexId);
+        return FutureUtils.combineAll(
+                
getExecutionVertex(executionVertexId).getCurrentExecutions().stream()
+                        .map(this::cancelExecution)
+                        .collect(Collectors.toList()));
+    }
 
-        notifyCoordinatorOfCancellation(vertex);
+    protected CompletableFuture<?> cancelExecution(final Execution execution) {
+        notifyCoordinatorOfCancellation(execution);
+        return executionOperations.cancel(execution);
+    }
 
-        return executionOperations.cancel(vertex.getCurrentExecutionAttempt());
+    private void cancelAllPendingSlotRequestsForVertices(
+            final Set<ExecutionVertexID> executionVertices) {
+        executionVertices.forEach(this::cancelAllPendingSlotRequestsForVertex);
     }
 
-    private ExecutionAttemptID getCurrentExecutionIdOfVertex(ExecutionVertexID 
executionVertexId) {
-        return getCurrentExecutionOfVertex(executionVertexId).getAttemptId();
+    protected void cancelAllPendingSlotRequestsForVertex(
+            final ExecutionVertexID executionVertexId) {
+        getExecutionVertex(executionVertexId)
+                .getCurrentExecutions()
+                .forEach(e -> executionSlotAllocator.cancel(e.getAttemptId()));
     }
 
     private Execution getCurrentExecutionOfVertex(ExecutionVertexID 
executionVertexId) {
@@ -445,23 +465,18 @@ public class DefaultScheduler extends SchedulerBase 
implements SchedulerOperatio
         }
     }
 
-    private void notifyCoordinatorOfCancellation(ExecutionVertex vertex) {
+    private void notifyCoordinatorOfCancellation(Execution execution) {
         // this method makes a best effort to filter out duplicate 
notifications, meaning cases
-        // where
-        // the coordinator was already notified for that specific task
+        // where the coordinator was already notified for that specific task
         // we don't notify if the task is already FAILED, CANCELLING, or 
CANCELED
-
-        final ExecutionState currentState = vertex.getExecutionState();
+        final ExecutionState currentState = execution.getState();
         if (currentState == ExecutionState.FAILED
                 || currentState == ExecutionState.CANCELING
                 || currentState == ExecutionState.CANCELED) {
             return;
         }
 
-        for (OperatorCoordinatorHolder coordinator :
-                vertex.getJobVertex().getOperatorCoordinators()) {
-            coordinator.subtaskFailed(vertex.getParallelSubtaskIndex(), null);
-        }
+        notifyCoordinatorsAboutTaskFailure(execution, null);
     }
 
     private class DefaultExecutionSlotAllocationContext implements 
ExecutionSlotAllocationContext {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
index de7d56315bd..72893347982 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.blob.BlobWriter;
+import org.apache.flink.runtime.blocklist.BlocklistOperations;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
@@ -72,7 +73,8 @@ public class DefaultSchedulerFactory implements 
SchedulerNGFactory {
             long initializationTimestamp,
             final ComponentMainThreadExecutor mainThreadExecutor,
             final FatalErrorHandler fatalErrorHandler,
-            final JobStatusListener jobStatusListener)
+            final JobStatusListener jobStatusListener,
+            final BlocklistOperations blocklistOperations)
             throws Exception {
 
         final SlotPool slotPool =
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionDeployer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionDeployer.java
index adb5ff76c87..a42eb97b644 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionDeployer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionDeployer.java
@@ -32,7 +32,7 @@ import java.util.Map;
 import java.util.function.BiConsumer;
 
 /** This deployer is responsible for deploying executions. */
-interface ExecutionDeployer {
+public interface ExecutionDeployer {
 
     /**
      * Allocate slots and deploy executions.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexVersioner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexVersioner.java
index 42e5f3387c2..b0a0b17db0d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexVersioner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexVersioner.java
@@ -83,6 +83,15 @@ public class ExecutionVertexVersioner {
                 .collect(Collectors.toSet());
     }
 
+    public Map<ExecutionVertexID, ExecutionVertexVersion> 
getExecutionVertexVersions(
+            Collection<ExecutionVertexID> executionVertexIds) {
+        return executionVertexIds.stream()
+                .map(id -> new ExecutionVertexVersion(id, 
getCurrentVersion(id)))
+                .collect(
+                        Collectors.toMap(
+                                ExecutionVertexVersion::getExecutionVertexId, 
Function.identity()));
+    }
+
     ExecutionVertexVersion getExecutionVertexVersion(ExecutionVertexID 
executionVertexId) {
         final long currentVersion = getCurrentVersion(executionVertexId);
         return new ExecutionVertexVersion(executionVertexId, currentVersion);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index 53547ab4104..f240dce094c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -706,49 +706,41 @@ public abstract class SchedulerBase implements 
SchedulerNG, CheckpointScheduling
     @Override
     public final boolean updateTaskExecutionState(
             final TaskExecutionStateTransition taskExecutionState) {
-        final ExecutionVertexID executionVertexId =
-                taskExecutionState.getID().getExecutionVertexId();
 
-        boolean updateSuccess = executionGraph.updateState(taskExecutionState);
-
-        if (updateSuccess) {
-            if (isNotifiable(executionVertexId, taskExecutionState)) {
-                updateTaskExecutionStateInternal(executionVertexId, 
taskExecutionState);
-            }
+        final ExecutionAttemptID attemptId = taskExecutionState.getID();
+        final Execution execution = 
executionGraph.getRegisteredExecutions().get(attemptId);
+        if (execution != null && 
executionGraph.updateState(taskExecutionState)) {
+            onTaskExecutionStateUpdate(execution, taskExecutionState);
             return true;
-        } else {
-            return false;
         }
+
+        return false;
     }
 
-    private boolean isNotifiable(
-            final ExecutionVertexID executionVertexId,
-            final TaskExecutionStateTransition taskExecutionState) {
+    private void onTaskExecutionStateUpdate(
+            final Execution execution, final TaskExecutionStateTransition 
taskExecutionState) {
 
-        final ExecutionVertex executionVertex = 
getExecutionVertex(executionVertexId);
+        // only notifies a state update if it's effective, namely it 
successfully
+        // turns the execution state to the expected value.
+        if (execution.getState() != taskExecutionState.getExecutionState()) {
+            return;
+        }
 
         // only notifies FINISHED and FAILED states which are needed at the 
moment.
-        // can be refined in FLINK-14233 after the legacy scheduler is removed 
and
-        // the actions are factored out from ExecutionGraph.
+        // can be refined in FLINK-14233 after the actions are factored out 
from ExecutionGraph.
         switch (taskExecutionState.getExecutionState()) {
             case FINISHED:
-            case FAILED:
-                // only notifies a state update if it's effective, namely it 
successfully
-                // turns the execution state to the expected value.
-                if (executionVertex.getExecutionState() == 
taskExecutionState.getExecutionState()) {
-                    return true;
-                }
+                onTaskFinished(execution);
                 break;
-            default:
+            case FAILED:
+                onTaskFailed(execution);
                 break;
         }
-
-        return false;
     }
 
-    protected void updateTaskExecutionStateInternal(
-            final ExecutionVertexID executionVertexId,
-            final TaskExecutionStateTransition taskExecutionState) {}
+    protected abstract void onTaskFinished(final Execution execution);
+
+    protected abstract void onTaskFailed(final Execution execution);
 
     @Override
     public SerializedInputSplit requestNextInputSplit(
@@ -770,7 +762,7 @@ public abstract class SchedulerBase implements SchedulerNG, 
CheckpointScheduling
     }
 
     @VisibleForTesting
-    Iterable<RootExceptionHistoryEntry> getExceptionHistory() {
+    public Iterable<RootExceptionHistoryEntry> getExceptionHistory() {
         return exceptionHistory.toArrayList();
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNGFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNGFactory.java
index 27b92b3d8a8..d3809b6b353 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNGFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNGFactory.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.blob.BlobWriter;
+import org.apache.flink.runtime.blocklist.BlocklistOperations;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.runtime.executiongraph.JobStatusListener;
@@ -61,7 +62,8 @@ public interface SchedulerNGFactory {
             long initializationTimestamp,
             ComponentMainThreadExecutor mainThreadExecutor,
             FatalErrorHandler fatalErrorHandler,
-            JobStatusListener jobStatusListener)
+            JobStatusListener jobStatusListener,
+            BlocklistOperations blocklistOperations)
             throws Exception;
 
     JobManagerOptions.SchedulerType getSchedulerType();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SimpleExecutionSlotAllocator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SimpleExecutionSlotAllocator.java
new file mode 100644
index 00000000000..e5d6d8ad1e7
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SimpleExecutionSlotAllocator.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequest;
+import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot;
+import org.apache.flink.runtime.util.DualKeyLinkedMap;
+import org.apache.flink.util.FlinkException;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A simple implementation of {@link ExecutionSlotAllocator}. No support for 
slot sharing,
+ * co-location, state/input locality, nor local recovery.
+ */
+public class SimpleExecutionSlotAllocator implements ExecutionSlotAllocator {
+    private final PhysicalSlotProvider slotProvider;
+
+    private final boolean slotWillBeOccupiedIndefinitely;
+
+    private final Function<ExecutionAttemptID, ResourceProfile> 
resourceProfileRetriever;
+
+    private final DualKeyLinkedMap<
+                    ExecutionAttemptID, SlotRequestId, 
CompletableFuture<LogicalSlot>>
+            requestedPhysicalSlots;
+
+    SimpleExecutionSlotAllocator(
+            PhysicalSlotProvider slotProvider,
+            Function<ExecutionAttemptID, ResourceProfile> 
resourceProfileRetriever,
+            boolean slotWillBeOccupiedIndefinitely) {
+        this.slotProvider = checkNotNull(slotProvider);
+        this.slotWillBeOccupiedIndefinitely = slotWillBeOccupiedIndefinitely;
+        this.resourceProfileRetriever = checkNotNull(resourceProfileRetriever);
+        this.requestedPhysicalSlots = new DualKeyLinkedMap<>();
+    }
+
+    @Override
+    public List<ExecutionSlotAssignment> allocateSlotsFor(
+            List<ExecutionAttemptID> executionAttemptIds) {
+        return executionAttemptIds.stream()
+                .map(id -> new ExecutionSlotAssignment(id, 
allocateSlotFor(id)))
+                .collect(Collectors.toList());
+    }
+
+    private CompletableFuture<LogicalSlot> allocateSlotFor(ExecutionAttemptID 
executionAttemptId) {
+        if (requestedPhysicalSlots.containsKeyA(executionAttemptId)) {
+            return requestedPhysicalSlots.getValueByKeyA(executionAttemptId);
+        }
+        final SlotRequestId slotRequestId = new SlotRequestId();
+        final ResourceProfile resourceProfile = 
resourceProfileRetriever.apply(executionAttemptId);
+        final SlotProfile slotProfile =
+                SlotProfile.priorAllocation(
+                        resourceProfile,
+                        resourceProfile,
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        Collections.emptySet());
+        final PhysicalSlotRequest request =
+                new PhysicalSlotRequest(slotRequestId, slotProfile, 
slotWillBeOccupiedIndefinitely);
+        final CompletableFuture<LogicalSlot> slotFuture =
+                slotProvider
+                        .allocatePhysicalSlot(request)
+                        .thenApply(
+                                physicalSlotRequest ->
+                                        allocateLogicalSlotFromPhysicalSlot(
+                                                slotRequestId,
+                                                
physicalSlotRequest.getPhysicalSlot(),
+                                                
slotWillBeOccupiedIndefinitely));
+        slotFuture.exceptionally(
+                throwable -> {
+                    this.requestedPhysicalSlots.removeKeyA(executionAttemptId);
+                    this.slotProvider.cancelSlotRequest(slotRequestId, 
throwable);
+                    return null;
+                });
+        this.requestedPhysicalSlots.put(executionAttemptId, slotRequestId, 
slotFuture);
+        return slotFuture;
+    }
+
+    @Override
+    public void cancel(ExecutionAttemptID executionAttemptId) {
+        final CompletableFuture<LogicalSlot> slotFuture =
+                this.requestedPhysicalSlots.getValueByKeyA(executionAttemptId);
+        if (slotFuture != null) {
+            slotFuture.cancel(false);
+        }
+    }
+
+    private void returnLogicalSlot(LogicalSlot slot) {
+        releaseSlot(
+                slot,
+                new FlinkException("Slot is being returned from 
SimpleExecutionSlotAllocator."));
+    }
+
+    private void releaseSlot(LogicalSlot slot, Throwable cause) {
+        requestedPhysicalSlots.removeKeyB(slot.getSlotRequestId());
+        slotProvider.cancelSlotRequest(slot.getSlotRequestId(), cause);
+    }
+
+    private LogicalSlot allocateLogicalSlotFromPhysicalSlot(
+            final SlotRequestId slotRequestId,
+            final PhysicalSlot physicalSlot,
+            final boolean slotWillBeOccupiedIndefinitely) {
+
+        final SingleLogicalSlot singleLogicalSlot =
+                new SingleLogicalSlot(
+                        slotRequestId,
+                        physicalSlot,
+                        Locality.UNKNOWN,
+                        this::returnLogicalSlot,
+                        slotWillBeOccupiedIndefinitely);
+
+        final LogicalSlotHolder logicalSlotHolder = new 
LogicalSlotHolder(singleLogicalSlot);
+        if (physicalSlot.tryAssignPayload(logicalSlotHolder)) {
+            return singleLogicalSlot;
+        } else {
+            throw new IllegalStateException(
+                    "BUG: Unexpected physical slot payload assignment 
failure!");
+        }
+    }
+
+    private class LogicalSlotHolder implements PhysicalSlot.Payload {
+        private final SingleLogicalSlot logicalSlot;
+
+        private LogicalSlotHolder(SingleLogicalSlot logicalSlot) {
+            this.logicalSlot = checkNotNull(logicalSlot);
+        }
+
+        @Override
+        public void release(Throwable cause) {
+            logicalSlot.release(cause);
+            releaseSlot(logicalSlot, new FlinkException("Physical slot 
releases its payload."));
+        }
+
+        @Override
+        public boolean willOccupySlotIndefinitely() {
+            return logicalSlot.willOccupySlotIndefinitely();
+        }
+    }
+
+    /** Factory to instantiate a {@link SimpleExecutionSlotAllocator}. */
+    public static class Factory implements ExecutionSlotAllocatorFactory {
+        private final PhysicalSlotProvider slotProvider;
+
+        private final boolean slotWillBeOccupiedIndefinitely;
+
+        public Factory(PhysicalSlotProvider slotProvider, boolean 
slotWillBeOccupiedIndefinitely) {
+            this.slotProvider = slotProvider;
+            this.slotWillBeOccupiedIndefinitely = 
slotWillBeOccupiedIndefinitely;
+        }
+
+        @Override
+        public ExecutionSlotAllocator 
createInstance(ExecutionSlotAllocationContext context) {
+            return new SimpleExecutionSlotAllocator(
+                    slotProvider,
+                    id -> 
context.getResourceProfile(id.getExecutionVertexId()),
+                    slotWillBeOccupiedIndefinitely);
+        }
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java
index 61236a1c18e..2d59d56466d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.blob.BlobWriter;
+import org.apache.flink.runtime.blocklist.BlocklistOperations;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
@@ -80,7 +81,8 @@ public class AdaptiveSchedulerFactory implements 
SchedulerNGFactory {
             long initializationTimestamp,
             ComponentMainThreadExecutor mainThreadExecutor,
             FatalErrorHandler fatalErrorHandler,
-            JobStatusListener jobStatusListener)
+            JobStatusListener jobStatusListener,
+            BlocklistOperations blocklistOperations)
             throws Exception {
         final DeclarativeSlotPool declarativeSlotPool =
                 slotPoolService
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
index f744d68f28b..29d6bfa72a1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
@@ -26,10 +26,10 @@ import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.IntermediateResult;
 import org.apache.flink.runtime.executiongraph.JobStatusListener;
-import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
 import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategy;
 import 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
 import org.apache.flink.runtime.jobgraph.JobEdge;
@@ -51,7 +51,6 @@ import org.apache.flink.runtime.scheduler.SchedulerOperations;
 import org.apache.flink.runtime.scheduler.VertexParallelismStore;
 import 
org.apache.flink.runtime.scheduler.adaptivebatch.forwardgroup.ForwardGroup;
 import 
org.apache.flink.runtime.scheduler.adaptivebatch.forwardgroup.ForwardGroupComputeUtil;
-import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
 import org.apache.flink.runtime.shuffle.ShuffleMaster;
 import org.apache.flink.util.concurrent.ScheduledExecutor;
@@ -145,23 +144,20 @@ public class AdaptiveBatchScheduler extends 
DefaultScheduler implements Schedule
     }
 
     @Override
-    public void startSchedulingInternal() {
+    protected void startSchedulingInternal() {
         initializeVerticesIfPossible();
 
         super.startSchedulingInternal();
     }
 
     @Override
-    protected void updateTaskExecutionStateInternal(
-            final ExecutionVertexID executionVertexId,
-            final TaskExecutionStateTransition taskExecutionState) {
-
+    protected void onTaskFinished(final Execution execution) {
         initializeVerticesIfPossible();
 
-        super.updateTaskExecutionStateInternal(executionVertexId, 
taskExecutionState);
+        super.onTaskFinished(execution);
     }
 
-    private void initializeVerticesIfPossible() {
+    void initializeVerticesIfPossible() {
         final List<ExecutionJobVertex> newlyInitializedJobVertices = new 
ArrayList<>();
         try {
             final long createTimestamp = System.currentTimeMillis();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java
index 3aaad1dbd0d..621eaa9788b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java
@@ -25,11 +25,13 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ExecutionOptions;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.blob.BlobWriter;
+import org.apache.flink.runtime.blocklist.BlocklistOperations;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.JobStatusListener;
+import org.apache.flink.runtime.executiongraph.SpeculativeExecutionJobVertex;
 import 
org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategyFactoryLoader;
 import 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
 import 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategyFactoryLoader;
@@ -41,8 +43,6 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTracker;
 import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider;
 import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProviderImpl;
-import 
org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkChecker;
-import 
org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolService;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotSelectionStrategy;
@@ -55,17 +55,19 @@ import 
org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorFactory;
 import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner;
 import org.apache.flink.runtime.scheduler.SchedulerNG;
 import org.apache.flink.runtime.scheduler.SchedulerNGFactory;
-import 
org.apache.flink.runtime.scheduler.SlotSharingExecutionSlotAllocatorFactory;
+import org.apache.flink.runtime.scheduler.SimpleExecutionSlotAllocator;
 import 
org.apache.flink.runtime.scheduler.strategy.VertexwiseSchedulingStrategy;
 import org.apache.flink.runtime.shuffle.ShuffleMaster;
 import org.apache.flink.runtime.util.SlotSelectionStrategyUtils;
-import org.apache.flink.util.clock.SystemClock;
 import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
 
 import org.slf4j.Logger;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.Consumer;
 
 import static org.apache.flink.util.Preconditions.checkState;
 
@@ -92,7 +94,8 @@ public class AdaptiveBatchSchedulerFactory implements 
SchedulerNGFactory {
             long initializationTimestamp,
             ComponentMainThreadExecutor mainThreadExecutor,
             FatalErrorHandler fatalErrorHandler,
-            JobStatusListener jobStatusListener)
+            JobStatusListener jobStatusListener,
+            BlocklistOperations blocklistOperations)
             throws Exception {
 
         checkState(
@@ -108,17 +111,15 @@ public class AdaptiveBatchSchedulerFactory implements 
SchedulerNGFactory {
                                         new IllegalStateException(
                                                 "The AdaptiveBatchScheduler 
requires a SlotPool."));
 
-        final SlotSelectionStrategy slotSelectionStrategy =
-                SlotSelectionStrategyUtils.selectSlotSelectionStrategy(
-                        JobType.BATCH, jobMasterConfiguration);
-        final PhysicalSlotRequestBulkChecker bulkChecker =
-                PhysicalSlotRequestBulkCheckerImpl.createFromSlotPool(
-                        slotPool, SystemClock.getInstance());
-        final PhysicalSlotProvider physicalSlotProvider =
-                new PhysicalSlotProviderImpl(slotSelectionStrategy, slotPool);
+        final boolean enableSpeculativeExecution =
+                
jobMasterConfiguration.getBoolean(JobManagerOptions.SPECULATIVE_ENABLED);
+
+        final List<Consumer<ComponentMainThreadExecutor>> startUpActions = new 
ArrayList<>();
+        final Consumer<ComponentMainThreadExecutor> combinedStartUpActions =
+                m -> startUpActions.forEach(a -> a.accept(m));
+
         final ExecutionSlotAllocatorFactory allocatorFactory =
-                new SlotSharingExecutionSlotAllocatorFactory(
-                        physicalSlotProvider, false, bulkChecker, 
slotRequestTimeout);
+                createExecutionSlotAllocatorFactory(jobMasterConfiguration, 
slotPool);
 
         final RestartBackoffTimeStrategy restartBackoffTimeStrategy =
                 
RestartBackoffTimeStrategyFactoryLoader.createRestartBackoffTimeStrategyFactory(
@@ -147,34 +148,86 @@ public class AdaptiveBatchSchedulerFactory implements 
SchedulerNGFactory {
                         shuffleMaster,
                         partitionTracker,
                         true,
-                        new ExecutionJobVertex.Factory());
-
-        return new AdaptiveBatchScheduler(
-                log,
-                jobGraph,
-                ioExecutor,
-                jobMasterConfiguration,
-                bulkChecker::start,
-                new ScheduledExecutorServiceAdapter(futureExecutor),
-                userCodeLoader,
-                new CheckpointsCleaner(),
-                checkpointRecoveryFactory,
-                jobManagerJobMetricGroup,
-                new VertexwiseSchedulingStrategy.Factory(),
-                
FailoverStrategyFactoryLoader.loadFailoverStrategyFactory(jobMasterConfiguration),
-                restartBackoffTimeStrategy,
-                new DefaultExecutionOperations(),
-                new ExecutionVertexVersioner(),
-                allocatorFactory,
-                initializationTimestamp,
-                mainThreadExecutor,
-                jobStatusListener,
-                executionGraphFactory,
-                shuffleMaster,
-                rpcTimeout,
-                DefaultVertexParallelismDecider.from(jobMasterConfiguration),
-                DefaultVertexParallelismDecider.getNormalizedMaxParallelism(
-                        jobMasterConfiguration));
+                        
createExecutionJobVertexFactory(enableSpeculativeExecution));
+
+        if (enableSpeculativeExecution) {
+            return new SpeculativeScheduler(
+                    log,
+                    jobGraph,
+                    ioExecutor,
+                    jobMasterConfiguration,
+                    combinedStartUpActions,
+                    new ScheduledExecutorServiceAdapter(futureExecutor),
+                    userCodeLoader,
+                    new CheckpointsCleaner(),
+                    checkpointRecoveryFactory,
+                    jobManagerJobMetricGroup,
+                    new VertexwiseSchedulingStrategy.Factory(),
+                    FailoverStrategyFactoryLoader.loadFailoverStrategyFactory(
+                            jobMasterConfiguration),
+                    restartBackoffTimeStrategy,
+                    new DefaultExecutionOperations(),
+                    new ExecutionVertexVersioner(),
+                    allocatorFactory,
+                    initializationTimestamp,
+                    mainThreadExecutor,
+                    jobStatusListener,
+                    executionGraphFactory,
+                    shuffleMaster,
+                    rpcTimeout,
+                    
DefaultVertexParallelismDecider.from(jobMasterConfiguration),
+                    
DefaultVertexParallelismDecider.getNormalizedMaxParallelism(
+                            jobMasterConfiguration),
+                    blocklistOperations);
+        } else {
+            return new AdaptiveBatchScheduler(
+                    log,
+                    jobGraph,
+                    ioExecutor,
+                    jobMasterConfiguration,
+                    combinedStartUpActions,
+                    new ScheduledExecutorServiceAdapter(futureExecutor),
+                    userCodeLoader,
+                    new CheckpointsCleaner(),
+                    checkpointRecoveryFactory,
+                    jobManagerJobMetricGroup,
+                    new VertexwiseSchedulingStrategy.Factory(),
+                    FailoverStrategyFactoryLoader.loadFailoverStrategyFactory(
+                            jobMasterConfiguration),
+                    restartBackoffTimeStrategy,
+                    new DefaultExecutionOperations(),
+                    new ExecutionVertexVersioner(),
+                    allocatorFactory,
+                    initializationTimestamp,
+                    mainThreadExecutor,
+                    jobStatusListener,
+                    executionGraphFactory,
+                    shuffleMaster,
+                    rpcTimeout,
+                    
DefaultVertexParallelismDecider.from(jobMasterConfiguration),
+                    
DefaultVertexParallelismDecider.getNormalizedMaxParallelism(
+                            jobMasterConfiguration));
+        }
+    }
+
+    private static ExecutionSlotAllocatorFactory 
createExecutionSlotAllocatorFactory(
+            Configuration configuration, SlotPool slotPool) {
+        final SlotSelectionStrategy slotSelectionStrategy =
+                SlotSelectionStrategyUtils.selectSlotSelectionStrategy(
+                        JobType.BATCH, configuration);
+        final PhysicalSlotProvider physicalSlotProvider =
+                new PhysicalSlotProviderImpl(slotSelectionStrategy, slotPool);
+
+        return new SimpleExecutionSlotAllocator.Factory(physicalSlotProvider, 
false);
+    }
+
+    private static ExecutionJobVertex.Factory createExecutionJobVertexFactory(
+            boolean enableSpeculativeExecution) {
+        if (enableSpeculativeExecution) {
+            return new SpeculativeExecutionJobVertex.Factory();
+        } else {
+            return new ExecutionJobVertex.Factory();
+        }
     }
 
     private void checkAllExchangesBlocking(final JobGraph jobGraph) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java
new file mode 100644
index 00000000000..919ec05cf1d
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptivebatch;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.blocklist.BlockedNode;
+import org.apache.flink.runtime.blocklist.BlocklistOperations;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.JobStatusListener;
+import org.apache.flink.runtime.executiongraph.SpeculativeExecutionVertex;
+import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategy;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.FailureHandlingResult;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
+import org.apache.flink.runtime.io.network.partition.PartitionException;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.scheduler.ExecutionGraphFactory;
+import org.apache.flink.runtime.scheduler.ExecutionOperations;
+import org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorFactory;
+import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner;
+import 
org.apache.flink.runtime.scheduler.slowtaskdetector.ExecutionTimeBasedSlowTaskDetector;
+import org.apache.flink.runtime.scheduler.slowtaskdetector.SlowTaskDetector;
+import 
org.apache.flink.runtime.scheduler.slowtaskdetector.SlowTaskDetectorListener;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.concurrent.FutureUtils;
+import org.apache.flink.util.concurrent.ScheduledExecutor;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** The speculative scheduler. */
+public class SpeculativeScheduler extends AdaptiveBatchScheduler
+        implements SlowTaskDetectorListener {
+
+    private final int maxConcurrentExecutions;
+
+    private final Duration blockSlowNodeDuration;
+
+    private final BlocklistOperations blocklistOperations;
+
+    private final SlowTaskDetector slowTaskDetector;
+
+    public SpeculativeScheduler(
+            final Logger log,
+            final JobGraph jobGraph,
+            final Executor ioExecutor,
+            final Configuration jobMasterConfiguration,
+            final Consumer<ComponentMainThreadExecutor> startUpAction,
+            final ScheduledExecutor delayExecutor,
+            final ClassLoader userCodeLoader,
+            final CheckpointsCleaner checkpointsCleaner,
+            final CheckpointRecoveryFactory checkpointRecoveryFactory,
+            final JobManagerJobMetricGroup jobManagerJobMetricGroup,
+            final SchedulingStrategyFactory schedulingStrategyFactory,
+            final FailoverStrategy.Factory failoverStrategyFactory,
+            final RestartBackoffTimeStrategy restartBackoffTimeStrategy,
+            final ExecutionOperations executionOperations,
+            final ExecutionVertexVersioner executionVertexVersioner,
+            final ExecutionSlotAllocatorFactory executionSlotAllocatorFactory,
+            long initializationTimestamp,
+            final ComponentMainThreadExecutor mainThreadExecutor,
+            final JobStatusListener jobStatusListener,
+            final ExecutionGraphFactory executionGraphFactory,
+            final ShuffleMaster<?> shuffleMaster,
+            final Time rpcTimeout,
+            final VertexParallelismDecider vertexParallelismDecider,
+            final int defaultMaxParallelism,
+            final BlocklistOperations blocklistOperations)
+            throws Exception {
+
+        super(
+                log,
+                jobGraph,
+                ioExecutor,
+                jobMasterConfiguration,
+                startUpAction,
+                delayExecutor,
+                userCodeLoader,
+                checkpointsCleaner,
+                checkpointRecoveryFactory,
+                jobManagerJobMetricGroup,
+                schedulingStrategyFactory,
+                failoverStrategyFactory,
+                restartBackoffTimeStrategy,
+                executionOperations,
+                executionVertexVersioner,
+                executionSlotAllocatorFactory,
+                initializationTimestamp,
+                mainThreadExecutor,
+                jobStatusListener,
+                executionGraphFactory,
+                shuffleMaster,
+                rpcTimeout,
+                vertexParallelismDecider,
+                defaultMaxParallelism);
+
+        this.maxConcurrentExecutions =
+                jobMasterConfiguration.getInteger(
+                        
JobManagerOptions.SPECULATIVE_MAX_CONCURRENT_EXECUTIONS);
+
+        this.blockSlowNodeDuration =
+                
jobMasterConfiguration.get(JobManagerOptions.BLOCK_SLOW_NODE_DURATION);
+
+        this.blocklistOperations = checkNotNull(blocklistOperations);
+
+        this.slowTaskDetector = new 
ExecutionTimeBasedSlowTaskDetector(jobMasterConfiguration);
+    }
+
+    @Override
+    protected void startSchedulingInternal() {
+        super.startSchedulingInternal();
+        slowTaskDetector.start(getExecutionGraph(), this, 
getMainThreadExecutor());
+    }
+
+    @Override
+    public CompletableFuture<Void> closeAsync() {
+        slowTaskDetector.stop();
+        return super.closeAsync();
+    }
+
+    @Override
+    public SpeculativeExecutionVertex getExecutionVertex(ExecutionVertexID 
executionVertexId) {
+        return (SpeculativeExecutionVertex) 
super.getExecutionVertex(executionVertexId);
+    }
+
+    @Override
+    protected void onTaskFinished(final Execution execution) {
+        // cancel all un-terminated executions because the execution vertex 
has finished
+        
FutureUtils.assertNoException(cancelPendingExecutions(execution.getVertex().getID()));
+
+        super.onTaskFinished(execution);
+    }
+
+    private CompletableFuture<?> cancelPendingExecutions(
+            final ExecutionVertexID executionVertexId) {
+        // cancel all the related pending requests to avoid that slots 
returned by the canceled
+        // vertices are used to fulfill these pending requests
+        // do not cancel the FINISHED execution
+        cancelAllPendingSlotRequestsForVertex(executionVertexId);
+        return FutureUtils.combineAll(
+                
getExecutionVertex(executionVertexId).getCurrentExecutions().stream()
+                        .filter(e -> e.getState() != ExecutionState.FINISHED)
+                        .map(this::cancelExecution)
+                        .collect(Collectors.toList()));
+    }
+
+    @Override
+    protected void onTaskFailed(final Execution execution) {
+        final SpeculativeExecutionVertex executionVertex =
+                getExecutionVertex(execution.getVertex().getID());
+
+        // when an execution fails, remove it from current executions to make 
room for future
+        // speculative executions
+        executionVertex.archiveFailedExecution(execution.getAttemptId());
+
+        super.onTaskFailed(execution);
+    }
+
+    @Override
+    protected void handleTaskFailure(
+            final Execution failedExecution, @Nullable final Throwable error) {
+
+        final SpeculativeExecutionVertex executionVertex =
+                getExecutionVertex(failedExecution.getVertex().getID());
+
+        // if the execution vertex is not possible finish or a 
PartitionException occurred, trigger
+        // an execution vertex failover to recover
+        if (!isExecutionVertexPossibleToFinish(executionVertex)
+                || ExceptionUtils.findThrowable(error, 
PartitionException.class).isPresent()) {
+            super.handleTaskFailure(failedExecution, error);
+        } else {
+            // this is just a local failure and the execution vertex will not 
be fully restarted
+            handleLocalExecutionAttemptFailure(failedExecution, error);
+        }
+    }
+
+    private void handleLocalExecutionAttemptFailure(
+            final Execution failedExecution, @Nullable final Throwable error) {
+        executionSlotAllocator.cancel(failedExecution.getAttemptId());
+
+        final FailureHandlingResult failureHandlingResult =
+                recordTaskFailure(failedExecution, error);
+        if (failureHandlingResult.canRestart()) {
+            archiveFromFailureHandlingResult(
+                    
createFailureHandlingResultSnapshot(failureHandlingResult));
+        } else {
+            failJob(error, failureHandlingResult.getTimestamp());
+        }
+    }
+
+    private static boolean isExecutionVertexPossibleToFinish(
+            final SpeculativeExecutionVertex executionVertex) {
+        boolean anyExecutionPossibleToFinish = false;
+        for (Execution execution : executionVertex.getCurrentExecutions()) {
+            // if any execution has finished, no execution of the same 
execution vertex should fail
+            // after that
+            checkState(execution.getState() != ExecutionState.FINISHED);
+
+            if (execution.getState() == ExecutionState.CREATED
+                    || execution.getState() == ExecutionState.SCHEDULED
+                    || execution.getState() == ExecutionState.DEPLOYING
+                    || execution.getState() == ExecutionState.INITIALIZING
+                    || execution.getState() == ExecutionState.RUNNING) {
+                anyExecutionPossibleToFinish = true;
+            }
+        }
+        return anyExecutionPossibleToFinish;
+    }
+
+    @Override
+    public void notifySlowTasks(Map<ExecutionVertexID, 
Collection<ExecutionAttemptID>> slowTasks) {
+        // add slow nodes to blocklist before scheduling new speculative 
executions
+        final long blockedEndTimestamp =
+                System.currentTimeMillis() + blockSlowNodeDuration.toMillis();
+        final Collection<BlockedNode> nodesToBlock =
+                getSlowNodeIds(slowTasks).stream()
+                        .map(
+                                nodeId ->
+                                        new BlockedNode(
+                                                nodeId,
+                                                "Node is detected to be slow.",
+                                                blockedEndTimestamp))
+                        .collect(Collectors.toList());
+        blocklistOperations.addNewBlockedNodes(nodesToBlock);
+
+        final List<Execution> newSpeculativeExecutions = new ArrayList<>();
+        final Set<ExecutionVertexID> verticesToDeploy = new HashSet<>();
+        for (ExecutionVertexID executionVertexId : slowTasks.keySet()) {
+            final SpeculativeExecutionVertex executionVertex =
+                    getExecutionVertex(executionVertexId);
+
+            if (executionVertex.containsSources() || 
executionVertex.containsSinks()) {
+                continue;
+            }
+
+            final int currentConcurrentExecutions = 
executionVertex.getCurrentExecutions().size();
+            final int newSpeculativeExecutionsToDeploy =
+                    maxConcurrentExecutions - currentConcurrentExecutions;
+            if (newSpeculativeExecutionsToDeploy > 0) {
+                log.info(
+                        "{} ({}) is detected as a slow vertex, create and 
deploy {} new speculative executions for it.",
+                        executionVertex.getTaskNameWithSubtaskIndex(),
+                        executionVertex.getID(),
+                        newSpeculativeExecutionsToDeploy);
+
+                verticesToDeploy.add(executionVertexId);
+                IntStream.range(0, newSpeculativeExecutionsToDeploy)
+                        
.mapToObj(executionVertex::createNewSpeculativeExecution)
+                        .forEach(newSpeculativeExecutions::add);
+            }
+        }
+
+        executionDeployer.allocateSlotsAndDeploy(
+                newSpeculativeExecutions,
+                
executionVertexVersioner.getExecutionVertexVersions(verticesToDeploy));
+    }
+
+    private Set<String> getSlowNodeIds(
+            Map<ExecutionVertexID, Collection<ExecutionAttemptID>> slowTasks) {
+        final Set<ExecutionAttemptID> slowExecutions =
+                
slowTasks.values().stream().flatMap(Collection::stream).collect(Collectors.toSet());
+
+        return slowExecutions.stream()
+                .map(id -> 
getExecutionGraph().getRegisteredExecutions().get(id))
+                .map(Execution::getAssignedResourceLocation)
+                .map(TaskManagerLocation::getNodeId)
+                .collect(Collectors.toSet());
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterSchedulerTest.java
index 07fc6ed2e3c..ee542eb0822 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterSchedulerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterSchedulerTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.blob.BlobWriter;
+import org.apache.flink.runtime.blocklist.BlocklistOperations;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.runtime.executiongraph.JobStatusListener;
@@ -119,7 +120,8 @@ public class JobMasterSchedulerTest extends TestLogger {
                 long initializationTimestamp,
                 ComponentMainThreadExecutor mainThreadExecutor,
                 FatalErrorHandler fatalErrorHandler,
-                JobStatusListener jobStatusListener) {
+                JobStatusListener jobStatusListener,
+                BlocklistOperations blocklistOperations) {
             return TestingSchedulerNG.newBuilder()
                     .setStartSchedulingRunnable(
                             () -> {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBuilder.java
index 4c99e8ce5ec..7629e76a157 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBuilder.java
@@ -23,12 +23,14 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.blocklist.BlocklistOperations;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.JobStatusListener;
+import org.apache.flink.runtime.executiongraph.SpeculativeExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategy;
 import 
org.apache.flink.runtime.executiongraph.failover.flip1.NoRestartBackoffTimeStrategy;
 import 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
@@ -40,6 +42,7 @@ import 
org.apache.flink.runtime.jobmaster.DefaultExecutionDeploymentTracker;
 import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler;
+import org.apache.flink.runtime.scheduler.adaptivebatch.SpeculativeScheduler;
 import 
org.apache.flink.runtime.scheduler.adaptivebatch.VertexParallelismDecider;
 import 
org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
@@ -57,7 +60,7 @@ import java.util.concurrent.ScheduledExecutorService;
 
 import static 
org.apache.flink.runtime.scheduler.SchedulerBase.computeVertexParallelismStore;
 
-/** A builder to create {@link DefaultScheduler} instances for testing. */
+/** A builder to create {@link DefaultScheduler} or its subclass instances for 
testing. */
 public class DefaultSchedulerBuilder {
     private static final Logger LOG = 
LoggerFactory.getLogger(DefaultSchedulerBuilder.class);
 
@@ -95,6 +98,7 @@ public class DefaultSchedulerBuilder {
     private VertexParallelismDecider vertexParallelismDecider = (ignored) -> 0;
     private int defaultMaxParallelism =
             
JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_MAX_PARALLELISM.defaultValue();
+    private BlocklistOperations blocklistOperations = ignore -> {};
 
     public DefaultSchedulerBuilder(
             JobGraph jobGraph,
@@ -245,6 +249,11 @@ public class DefaultSchedulerBuilder {
         return this;
     }
 
+    public DefaultSchedulerBuilder setBlocklistOperations(BlocklistOperations 
blocklistOperations) {
+        this.blocklistOperations = blocklistOperations;
+        return this;
+    }
+
     public DefaultScheduler build() throws Exception {
         return new DefaultScheduler(
                 log,
@@ -301,7 +310,41 @@ public class DefaultSchedulerBuilder {
                 defaultMaxParallelism);
     }
 
+    public SpeculativeScheduler buildSpeculativeScheduler() throws Exception {
+        return new SpeculativeScheduler(
+                log,
+                jobGraph,
+                ioExecutor,
+                jobMasterConfiguration,
+                componentMainThreadExecutor -> {},
+                delayExecutor,
+                userCodeLoader,
+                checkpointCleaner,
+                checkpointRecoveryFactory,
+                jobManagerJobMetricGroup,
+                new VertexwiseSchedulingStrategy.Factory(),
+                failoverStrategyFactory,
+                restartBackoffTimeStrategy,
+                executionOperations,
+                executionVertexVersioner,
+                executionSlotAllocatorFactory,
+                System.currentTimeMillis(),
+                mainThreadExecutor,
+                jobStatusListener,
+                createExecutionGraphFactory(true, new 
SpeculativeExecutionJobVertex.Factory()),
+                shuffleMaster,
+                rpcTimeout,
+                vertexParallelismDecider,
+                defaultMaxParallelism,
+                blocklistOperations);
+    }
+
     private ExecutionGraphFactory createExecutionGraphFactory(boolean 
isDynamicGraph) {
+        return createExecutionGraphFactory(isDynamicGraph, new 
ExecutionJobVertex.Factory());
+    }
+
+    private ExecutionGraphFactory createExecutionGraphFactory(
+            boolean isDynamicGraph, ExecutionJobVertex.Factory 
executionJobVertexFactory) {
         return new DefaultExecutionGraphFactory(
                 jobMasterConfiguration,
                 userCodeLoader,
@@ -314,6 +357,6 @@ public class DefaultSchedulerBuilder {
                 shuffleMaster,
                 partitionTracker,
                 isDynamicGraph,
-                new ExecutionJobVertex.Factory());
+                executionJobVertexFactory);
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
index d78610c25aa..d3bad8be708 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
@@ -1690,7 +1690,7 @@ public class DefaultSchedulerTest extends TestLogger {
         schedulerClosed.get();
     }
 
-    private static TaskExecutionState createFailedTaskExecutionState(
+    public static TaskExecutionState createFailedTaskExecutionState(
             ExecutionAttemptID executionAttemptID) {
         return new TaskExecutionState(
                 executionAttemptID, ExecutionState.FAILED, new 
Exception("Expected failure cause"));
@@ -1745,7 +1745,7 @@ public class DefaultSchedulerTest extends TestLogger {
         scheduler.getJobTerminationFuture().get(TIMEOUT_MS, 
TimeUnit.MILLISECONDS);
     }
 
-    private static JobGraph singleNonParallelJobVertexJobGraph() {
+    public static JobGraph singleNonParallelJobVertexJobGraph() {
         return singleJobVertexJobGraph(1);
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SimpleExecutionSlotAllocatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SimpleExecutionSlotAllocatorTest.java
new file mode 100644
index 00000000000..1fd6ca850f6
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SimpleExecutionSlotAllocatorTest.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.jobmaster.TestingPayload;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequest;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.function.BiConsumerWithException;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+
+import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test suits for {@link SimpleExecutionSlotAllocator}. */
+class SimpleExecutionSlotAllocatorTest {
+
+    private static final ResourceProfile RESOURCE_PROFILE = 
ResourceProfile.fromResources(3, 5);
+    private static final ExecutionAttemptID EXECUTION_ATTEMPT_ID = 
createExecutionAttemptId();
+
+    @Test
+    void testSlotAllocation() {
+        final AllocationContext context = new AllocationContext();
+        final CompletableFuture<LogicalSlot> slotFuture =
+                context.allocateSlotsFor(EXECUTION_ATTEMPT_ID);
+        assertThat(slotFuture).isCompleted();
+        assertThat(context.getSlotProvider().getRequests()).hasSize(1);
+
+        final PhysicalSlotRequest slotRequest =
+                
context.getSlotProvider().getRequests().values().iterator().next();
+        
assertThat(slotRequest.getSlotProfile().getPhysicalSlotResourceProfile())
+                .isEqualTo(RESOURCE_PROFILE);
+    }
+
+    @Test
+    void testDuplicateAllocationDoesNotRecreateLogicalSlotFuture() throws 
Exception {
+        final AllocationContext context = new AllocationContext();
+
+        final CompletableFuture<LogicalSlot> slotFuture1 =
+                context.allocateSlotsFor(EXECUTION_ATTEMPT_ID);
+        final CompletableFuture<LogicalSlot> slotFuture2 =
+                context.allocateSlotsFor(EXECUTION_ATTEMPT_ID);
+
+        assertThat(slotFuture1.get()).isSameAs(slotFuture2.get());
+    }
+
+    @Test
+    void testFailedPhysicalSlotRequestFailsLogicalSlotFuture() {
+        final AllocationContext context =
+                new AllocationContext(
+                        
TestingPhysicalSlotProvider.createWithoutImmediatePhysicalSlotCreation(),
+                        false);
+        final CompletableFuture<LogicalSlot> slotFuture =
+                context.allocateSlotsFor(EXECUTION_ATTEMPT_ID);
+        final SlotRequestId slotRequestId =
+                
context.getSlotProvider().getFirstRequestOrFail().getSlotRequestId();
+
+        assertThat(slotFuture).isNotDone();
+        context.getSlotProvider()
+                .getResponses()
+                .get(slotRequestId)
+                .completeExceptionally(new Throwable());
+        assertThat(slotFuture).isCompletedExceptionally();
+
+        // next allocation allocates new slot
+        context.allocateSlotsFor(EXECUTION_ATTEMPT_ID);
+        assertThat(context.getSlotProvider().getRequests()).hasSize(2);
+    }
+
+    @Test
+    void testSlotWillBeOccupiedIndefinitelyFalse() throws Exception {
+        testSlotWillBeOccupiedIndefinitely(false);
+    }
+
+    @Test
+    void testSlotWillBeOccupiedIndefinitelyTrue() throws Exception {
+        testSlotWillBeOccupiedIndefinitely(true);
+    }
+
+    private static void testSlotWillBeOccupiedIndefinitely(boolean 
slotWillBeOccupiedIndefinitely)
+            throws Exception {
+        final AllocationContext context =
+                new AllocationContext(
+                        
TestingPhysicalSlotProvider.createWithInfiniteSlotCreation(),
+                        slotWillBeOccupiedIndefinitely);
+        context.allocateSlotsFor(EXECUTION_ATTEMPT_ID);
+
+        final PhysicalSlotRequest slotRequest = 
context.getSlotProvider().getFirstRequestOrFail();
+        assertThat(slotRequest.willSlotBeOccupiedIndefinitely())
+                .isEqualTo(slotWillBeOccupiedIndefinitely);
+
+        final TestingPhysicalSlot physicalSlot =
+                
context.getSlotProvider().getResponses().get(slotRequest.getSlotRequestId()).get();
+        assertThat(physicalSlot.getPayload()).isNotNull();
+        assertThat(physicalSlot.getPayload().willOccupySlotIndefinitely())
+                .isEqualTo(slotWillBeOccupiedIndefinitely);
+    }
+
+    @Test
+    void testLogicalSlotReleasingCancelsPhysicalSlotRequest() throws Exception 
{
+        testLogicalSlotRequestCancellationOrRelease(
+                true, true, (context, slotFuture) -> 
slotFuture.get().releaseSlot(null));
+    }
+
+    @Test
+    void testLogicalSlotCancellationCancelsPhysicalSlotRequest() throws 
Exception {
+        testLogicalSlotRequestCancellationOrRelease(
+                false,
+                true,
+                (context, slotFuture) -> {
+                    assertThatThrownBy(
+                                    () -> {
+                                        
context.getAllocator().cancel(EXECUTION_ATTEMPT_ID);
+                                        slotFuture.get();
+                                    })
+                            .as("The logical future must finish with a 
cancellation exception.")
+                            .isInstanceOf(CancellationException.class);
+                });
+    }
+
+    @Test
+    void 
testCompletedLogicalSlotCancellationDoesNotCancelPhysicalSlotRequest() throws 
Exception {
+        testLogicalSlotRequestCancellationOrRelease(
+                true,
+                false,
+                (context, slotFuture) -> {
+                    context.getAllocator().cancel(EXECUTION_ATTEMPT_ID);
+                    slotFuture.get();
+                });
+    }
+
+    private static void testLogicalSlotRequestCancellationOrRelease(
+            final boolean autoCompletePhysicalSlotFuture,
+            final boolean expectPhysicalSlotRequestCanceled,
+            final BiConsumerWithException<
+                            AllocationContext, CompletableFuture<LogicalSlot>, 
Exception>
+                    cancelOrReleaseAction)
+            throws Exception {
+
+        final TestingPhysicalSlotProvider physicalSlotProvider;
+        if (!autoCompletePhysicalSlotFuture) {
+            physicalSlotProvider =
+                    
TestingPhysicalSlotProvider.createWithoutImmediatePhysicalSlotCreation();
+        } else {
+            physicalSlotProvider = 
TestingPhysicalSlotProvider.createWithInfiniteSlotCreation();
+        }
+        final AllocationContext context = new 
AllocationContext(physicalSlotProvider, false);
+
+        final CompletableFuture<LogicalSlot> slotFuture1 =
+                context.allocateSlotsFor(EXECUTION_ATTEMPT_ID);
+
+        cancelOrReleaseAction.accept(context, slotFuture1);
+
+        final SlotRequestId slotRequestId =
+                
context.getSlotProvider().getFirstRequestOrFail().getSlotRequestId();
+        
assertThat(context.getSlotProvider().getCancellations().containsKey(slotRequestId))
+                .isEqualTo(expectPhysicalSlotRequestCanceled);
+
+        context.allocateSlotsFor(EXECUTION_ATTEMPT_ID);
+        final int expectedNumberOfRequests = expectPhysicalSlotRequestCanceled 
? 2 : 1;
+        
assertThat(context.getSlotProvider().getRequests()).hasSize(expectedNumberOfRequests);
+    }
+
+    @Test
+    void testPhysicalSlotReleasesLogicalSlots() throws Exception {
+        final AllocationContext context = new AllocationContext();
+        final CompletableFuture<LogicalSlot> slotFuture =
+                context.allocateSlotsFor(EXECUTION_ATTEMPT_ID);
+        final TestingPayload payload = new TestingPayload();
+        slotFuture.thenAccept(logicalSlot -> 
logicalSlot.tryAssignPayload(payload));
+        final SlotRequestId slotRequestId =
+                
context.getSlotProvider().getFirstRequestOrFail().getSlotRequestId();
+        final TestingPhysicalSlot physicalSlot =
+                context.getSlotProvider().getFirstResponseOrFail().get();
+
+        assertThat(payload.getTerminalStateFuture()).isNotDone();
+        assertThat(physicalSlot.getPayload()).isNotNull();
+
+        physicalSlot.getPayload().release(new Throwable());
+        assertThat(payload.getTerminalStateFuture()).isDone();
+        
assertThat(context.getSlotProvider().getCancellations()).containsKey(slotRequestId);
+
+        context.allocateSlotsFor(EXECUTION_ATTEMPT_ID);
+        // there should be one more physical slot allocation, as the first 
allocation should be
+        // removed after releasing all logical slots
+        assertThat(context.getSlotProvider().getRequests()).hasSize(2);
+    }
+
+    @Test
+    void testFailLogicalSlotIfPhysicalSlotIsFails() {
+        final AllocationContext context =
+                new AllocationContext(
+                        
TestingPhysicalSlotProvider.createWithFailingPhysicalSlotCreation(
+                                new FlinkException("test failure")),
+                        false);
+        final CompletableFuture<LogicalSlot> slotFuture =
+                context.allocateSlotsFor(EXECUTION_ATTEMPT_ID);
+
+        assertThat(slotFuture).isCompletedExceptionally();
+        assertThat(context.getSlotProvider().getCancellations().keySet())
+                .isEqualTo(context.getSlotProvider().getRequests().keySet());
+    }
+
+    private static class AllocationContext {
+        private final TestingPhysicalSlotProvider slotProvider;
+        private final boolean slotWillBeOccupiedIndefinitely;
+        private final SimpleExecutionSlotAllocator allocator;
+
+        public AllocationContext() {
+            this(TestingPhysicalSlotProvider.createWithInfiniteSlotCreation(), 
false);
+        }
+
+        public AllocationContext(
+                TestingPhysicalSlotProvider slotProvider, boolean 
slotWillBeOccupiedIndefinitely) {
+            this.slotProvider = slotProvider;
+            this.slotWillBeOccupiedIndefinitely = 
slotWillBeOccupiedIndefinitely;
+            this.allocator =
+                    new SimpleExecutionSlotAllocator(
+                            slotProvider,
+                            executionAttemptId -> RESOURCE_PROFILE,
+                            slotWillBeOccupiedIndefinitely);
+        }
+
+        private CompletableFuture<LogicalSlot> allocateSlotsFor(
+                ExecutionAttemptID executionAttemptId) {
+            return allocator
+                    
.allocateSlotsFor(Collections.singletonList(executionAttemptId))
+                    .get(0)
+                    .getLogicalSlotFuture();
+        }
+
+        public TestingPhysicalSlotProvider getSlotProvider() {
+            return slotProvider;
+        }
+
+        public boolean isSlotWillBeOccupiedIndefinitely() {
+            return slotWillBeOccupiedIndefinitely;
+        }
+
+        public SimpleExecutionSlotAllocator getAllocator() {
+            return allocator;
+        }
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNGFactory.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNGFactory.java
index 40a122e46f5..6233a42a805 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNGFactory.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNGFactory.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.blob.BlobWriter;
+import org.apache.flink.runtime.blocklist.BlocklistOperations;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.runtime.executiongraph.JobStatusListener;
@@ -69,7 +70,8 @@ public class TestingSchedulerNGFactory implements 
SchedulerNGFactory {
             long initializationTimestamp,
             ComponentMainThreadExecutor mainThreadExecutor,
             FatalErrorHandler fatalErrorHandler,
-            JobStatusListener jobStatusListener)
+            JobStatusListener jobStatusListener,
+            BlocklistOperations blocklistOperations)
             throws Exception {
         return schedulerNG;
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeSchedulerTest.java
new file mode 100644
index 00000000000..75456468447
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeSchedulerTest.java
@@ -0,0 +1,426 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptivebatch;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.blocklist.BlockedNode;
+import org.apache.flink.runtime.blocklist.BlocklistOperations;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.execution.SuppressRestartsException;
+import org.apache.flink.runtime.executiongraph.DefaultExecutionGraph;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.IOMetrics;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy;
+import 
org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.scheduler.DefaultExecutionOperations;
+import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
+import org.apache.flink.runtime.scheduler.TestExecutionOperationsDecorator;
+import 
org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.testutils.TestingUtils;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
+import org.apache.flink.util.ExecutorUtils;
+import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableMap;
+import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.completeCancellingForAllVertices;
+import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createNoOpVertex;
+import static 
org.apache.flink.runtime.scheduler.DefaultSchedulerTest.createFailedTaskExecutionState;
+import static 
org.apache.flink.runtime.scheduler.DefaultSchedulerTest.singleNonParallelJobVertexJobGraph;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link SpeculativeScheduler}. */
+class SpeculativeSchedulerTest {
+
+    @RegisterExtension
+    private static final TestExecutorExtension<ScheduledExecutorService> 
EXECUTOR_RESOURCE =
+            TestingUtils.defaultExecutorExtension();
+
+    private ScheduledExecutorService futureExecutor;
+
+    private ManuallyTriggeredScheduledExecutor taskRestartExecutor;
+    private TestExecutionOperationsDecorator testExecutionOperations;
+    private TestBlocklistOperations testBlocklistOperations;
+    private TestRestartBackoffTimeStrategy restartStrategy;
+
+    @BeforeEach
+    void setUp() {
+        futureExecutor = new DirectScheduledExecutorService();
+
+        taskRestartExecutor = new ManuallyTriggeredScheduledExecutor();
+        testExecutionOperations =
+                new TestExecutionOperationsDecorator(new 
DefaultExecutionOperations());
+        testBlocklistOperations = new TestBlocklistOperations();
+        restartStrategy = new TestRestartBackoffTimeStrategy(true, 0);
+    }
+
+    @AfterEach
+    void tearDown() {
+        if (futureExecutor != null) {
+            ExecutorUtils.gracefulShutdown(10, TimeUnit.SECONDS, 
futureExecutor);
+        }
+    }
+
+    @Test
+    void testStartScheduling() {
+        createSchedulerAndStartScheduling();
+        final List<ExecutionAttemptID> deployedExecutions =
+                testExecutionOperations.getDeployedExecutions();
+        assertThat(deployedExecutions).hasSize(1);
+    }
+
+    @Test
+    void testNotifySlowTasks() {
+        final SpeculativeScheduler scheduler = 
createSchedulerAndStartScheduling();
+        final ExecutionVertex ev = getOnlyExecutionVertex(scheduler);
+        final Execution attempt1 = ev.getCurrentExecutionAttempt();
+
+        assertThat(testExecutionOperations.getDeployedExecutions()).hasSize(1);
+
+        notifySlowTask(scheduler, attempt1);
+
+        assertThat(testExecutionOperations.getDeployedExecutions()).hasSize(2);
+        assertThat(testBlocklistOperations.getAllBlockedNodeIds())
+                
.containsExactly(attempt1.getAssignedResourceLocation().getNodeId());
+    }
+
+    @Test
+    void testNotifyDuplicatedSlowTasks() {
+        final SpeculativeScheduler scheduler = 
createSchedulerAndStartScheduling();
+        final ExecutionVertex ev = getOnlyExecutionVertex(scheduler);
+        final Execution attempt1 = ev.getCurrentExecutionAttempt();
+
+        notifySlowTask(scheduler, attempt1);
+
+        assertThat(testExecutionOperations.getDeployedExecutions()).hasSize(2);
+
+        // notify the execution as a slow task again
+        notifySlowTask(scheduler, attempt1);
+
+        assertThat(testExecutionOperations.getDeployedExecutions()).hasSize(2);
+
+        // fail attempt2 to make room for a new speculative execution
+        final Execution attempt2 = getExecution(ev, 1);
+        
scheduler.updateTaskExecutionState(createFailedTaskExecutionState(attempt2.getAttemptId()));
+
+        // notify the execution as a slow task again
+        notifySlowTask(scheduler, attempt1);
+
+        assertThat(testExecutionOperations.getDeployedExecutions()).hasSize(3);
+    }
+
+    @Test
+    void testRestartVertexIfAllSpeculativeExecutionFailed() {
+        final SpeculativeScheduler scheduler = 
createSchedulerAndStartScheduling();
+        final ExecutionVertex ev = getOnlyExecutionVertex(scheduler);
+        final Execution attempt1 = ev.getCurrentExecutionAttempt();
+
+        notifySlowTask(scheduler, attempt1);
+
+        assertThat(testExecutionOperations.getDeployedExecutions()).hasSize(2);
+
+        final ExecutionAttemptID attemptId1 = attempt1.getAttemptId();
+        final ExecutionAttemptID attemptId2 = getExecution(ev, 
1).getAttemptId();
+
+        
scheduler.updateTaskExecutionState(createFailedTaskExecutionState(attemptId1));
+        
scheduler.updateTaskExecutionState(createFailedTaskExecutionState(attemptId2));
+        taskRestartExecutor.triggerScheduledTasks();
+
+        assertThat(testExecutionOperations.getDeployedExecutions()).hasSize(3);
+    }
+
+    @Test
+    void testNoRestartIfNotAllSpeculativeExecutionFailed() {
+        final SpeculativeScheduler scheduler = 
createSchedulerAndStartScheduling();
+        final ExecutionVertex ev = getOnlyExecutionVertex(scheduler);
+        final Execution attempt1 = ev.getCurrentExecutionAttempt();
+
+        notifySlowTask(scheduler, attempt1);
+        
scheduler.updateTaskExecutionState(createFailedTaskExecutionState(attempt1.getAttemptId()));
+        taskRestartExecutor.triggerScheduledTasks();
+
+        assertThat(testExecutionOperations.getDeployedExecutions()).hasSize(2);
+    }
+
+    @Test
+    void testRestartVertexIfPartitionExceptionHappened() {
+        final SpeculativeScheduler scheduler = 
createSchedulerAndStartScheduling();
+        final ExecutionVertex ev = getOnlyExecutionVertex(scheduler);
+        final Execution attempt1 = ev.getCurrentExecutionAttempt();
+
+        notifySlowTask(scheduler, attempt1);
+        final Execution attempt2 = getExecution(ev, 1);
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        attempt1.getAttemptId(),
+                        ExecutionState.FAILED,
+                        new PartitionNotFoundException(new 
ResultPartitionID())));
+
+        assertThat(attempt2.getState()).isEqualTo(ExecutionState.CANCELING);
+
+        completeCancellingForAllVertices(scheduler.getExecutionGraph());
+        taskRestartExecutor.triggerScheduledTasks();
+
+        assertThat(testExecutionOperations.getDeployedExecutions()).hasSize(3);
+    }
+
+    @Test
+    void testCancelOtherCurrentExecutionsWhenAnyExecutionFinished() {
+        final SpeculativeScheduler scheduler = 
createSchedulerAndStartScheduling();
+        final ExecutionVertex ev = getOnlyExecutionVertex(scheduler);
+        final Execution attempt1 = ev.getCurrentExecutionAttempt();
+
+        notifySlowTask(scheduler, attempt1);
+        final Execution attempt2 = getExecution(ev, 1);
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(attempt1.getAttemptId(), 
ExecutionState.FINISHED));
+
+        assertThat(attempt2.getState()).isEqualTo(ExecutionState.CANCELING);
+    }
+
+    @Test
+    void testExceptionHistoryIfPartitionExceptionHappened() {
+        final SpeculativeScheduler scheduler = 
createSchedulerAndStartScheduling();
+        final ExecutionVertex ev = getOnlyExecutionVertex(scheduler);
+        final Execution attempt1 = ev.getCurrentExecutionAttempt();
+
+        notifySlowTask(scheduler, attempt1);
+
+        // A partition exception can result in a restart of the whole 
execution vertex.
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        attempt1.getAttemptId(),
+                        ExecutionState.FAILED,
+                        new PartitionNotFoundException(new 
ResultPartitionID())));
+
+        completeCancellingForAllVertices(scheduler.getExecutionGraph());
+        taskRestartExecutor.triggerScheduledTasks();
+
+        assertThat(scheduler.getExceptionHistory()).hasSize(1);
+
+        final RootExceptionHistoryEntry entry = 
scheduler.getExceptionHistory().iterator().next();
+        // the current execution attempt before the restarting should be 
attempt2 but the failure
+        // root exception should be attempt1
+        
assertThat(entry.getFailingTaskName()).isEqualTo(attempt1.getVertexWithAttempt());
+    }
+
+    @Test
+    void testLocalExecutionAttemptFailureIsCorrectlyRecorded() {
+        final SpeculativeScheduler scheduler = 
createSchedulerAndStartScheduling();
+        final ExecutionVertex ev = getOnlyExecutionVertex(scheduler);
+        final Execution attempt1 = ev.getCurrentExecutionAttempt();
+
+        notifySlowTask(scheduler, attempt1);
+
+        // the execution vertex will not be restarted if we only fails 
attempt1, but it still should
+        // be recorded in the execution graph and in exception history
+        final TaskExecutionState failedState =
+                createFailedTaskExecutionState(attempt1.getAttemptId());
+        scheduler.updateTaskExecutionState(failedState);
+
+        final ClassLoader classLoader = 
SpeculativeSchedulerTest.class.getClassLoader();
+        assertThat(scheduler.getExecutionGraph().getFailureInfo()).isNotNull();
+        
assertThat(scheduler.getExecutionGraph().getFailureInfo().getExceptionAsString())
+                .contains(failedState.getError(classLoader).getMessage());
+
+        assertThat(scheduler.getExceptionHistory()).hasSize(1);
+
+        final RootExceptionHistoryEntry entry = 
scheduler.getExceptionHistory().iterator().next();
+        
assertThat(entry.getFailingTaskName()).isEqualTo(attempt1.getVertexWithAttempt());
+    }
+
+    @Test
+    void testUnrecoverableLocalExecutionAttemptFailureWillFailJob() {
+        final SpeculativeScheduler scheduler = 
createSchedulerAndStartScheduling();
+        final ExecutionVertex ev = getOnlyExecutionVertex(scheduler);
+        final Execution attempt1 = ev.getCurrentExecutionAttempt();
+
+        notifySlowTask(scheduler, attempt1);
+
+        final TaskExecutionState failedState =
+                new TaskExecutionState(
+                        attempt1.getAttemptId(),
+                        ExecutionState.FAILED,
+                        new SuppressRestartsException(
+                                new Exception("Forced failure for testing.")));
+        scheduler.updateTaskExecutionState(failedState);
+
+        
assertThat(scheduler.getExecutionGraph().getState()).isEqualTo(JobStatus.FAILING);
+    }
+
+    @Test
+    void testLocalExecutionAttemptFailureAndForbiddenRestartWillFailJob() {
+        restartStrategy.setCanRestart(false);
+
+        final SpeculativeScheduler scheduler = 
createSchedulerAndStartScheduling();
+        final ExecutionVertex ev = getOnlyExecutionVertex(scheduler);
+        final Execution attempt1 = ev.getCurrentExecutionAttempt();
+
+        notifySlowTask(scheduler, attempt1);
+
+        final TaskExecutionState failedState =
+                createFailedTaskExecutionState(attempt1.getAttemptId());
+        scheduler.updateTaskExecutionState(failedState);
+
+        
assertThat(scheduler.getExecutionGraph().getState()).isEqualTo(JobStatus.FAILING);
+    }
+
+    @Test
+    public void testSpeculativeExecutionCombinedWithAdaptiveScheduling() 
throws Exception {
+        final JobVertex source = createNoOpVertex("source", 1);
+        final JobVertex sink = createNoOpVertex("sink", -1);
+        sink.connectNewDataSetAsInput(
+                source, DistributionPattern.ALL_TO_ALL, 
ResultPartitionType.BLOCKING);
+        final JobGraph jobGraph = JobGraphTestUtils.batchJobGraph(source, 
sink);
+
+        final ComponentMainThreadExecutor mainThreadExecutor =
+                ComponentMainThreadExecutorServiceAdapter.forMainThread();
+        final SpeculativeScheduler scheduler =
+                createSchedulerBuilder(jobGraph, mainThreadExecutor)
+                        .setVertexParallelismDecider(ignore -> 3)
+                        .buildSpeculativeScheduler();
+        mainThreadExecutor.execute(scheduler::startScheduling);
+
+        final DefaultExecutionGraph graph = (DefaultExecutionGraph) 
scheduler.getExecutionGraph();
+        final ExecutionJobVertex sourceExecutionJobVertex = 
graph.getJobVertex(source.getID());
+        final ExecutionJobVertex sinkExecutionJobVertex = 
graph.getJobVertex(sink.getID());
+
+        final ExecutionVertex sourceExecutionVertex = 
sourceExecutionJobVertex.getTaskVertices()[0];
+        assertThat(sourceExecutionVertex.getCurrentExecutions()).hasSize(1);
+
+        // trigger source vertex speculation
+        final Execution sourceAttempt1 = 
sourceExecutionVertex.getCurrentExecutionAttempt();
+        notifySlowTask(scheduler, sourceAttempt1);
+        assertThat(sourceExecutionVertex.getCurrentExecutions()).hasSize(2);
+
+        assertThat(sinkExecutionJobVertex.getParallelism()).isEqualTo(-1);
+
+        // Finishing any source execution attempt will finish the source 
execution vertex, and then
+        // finish the job vertex.
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        sourceAttempt1.getAttemptId(),
+                        ExecutionState.FINISHED,
+                        null,
+                        null,
+                        new IOMetrics(0, 0, 0, 0, 0, 0, 0)));
+        assertThat(sinkExecutionJobVertex.getParallelism()).isEqualTo(3);
+
+        // trigger sink vertex speculation
+        final ExecutionVertex sinkExecutionVertex = 
sinkExecutionJobVertex.getTaskVertices()[0];
+        final Execution sinkAttempt1 = 
sinkExecutionVertex.getCurrentExecutionAttempt();
+        notifySlowTask(scheduler, sinkAttempt1);
+        assertThat(sinkExecutionVertex.getCurrentExecutions()).hasSize(2);
+    }
+
+    private static Execution getExecution(ExecutionVertex executionVertex, int 
attemptNumber) {
+        return executionVertex.getCurrentExecutions().stream()
+                .filter(e -> e.getAttemptNumber() == attemptNumber)
+                .findFirst()
+                .get();
+    }
+
+    private static ExecutionVertex getOnlyExecutionVertex(SpeculativeScheduler 
scheduler) {
+        return 
Iterables.getOnlyElement(scheduler.getExecutionGraph().getAllExecutionVertices());
+    }
+
+    private SpeculativeScheduler createSchedulerAndStartScheduling() {
+        return 
createSchedulerAndStartScheduling(singleNonParallelJobVertexJobGraph());
+    }
+
+    private SpeculativeScheduler createSchedulerAndStartScheduling(final 
JobGraph jobGraph) {
+        final ComponentMainThreadExecutor mainThreadExecutor =
+                ComponentMainThreadExecutorServiceAdapter.forMainThread();
+
+        try {
+            final SpeculativeScheduler scheduler = createScheduler(jobGraph, 
mainThreadExecutor);
+            mainThreadExecutor.execute(scheduler::startScheduling);
+            return scheduler;
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private SpeculativeScheduler createScheduler(
+            final JobGraph jobGraph, final ComponentMainThreadExecutor 
mainThreadExecutor)
+            throws Exception {
+        return createSchedulerBuilder(jobGraph, 
mainThreadExecutor).buildSpeculativeScheduler();
+    }
+
+    private DefaultSchedulerBuilder createSchedulerBuilder(
+            final JobGraph jobGraph, final ComponentMainThreadExecutor 
mainThreadExecutor) {
+        return new DefaultSchedulerBuilder(
+                        jobGraph, mainThreadExecutor, 
EXECUTOR_RESOURCE.getExecutor())
+                .setBlocklistOperations(testBlocklistOperations)
+                .setExecutionOperations(testExecutionOperations)
+                .setFutureExecutor(futureExecutor)
+                .setDelayExecutor(taskRestartExecutor)
+                .setRestartBackoffTimeStrategy(restartStrategy);
+    }
+
+    private static void notifySlowTask(
+            final SpeculativeScheduler scheduler, final Execution slowTask) {
+        scheduler.notifySlowTasks(
+                ImmutableMap.of(
+                        slowTask.getVertex().getID(),
+                        Collections.singleton(slowTask.getAttemptId())));
+    }
+
+    private static class TestBlocklistOperations implements 
BlocklistOperations {
+        private final List<BlockedNode> blockedNodes = new ArrayList<>();
+
+        @Override
+        public void addNewBlockedNodes(Collection<BlockedNode> newNodes) {
+            blockedNodes.addAll(newNodes);
+        }
+
+        public Set<String> getAllBlockedNodeIds() {
+            return 
blockedNodes.stream().map(BlockedNode::getNodeId).collect(Collectors.toSet());
+        }
+    }
+}

Reply via email to