tillrohrmann commented on a change in pull request #14798:
URL: https://github.com/apache/flink/pull/14798#discussion_r578407995



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
##########
@@ -153,7 +152,7 @@
 
     private LogicalSlot assignedResource;
 
-    private Throwable failureCause; // once assigned, never changes
+    private Optional<ErrorInfo> failureCause = Optional.empty(); // once 
assigned, never changes

Review comment:
       The comment seems to be wrong now.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
##########
@@ -1133,7 +1128,10 @@ private void processFail(
         checkState(transitionState(current, FAILED, t));
 
         // success (in a manner of speaking)
-        this.failureCause = t;
+        if (t != null) {
+            // we only set the failureCause if an error is passed (see 
FLINK-21376)
+            this.failureCause = Optional.of(new ErrorInfo(t, 
getStateTimestamp(FAILED)));
+        }

Review comment:
       Can't we say that if `t == null`, then we create an unknown failure 
cause? `new FlinkException("Unknown cause for Execution failure. This might be 
caused by FLINK-21376")`?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##########
@@ -635,6 +641,33 @@ public void cancel() {
         return 
executionGraph.getTerminationFuture().thenApply(FunctionUtils.nullFn());
     }
 
+    protected void archiveGlobalFailure(Throwable failure) {
+        taskFailureHistory.add(
+                new ErrorInfo(failure, 
executionGraph.getStatusTimestamp(JobStatus.FAILED)));
+        log.debug("Archive global failure.", failure);
+    }
+
+    protected void archiveFromFailureHandlingResult(FailureHandlingResult 
failureHandlingResult) {
+        Optional<Execution> executionOptional =
+                failureHandlingResult
+                        .getExecutionVertexIdOfFailedTask()
+                        .map(this::getExecutionVertex)
+                        .map(ExecutionVertex::getCurrentExecutionAttempt);

Review comment:
       I think `archiveFromFailureHandlingResult` can also be called when 
handling a global failover. In this case `executionOptional` would be empty and 
as a consequence we won't record a failure cause.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##########
@@ -635,6 +641,33 @@ public void cancel() {
         return 
executionGraph.getTerminationFuture().thenApply(FunctionUtils.nullFn());
     }
 
+    protected void archiveGlobalFailure(Throwable failure) {
+        taskFailureHistory.add(
+                new ErrorInfo(failure, 
executionGraph.getStatusTimestamp(JobStatus.FAILED)));
+        log.debug("Archive global failure.", failure);
+    }
+
+    protected void archiveFromFailureHandlingResult(FailureHandlingResult 
failureHandlingResult) {
+        Optional<Execution> executionOptional =
+                failureHandlingResult
+                        .getExecutionVertexIdOfFailedTask()
+                        .map(this::getExecutionVertex)
+                        .map(ExecutionVertex::getCurrentExecutionAttempt);
+
+        executionOptional.ifPresent(
+                execution ->
+                        execution
+                                .getFailureInfo()
+                                .ifPresent(
+                                        failureInfo -> {
+                                            
taskFailureHistory.add(failureInfo);
+                                            log.debug(
+                                                    "Archive local failure 
causing attempt {} to fail: {}",
+                                                    execution.getAttemptId(),
+                                                    
failureInfo.getExceptionAsString());
+                                        }));

Review comment:
       Why do we have to ask the execution for the failure cause? If we are 
only interested in the root cause, doesn't `failureHandlingResult` contain all 
the required information?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResult.java
##########
@@ -38,6 +42,12 @@
     /** Delay before the restarting can be conducted. */
     private final long restartDelayMS;
 
+    /**
+     * The ExecutionVertexID refering to the ExecutionVertex the failure is 
originating from or
+     * {@code null} if it's a global failure.
+     */
+    @Nullable private final ExecutionVertexID failingExecutionVertexId;
+
     /** Reason why the failure is not recoverable. */

Review comment:
       The JavaDoc seems wrong. If I am not mistaken, then we will also set 
this field if the failure is recoverable, right?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
##########
@@ -870,6 +871,87 @@ public void 
allocationIsCanceledWhenVertexIsFailedOrCanceled() throws Exception
         assertThat(testExecutionSlotAllocator.getPendingRequests().keySet(), 
hasSize(0));
     }
 
+    @Test
+    public void testExceptionHistoryWithRestartableFailure() {

Review comment:
       I think we also need a test for global failover.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to