GJL commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#discussion_r327175478
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java ########## @@ -75,10 +128,293 @@ public DefaultScheduler( slotRequestTimeout, shuffleMaster, partitionTracker); + + this.log = log; + + this.delayExecutor = checkNotNull(delayExecutor); + this.userCodeLoader = checkNotNull(userCodeLoader); + this.executionVertexOperations = checkNotNull(executionVertexOperations); + this.executionVertexVersioner = checkNotNull(executionVertexVersioner); + + this.executionFailureHandler = new ExecutionFailureHandler(failoverStrategyFactory.create(getFailoverTopology()), restartBackoffTimeStrategy); + this.schedulingStrategy = schedulingStrategyFactory.createInstance(this, getSchedulingTopology(), getJobGraph()); + this.executionSlotAllocator = new DefaultExecutionSlotAllocator(slotProvider, getInputsLocationsRetriever(), slotRequestTimeout); + setTaskFailureListener(new UpdateSchedulerNgOnInternalTaskFailuresListener(this, getJobGraph().getJobID())); + } + + // ------------------------------------------------------------------------ + // SchedulerNG + // ------------------------------------------------------------------------ + + @Override + protected void startSchedulingInternal() { + prepareExecutionGraphForScheduling(); + schedulingStrategy.startScheduling(); + } + + @Override + public boolean updateTaskExecutionState(final TaskExecutionState taskExecutionState) { + final Optional<ExecutionVertexID> executionVertexIdOptional = getExecutionVertexId(taskExecutionState.getID()); + if (executionVertexIdOptional.isPresent()) { + final ExecutionVertexID executionVertexId = executionVertexIdOptional.get(); + updateState(taskExecutionState); + schedulingStrategy.onExecutionStateChange(executionVertexId, taskExecutionState.getExecutionState()); + maybeHandleTaskFailure(taskExecutionState, executionVertexId); + return true; + } + + return false; + } + + private void maybeHandleTaskFailure(final TaskExecutionState taskExecutionState, final ExecutionVertexID executionVertexId) { + if (taskExecutionState.getExecutionState() == ExecutionState.FAILED) { + final Throwable error = taskExecutionState.getError(userCodeLoader); + handleTaskFailure(executionVertexId, error); + } + } + + private void handleTaskFailure(final ExecutionVertexID executionVertexId, final Throwable error) { + final FailureHandlingResult failureHandlingResult = executionFailureHandler.getFailureHandlingResult(executionVertexId, error); + maybeRestartTasks(failureHandlingResult); + } + + private void maybeRestartTasks(final FailureHandlingResult failureHandlingResult) { + if (failureHandlingResult.canRestart()) { + restartTasksWithDelay(failureHandlingResult); + } else { + failJob(failureHandlingResult.getError()); + } + } + + private void restartTasksWithDelay(final FailureHandlingResult failureHandlingResult) { + final Set<ExecutionVertexID> verticesToRestart = failureHandlingResult.getVerticesToRestart(); + + final Set<ExecutionVertexVersion> executionVertexVersions = + new HashSet<>(executionVertexVersioner.recordVertexModifications(verticesToRestart).values()); + + final CompletableFuture<?> cancelFuture = cancelTasksAsync(verticesToRestart); + + delayExecutor.schedule( + () -> FutureUtils.assertNoException( + cancelFuture.handleAsync(restartTasksOrHandleError(executionVertexVersions), getMainThreadExecutor())), + failureHandlingResult.getRestartDelayMS(), + TimeUnit.MILLISECONDS); + } + + private BiFunction<Object, Throwable, Void> restartTasksOrHandleError(final Set<ExecutionVertexVersion> executionVertexVersions) { + return (Object ignored, Throwable throwable) -> { + + if (throwable == null) { + final Set<ExecutionVertexID> verticesToRestart = executionVertexVersioner.getUnmodifiedExecutionVertices(executionVertexVersions); + schedulingStrategy.restartTasks(verticesToRestart); + } else { + failJob(throwable); Review comment: Ok, we do not have to change anything immediately. However, here is some food for thought: - If the network between JM and TM is unavailable during cancelation, the cancelation will fail, and we will transition to `FAILED`. The time period where this can happen is narrow but the scenario is not too far fetched. It should be possible to create an environment where this happens with [flink-jepsen](https://github.com/apache/flink/tree/master/flink-jepsen). - Transitioning the job to `FAILED`, which is a globally terminal state, will stop the job execution indefintely (until it is redeployed). This is not nice operationally and not even documented. - If we run into violations of invariants in the execution graph, the JVM should exit and be brought up again by an external process supervisor, in my opinion. Unability to cancel tasks does not violate invariants in the execution graph. - One could retry the task cancelation indefintely until success or accept that tasks might become orphaned (but they should terminate themselves if the connection to the JobMaster is lost). ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services