dmvk commented on a change in pull request #18689: URL: https://github.com/apache/flink/pull/18689#discussion_r805652431
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java ########## @@ -306,22 +331,88 @@ void deliverOperatorEventToCoordinator( operatorId, request); } + /** Transition to different state when failure occurs. Stays in the same state by default. */ + abstract void onFailure(Throwable cause); + + /** + * Transition to different state when the execution graph reaches a globally terminal state. + * + * @param globallyTerminalState globally terminal state which the execution graph reached + */ + abstract void onGloballyTerminalState(JobStatus globallyTerminalState); + + @Override + public void handleGlobalFailure(Throwable cause) { + failureCollection.add(new GlobalFailure(cause)); + onFailure(cause); + } + /** * Updates the execution graph with the given task execution state transition. * * @param taskExecutionStateTransition taskExecutionStateTransition to update the ExecutionGraph * with * @return {@code true} if the update was successful; otherwise {@code false} */ - abstract boolean updateTaskExecutionState( - TaskExecutionStateTransition taskExecutionStateTransition); + boolean updateTaskExecutionState(TaskExecutionStateTransition taskExecutionStateTransition) { + if (taskExecutionStateTransition.getExecutionState() != ExecutionState.FAILED) { + return getExecutionGraph().updateState(taskExecutionStateTransition); + } - /** - * Callback which is called once the execution graph reaches a globally terminal state. - * - * @param globallyTerminalState globally terminal state which the execution graph reached - */ - abstract void onGloballyTerminalState(JobStatus globallyTerminalState); + // We need to collect the ExecutionVertexID before updating the state, because the Execution + // is de-registered afterwards. + // We need to use an optional here, because this method can be called even after the + // Execution is de-registered. + Optional<ExecutionVertexID> idOpt = + executionGraph.findExecutionVertexId(taskExecutionStateTransition.getID()); + final boolean successfulUpdate = + getExecutionGraph().updateState(taskExecutionStateTransition); + if (!successfulUpdate) { + return false; + } + + Throwable cause = extractErrorOrUseDefault(taskExecutionStateTransition); + + checkState(idOpt.isPresent()); + ExecutionVertexID id = idOpt.get(); + + if (getNonEmptyExecution(id).getFailureInfo().isPresent()) { + failureCollection.add(new LocalFailure(cause, id)); + } + onFailure(cause); + return true; Review comment: That's not actually not doing the same thing as you're not checking whether the transition has really happened (`newState == desiredState`). Please use the code above. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java ########## @@ -306,22 +331,88 @@ void deliverOperatorEventToCoordinator( operatorId, request); } + /** Transition to different state when failure occurs. Stays in the same state by default. */ + abstract void onFailure(Throwable cause); + + /** + * Transition to different state when the execution graph reaches a globally terminal state. + * + * @param globallyTerminalState globally terminal state which the execution graph reached + */ + abstract void onGloballyTerminalState(JobStatus globallyTerminalState); + + @Override + public void handleGlobalFailure(Throwable cause) { + failureCollection.add(new GlobalFailure(cause)); + onFailure(cause); + } + /** * Updates the execution graph with the given task execution state transition. * * @param taskExecutionStateTransition taskExecutionStateTransition to update the ExecutionGraph * with * @return {@code true} if the update was successful; otherwise {@code false} */ - abstract boolean updateTaskExecutionState( - TaskExecutionStateTransition taskExecutionStateTransition); + boolean updateTaskExecutionState(TaskExecutionStateTransition taskExecutionStateTransition) { + if (taskExecutionStateTransition.getExecutionState() != ExecutionState.FAILED) { + return getExecutionGraph().updateState(taskExecutionStateTransition); + } - /** - * Callback which is called once the execution graph reaches a globally terminal state. - * - * @param globallyTerminalState globally terminal state which the execution graph reached - */ - abstract void onGloballyTerminalState(JobStatus globallyTerminalState); + // We need to collect the ExecutionVertexID before updating the state, because the Execution + // is de-registered afterwards. + // We need to use an optional here, because this method can be called even after the + // Execution is de-registered. + Optional<ExecutionVertexID> idOpt = + executionGraph.findExecutionVertexId(taskExecutionStateTransition.getID()); + final boolean successfulUpdate = + getExecutionGraph().updateState(taskExecutionStateTransition); + if (!successfulUpdate) { + return false; + } + + Throwable cause = extractErrorOrUseDefault(taskExecutionStateTransition); + + checkState(idOpt.isPresent()); + ExecutionVertexID id = idOpt.get(); + + if (getNonEmptyExecution(id).getFailureInfo().isPresent()) { + failureCollection.add(new LocalFailure(cause, id)); + } + onFailure(cause); + return true; Review comment: That's not actually doing the same thing as you're not checking whether the transition has really happened (`newState == desiredState`). Please use the code above. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org