azagrebin commented on a change in pull request #8226: [FLINK-12181][Tests] Port ExecutionGraphRestartTest to new codebase URL: https://github.com/apache/flink/pull/8226#discussion_r279345597
########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java ########## @@ -136,149 +162,163 @@ private void executeOperationForAllExecutions(ExecutionGraph eg, Consumer<Execut @Test public void testRestartAutomatically() throws Exception { - Tuple2<ExecutionGraph, Instance> executionGraphInstanceTuple = - createExecutionGraph(TestRestartStrategy.directExecuting()); + try (SlotPool slotPool = new SlotPoolImpl(jobId)) { + ExecutionGraph eg = createExecutionGraph(TestRestartStrategy.directExecuting(), slotPool); - ExecutionGraph eg = executionGraphInstanceTuple.f0; + restartAfterFailure(eg, new FiniteDuration(2, TimeUnit.MINUTES), true); + } - restartAfterFailure(eg, new FiniteDuration(2, TimeUnit.MINUTES), true); } @Test public void testCancelWhileRestarting() throws Exception { // We want to manually control the restart and delay RestartStrategy restartStrategy = new InfiniteDelayRestartStrategy(); - Tuple2<ExecutionGraph, Instance> executionGraphInstanceTuple = createExecutionGraph(restartStrategy); - ExecutionGraph executionGraph = executionGraphInstanceTuple.f0; - Instance instance = executionGraphInstanceTuple.f1; + try (SlotPool slotPool = new SlotPoolImpl(jobId)) { + ExecutionGraph executionGraph = createExecutionGraph(restartStrategy, slotPool); + + // Release the TaskManager and wait for the job to restart + slotPool.releaseTaskManager(taskManagerLocation.getResourceID(), new Exception("Test Exception")); + assertEquals(JobStatus.RESTARTING, executionGraph.getState()); - // Kill the instance and wait for the job to restart - instance.markDead(); - Assert.assertEquals(JobStatus.RESTARTING, executionGraph.getState()); + assertEquals(JobStatus.RESTARTING, executionGraph.getState()); - assertEquals(JobStatus.RESTARTING, executionGraph.getState()); + // Canceling needs to abort the restart + executionGraph.cancel(); - // Canceling needs to abort the restart - executionGraph.cancel(); + assertEquals(JobStatus.CANCELED, executionGraph.getState()); - assertEquals(JobStatus.CANCELED, executionGraph.getState()); + // The restart has been aborted + executionGraph.restart(executionGraph.getGlobalModVersion()); - // The restart has been aborted - executionGraph.restart(executionGraph.getGlobalModVersion()); + assertEquals(JobStatus.CANCELED, executionGraph.getState()); + } - assertEquals(JobStatus.CANCELED, executionGraph.getState()); } @Test public void testFailWhileRestarting() throws Exception { - Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext()); + try (SlotPool slotPool = new SlotPoolImpl(jobId)) { + setupSlotPool(slotPool, resourceManagerGateway, mainThreadExecutor); Review comment: The beginning looks like just calling `createExecutionGraph` with custom `JobGraph`. If `createExecutionGraph` accepted `JobGraph`, this code could be deduplicated. The same for other tests with similar code duplication. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services