This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 21b92ac  [FLINK-12181][runtime] Port ExecutionGraphRestartTest to new 
codebase
21b92ac is described below

commit 21b92ac95f1824e9b1ca483fa3ffaaf77ef14d4a
Author: azagrebin <azagre...@users.noreply.github.com>
AuthorDate: Thu May 23 11:35:48 2019 +0200

    [FLINK-12181][runtime] Port ExecutionGraphRestartTest to new codebase
---
 .../executiongraph/ExecutionGraphRestartTest.java  | 709 ++++++++++-----------
 1 file changed, 331 insertions(+), 378 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index 1c5b650..1e6be72 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -21,11 +21,11 @@ package org.apache.flink.runtime.executiongraph;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.SuppressRestartsException;
 import 
org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
@@ -35,9 +35,6 @@ import 
org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import 
org.apache.flink.runtime.executiongraph.utils.NotCancelAckingTaskGateway;
 import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
 import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
-import org.apache.flink.runtime.instance.HardwareDescription;
-import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -45,10 +42,19 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
-import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
+import 
org.apache.flink.runtime.jobmaster.slotpool.LocationPreferenceSlotSelectionStrategy;
+import org.apache.flink.runtime.jobmaster.slotpool.Scheduler;
+import org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import 
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
@@ -58,23 +64,18 @@ import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.After;
-import org.junit.Assert;
 import org.junit.Test;
 
-import javax.annotation.Nonnull;
-
 import java.io.IOException;
-import java.net.InetAddress;
+import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 
-import scala.concurrent.duration.FiniteDuration;
-
 import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.completeCancellingForAllVertices;
 import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createNoOpVertex;
 import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createSimpleTestGraph;
