metaswirl commented on a change in pull request #18689: URL: https://github.com/apache/flink/pull/18689#discussion_r808850854
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java ########## @@ -306,22 +327,87 @@ void deliverOperatorEventToCoordinator( operatorId, request); } + /** Transition to different state when failure occurs. Stays in the same state by default. */ + abstract void onFailure(Throwable cause); + + <T extends StateTransitions.ToRestarting & StateTransitions.ToFailing> void restartOrFail( + FailureResult failureResult, T context) { + if (failureResult.canRestart()) { + getLogger().info("Restarting job.", failureResult.getFailureCause()); + context.goToRestarting( + getExecutionGraph(), + getExecutionGraphHandler(), + getOperatorCoordinatorHandler(), + failureResult.getBackoffTime(), + getFailures()); + } else { + getLogger().info("Failing job.", failureResult.getFailureCause()); + context.goToFailing( + getExecutionGraph(), + getExecutionGraphHandler(), + getOperatorCoordinatorHandler(), + failureResult.getFailureCause(), + getFailures()); + } + } + + /** + * Transition to different state when the execution graph reaches a globally terminal state. + * + * @param globallyTerminalState globally terminal state which the execution graph reached + */ + abstract void onGloballyTerminalState(JobStatus globallyTerminalState); + + @Override + public void handleGlobalFailure(Throwable cause) { + failureCollection.add(ExceptionHistoryEntry.createGlobal(cause)); + onFailure(cause); + } + /** * Updates the execution graph with the given task execution state transition. * * @param taskExecutionStateTransition taskExecutionStateTransition to update the ExecutionGraph * with * @return {@code true} if the update was successful; otherwise {@code false} */ - abstract boolean updateTaskExecutionState( - TaskExecutionStateTransition taskExecutionStateTransition); + boolean updateTaskExecutionState(TaskExecutionStateTransition taskExecutionStateTransition) { + // collect before updateState, as updateState may deregister the execution + final Optional<AccessExecution> maybeExecution = + executionGraph.findExecution(taskExecutionStateTransition.getID()); + final Optional<String> maybeTaskName = + executionGraph.findVertexWithAttempt(taskExecutionStateTransition.getID()); + + final ExecutionState desiredState = taskExecutionStateTransition.getExecutionState(); + boolean successfulUpdate = getExecutionGraph().updateState(taskExecutionStateTransition); + if (successfulUpdate && desiredState == ExecutionState.FAILED) { + final AccessExecution execution = + maybeExecution.orElseThrow(NoSuchElementException::new); + final String taskName = maybeTaskName.orElseThrow(NoSuchElementException::new); + final ExecutionState currentState = execution.getState(); + if (currentState == desiredState) { + failureCollection.add(ExceptionHistoryEntry.create(execution, taskName)); Review comment: The failureCollection contains all exceptions that were thrown in the interval from the initial failure to the job restart or cancellation. Usually this should be no more than one exception. Local failures are bounded by the number of tasks (actually the number is currently limited to 1). I don't know whether/how the number of global failures is bounded, but all failures would have to arrive in this short time interval. Given these restrictions, I don't think that having this collection unbounded is really dangerous. If you want to change it anyway, should we add a new config parameter for this? Were should we add this, also to the `WebOptions`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org