GJL commented on a change in pull request #9860: [FLINK-14331][runtime] Reset vertices right after they transition to terminated states URL: https://github.com/apache/flink/pull/9860#discussion_r333981786
########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java ########## @@ -252,17 +258,82 @@ public void failJobIfNotEnoughResources() throws Exception { findThrowableWithMessage( failureCause, "Could not allocate the required slot within slot request timeout.").isPresent()); + assertThat(jobStatus, is(equalTo(JobStatus.FAILED))); + } + + @Test + public void skipDeploymentIfVertexVersionOutdated() { + testExecutionSlotAllocator.disableAutoCompletePendingRequests(); + + final JobGraph jobGraph = nonParallelSourceSinkJobGraph(); + final List<JobVertex> sortedJobVertices = jobGraph.getVerticesSortedTopologicallyFromSources(); + final ExecutionVertexID sourceExecutionVertexId = new ExecutionVertexID(sortedJobVertices.get(0).getID(), 0); + final ExecutionVertexID sinkExecutionVertexId = new ExecutionVertexID(sortedJobVertices.get(1).getID(), 0); + + final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph); + testExecutionSlotAllocator.completePendingRequest(sourceExecutionVertexId); + + final ArchivedExecutionVertex sourceExecutionVertex = scheduler.requestJob().getAllExecutionVertices().iterator().next(); + final ExecutionAttemptID attemptId = sourceExecutionVertex.getCurrentExecutionAttempt().getAttemptId(); + scheduler.updateTaskExecutionState(new TaskExecutionState(jobGraph.getJobID(), attemptId, ExecutionState.FAILED)); + testRestartBackoffTimeStrategy.setCanRestart(false); + + testExecutionSlotAllocator.enableAutoCompletePendingRequests(); + taskRestartExecutor.triggerScheduledTasks(); + + assertThat(testExecutionVertexOperations.getDeployedVertices(), containsInAnyOrder(sourceExecutionVertexId, sinkExecutionVertexId)); + assertThat(scheduler.requestJob().getState(), is(equalTo(JobStatus.RUNNING))); + } + + @Test + public void vertexIsResetBeforeRestarted() throws Exception { + final JobGraph jobGraph = singleNonParallelJobVertexJobGraph(); + + final TestSchedulingStrategy.Factory schedulingStrategyFactory = new TestSchedulingStrategy.Factory(); + final DefaultScheduler scheduler = createScheduler(jobGraph, schedulingStrategyFactory); + final TestSchedulingStrategy schedulingStrategy = schedulingStrategyFactory.getLastCreatedSchedulingStrategy(); + final SchedulingTopology topology = schedulingStrategy.getSchedulingTopology(); + + startScheduling(scheduler); + + final SchedulingExecutionVertex onlySchedulingVertex = Iterables.getOnlyElement(topology.getVertices()); + final ArchivedExecutionVertex onlyExecutionVertex = Iterables.getOnlyElement(scheduler.requestJob().getAllExecutionVertices()); + final ExecutionAttemptID attemptId = onlyExecutionVertex.getCurrentExecutionAttempt().getAttemptId(); + + schedulingStrategy.schedule(Collections.singleton(onlySchedulingVertex.getId())); + + scheduler.updateTaskExecutionState(new TaskExecutionState(jobGraph.getJobID(), attemptId, ExecutionState.FAILED)); + + taskRestartExecutor.triggerScheduledTasks(); + + assertThat(schedulingStrategy.getReceivedVerticesToRestart().size(), is(equalTo(1))); + assertThat(onlySchedulingVertex.getState(), is(equalTo(ExecutionState.CREATED))); } - private void drainAllAvailableSlots() { - final int numberOfAvailableSlots = slotProvider.getNumberOfAvailableSlots(); - for (int i = 0; i < numberOfAvailableSlots; i++) { - slotProvider.allocateSlot( - new SlotRequestId(), - new ScheduledUnit(new JobVertexID(), null, null), - SlotProfile.noRequirements(), - true, - Time.milliseconds(TIMEOUT_MS)); + @Test + public void scheduleOnlyIfVertexIsCreated() throws Exception { + final JobGraph jobGraph = singleNonParallelJobVertexJobGraph(); + + final TestSchedulingStrategy.Factory schedulingStrategyFactory = new TestSchedulingStrategy.Factory(); + final DefaultScheduler scheduler = createScheduler(jobGraph, schedulingStrategyFactory); + final TestSchedulingStrategy schedulingStrategy = schedulingStrategyFactory.getLastCreatedSchedulingStrategy(); + final SchedulingTopology topology = schedulingStrategy.getSchedulingTopology(); + + startScheduling(scheduler); + + final ExecutionVertexID onlySchedulingVertexId = Iterables.getOnlyElement(topology.getVertices()).getId(); + + // The first time scheduling of a vertex will result in one task deployment + schedulingStrategy.schedule(Collections.singleton(onlySchedulingVertexId)); + assertThat(testExecutionVertexOperations.getDeployedVertices().size(), is(equalTo(1))); + assertThat(testExecutionVertexOperations.getDeployedVertices(), containsInAnyOrder(onlySchedulingVertexId)); Review comment: I think it's better to test only one thing. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services