This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 21b92ac [FLINK-12181][runtime] Port ExecutionGraphRestartTest to new codebase 21b92ac is described below commit 21b92ac95f1824e9b1ca483fa3ffaaf77ef14d4a Author: azagrebin <azagre...@users.noreply.github.com> AuthorDate: Thu May 23 11:35:48 2019 +0200 [FLINK-12181][runtime] Port ExecutionGraphRestartTest to new codebase --- .../executiongraph/ExecutionGraphRestartTest.java | 709 ++++++++++----------- 1 file changed, 331 insertions(+), 378 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java index 1c5b650..1e6be72 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java @@ -21,11 +21,11 @@ package org.apache.flink.runtime.executiongraph; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.restartstrategy.RestartStrategies; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.akka.AkkaUtils; -import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.execution.SuppressRestartsException; import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy; @@ -35,9 +35,6 @@ import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; import org.apache.flink.runtime.executiongraph.utils.NotCancelAckingTaskGateway; import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider; -import org.apache.flink.runtime.instance.HardwareDescription; -import org.apache.flink.runtime.instance.Instance; -import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.runtime.jobgraph.JobGraph; @@ -45,10 +42,19 @@ import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.ScheduleMode; import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; -import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; +import org.apache.flink.runtime.jobmaster.JobMasterId; +import org.apache.flink.runtime.jobmaster.slotpool.LocationPreferenceSlotSelectionStrategy; +import org.apache.flink.runtime.jobmaster.slotpool.Scheduler; +import org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl; +import org.apache.flink.runtime.jobmaster.slotpool.SlotPool; +import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl; import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; +import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway; +import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; +import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.testtasks.NoOpInvokable; @@ -58,23 +64,18 @@ import org.apache.flink.util.SerializedValue; import org.apache.flink.util.TestLogger; import org.junit.After; -import org.junit.Assert; import org.junit.Test; -import javax.annotation.Nonnull; - import java.io.IOException; -import java.net.InetAddress; +import java.util.ArrayList; import java.util.Iterator; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import java.util.function.Consumer; -import scala.concurrent.duration.FiniteDuration; - import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.completeCancellingForAllVertices; import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createNoOpVertex; import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createSimpleTestGraph; @@ -97,7 +98,10 @@ public class ExecutionGraphRestartTest extends TestLogger { private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(4); - private TestingComponentMainThreadExecutorServiceAdapter mainThreadExecutor = TestingComponentMainThreadExecutorServiceAdapter.forMainThread(); + private static final TestingComponentMainThreadExecutorServiceAdapter mainThreadExecutor = + TestingComponentMainThreadExecutorServiceAdapter.forMainThread(); + + private static final JobID TEST_JOB_ID = new JobID(); @After public void shutdown() { @@ -109,8 +113,8 @@ public class ExecutionGraphRestartTest extends TestLogger { @Test public void testNoManualRestart() throws Exception { NoRestartStrategy restartStrategy = new NoRestartStrategy(); - Tuple2<ExecutionGraph, Instance> executionGraphInstanceTuple = createExecutionGraph(restartStrategy); - ExecutionGraph eg = executionGraphInstanceTuple.f0; + ExecutionGraph eg = createSimpleExecutionGraph( + restartStrategy, new SimpleSlotProvider(TEST_JOB_ID, NUM_TASKS), createJobGraph()); eg.getAllExecutionVertices().iterator().next().fail(new Exception("Test Exception")); @@ -136,149 +140,128 @@ public class ExecutionGraphRestartTest extends TestLogger { @Test public void testRestartAutomatically() throws Exception { - Tuple2<ExecutionGraph, Instance> executionGraphInstanceTuple = - createExecutionGraph(TestRestartStrategy.directExecuting()); - - ExecutionGraph eg = executionGraphInstanceTuple.f0; + try (SlotPool slotPool = new SlotPoolImpl(TEST_JOB_ID)) { + restartAfterFailure(TestingExecutionGraphBuilder.newBuilder() + .setRestartStrategy(TestRestartStrategy.directExecuting()) + .buildAndScheduleForExecution(slotPool)); + } - restartAfterFailure(eg, new FiniteDuration(2, TimeUnit.MINUTES), true); } @Test public void testCancelWhileRestarting() throws Exception { // We want to manually control the restart and delay - RestartStrategy restartStrategy = new InfiniteDelayRestartStrategy(); - Tuple2<ExecutionGraph, Instance> executionGraphInstanceTuple = createExecutionGraph(restartStrategy); - ExecutionGraph executionGraph = executionGraphInstanceTuple.f0; - Instance instance = executionGraphInstanceTuple.f1; + try (SlotPool slotPool = new SlotPoolImpl(TEST_JOB_ID)) { + TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation(); + final ExecutionGraph executionGraph = TestingExecutionGraphBuilder.newBuilder() + .setRestartStrategy(new InfiniteDelayRestartStrategy()) + .setTaskManagerLocation(taskManagerLocation) + .buildAndScheduleForExecution(slotPool); - // Kill the instance and wait for the job to restart - instance.markDead(); - Assert.assertEquals(JobStatus.RESTARTING, executionGraph.getState()); + // Release the TaskManager and wait for the job to restart + slotPool.releaseTaskManager(taskManagerLocation.getResourceID(), new Exception("Test Exception")); + assertEquals(JobStatus.RESTARTING, executionGraph.getState()); - assertEquals(JobStatus.RESTARTING, executionGraph.getState()); + // Canceling needs to abort the restart + executionGraph.cancel(); - // Canceling needs to abort the restart - executionGraph.cancel(); + assertEquals(JobStatus.CANCELED, executionGraph.getState()); - assertEquals(JobStatus.CANCELED, executionGraph.getState()); + // The restart has been aborted + executionGraph.restart(executionGraph.getGlobalModVersion()); - // The restart has been aborted - executionGraph.restart(executionGraph.getGlobalModVersion()); + assertEquals(JobStatus.CANCELED, executionGraph.getState()); + } - assertEquals(JobStatus.CANCELED, executionGraph.getState()); } @Test public void testFailWhileRestarting() throws Exception { - Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext()); - - Instance instance = ExecutionGraphTestUtils.getInstance( - new SimpleAckingTaskManagerGateway(), - NUM_TASKS); - - scheduler.newInstanceAvailable(instance); - - // Blocking program - ExecutionGraph executionGraph = new ExecutionGraph( - TestingUtils.defaultExecutor(), - TestingUtils.defaultExecutor(), - new JobID(), - "TestJob", - new Configuration(), - new SerializedValue<>(new ExecutionConfig()), - AkkaUtils.getDefaultTimeout(), - // We want to manually control the restart and delay - new InfiniteDelayRestartStrategy(), - scheduler); - - executionGraph.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread()); - - JobVertex jobVertex = new JobVertex("NoOpInvokable"); - jobVertex.setInvokableClass(NoOpInvokable.class); - jobVertex.setParallelism(NUM_TASKS); - - JobGraph jobGraph = new JobGraph("TestJob", jobVertex); - - executionGraph.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources()); + try (SlotPool slotPool = new SlotPoolImpl(TEST_JOB_ID)) { + TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation(); + final ExecutionGraph executionGraph = TestingExecutionGraphBuilder.newBuilder() + .setRestartStrategy(new InfiniteDelayRestartStrategy()) + .setTaskManagerLocation(taskManagerLocation) + .buildAndScheduleForExecution(slotPool); - assertEquals(JobStatus.CREATED, executionGraph.getState()); + // Release the TaskManager and wait for the job to restart + slotPool.releaseTaskManager(taskManagerLocation.getResourceID(), new Exception("Test Exception")); - executionGraph.scheduleForExecution(); - - assertEquals(JobStatus.RUNNING, executionGraph.getState()); - - // Kill the instance and wait for the job to restart - instance.markDead(); - - assertEquals(JobStatus.RESTARTING, executionGraph.getState()); + assertEquals(JobStatus.RESTARTING, executionGraph.getState()); - // If we fail when being in RESTARTING, then we should try to restart again - final long globalModVersion = executionGraph.getGlobalModVersion(); - final Exception testException = new Exception("Test exception"); - executionGraph.failGlobal(testException); + // If we fail when being in RESTARTING, then we should try to restart again + final long globalModVersion = executionGraph.getGlobalModVersion(); + final Exception testException = new Exception("Test exception"); + executionGraph.failGlobal(testException); - assertNotEquals(globalModVersion, executionGraph.getGlobalModVersion()); - assertEquals(JobStatus.RESTARTING, executionGraph.getState()); - assertEquals(testException, executionGraph.getFailureCause()); // we should have updated the failure cause + assertNotEquals(globalModVersion, executionGraph.getGlobalModVersion()); + assertEquals(JobStatus.RESTARTING, executionGraph.getState()); + assertEquals(testException, executionGraph.getFailureCause()); // we should have updated the failure cause - // but it should fail when sending a SuppressRestartsException - executionGraph.failGlobal(new SuppressRestartsException(new Exception("Suppress restart exception"))); + // but it should fail when sending a SuppressRestartsException + executionGraph.failGlobal(new SuppressRestartsException(new Exception("Suppress restart exception"))); - assertEquals(JobStatus.FAILED, executionGraph.getState()); + assertEquals(JobStatus.FAILED, executionGraph.getState()); - // The restart has been aborted - executionGraph.restart(executionGraph.getGlobalModVersion()); + // The restart has been aborted + executionGraph.restart(executionGraph.getGlobalModVersion()); - assertEquals(JobStatus.FAILED, executionGraph.getState()); + assertEquals(JobStatus.FAILED, executionGraph.getState()); + } } @Test public void testCancelWhileFailing() throws Exception { - final RestartStrategy restartStrategy = new InfiniteDelayRestartStrategy(); - final ExecutionGraph graph = createExecutionGraph(restartStrategy).f0; + try (SlotPool slotPool = new SlotPoolImpl(TEST_JOB_ID)) { + final ExecutionGraph graph = TestingExecutionGraphBuilder.newBuilder() + .setRestartStrategy(new InfiniteDelayRestartStrategy()) + .buildAndScheduleForExecution(slotPool); - assertEquals(JobStatus.RUNNING, graph.getState()); + assertEquals(JobStatus.RUNNING, graph.getState()); - // switch all tasks to running - for (ExecutionVertex vertex : graph.getVerticesTopologically().iterator().next().getTaskVertices()) { - vertex.getCurrentExecutionAttempt().switchToRunning(); - } + // switch all tasks to running + for (ExecutionVertex vertex : graph.getVerticesTopologically().iterator().next().getTaskVertices()) { + vertex.getCurrentExecutionAttempt().switchToRunning(); + } - graph.failGlobal(new Exception("test")); + graph.failGlobal(new Exception("test")); - assertEquals(JobStatus.FAILING, graph.getState()); + assertEquals(JobStatus.FAILING, graph.getState()); - graph.cancel(); + graph.cancel(); - assertEquals(JobStatus.CANCELLING, graph.getState()); + assertEquals(JobStatus.CANCELLING, graph.getState()); - // let all tasks finish cancelling - completeCanceling(graph); + // let all tasks finish cancelling + completeCanceling(graph); + + assertEquals(JobStatus.CANCELED, graph.getState()); + } - assertEquals(JobStatus.CANCELED, graph.getState()); } @Test public void testFailWhileCanceling() throws Exception { - final RestartStrategy restartStrategy = new NoRestartStrategy(); - final ExecutionGraph graph = createExecutionGraph(restartStrategy).f0; + try (SlotPool slotPool = new SlotPoolImpl(TEST_JOB_ID)) { + final ExecutionGraph graph = TestingExecutionGraphBuilder.newBuilder().buildAndScheduleForExecution(slotPool); + + assertEquals(JobStatus.RUNNING, graph.getState()); + switchAllTasksToRunning(graph); - assertEquals(JobStatus.RUNNING, graph.getState()); - switchAllTasksToRunning(graph); + graph.cancel(); - graph.cancel(); + assertEquals(JobStatus.CANCELLING, graph.getState()); - assertEquals(JobStatus.CANCELLING, graph.getState()); + graph.failGlobal(new Exception("test")); - graph.failGlobal(new Exception("test")); + assertEquals(JobStatus.FAILING, graph.getState()); - assertEquals(JobStatus.FAILING, graph.getState()); + // let all tasks finish cancelling + completeCanceling(graph); - // let all tasks finish cancelling - completeCanceling(graph); + assertEquals(JobStatus.FAILED, graph.getState()); + } - assertEquals(JobStatus.FAILED, graph.getState()); } private void switchAllTasksToRunning(ExecutionGraph graph) { @@ -287,23 +270,28 @@ public class ExecutionGraphRestartTest extends TestLogger { @Test public void testNoRestartOnSuppressException() throws Exception { - final ExecutionGraph eg = createExecutionGraph(new FixedDelayRestartStrategy(Integer.MAX_VALUE, 0)).f0; + try (SlotPool slotPool = new SlotPoolImpl(TEST_JOB_ID)) { + ExecutionGraph eg = TestingExecutionGraphBuilder.newBuilder() + .setRestartStrategy(new FixedDelayRestartStrategy(Integer.MAX_VALUE, 0)) + .buildAndScheduleForExecution(slotPool); - // Fail with unrecoverable Exception - eg.getAllExecutionVertices().iterator().next().fail( - new SuppressRestartsException(new Exception("Test Exception"))); + // Fail with unrecoverable Exception + eg.getAllExecutionVertices().iterator().next().fail( + new SuppressRestartsException(new Exception("Test Exception"))); - assertEquals(JobStatus.FAILING, eg.getState()); + assertEquals(JobStatus.FAILING, eg.getState()); - completeCanceling(eg); + completeCanceling(eg); - eg.waitUntilTerminal(); - assertEquals(JobStatus.FAILED, eg.getState()); + eg.waitUntilTerminal(); + assertEquals(JobStatus.FAILED, eg.getState()); - RestartStrategy restartStrategy = eg.getRestartStrategy(); - assertTrue(restartStrategy instanceof FixedDelayRestartStrategy); + RestartStrategy restartStrategy = eg.getRestartStrategy(); + assertTrue(restartStrategy instanceof FixedDelayRestartStrategy); + + assertEquals(0, ((FixedDelayRestartStrategy) restartStrategy).getCurrentRestartAttempt()); + } - assertEquals(0, ((FixedDelayRestartStrategy) restartStrategy).getCurrentRestartAttempt()); } /** @@ -313,56 +301,47 @@ public class ExecutionGraphRestartTest extends TestLogger { */ @Test public void testFailingExecutionAfterRestart() throws Exception { - Instance instance = ExecutionGraphTestUtils.getInstance( - new SimpleAckingTaskManagerGateway(), - 2); - - Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext()); - scheduler.newInstanceAvailable(instance); - - TestRestartStrategy restartStrategy = TestRestartStrategy.directExecuting(); - JobVertex sender = ExecutionGraphTestUtils.createJobVertex("Task1", 1, NoOpInvokable.class); JobVertex receiver = ExecutionGraphTestUtils.createJobVertex("Task2", 1, NoOpInvokable.class); JobGraph jobGraph = new JobGraph("Pointwise job", sender, receiver); - ExecutionGraph eg = newExecutionGraph(restartStrategy, scheduler); - eg.start(mainThreadExecutor); - eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources()); - assertEquals(JobStatus.CREATED, eg.getState()); + try (SlotPool slotPool = new SlotPoolImpl(TEST_JOB_ID)) { + ExecutionGraph eg = TestingExecutionGraphBuilder.newBuilder() + .setRestartStrategy(TestRestartStrategy.directExecuting()) + .setJobGraph(jobGraph) + .setNumberOfTasks(2) + .buildAndScheduleForExecution(slotPool); - eg.scheduleForExecution(); - assertEquals(JobStatus.RUNNING, eg.getState()); + Iterator<ExecutionVertex> executionVertices = eg.getAllExecutionVertices().iterator(); - Iterator<ExecutionVertex> executionVertices = eg.getAllExecutionVertices().iterator(); + Execution finishedExecution = executionVertices.next().getCurrentExecutionAttempt(); + Execution failedExecution = executionVertices.next().getCurrentExecutionAttempt(); - Execution finishedExecution = executionVertices.next().getCurrentExecutionAttempt(); - Execution failedExecution = executionVertices.next().getCurrentExecutionAttempt(); + finishedExecution.markFinished(); - finishedExecution.markFinished(); + failedExecution.fail(new Exception("Test Exception")); + failedExecution.completeCancelling(); - failedExecution.fail(new Exception("Test Exception")); - failedExecution.completeCancelling(); - - assertEquals(JobStatus.RUNNING, eg.getState()); + assertEquals(JobStatus.RUNNING, eg.getState()); - // At this point all resources have been assigned - for (ExecutionVertex vertex : eg.getAllExecutionVertices()) { - assertNotNull("No assigned resource (test instability).", vertex.getCurrentAssignedResource()); - vertex.getCurrentExecutionAttempt().switchToRunning(); - } + // At this point all resources have been assigned + for (ExecutionVertex vertex : eg.getAllExecutionVertices()) { + assertNotNull("No assigned resource (test instability).", vertex.getCurrentAssignedResource()); + vertex.getCurrentExecutionAttempt().switchToRunning(); + } - // fail old finished execution, this should not affect the execution - finishedExecution.fail(new Exception("This should have no effect")); + // fail old finished execution, this should not affect the execution + finishedExecution.fail(new Exception("This should have no effect")); - for (ExecutionVertex vertex: eg.getAllExecutionVertices()) { - vertex.getCurrentExecutionAttempt().markFinished(); - } + for (ExecutionVertex vertex: eg.getAllExecutionVertices()) { + vertex.getCurrentExecutionAttempt().markFinished(); + } - // the state of the finished execution should have not changed since it is terminal - assertEquals(ExecutionState.FINISHED, finishedExecution.getState()); + // the state of the finished execution should have not changed since it is terminal + assertEquals(ExecutionState.FINISHED, finishedExecution.getState()); - assertEquals(JobStatus.FINISHED, eg.getState()); + assertEquals(JobStatus.FINISHED, eg.getState()); + } } /** @@ -372,43 +351,27 @@ public class ExecutionGraphRestartTest extends TestLogger { */ @Test public void testFailExecutionAfterCancel() throws Exception { - Instance instance = ExecutionGraphTestUtils.getInstance( - new SimpleAckingTaskManagerGateway(), - 2); - - Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext()); - scheduler.newInstanceAvailable(instance); - - JobVertex vertex = ExecutionGraphTestUtils.createJobVertex("Test Vertex", 1, NoOpInvokable.class); - - ExecutionConfig executionConfig = new ExecutionConfig(); - executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart( - Integer.MAX_VALUE, Integer.MAX_VALUE)); - JobGraph jobGraph = new JobGraph("Test Job", vertex); - jobGraph.setExecutionConfig(executionConfig); - - ExecutionGraph eg = newExecutionGraph(new InfiniteDelayRestartStrategy(), scheduler); + try (SlotPool slotPool = new SlotPoolImpl(TEST_JOB_ID)) { + ExecutionGraph eg = TestingExecutionGraphBuilder.newBuilder() + .setRestartStrategy(new InfiniteDelayRestartStrategy()) + .setJobGraph(createJobGraphToCancel()) + .setNumberOfTasks(2) + .buildAndScheduleForExecution(slotPool); - eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources()); + // Fail right after cancel (for example with concurrent slot release) + eg.cancel(); - assertEquals(JobStatus.CREATED, eg.getState()); + for (ExecutionVertex v : eg.getAllExecutionVertices()) { + v.getCurrentExecutionAttempt().fail(new Exception("Test Exception")); + } - eg.scheduleForExecution(); - assertEquals(JobStatus.RUNNING, eg.getState()); + assertEquals(JobStatus.CANCELED, eg.getTerminationFuture().get()); - // Fail right after cancel (for example with concurrent slot release) - eg.cancel(); + Execution execution = eg.getAllExecutionVertices().iterator().next().getCurrentExecutionAttempt(); - for (ExecutionVertex v : eg.getAllExecutionVertices()) { - v.getCurrentExecutionAttempt().fail(new Exception("Test Exception")); + execution.completeCancelling(); + assertEquals(JobStatus.CANCELED, eg.getState()); } - - assertEquals(JobStatus.CANCELED, eg.getTerminationFuture().get()); - - Execution execution = eg.getAllExecutionVertices().iterator().next().getCurrentExecutionAttempt(); - - execution.completeCancelling(); - assertEquals(JobStatus.CANCELED, eg.getState()); } /** @@ -417,41 +380,25 @@ public class ExecutionGraphRestartTest extends TestLogger { */ @Test public void testFailExecutionGraphAfterCancel() throws Exception { - Instance instance = ExecutionGraphTestUtils.getInstance( - new SimpleAckingTaskManagerGateway(), - 2); - - Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext()); - scheduler.newInstanceAvailable(instance); - - JobVertex vertex = ExecutionGraphTestUtils.createJobVertex("Test Vertex", 1, NoOpInvokable.class); - - ExecutionConfig executionConfig = new ExecutionConfig(); - executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart( - Integer.MAX_VALUE, Integer.MAX_VALUE)); - JobGraph jobGraph = new JobGraph("Test Job", vertex); - jobGraph.setExecutionConfig(executionConfig); - - ExecutionGraph eg = newExecutionGraph(new InfiniteDelayRestartStrategy(), scheduler); + try (SlotPool slotPool = new SlotPoolImpl(TEST_JOB_ID)) { + ExecutionGraph eg = TestingExecutionGraphBuilder.newBuilder() + .setRestartStrategy(new InfiniteDelayRestartStrategy()) + .setJobGraph(createJobGraphToCancel()) + .setNumberOfTasks(2) + .buildAndScheduleForExecution(slotPool); - eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources()); + // Fail right after cancel (for example with concurrent slot release) + eg.cancel(); + assertEquals(JobStatus.CANCELLING, eg.getState()); - assertEquals(JobStatus.CREATED, eg.getState()); + eg.failGlobal(new Exception("Test Exception")); + assertEquals(JobStatus.FAILING, eg.getState()); - eg.scheduleForExecution(); - assertEquals(JobStatus.RUNNING, eg.getState()); - - // Fail right after cancel (for example with concurrent slot release) - eg.cancel(); - assertEquals(JobStatus.CANCELLING, eg.getState()); - - eg.failGlobal(new Exception("Test Exception")); - assertEquals(JobStatus.FAILING, eg.getState()); + Execution execution = eg.getAllExecutionVertices().iterator().next().getCurrentExecutionAttempt(); - Execution execution = eg.getAllExecutionVertices().iterator().next().getCurrentExecutionAttempt(); - - execution.completeCancelling(); - assertEquals(JobStatus.RESTARTING, eg.getState()); + execution.completeCancelling(); + assertEquals(JobStatus.RESTARTING, eg.getState()); + } } /** @@ -459,55 +406,29 @@ public class ExecutionGraphRestartTest extends TestLogger { */ @Test public void testSuspendWhileRestarting() throws Exception { - - Instance instance = ExecutionGraphTestUtils.getInstance( - new SimpleAckingTaskManagerGateway(), - NUM_TASKS); - - Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext()); - scheduler.newInstanceAvailable(instance); - - JobVertex sender = new JobVertex("Task"); - sender.setInvokableClass(NoOpInvokable.class); - sender.setParallelism(NUM_TASKS); - - JobGraph jobGraph = new JobGraph("Pointwise job", sender); - TestRestartStrategy controllableRestartStrategy = TestRestartStrategy.manuallyTriggered(); + try (SlotPool slotPool = new SlotPoolImpl(TEST_JOB_ID)) { + TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation(); + ExecutionGraph eg = TestingExecutionGraphBuilder.newBuilder() + .setRestartStrategy(controllableRestartStrategy) + .setTaskManagerLocation(taskManagerLocation) + .buildAndScheduleForExecution(slotPool); - ExecutionGraph eg = new ExecutionGraph( - TestingUtils.defaultExecutor(), - TestingUtils.defaultExecutor(), - new JobID(), - "Test job", - new Configuration(), - new SerializedValue<>(new ExecutionConfig()), - AkkaUtils.getDefaultTimeout(), - controllableRestartStrategy, - scheduler); - - eg.start(mainThreadExecutor); - eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources()); - - assertEquals(JobStatus.CREATED, eg.getState()); - - eg.scheduleForExecution(); - - assertEquals(JobStatus.RUNNING, eg.getState()); - - instance.markDead(); + // Release the TaskManager and wait for the job to restart + slotPool.releaseTaskManager(taskManagerLocation.getResourceID(), new Exception("Test Exception")); - Assert.assertEquals(1, controllableRestartStrategy.getNumberOfQueuedActions()); + assertEquals(1, controllableRestartStrategy.getNumberOfQueuedActions()); - assertEquals(JobStatus.RESTARTING, eg.getState()); + assertEquals(JobStatus.RESTARTING, eg.getState()); - eg.suspend(new Exception("Test exception")); + eg.suspend(new Exception("Test exception")); - assertEquals(JobStatus.SUSPENDED, eg.getState()); + assertEquals(JobStatus.SUSPENDED, eg.getState()); - controllableRestartStrategy.triggerAll().join(); + controllableRestartStrategy.triggerAll().join(); - assertEquals(JobStatus.SUSPENDED, eg.getState()); + assertEquals(JobStatus.SUSPENDED, eg.getState()); + } } @Test @@ -518,7 +439,7 @@ public class ExecutionGraphRestartTest extends TestLogger { final TestRestartStrategy triggeredRestartStrategy = TestRestartStrategy.manuallyTriggered(); final ExecutionGraph eg = createSimpleTestGraph( - new JobID(), + TEST_JOB_ID, taskManagerGateway, triggeredRestartStrategy, createNoOpVertex(parallelism)); @@ -538,15 +459,15 @@ public class ExecutionGraphRestartTest extends TestLogger { first.fail(new Exception("intended test failure 1")); last.fail(new Exception("intended test failure 2")); - Assert.assertEquals(JobStatus.FAILING, eg.getState()); + assertEquals(JobStatus.FAILING, eg.getState()); completeCancellingForAllVertices(eg); // Now trigger the restart - Assert.assertEquals(1, triggeredRestartStrategy.getNumberOfQueuedActions()); + assertEquals(1, triggeredRestartStrategy.getNumberOfQueuedActions()); triggeredRestartStrategy.triggerAll().join(); - Assert.assertEquals(JobStatus.RUNNING, eg.getState()); + assertEquals(JobStatus.RUNNING, eg.getState()); switchToRunning(eg); finishAllVertices(eg); @@ -558,13 +479,12 @@ public class ExecutionGraphRestartTest extends TestLogger { @Test public void testGlobalFailAndRestarts() throws Exception { final int parallelism = 10; - final JobID jid = new JobID(); final JobVertex vertex = createNoOpVertex(parallelism); final NotCancelAckingTaskGateway taskManagerGateway = new NotCancelAckingTaskGateway(); - final SlotProvider slots = new SimpleSlotProvider(jid, parallelism, taskManagerGateway); + final SlotProvider slots = new SimpleSlotProvider(TEST_JOB_ID, parallelism, taskManagerGateway); final TestRestartStrategy restartStrategy = TestRestartStrategy.manuallyTriggered(); - final ExecutionGraph eg = createSimpleTestGraph(jid, slots, restartStrategy, vertex); + final ExecutionGraph eg = createSimpleTestGraph(TEST_JOB_ID, slots, restartStrategy, vertex); eg.start(mainThreadExecutor); eg.setScheduleMode(ScheduleMode.EAGER); @@ -603,55 +523,57 @@ public class ExecutionGraphRestartTest extends TestLogger { // this test is inconclusive if not used with a proper multi-threaded executor assertTrue("test assumptions violated", ((ThreadPoolExecutor) executor).getCorePoolSize() > 1); - SimpleAckingTaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway(); final int parallelism = 20; - final Scheduler scheduler = createSchedulerWithInstances(parallelism, taskManagerGateway); - final SlotSharingGroup sharingGroup = new SlotSharingGroup(); + try (SlotPool slotPool = new SlotPoolImpl(TEST_JOB_ID)) { + final Scheduler scheduler = createSchedulerWithSlots(parallelism, slotPool, new LocalTaskManagerLocation()); - final JobVertex source = new JobVertex("source"); - source.setInvokableClass(NoOpInvokable.class); - source.setParallelism(parallelism); - source.setSlotSharingGroup(sharingGroup); + final SlotSharingGroup sharingGroup = new SlotSharingGroup(); - final JobVertex sink = new JobVertex("sink"); - sink.setInvokableClass(NoOpInvokable.class); - sink.setParallelism(parallelism); - sink.setSlotSharingGroup(sharingGroup); - sink.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED_BOUNDED); + final JobVertex source = new JobVertex("source"); + source.setInvokableClass(NoOpInvokable.class); + source.setParallelism(parallelism); + source.setSlotSharingGroup(sharingGroup); - TestRestartStrategy restartStrategy = TestRestartStrategy.directExecuting(); + final JobVertex sink = new JobVertex("sink"); + sink.setInvokableClass(NoOpInvokable.class); + sink.setParallelism(parallelism); + sink.setSlotSharingGroup(sharingGroup); + sink.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED_BOUNDED); - final ExecutionGraph eg = ExecutionGraphTestUtils.createExecutionGraph( - new JobID(), - scheduler, - restartStrategy, - executor, - source, - sink); + TestRestartStrategy restartStrategy = TestRestartStrategy.directExecuting(); - eg.start(mainThreadExecutor); + final ExecutionGraph eg = ExecutionGraphTestUtils.createExecutionGraph( + TEST_JOB_ID, + scheduler, + restartStrategy, + executor, + source, + sink); - eg.setScheduleMode(ScheduleMode.EAGER); - eg.scheduleForExecution(); + eg.start(mainThreadExecutor); - switchToRunning(eg); + eg.setScheduleMode(ScheduleMode.EAGER); + eg.scheduleForExecution(); - // fail into 'RESTARTING' - eg.getAllExecutionVertices().iterator().next().getCurrentExecutionAttempt().fail( - new Exception("intended test failure")); + switchToRunning(eg); - assertEquals(JobStatus.FAILING, eg.getState()); + // fail into 'RESTARTING' + eg.getAllExecutionVertices().iterator().next().getCurrentExecutionAttempt().fail( + new Exception("intended test failure")); - completeCancellingForAllVertices(eg); + assertEquals(JobStatus.FAILING, eg.getState()); - assertEquals(JobStatus.RUNNING, eg.getState()); + completeCancellingForAllVertices(eg); - // clean termination - switchToRunning(eg); - finishAllVertices(eg); + assertEquals(JobStatus.RUNNING, eg.getState()); - assertEquals(JobStatus.FINISHED, eg.getState()); + // clean termination + switchToRunning(eg); + finishAllVertices(eg); + + assertEquals(JobStatus.FINISHED, eg.getState()); + } } @Test @@ -662,43 +584,44 @@ public class ExecutionGraphRestartTest extends TestLogger { final int numRestarts = 10; final int parallelism = 20; - TaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway(); - final Scheduler scheduler = createSchedulerWithInstances(parallelism - 1, taskManagerGateway); - - final SlotSharingGroup sharingGroup = new SlotSharingGroup(); + try (SlotPool slotPool = new SlotPoolImpl(TEST_JOB_ID)) { + final Scheduler scheduler = createSchedulerWithSlots( + parallelism - 1, slotPool, new LocalTaskManagerLocation()); - final JobVertex source = new JobVertex("source"); - source.setInvokableClass(NoOpInvokable.class); - source.setParallelism(parallelism); - source.setSlotSharingGroup(sharingGroup); + final SlotSharingGroup sharingGroup = new SlotSharingGroup(); - final JobVertex sink = new JobVertex("sink"); - sink.setInvokableClass(NoOpInvokable.class); - sink.setParallelism(parallelism); - sink.setSlotSharingGroup(sharingGroup); - sink.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED_BOUNDED); + final JobVertex source = new JobVertex("source"); + source.setInvokableClass(NoOpInvokable.class); + source.setParallelism(parallelism); + source.setSlotSharingGroup(sharingGroup); - TestRestartStrategy restartStrategy = - new TestRestartStrategy(numRestarts, false); + final JobVertex sink = new JobVertex("sink"); + sink.setInvokableClass(NoOpInvokable.class); + sink.setParallelism(parallelism); + sink.setSlotSharingGroup(sharingGroup); + sink.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED_BOUNDED); + TestRestartStrategy restartStrategy = + new TestRestartStrategy(numRestarts, false); - final ExecutionGraph eg = ExecutionGraphTestUtils.createExecutionGraph( - new JobID(), scheduler, restartStrategy, executor, source, sink); + final ExecutionGraph eg = ExecutionGraphTestUtils.createExecutionGraph( + TEST_JOB_ID, scheduler, restartStrategy, executor, source, sink); - eg.start(mainThreadExecutor); - eg.setScheduleMode(ScheduleMode.EAGER); - eg.scheduleForExecution(); + eg.start(mainThreadExecutor); + eg.setScheduleMode(ScheduleMode.EAGER); + eg.scheduleForExecution(); - // wait until no more changes happen - while (eg.getNumberOfFullRestarts() < numRestarts) { - Thread.sleep(1); - } + // wait until no more changes happen + while (eg.getNumberOfFullRestarts() < numRestarts) { + Thread.sleep(1); + } - assertEquals(JobStatus.FAILED, eg.getState()); + assertEquals(JobStatus.FAILED, eg.getState()); - final Throwable t = eg.getFailureCause(); - if (!(t instanceof NoResourceAvailableException)) { - ExceptionUtils.rethrowException(t, t.getMessage()); + final Throwable t = eg.getFailureCause(); + if (!(t instanceof NoResourceAvailableException)) { + ExceptionUtils.rethrowException(t, t.getMessage()); + } } } @@ -710,7 +633,8 @@ public class ExecutionGraphRestartTest extends TestLogger { public void testFailureWhileRestarting() throws Exception { final TestRestartStrategy restartStrategy = TestRestartStrategy.manuallyTriggered(); - final ExecutionGraph executionGraph = createSimpleExecutionGraph(restartStrategy, new TestingSlotProvider(ignored -> new CompletableFuture<>())); + final ExecutionGraph executionGraph = createSimpleExecutionGraph( + restartStrategy, new TestingSlotProvider(ignored -> new CompletableFuture<>()), createJobGraph()); executionGraph.start(mainThreadExecutor); executionGraph.setQueuedSchedulingAllowed(true); @@ -733,66 +657,100 @@ public class ExecutionGraphRestartTest extends TestLogger { // Utilities // ------------------------------------------------------------------------ - private Scheduler createSchedulerWithInstances(int num, TaskManagerGateway taskManagerGateway) { - final Scheduler scheduler = new Scheduler(executor); - final Instance[] instances = new Instance[num]; + private static class TestingExecutionGraphBuilder { + private RestartStrategy restartStrategy = new NoRestartStrategy(); + private JobGraph jobGraph = createJobGraph(); + private int tasksNum = NUM_TASKS; + private TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation(); - for (int i = 0; i < instances.length; i++) { - instances[i] = createInstance(taskManagerGateway, 55443 + i); - scheduler.newInstanceAvailable(instances[i]); + private TestingExecutionGraphBuilder setRestartStrategy(RestartStrategy restartStrategy) { + this.restartStrategy = restartStrategy; + return this; } - return scheduler; - } + private TestingExecutionGraphBuilder setJobGraph(JobGraph jobGraph) { + this.jobGraph = jobGraph; + return this; + } - private static Instance createInstance(TaskManagerGateway taskManagerGateway, int port) { - final HardwareDescription resources = new HardwareDescription(4, 1_000_000_000, 500_000_000, 400_000_000); - final TaskManagerLocation location = new TaskManagerLocation( - ResourceID.generate(), InetAddress.getLoopbackAddress(), port); - return new Instance(taskManagerGateway, location, new InstanceID(), resources, 1); - } + TestingExecutionGraphBuilder setNumberOfTasks(@SuppressWarnings("SameParameterValue") int tasksNum) { + this.tasksNum = tasksNum; + return this; + } - // ------------------------------------------------------------------------ + private TestingExecutionGraphBuilder setTaskManagerLocation(TaskManagerLocation taskManagerLocation) { + this.taskManagerLocation = taskManagerLocation; + return this; + } - private Tuple2<ExecutionGraph, Instance> createExecutionGraph(RestartStrategy restartStrategy) throws Exception { - Instance instance = ExecutionGraphTestUtils.getInstance( - new SimpleAckingTaskManagerGateway(), - NUM_TASKS); + private static TestingExecutionGraphBuilder newBuilder() { + return new TestingExecutionGraphBuilder(); + } - Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext()); - scheduler.newInstanceAvailable(instance); + private ExecutionGraph buildAndScheduleForExecution(SlotPool slotPool) throws Exception { + final Scheduler scheduler = createSchedulerWithSlots(tasksNum, slotPool, taskManagerLocation); + final ExecutionGraph eg = createSimpleExecutionGraph(restartStrategy, scheduler, jobGraph); - ExecutionGraph eg = createSimpleExecutionGraph(restartStrategy, scheduler); + assertEquals(JobStatus.CREATED, eg.getState()); - assertEquals(JobStatus.CREATED, eg.getState()); + eg.scheduleForExecution(); + assertEquals(JobStatus.RUNNING, eg.getState()); - eg.scheduleForExecution(); - assertEquals(JobStatus.RUNNING, eg.getState()); - return new Tuple2<>(eg, instance); + return eg; + } } - private ExecutionGraph createSimpleExecutionGraph(RestartStrategy restartStrategy, SlotProvider slotProvider) throws IOException, JobException { - JobGraph jobGraph = createJobGraph(NUM_TASKS); + private static Scheduler createSchedulerWithSlots( + int numSlots, SlotPool slotPool, TaskManagerLocation taskManagerLocation) throws Exception { - ExecutionGraph eg = newExecutionGraph(restartStrategy, slotProvider); - eg.start(mainThreadExecutor); - eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources()); + final TaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway(); + setupSlotPool(slotPool); + Scheduler scheduler = new SchedulerImpl(LocationPreferenceSlotSelectionStrategy.INSTANCE, slotPool); + scheduler.start(mainThreadExecutor); + slotPool.registerTaskManager(taskManagerLocation.getResourceID()); - return eg; + final List<SlotOffer> slotOffers = new ArrayList<>(NUM_TASKS); + for (int i = 0; i < numSlots; i++) { + final AllocationID allocationId = new AllocationID(); + final SlotOffer slotOffer = new SlotOffer(allocationId, 0, ResourceProfile.UNKNOWN); + slotOffers.add(slotOffer); + } + + slotPool.offerSlots(taskManagerLocation, taskManagerGateway, slotOffers); + + return scheduler; } - @Nonnull - private static JobGraph createJobGraph(int parallelism) { - JobVertex sender = ExecutionGraphTestUtils.createJobVertex("Task", parallelism, NoOpInvokable.class); + private static void setupSlotPool(SlotPool slotPool) throws Exception { + final String jobManagerAddress = "foobar"; + final ResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway(); + slotPool.start(JobMasterId.generate(), jobManagerAddress, mainThreadExecutor); + slotPool.connectToResourceManager(resourceManagerGateway); + } + private static JobGraph createJobGraph() { + JobVertex sender = ExecutionGraphTestUtils.createJobVertex("Task", NUM_TASKS, NoOpInvokable.class); return new JobGraph("Pointwise job", sender); } - private static ExecutionGraph newExecutionGraph(RestartStrategy restartStrategy, SlotProvider slotProvider) throws IOException { - final ExecutionGraph executionGraph = new ExecutionGraph( + private static JobGraph createJobGraphToCancel() throws IOException { + JobVertex vertex = ExecutionGraphTestUtils.createJobVertex("Test Vertex", 1, NoOpInvokable.class); + ExecutionConfig executionConfig = new ExecutionConfig(); + executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart( + Integer.MAX_VALUE, Integer.MAX_VALUE)); + JobGraph jobGraph = new JobGraph("Test Job", vertex); + jobGraph.setExecutionConfig(executionConfig); + return jobGraph; + } + + private static ExecutionGraph createSimpleExecutionGraph( + RestartStrategy restartStrategy, SlotProvider slotProvider, JobGraph jobGraph) + throws IOException, JobException { + + ExecutionGraph executionGraph = new ExecutionGraph( TestingUtils.defaultExecutor(), TestingUtils.defaultExecutor(), - new JobID(), + TEST_JOB_ID, "Test job", new Configuration(), new SerializedValue<>(new ExecutionConfig()), @@ -800,12 +758,13 @@ public class ExecutionGraphRestartTest extends TestLogger { restartStrategy, slotProvider); - executionGraph.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread()); + executionGraph.start(mainThreadExecutor); + executionGraph.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources()); return executionGraph; } - private void restartAfterFailure(ExecutionGraph eg, FiniteDuration timeout, boolean haltAfterRestart) { + private void restartAfterFailure(ExecutionGraph eg) { eg.start(mainThreadExecutor); eg.getAllExecutionVertices().iterator().next().fail(new Exception("Test Exception")); @@ -817,12 +776,6 @@ public class ExecutionGraphRestartTest extends TestLogger { assertEquals(JobStatus.RUNNING, eg.getState()); - if (haltAfterRestart) { - haltExecution(eg); - } - } - - private static void haltExecution(ExecutionGraph eg) { finishAllVertices(eg); assertEquals(JobStatus.FINISHED, eg.getState()); }