dmvk commented on a change in pull request #18689:
URL: https://github.com/apache/flink/pull/18689#discussion_r806187996



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java
##########
@@ -306,22 +327,73 @@ void deliverOperatorEventToCoordinator(
                 operatorId, request);
     }
 
+    /** Transition to different state when failure occurs. Stays in the same 
state by default. */
+    abstract void onFailure(Throwable cause);
+
+    /**
+     * Transition to different state when the execution graph reaches a 
globally terminal state.
+     *
+     * @param globallyTerminalState globally terminal state which the 
execution graph reached
+     */
+    abstract void onGloballyTerminalState(JobStatus globallyTerminalState);
+
+    @Override
+    public void handleGlobalFailure(Throwable cause) {
+        failureCollection.add(ExceptionHistoryEntry.createGlobal(cause));
+        onFailure(cause);
+    }
+
     /**
      * Updates the execution graph with the given task execution state 
transition.
      *
      * @param taskExecutionStateTransition taskExecutionStateTransition to 
update the ExecutionGraph
      *     with
      * @return {@code true} if the update was successful; otherwise {@code 
false}
      */
-    abstract boolean updateTaskExecutionState(
-            TaskExecutionStateTransition taskExecutionStateTransition);
+    boolean updateTaskExecutionState(TaskExecutionStateTransition 
taskExecutionStateTransition) {
+        final Optional<ExecutionVertexID> maybeExecutionVertexId =
+                
executionGraph.findExecutionVertexId(taskExecutionStateTransition.getID());
+        final ExecutionState desiredState = 
taskExecutionStateTransition.getExecutionState();
+        boolean successfulUpdate = 
getExecutionGraph().updateState(taskExecutionStateTransition);
+        // We only handle failures for the actual transition into the FAILED 
state.
+        if (successfulUpdate && desiredState == ExecutionState.FAILED) {
+            // We're sure that the executionVertexId has been found, because 
we've been able to
+            // update the execution graph.
+            final ExecutionVertexID executionVertexId =
+                    
maybeExecutionVertexId.orElseThrow(NoSuchElementException::new);
+            final Execution execution =
+                    executionGraph
+                            .findExecution(executionVertexId)
+                            .orElseThrow(NoSuchElementException::new);
+            if (execution.getFailureInfo().isPresent()) {
+                // Only collect failures that are recorded in the Execution
+                failureCollection.add(
+                        ExceptionHistoryEntry.create(execution, 
execution.getVertexWithAttempt()));
+            }
+            onFailure(extractErrorOrUseDefault(taskExecutionStateTransition));
+        }
+        return successfulUpdate;
+    }
 
-    /**
-     * Callback which is called once the execution graph reaches a globally 
terminal state.
-     *
-     * @param globallyTerminalState globally terminal state which the 
execution graph reached
-     */
-    abstract void onGloballyTerminalState(JobStatus globallyTerminalState);
+    private Throwable extractErrorOrUseDefault(
+            TaskExecutionStateTransition taskExecutionStateTransition) {
+        return ErrorInfo.handleMissingThrowable(
+                taskExecutionStateTransition.getError(userCodeClassLoader));
+    }
+
+    List<ExceptionHistoryEntry> getFailures() {
+        return failureCollection;
+    }
+
+    private Optional<RootExceptionHistoryEntry> convertFailures(
+            List<ExceptionHistoryEntry> failureCollection) {
+        if (failureCollection.isEmpty()) {

Review comment:
       That sounds weird, all interactions should happen in the main thread. Do 
you have a test to reproduce this?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to