@@ -97,7 +98,10 @@ public class ExecutionGraphRestartTest extends TestLogger {
 
        private final ScheduledExecutorService executor = 
Executors.newScheduledThreadPool(4);
 
-       private TestingComponentMainThreadExecutorServiceAdapter 
mainThreadExecutor = 
TestingComponentMainThreadExecutorServiceAdapter.forMainThread();
+       private static final TestingComponentMainThreadExecutorServiceAdapter 
mainThreadExecutor =
+               
TestingComponentMainThreadExecutorServiceAdapter.forMainThread();
+
+       private static final JobID TEST_JOB_ID = new JobID();
 
        @After
        public void shutdown() {
@@ -109,8 +113,8 @@ public class ExecutionGraphRestartTest extends TestLogger {
        @Test
        public void testNoManualRestart() throws Exception {
                NoRestartStrategy restartStrategy = new NoRestartStrategy();
-               Tuple2<ExecutionGraph, Instance> executionGraphInstanceTuple = 
createExecutionGraph(restartStrategy);
-               ExecutionGraph eg = executionGraphInstanceTuple.f0;
+               ExecutionGraph eg = createSimpleExecutionGraph(
+                       restartStrategy, new SimpleSlotProvider(TEST_JOB_ID, 
NUM_TASKS), createJobGraph());
 
                eg.getAllExecutionVertices().iterator().next().fail(new 
Exception("Test Exception"));
 
@@ -136,149 +140,128 @@ public class ExecutionGraphRestartTest extends 
TestLogger {
 
        @Test
        public void testRestartAutomatically() throws Exception {
-               Tuple2<ExecutionGraph, Instance> executionGraphInstanceTuple =
-                       
createExecutionGraph(TestRestartStrategy.directExecuting());
-
-               ExecutionGraph eg = executionGraphInstanceTuple.f0;
+               try (SlotPool slotPool = new SlotPoolImpl(TEST_JOB_ID)) {
+                       
restartAfterFailure(TestingExecutionGraphBuilder.newBuilder()
+                               
.setRestartStrategy(TestRestartStrategy.directExecuting())
+                               .buildAndScheduleForExecution(slotPool));
+               }
 
-               restartAfterFailure(eg, new FiniteDuration(2, 
TimeUnit.MINUTES), true);
        }
 
        @Test
        public void testCancelWhileRestarting() throws Exception {
                // We want to manually control the restart and delay
-               RestartStrategy restartStrategy = new 
InfiniteDelayRestartStrategy();
-               Tuple2<ExecutionGraph, Instance> executionGraphInstanceTuple = 
createExecutionGraph(restartStrategy);
-               ExecutionGraph executionGraph = executionGraphInstanceTuple.f0;
-               Instance instance = executionGraphInstanceTuple.f1;
+               try (SlotPool slotPool = new SlotPoolImpl(TEST_JOB_ID)) {
+                       TaskManagerLocation taskManagerLocation = new 
LocalTaskManagerLocation();
+                       final ExecutionGraph executionGraph = 
TestingExecutionGraphBuilder.newBuilder()
+                               .setRestartStrategy(new 
InfiniteDelayRestartStrategy())
+                               .setTaskManagerLocation(taskManagerLocation)
+                               .buildAndScheduleForExecution(slotPool);
 
-               // Kill the instance and wait for the job to restart
-               instance.markDead();
-               Assert.assertEquals(JobStatus.RESTARTING, 
executionGraph.getState());
+                       // Release the TaskManager and wait for the job to 
restart
+                       
slotPool.releaseTaskManager(taskManagerLocation.getResourceID(), new 
Exception("Test Exception"));
+                       assertEquals(JobStatus.RESTARTING, 
executionGraph.getState());
 
-               assertEquals(JobStatus.RESTARTING, executionGraph.getState());
+                       // Canceling needs to abort the restart
+                       executionGraph.cancel();
 
-               // Canceling needs to abort the restart
-               executionGraph.cancel();
+                       assertEquals(JobStatus.CANCELED, 
executionGraph.getState());
 
-               assertEquals(JobStatus.CANCELED, executionGraph.getState());
+                       // The restart has been aborted
+                       
executionGraph.restart(executionGraph.getGlobalModVersion());
 
-               // The restart has been aborted
-               executionGraph.restart(executionGraph.getGlobalModVersion());
+                       assertEquals(JobStatus.CANCELED, 
executionGraph.getState());
+               }
 
-               assertEquals(JobStatus.CANCELED, executionGraph.getState());
        }
 
        @Test
        public void testFailWhileRestarting() throws Exception {
-               Scheduler scheduler = new 
Scheduler(TestingUtils.defaultExecutionContext());
-
-               Instance instance = ExecutionGraphTestUtils.getInstance(
-                       new SimpleAckingTaskManagerGateway(),
-                       NUM_TASKS);
-
-               scheduler.newInstanceAvailable(instance);
-
-               // Blocking program
-               ExecutionGraph executionGraph = new ExecutionGraph(
-                       TestingUtils.defaultExecutor(),
-                       TestingUtils.defaultExecutor(),
-                       new JobID(),
-                       "TestJob",
-                       new Configuration(),
-                       new SerializedValue<>(new ExecutionConfig()),
-                       AkkaUtils.getDefaultTimeout(),
-                       // We want to manually control the restart and delay
-                       new InfiniteDelayRestartStrategy(),
-                       scheduler);
-
-               
executionGraph.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
-
-               JobVertex jobVertex = new JobVertex("NoOpInvokable");
-               jobVertex.setInvokableClass(NoOpInvokable.class);
-               jobVertex.setParallelism(NUM_TASKS);
-
-               JobGraph jobGraph = new JobGraph("TestJob", jobVertex);
-
-               
executionGraph.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
+               try (SlotPool slotPool = new SlotPoolImpl(TEST_JOB_ID)) {
+                       TaskManagerLocation taskManagerLocation = new 
LocalTaskManagerLocation();
+                       final ExecutionGraph executionGraph = 
TestingExecutionGraphBuilder.newBuilder()
+                               .setRestartStrategy(new 
InfiniteDelayRestartStrategy())
+                               .setTaskManagerLocation(taskManagerLocation)
+                               .buildAndScheduleForExecution(slotPool);
 
-               assertEquals(JobStatus.CREATED, executionGraph.getState());
+                       // Release the TaskManager and wait for the job to 
restart
+                       
slotPool.releaseTaskManager(taskManagerLocation.getResourceID(), new 
Exception("Test Exception"));
 
-               executionGraph.scheduleForExecution();
-
-               assertEquals(JobStatus.RUNNING, executionGraph.getState());
-
-               // Kill the instance and wait for the job to restart
-               instance.markDead();
-
-               assertEquals(JobStatus.RESTARTING, executionGraph.getState());
+                       assertEquals(JobStatus.RESTARTING, 
executionGraph.getState());
 
-               // If we fail when being in RESTARTING, then we should try to 
restart again
-               final long globalModVersion = 
executionGraph.getGlobalModVersion();
-               final Exception testException = new Exception("Test exception");
-               executionGraph.failGlobal(testException);
+                       // If we fail when being in RESTARTING, then we should 
try to restart again
+                       final long globalModVersion = 
executionGraph.getGlobalModVersion();
+                       final Exception testException = new Exception("Test 
exception");
+                       executionGraph.failGlobal(testException);
 
-               assertNotEquals(globalModVersion, 
executionGraph.getGlobalModVersion());
-               assertEquals(JobStatus.RESTARTING, executionGraph.getState());
-               assertEquals(testException, executionGraph.getFailureCause()); 
// we should have updated the failure cause
+                       assertNotEquals(globalModVersion, 
executionGraph.getGlobalModVersion());
+                       assertEquals(JobStatus.RESTARTING, 
executionGraph.getState());
+                       assertEquals(testException, 
executionGraph.getFailureCause()); // we should have updated the failure cause
 
-               // but it should fail when sending a SuppressRestartsException
-               executionGraph.failGlobal(new SuppressRestartsException(new 
Exception("Suppress restart exception")));
+                       // but it should fail when sending a 
SuppressRestartsException
+                       executionGraph.failGlobal(new 
SuppressRestartsException(new Exception("Suppress restart exception")));
 
-               assertEquals(JobStatus.FAILED, executionGraph.getState());
+                       assertEquals(JobStatus.FAILED, 
executionGraph.getState());
 
-               // The restart has been aborted
-               executionGraph.restart(executionGraph.getGlobalModVersion());
+                       // The restart has been aborted
+                       
executionGraph.restart(executionGraph.getGlobalModVersion());
 
-               assertEquals(JobStatus.FAILED, executionGraph.getState());
+                       assertEquals(JobStatus.FAILED, 
executionGraph.getState());
+               }
        }
 
        @Test
        public void testCancelWhileFailing() throws Exception {
-               final RestartStrategy restartStrategy = new 
InfiniteDelayRestartStrategy();
-               final ExecutionGraph graph = 
createExecutionGraph(restartStrategy).f0;
+               try (SlotPool slotPool = new SlotPoolImpl(TEST_JOB_ID)) {
+                       final ExecutionGraph graph = 
TestingExecutionGraphBuilder.newBuilder()
+                               .setRestartStrategy(new 
InfiniteDelayRestartStrategy())
+                               .buildAndScheduleForExecution(slotPool);
 
-               assertEquals(JobStatus.RUNNING, graph.getState());
+                       assertEquals(JobStatus.RUNNING, graph.getState());
 
-               // switch all tasks to running
-               for (ExecutionVertex vertex : 
graph.getVerticesTopologically().iterator().next().getTaskVertices()) {
-                       vertex.getCurrentExecutionAttempt().switchToRunning();
-               }
+                       // switch all tasks to running
+                       for (ExecutionVertex vertex : 
graph.getVerticesTopologically().iterator().next().getTaskVertices()) {
+                               
vertex.getCurrentExecutionAttempt().switchToRunning();
+                       }
 
-               graph.failGlobal(new Exception("test"));
+                       graph.failGlobal(new Exception("test"));
 
-               assertEquals(JobStatus.FAILING, graph.getState());
+                       assertEquals(JobStatus.FAILING, graph.getState());
 
-               graph.cancel();
+                       graph.cancel();
 
-               assertEquals(JobStatus.CANCELLING, graph.getState());
+                       assertEquals(JobStatus.CANCELLING, graph.getState());
 
-               // let all tasks finish cancelling
-               completeCanceling(graph);
+                       // let all tasks finish cancelling
+                       completeCanceling(graph);
+
+                       assertEquals(JobStatus.CANCELED, graph.getState());
+               }
 
-               assertEquals(JobStatus.CANCELED, graph.getState());
        }
 
        @Test
        public void testFailWhileCanceling() throws Exception {
-               final RestartStrategy restartStrategy = new NoRestartStrategy();
-               final ExecutionGraph graph = 
createExecutionGraph(restartStrategy).f0;
+               try (SlotPool slotPool = new SlotPoolImpl(TEST_JOB_ID)) {
+                       final ExecutionGraph graph = 
TestingExecutionGraphBuilder.newBuilder().buildAndScheduleForExecution(slotPool);
+
+                       assertEquals(JobStatus.RUNNING, graph.getState());
+                       switchAllTasksToRunning(graph);
 
-               assertEquals(JobStatus.RUNNING, graph.getState());
-               switchAllTasksToRunning(graph);
+                       graph.cancel();
 
-               graph.cancel();
+                       assertEquals(JobStatus.CANCELLING, graph.getState());
 
-               assertEquals(JobStatus.CANCELLING, graph.getState());
+                       graph.failGlobal(new Exception("test"));
 
-               graph.failGlobal(new Exception("test"));
+                       assertEquals(JobStatus.FAILING, graph.getState());
 
-               assertEquals(JobStatus.FAILING, graph.getState());
+                       // let all tasks finish cancelling
+                       completeCanceling(graph);
 
-               // let all tasks finish cancelling
-               completeCanceling(graph);
+                       assertEquals(JobStatus.FAILED, graph.getState());
+               }
 
-               assertEquals(JobStatus.FAILED, graph.getState());
        }
 
        private void switchAllTasksToRunning(ExecutionGraph graph) {
@@ -287,23 +270,28 @@ public class ExecutionGraphRestartTest extends TestLogger 
{
 
        @Test
        public void testNoRestartOnSuppressException() throws Exception {
-               final ExecutionGraph eg = createExecutionGraph(new 
FixedDelayRestartStrategy(Integer.MAX_VALUE, 0)).f0;
+               try (SlotPool slotPool = new SlotPoolImpl(TEST_JOB_ID)) {
+                       ExecutionGraph eg = 
TestingExecutionGraphBuilder.newBuilder()
+                               .setRestartStrategy(new 
FixedDelayRestartStrategy(Integer.MAX_VALUE, 0))
+                               .buildAndScheduleForExecution(slotPool);
 
-               // Fail with unrecoverable Exception
-               eg.getAllExecutionVertices().iterator().next().fail(
-                       new SuppressRestartsException(new Exception("Test 
Exception")));
+                       // Fail with unrecoverable Exception
+                       eg.getAllExecutionVertices().iterator().next().fail(
+                               new SuppressRestartsException(new 
Exception("Test Exception")));
 
-               assertEquals(JobStatus.FAILING, eg.getState());
+                       assertEquals(JobStatus.FAILING, eg.getState());
 
-               completeCanceling(eg);
+                       completeCanceling(eg);
 
-               eg.waitUntilTerminal();
-               assertEquals(JobStatus.FAILED, eg.getState());
+                       eg.waitUntilTerminal();
+                       assertEquals(JobStatus.FAILED, eg.getState());
 
-               RestartStrategy restartStrategy = eg.getRestartStrategy();
-               assertTrue(restartStrategy instanceof 
FixedDelayRestartStrategy);
+                       RestartStrategy restartStrategy = 
eg.getRestartStrategy();
+                       assertTrue(restartStrategy instanceof 
FixedDelayRestartStrategy);
+
+                       assertEquals(0, ((FixedDelayRestartStrategy) 
restartStrategy).getCurrentRestartAttempt());
+               }
 
-               assertEquals(0, ((FixedDelayRestartStrategy) 
restartStrategy).getCurrentRestartAttempt());
        }
 
        /**
@@ -313,56 +301,47 @@ public class ExecutionGraphRestartTest extends TestLogger 
{
         */
        @Test
        public void testFailingExecutionAfterRestart() throws Exception {
-               Instance instance = ExecutionGraphTestUtils.getInstance(
-                       new SimpleAckingTaskManagerGateway(),
-                       2);
-
-               Scheduler scheduler = new 
Scheduler(TestingUtils.defaultExecutionContext());
-               scheduler.newInstanceAvailable(instance);
-
-               TestRestartStrategy restartStrategy = 
TestRestartStrategy.directExecuting();
-
                JobVertex sender = 
ExecutionGraphTestUtils.createJobVertex("Task1", 1, NoOpInvokable.class);
                JobVertex receiver = 
ExecutionGraphTestUtils.createJobVertex("Task2", 1, NoOpInvokable.class);
                JobGraph jobGraph = new JobGraph("Pointwise job", sender, 
receiver);
-               ExecutionGraph eg = newExecutionGraph(restartStrategy, 
scheduler);
-               eg.start(mainThreadExecutor);
-               
eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
 
-               assertEquals(JobStatus.CREATED, eg.getState());
+               try (SlotPool slotPool = new SlotPoolImpl(TEST_JOB_ID)) {
+                       ExecutionGraph eg = 
TestingExecutionGraphBuilder.newBuilder()
+                               
.setRestartStrategy(TestRestartStrategy.directExecuting())
+                               .setJobGraph(jobGraph)
+                               .setNumberOfTasks(2)
+                               .buildAndScheduleForExecution(slotPool);
 
-               eg.scheduleForExecution();
-               assertEquals(JobStatus.RUNNING, eg.getState());
+                       Iterator<ExecutionVertex> executionVertices = 
eg.getAllExecutionVertices().iterator();
 
-               Iterator<ExecutionVertex> executionVertices = 
eg.getAllExecutionVertices().iterator();
+                       Execution finishedExecution = 
executionVertices.next().getCurrentExecutionAttempt();
+                       Execution failedExecution = 
executionVertices.next().getCurrentExecutionAttempt();
 
-               Execution finishedExecution = 
executionVertices.next().getCurrentExecutionAttempt();
-               Execution failedExecution = 
executionVertices.next().getCurrentExecutionAttempt();
+                       finishedExecution.markFinished();
 
-               finishedExecution.markFinished();
+                       failedExecution.fail(new Exception("Test Exception"));
+                       failedExecution.completeCancelling();
 
-               failedExecution.fail(new Exception("Test Exception"));
-               failedExecution.completeCancelling();
-
-               assertEquals(JobStatus.RUNNING, eg.getState());
+                       assertEquals(JobStatus.RUNNING, eg.getState());
 
-               // At this point all resources have been assigned
-               for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
-                       assertNotNull("No assigned resource (test 
instability).", vertex.getCurrentAssignedResource());
-                       vertex.getCurrentExecutionAttempt().switchToRunning();
-               }
+                       // At this point all resources have been assigned
+                       for (ExecutionVertex vertex : 
eg.getAllExecutionVertices()) {
+                               assertNotNull("No assigned resource (test 
instability).", vertex.getCurrentAssignedResource());
+                               
vertex.getCurrentExecutionAttempt().switchToRunning();
+                       }
 
-               // fail old finished execution, this should not affect the 
execution
-               finishedExecution.fail(new Exception("This should have no 
effect"));
+                       // fail old finished execution, this should not affect 
the execution
+                       finishedExecution.fail(new Exception("This should have 
no effect"));
 
-               for (ExecutionVertex vertex: eg.getAllExecutionVertices()) {
-                       vertex.getCurrentExecutionAttempt().markFinished();
-               }
+                       for (ExecutionVertex vertex: 
eg.getAllExecutionVertices()) {
+                               
vertex.getCurrentExecutionAttempt().markFinished();
+                       }
 
-               // the state of the finished execution should have not changed 
since it is terminal
-               assertEquals(ExecutionState.FINISHED, 
finishedExecution.getState());
+                       // the state of the finished execution should have not 
changed since it is terminal
+                       assertEquals(ExecutionState.FINISHED, 
finishedExecution.getState());
 
-               assertEquals(JobStatus.FINISHED, eg.getState());
+                       assertEquals(JobStatus.FINISHED, eg.getState());
+               }
        }
 
        /**
@@ -372,43 +351,27 @@ public class ExecutionGraphRestartTest extends TestLogger 
{
         */
        @Test
        public void testFailExecutionAfterCancel() throws Exception {
-               Instance instance = ExecutionGraphTestUtils.getInstance(
-                       new SimpleAckingTaskManagerGateway(),
-                       2);
-
-               Scheduler scheduler = new 
Scheduler(TestingUtils.defaultExecutionContext());
-               scheduler.newInstanceAvailable(instance);
-
-               JobVertex vertex = 
ExecutionGraphTestUtils.createJobVertex("Test Vertex", 1, NoOpInvokable.class);
-
-               ExecutionConfig executionConfig = new ExecutionConfig();
-               
executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(
-                       Integer.MAX_VALUE, Integer.MAX_VALUE));
-               JobGraph jobGraph = new JobGraph("Test Job", vertex);
-               jobGraph.setExecutionConfig(executionConfig);
-
-               ExecutionGraph eg = newExecutionGraph(new 
InfiniteDelayRestartStrategy(), scheduler);
+               try (SlotPool slotPool = new SlotPoolImpl(TEST_JOB_ID)) {
+                       ExecutionGraph eg = 
TestingExecutionGraphBuilder.newBuilder()
+                               .setRestartStrategy(new 
InfiniteDelayRestartStrategy())
+                               .setJobGraph(createJobGraphToCancel())
+                               .setNumberOfTasks(2)
+                               .buildAndScheduleForExecution(slotPool);
 
-               
eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
+                       // Fail right after cancel (for example with concurrent 
slot release)
+                       eg.cancel();
 
-               assertEquals(JobStatus.CREATED, eg.getState());
+                       for (ExecutionVertex v : eg.getAllExecutionVertices()) {
+                               v.getCurrentExecutionAttempt().fail(new 
Exception("Test Exception"));
+                       }
 
-               eg.scheduleForExecution();
-               assertEquals(JobStatus.RUNNING, eg.getState());
+                       assertEquals(JobStatus.CANCELED, 
eg.getTerminationFuture().get());
 
-               // Fail right after cancel (for example with concurrent slot 
release)
-               eg.cancel();
+                       Execution execution = 
eg.getAllExecutionVertices().iterator().next().getCurrentExecutionAttempt();
 
-               for (ExecutionVertex v : eg.getAllExecutionVertices()) {
-                       v.getCurrentExecutionAttempt().fail(new Exception("Test 
Exception"));
+                       execution.completeCancelling();
+                       assertEquals(JobStatus.CANCELED, eg.getState());
                }
-
-               assertEquals(JobStatus.CANCELED, 
eg.getTerminationFuture().get());
-
-               Execution execution = 
eg.getAllExecutionVertices().iterator().next().getCurrentExecutionAttempt();
-
-               execution.completeCancelling();
-               assertEquals(JobStatus.CANCELED, eg.getState());
        }
 
        /**
@@ -417,41 +380,25 @@ public class ExecutionGraphRestartTest extends TestLogger 
{
         */
        @Test
        public void testFailExecutionGraphAfterCancel() throws Exception {
-               Instance instance = ExecutionGraphTestUtils.getInstance(
-                       new SimpleAckingTaskManagerGateway(),
-                       2);
-
-               Scheduler scheduler = new 
Scheduler(TestingUtils.defaultExecutionContext());
-               scheduler.newInstanceAvailable(instance);
-
-               JobVertex vertex = 
ExecutionGraphTestUtils.createJobVertex("Test Vertex", 1, NoOpInvokable.class);
-
-               ExecutionConfig executionConfig = new ExecutionConfig();
-               
executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(
-                       Integer.MAX_VALUE, Integer.MAX_VALUE));
-               JobGraph jobGraph = new JobGraph("Test Job", vertex);
-               jobGraph.setExecutionConfig(executionConfig);
-
-               ExecutionGraph eg = newExecutionGraph(new 
InfiniteDelayRestartStrategy(), scheduler);
+               try (SlotPool slotPool = new SlotPoolImpl(TEST_JOB_ID)) {
+                       ExecutionGraph eg = 
TestingExecutionGraphBuilder.newBuilder()
+                               .setRestartStrategy(new 
InfiniteDelayRestartStrategy())
+                               .setJobGraph(createJobGraphToCancel())
+                               .setNumberOfTasks(2)
+                               .buildAndScheduleForExecution(slotPool);
 
-               
eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
+                       // Fail right after cancel (for example with concurrent 
slot release)
+                       eg.cancel();
+                       assertEquals(JobStatus.CANCELLING, eg.getState());
 
-               assertEquals(JobStatus.CREATED, eg.getState());
+                       eg.failGlobal(new Exception("Test Exception"));
+                       assertEquals(JobStatus.FAILING, eg.getState());
 
-               eg.scheduleForExecution();
-               assertEquals(JobStatus.RUNNING, eg.getState());
-
-               // Fail right after cancel (for example with concurrent slot 
release)
-               eg.cancel();
-               assertEquals(JobStatus.CANCELLING, eg.getState());
-
-               eg.failGlobal(new Exception("Test Exception"));
-               assertEquals(JobStatus.FAILING, eg.getState());
+                       Execution execution = 
eg.getAllExecutionVertices().iterator().next().getCurrentExecutionAttempt();
 
-               Execution execution = 
eg.getAllExecutionVertices().iterator().next().getCurrentExecutionAttempt();
-
-               execution.completeCancelling();
-               assertEquals(JobStatus.RESTARTING, eg.getState());
+                       execution.completeCancelling();
+                       assertEquals(JobStatus.RESTARTING, eg.getState());
+               }
        }
 
        /**
@@ -459,55 +406,29 @@ public class ExecutionGraphRestartTest extends TestLogger 
{
         */
        @Test
        public void testSuspendWhileRestarting() throws Exception {
-
-               Instance instance = ExecutionGraphTestUtils.getInstance(
-                       new SimpleAckingTaskManagerGateway(),
-                       NUM_TASKS);
-
-               Scheduler scheduler = new 
Scheduler(TestingUtils.defaultExecutionContext());
-               scheduler.newInstanceAvailable(instance);
-
-               JobVertex sender = new JobVertex("Task");
-               sender.setInvokableClass(NoOpInvokable.class);
-               sender.setParallelism(NUM_TASKS);
-
-               JobGraph jobGraph = new JobGraph("Pointwise job", sender);
-
                TestRestartStrategy controllableRestartStrategy = 
TestRestartStrategy.manuallyTriggered();
+               try (SlotPool slotPool = new SlotPoolImpl(TEST_JOB_ID)) {
+                       TaskManagerLocation taskManagerLocation = new 
LocalTaskManagerLocation();
+                       ExecutionGraph eg = 
TestingExecutionGraphBuilder.newBuilder()
+                               .setRestartStrategy(controllableRestartStrategy)
+                               .setTaskManagerLocation(taskManagerLocation)
+                               .buildAndScheduleForExecution(slotPool);
 
-               ExecutionGraph eg = new ExecutionGraph(
-                       TestingUtils.defaultExecutor(),
-                       TestingUtils.defaultExecutor(),
-                       new JobID(),
-                       "Test job",
-                       new Configuration(),
-                       new SerializedValue<>(new ExecutionConfig()),
-                       AkkaUtils.getDefaultTimeout(),
-                       controllableRestartStrategy,
-                       scheduler);
-
-               eg.start(mainThreadExecutor);
-               
eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
-
-               assertEquals(JobStatus.CREATED, eg.getState());
-
-               eg.scheduleForExecution();
-
-               assertEquals(JobStatus.RUNNING, eg.getState());
-
-               instance.markDead();
+                       // Release the TaskManager and wait for the job to 
restart
+                       
slotPool.releaseTaskManager(taskManagerLocation.getResourceID(), new 
Exception("Test Exception"));
 
-               Assert.assertEquals(1, 
controllableRestartStrategy.getNumberOfQueuedActions());
+                       assertEquals(1, 
controllableRestartStrategy.getNumberOfQueuedActions());
 
-               assertEquals(JobStatus.RESTARTING, eg.getState());
+                       assertEquals(JobStatus.RESTARTING, eg.getState());
 
-               eg.suspend(new Exception("Test exception"));
+                       eg.suspend(new Exception("Test exception"));
 
-               assertEquals(JobStatus.SUSPENDED, eg.getState());
+                       assertEquals(JobStatus.SUSPENDED, eg.getState());
 
-               controllableRestartStrategy.triggerAll().join();
+                       controllableRestartStrategy.triggerAll().join();
 
-               assertEquals(JobStatus.SUSPENDED, eg.getState());
+                       assertEquals(JobStatus.SUSPENDED, eg.getState());
+               }
        }
 
        @Test
@@ -518,7 +439,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
                final TestRestartStrategy triggeredRestartStrategy = 
TestRestartStrategy.manuallyTriggered();
 
                final ExecutionGraph eg = createSimpleTestGraph(
-                       new JobID(),
+                       TEST_JOB_ID,
                        taskManagerGateway,
                        triggeredRestartStrategy,
                        createNoOpVertex(parallelism));
@@ -538,15 +459,15 @@ public class ExecutionGraphRestartTest extends TestLogger 
{
                first.fail(new Exception("intended test failure 1"));
                last.fail(new Exception("intended test failure 2"));
 
-               Assert.assertEquals(JobStatus.FAILING, eg.getState());
+               assertEquals(JobStatus.FAILING, eg.getState());
 
                completeCancellingForAllVertices(eg);
 
                // Now trigger the restart
-               Assert.assertEquals(1, 
triggeredRestartStrategy.getNumberOfQueuedActions());
+               assertEquals(1, 
triggeredRestartStrategy.getNumberOfQueuedActions());
                triggeredRestartStrategy.triggerAll().join();
 
-               Assert.assertEquals(JobStatus.RUNNING, eg.getState());
+               assertEquals(JobStatus.RUNNING, eg.getState());
 
                switchToRunning(eg);
                finishAllVertices(eg);
@@ -558,13 +479,12 @@ public class ExecutionGraphRestartTest extends TestLogger 
{
        @Test
        public void testGlobalFailAndRestarts() throws Exception {
                final int parallelism = 10;
-               final JobID jid = new JobID();
                final JobVertex vertex = createNoOpVertex(parallelism);
                final NotCancelAckingTaskGateway taskManagerGateway = new 
NotCancelAckingTaskGateway();
-               final SlotProvider slots = new SimpleSlotProvider(jid, 
parallelism, taskManagerGateway);
+               final SlotProvider slots = new SimpleSlotProvider(TEST_JOB_ID, 
parallelism, taskManagerGateway);
                final TestRestartStrategy restartStrategy = 
TestRestartStrategy.manuallyTriggered();
 
-               final ExecutionGraph eg = createSimpleTestGraph(jid, slots, 
restartStrategy, vertex);
+               final ExecutionGraph eg = createSimpleTestGraph(TEST_JOB_ID, 
slots, restartStrategy, vertex);
                eg.start(mainThreadExecutor);
 
                eg.setScheduleMode(ScheduleMode.EAGER);
@@ -603,55 +523,57 @@ public class ExecutionGraphRestartTest extends TestLogger 
{
                // this test is inconclusive if not used with a proper 
multi-threaded executor
                assertTrue("test assumptions violated", ((ThreadPoolExecutor) 
executor).getCorePoolSize() > 1);
 
-               SimpleAckingTaskManagerGateway taskManagerGateway = new 
SimpleAckingTaskManagerGateway();
                final int parallelism = 20;
-               final Scheduler scheduler = 
createSchedulerWithInstances(parallelism, taskManagerGateway);
 
-               final SlotSharingGroup sharingGroup = new SlotSharingGroup();
+               try (SlotPool slotPool = new SlotPoolImpl(TEST_JOB_ID)) {
+                       final Scheduler scheduler = 
createSchedulerWithSlots(parallelism, slotPool, new LocalTaskManagerLocation());
 
-               final JobVertex source = new JobVertex("source");
-               source.setInvokableClass(NoOpInvokable.class);
-               source.setParallelism(parallelism);
-               source.setSlotSharingGroup(sharingGroup);
+                       final SlotSharingGroup sharingGroup = new 
SlotSharingGroup();
 
-               final JobVertex sink = new JobVertex("sink");
-               sink.setInvokableClass(NoOpInvokable.class);
-               sink.setParallelism(parallelism);
-               sink.setSlotSharingGroup(sharingGroup);
-               sink.connectNewDataSetAsInput(source, 
DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED_BOUNDED);
+                       final JobVertex source = new JobVertex("source");
+                       source.setInvokableClass(NoOpInvokable.class);
+                       source.setParallelism(parallelism);
+                       source.setSlotSharingGroup(sharingGroup);
 
-               TestRestartStrategy restartStrategy = 
TestRestartStrategy.directExecuting();
+                       final JobVertex sink = new JobVertex("sink");
+                       sink.setInvokableClass(NoOpInvokable.class);
+                       sink.setParallelism(parallelism);
+                       sink.setSlotSharingGroup(sharingGroup);
+                       sink.connectNewDataSetAsInput(source, 
DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED_BOUNDED);
 
-               final ExecutionGraph eg = 
ExecutionGraphTestUtils.createExecutionGraph(
-                       new JobID(),
-                       scheduler,
-                       restartStrategy,
-                       executor,
-                       source,
-                       sink);
+                       TestRestartStrategy restartStrategy = 
TestRestartStrategy.directExecuting();
 
-               eg.start(mainThreadExecutor);
+                       final ExecutionGraph eg = 
ExecutionGraphTestUtils.createExecutionGraph(
+                               TEST_JOB_ID,
+                               scheduler,
+                               restartStrategy,
+                               executor,
+                               source,
+                               sink);
 
-               eg.setScheduleMode(ScheduleMode.EAGER);
-               eg.scheduleForExecution();
+                       eg.start(mainThreadExecutor);
 
-               switchToRunning(eg);
+                       eg.setScheduleMode(ScheduleMode.EAGER);
+                       eg.scheduleForExecution();
 
-               // fail into 'RESTARTING'
-               
eg.getAllExecutionVertices().iterator().next().getCurrentExecutionAttempt().fail(
-                       new Exception("intended test failure"));
+                       switchToRunning(eg);
 
-               assertEquals(JobStatus.FAILING, eg.getState());
+                       // fail into 'RESTARTING'
+                       
eg.getAllExecutionVertices().iterator().next().getCurrentExecutionAttempt().fail(
+                               new Exception("intended test failure"));
 
-               completeCancellingForAllVertices(eg);
+                       assertEquals(JobStatus.FAILING, eg.getState());
 
-               assertEquals(JobStatus.RUNNING, eg.getState());
+                       completeCancellingForAllVertices(eg);
 
-               // clean termination
-               switchToRunning(eg);
-               finishAllVertices(eg);
+                       assertEquals(JobStatus.RUNNING, eg.getState());
 
-               assertEquals(JobStatus.FINISHED, eg.getState());
+                       // clean termination
+                       switchToRunning(eg);
+                       finishAllVertices(eg);
+
+                       assertEquals(JobStatus.FINISHED, eg.getState());
+               }
        }
 
        @Test
@@ -662,43 +584,44 @@ public class ExecutionGraphRestartTest extends TestLogger 
{
                final int numRestarts = 10;
                final int parallelism = 20;
 
-               TaskManagerGateway taskManagerGateway = new 
SimpleAckingTaskManagerGateway();
-               final Scheduler scheduler = 
createSchedulerWithInstances(parallelism - 1, taskManagerGateway);
-
-               final SlotSharingGroup sharingGroup = new SlotSharingGroup();
+               try (SlotPool slotPool = new SlotPoolImpl(TEST_JOB_ID)) {
+                       final Scheduler scheduler = createSchedulerWithSlots(
+                               parallelism - 1, slotPool, new 
LocalTaskManagerLocation());
 
-               final JobVertex source = new JobVertex("source");
-               source.setInvokableClass(NoOpInvokable.class);
-               source.setParallelism(parallelism);
-               source.setSlotSharingGroup(sharingGroup);
+                       final SlotSharingGroup sharingGroup = new 
SlotSharingGroup();
 
-               final JobVertex sink = new JobVertex("sink");
-               sink.setInvokableClass(NoOpInvokable.class);
-               sink.setParallelism(parallelism);
-               sink.setSlotSharingGroup(sharingGroup);
-               sink.connectNewDataSetAsInput(source, 
DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED_BOUNDED);
+                       final JobVertex source = new JobVertex("source");
+                       source.setInvokableClass(NoOpInvokable.class);
+                       source.setParallelism(parallelism);
+                       source.setSlotSharingGroup(sharingGroup);
 
-               TestRestartStrategy restartStrategy =
-                       new TestRestartStrategy(numRestarts, false);
+                       final JobVertex sink = new JobVertex("sink");
+                       sink.setInvokableClass(NoOpInvokable.class);
+                       sink.setParallelism(parallelism);
+                       sink.setSlotSharingGroup(sharingGroup);
+                       sink.connectNewDataSetAsInput(source, 
DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED_BOUNDED);
 
+                       TestRestartStrategy restartStrategy =
+                               new TestRestartStrategy(numRestarts, false);
 
-               final ExecutionGraph eg = 
ExecutionGraphTestUtils.createExecutionGraph(
-                       new JobID(), scheduler, restartStrategy, executor, 
source, sink);
+                       final ExecutionGraph eg = 
ExecutionGraphTestUtils.createExecutionGraph(
+                               TEST_JOB_ID, scheduler, restartStrategy, 
executor, source, sink);
 
-               eg.start(mainThreadExecutor);
-               eg.setScheduleMode(ScheduleMode.EAGER);
-               eg.scheduleForExecution();
+                       eg.start(mainThreadExecutor);
+                       eg.setScheduleMode(ScheduleMode.EAGER);
+                       eg.scheduleForExecution();
 
-               // wait until no more changes happen
-               while (eg.getNumberOfFullRestarts() < numRestarts) {
-                       Thread.sleep(1);
-               }
+                       // wait until no more changes happen
+                       while (eg.getNumberOfFullRestarts() < numRestarts) {
+                               Thread.sleep(1);
+                       }
 
-               assertEquals(JobStatus.FAILED, eg.getState());
+                       assertEquals(JobStatus.FAILED, eg.getState());
 
-               final Throwable t = eg.getFailureCause();
-               if (!(t instanceof NoResourceAvailableException)) {
-                       ExceptionUtils.rethrowException(t, t.getMessage());
+                       final Throwable t = eg.getFailureCause();
+                       if (!(t instanceof NoResourceAvailableException)) {
+                               ExceptionUtils.rethrowException(t, 
t.getMessage());
+                       }
                }
        }
 
@@ -710,7 +633,8 @@ public class ExecutionGraphRestartTest extends TestLogger {
        public void testFailureWhileRestarting() throws Exception {
 
                final TestRestartStrategy restartStrategy = 
TestRestartStrategy.manuallyTriggered();
-               final ExecutionGraph executionGraph = 
createSimpleExecutionGraph(restartStrategy, new TestingSlotProvider(ignored -> 
new CompletableFuture<>()));
+               final ExecutionGraph executionGraph = 
createSimpleExecutionGraph(
+                       restartStrategy, new TestingSlotProvider(ignored -> new 
CompletableFuture<>()), createJobGraph());
 
                executionGraph.start(mainThreadExecutor);
                executionGraph.setQueuedSchedulingAllowed(true);
@@ -733,66 +657,100 @@ public class ExecutionGraphRestartTest extends 
TestLogger {
        //  Utilities
        // 
------------------------------------------------------------------------
 
-       private Scheduler createSchedulerWithInstances(int num, 
TaskManagerGateway taskManagerGateway) {
-               final Scheduler scheduler = new Scheduler(executor);
-               final Instance[] instances = new Instance[num];
+       private static class TestingExecutionGraphBuilder {
+               private RestartStrategy restartStrategy = new 
NoRestartStrategy();
+               private JobGraph jobGraph = createJobGraph();
+               private int tasksNum = NUM_TASKS;
+               private TaskManagerLocation taskManagerLocation = new 
LocalTaskManagerLocation();
 
-               for (int i = 0; i < instances.length; i++) {
-                       instances[i] = createInstance(taskManagerGateway, 55443 
+ i);
-                       scheduler.newInstanceAvailable(instances[i]);
+               private TestingExecutionGraphBuilder 
setRestartStrategy(RestartStrategy restartStrategy) {
+                       this.restartStrategy = restartStrategy;
+                       return this;
                }
 
-               return scheduler;
-       }
+               private TestingExecutionGraphBuilder setJobGraph(JobGraph 
jobGraph) {
+                       this.jobGraph = jobGraph;
+                       return this;
+               }
 
-       private static Instance createInstance(TaskManagerGateway 
taskManagerGateway, int port) {
-               final HardwareDescription resources = new 
HardwareDescription(4, 1_000_000_000, 500_000_000, 400_000_000);
-               final TaskManagerLocation location = new TaskManagerLocation(
-                       ResourceID.generate(), 
InetAddress.getLoopbackAddress(), port);
-               return new Instance(taskManagerGateway, location, new 
InstanceID(), resources, 1);
-       }
+               TestingExecutionGraphBuilder 
setNumberOfTasks(@SuppressWarnings("SameParameterValue") int tasksNum) {
+                       this.tasksNum = tasksNum;
+                       return this;
+               }
 
-       // 
------------------------------------------------------------------------
+               private TestingExecutionGraphBuilder 
setTaskManagerLocation(TaskManagerLocation taskManagerLocation) {
+                       this.taskManagerLocation = taskManagerLocation;
+                       return this;
+               }
 
-       private Tuple2<ExecutionGraph, Instance> 
createExecutionGraph(RestartStrategy restartStrategy) throws Exception {
-               Instance instance = ExecutionGraphTestUtils.getInstance(
-                       new SimpleAckingTaskManagerGateway(),
-                       NUM_TASKS);
+               private static TestingExecutionGraphBuilder newBuilder() {
+                       return new TestingExecutionGraphBuilder();
+               }
 
-               Scheduler scheduler = new 
Scheduler(TestingUtils.defaultExecutionContext());
-               scheduler.newInstanceAvailable(instance);
+               private ExecutionGraph buildAndScheduleForExecution(SlotPool 
slotPool) throws Exception {
+                       final Scheduler scheduler = 
createSchedulerWithSlots(tasksNum, slotPool, taskManagerLocation);
+                       final ExecutionGraph eg = 
createSimpleExecutionGraph(restartStrategy, scheduler, jobGraph);
 
-               ExecutionGraph eg = createSimpleExecutionGraph(restartStrategy, 
scheduler);
+                       assertEquals(JobStatus.CREATED, eg.getState());
 
-               assertEquals(JobStatus.CREATED, eg.getState());
+                       eg.scheduleForExecution();
+                       assertEquals(JobStatus.RUNNING, eg.getState());
 
-               eg.scheduleForExecution();
-               assertEquals(JobStatus.RUNNING, eg.getState());
-               return new Tuple2<>(eg, instance);
+                       return eg;
+               }
        }
 
-       private ExecutionGraph createSimpleExecutionGraph(RestartStrategy 
restartStrategy, SlotProvider slotProvider) throws IOException, JobException {
-               JobGraph jobGraph = createJobGraph(NUM_TASKS);
+       private static Scheduler createSchedulerWithSlots(
+               int numSlots, SlotPool slotPool, TaskManagerLocation 
taskManagerLocation) throws Exception {
 
-               ExecutionGraph eg = newExecutionGraph(restartStrategy, 
slotProvider);
-               eg.start(mainThreadExecutor);
-               
eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
+               final TaskManagerGateway taskManagerGateway = new 
SimpleAckingTaskManagerGateway();
+               setupSlotPool(slotPool);
+               Scheduler scheduler = new 
SchedulerImpl(LocationPreferenceSlotSelectionStrategy.INSTANCE, slotPool);
+               scheduler.start(mainThreadExecutor);
+               
slotPool.registerTaskManager(taskManagerLocation.getResourceID());
 
-               return eg;
+               final List<SlotOffer> slotOffers = new ArrayList<>(NUM_TASKS);
+               for (int i = 0; i < numSlots; i++) {
+                       final AllocationID allocationId = new AllocationID();
+                       final SlotOffer slotOffer = new SlotOffer(allocationId, 
0, ResourceProfile.UNKNOWN);
+                       slotOffers.add(slotOffer);
+               }
+
+               slotPool.offerSlots(taskManagerLocation, taskManagerGateway, 
slotOffers);
+
+               return scheduler;
        }
 
-       @Nonnull
-       private static JobGraph createJobGraph(int parallelism) {
-               JobVertex sender = 
ExecutionGraphTestUtils.createJobVertex("Task", parallelism, 
NoOpInvokable.class);
+       private static void setupSlotPool(SlotPool slotPool) throws Exception {
+               final String jobManagerAddress = "foobar";
+               final ResourceManagerGateway resourceManagerGateway = new 
TestingResourceManagerGateway();
+               slotPool.start(JobMasterId.generate(), jobManagerAddress, 
mainThreadExecutor);
+               slotPool.connectToResourceManager(resourceManagerGateway);
+       }
 
+       private static JobGraph createJobGraph() {
+               JobVertex sender = 
ExecutionGraphTestUtils.createJobVertex("Task", NUM_TASKS, NoOpInvokable.class);
                return new JobGraph("Pointwise job", sender);
        }
 
-       private static ExecutionGraph newExecutionGraph(RestartStrategy 
restartStrategy, SlotProvider slotProvider) throws IOException {
-               final ExecutionGraph executionGraph = new ExecutionGraph(
+       private static JobGraph createJobGraphToCancel() throws IOException {
+               JobVertex vertex = 
ExecutionGraphTestUtils.createJobVertex("Test Vertex", 1, NoOpInvokable.class);
+               ExecutionConfig executionConfig = new ExecutionConfig();
+               
executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(
+                       Integer.MAX_VALUE, Integer.MAX_VALUE));
+               JobGraph jobGraph = new JobGraph("Test Job", vertex);
+               jobGraph.setExecutionConfig(executionConfig);
+               return jobGraph;
+       }
+
+       private static ExecutionGraph createSimpleExecutionGraph(
+               RestartStrategy restartStrategy, SlotProvider slotProvider, 
JobGraph jobGraph)
+               throws IOException, JobException {
+
+               ExecutionGraph executionGraph = new ExecutionGraph(
                        TestingUtils.defaultExecutor(),
                        TestingUtils.defaultExecutor(),
-                       new JobID(),
+                       TEST_JOB_ID,
                        "Test job",
                        new Configuration(),
                        new SerializedValue<>(new ExecutionConfig()),
@@ -800,12 +758,13 @@ public class ExecutionGraphRestartTest extends TestLogger 
{
                        restartStrategy,
                        slotProvider);
 
-               
executionGraph.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+               executionGraph.start(mainThreadExecutor);
+               
executionGraph.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
 
                return executionGraph;
        }
 
-       private void restartAfterFailure(ExecutionGraph eg, FiniteDuration 
timeout, boolean haltAfterRestart) {
+       private void restartAfterFailure(ExecutionGraph eg) {
                eg.start(mainThreadExecutor);
                eg.getAllExecutionVertices().iterator().next().fail(new 
Exception("Test Exception"));
 
@@ -817,12 +776,6 @@ public class ExecutionGraphRestartTest extends TestLogger {
 
                assertEquals(JobStatus.RUNNING, eg.getState());
 
-               if (haltAfterRestart) {
-                       haltExecution(eg);
-               }
-       }
-
-       private static void haltExecution(ExecutionGraph eg) {
                finishAllVertices(eg);
                assertEquals(JobStatus.FINISHED, eg.getState());
        }

Reply via email to