tillrohrmann commented on a change in pull request #14798: URL: https://github.com/apache/flink/pull/14798#discussion_r578407995
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ########## @@ -153,7 +152,7 @@ private LogicalSlot assignedResource; - private Throwable failureCause; // once assigned, never changes + private Optional<ErrorInfo> failureCause = Optional.empty(); // once assigned, never changes Review comment: The comment seems to be wrong now. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ########## @@ -1133,7 +1128,10 @@ private void processFail( checkState(transitionState(current, FAILED, t)); // success (in a manner of speaking) - this.failureCause = t; + if (t != null) { + // we only set the failureCause if an error is passed (see FLINK-21376) + this.failureCause = Optional.of(new ErrorInfo(t, getStateTimestamp(FAILED))); + } Review comment: Can't we say that if `t == null`, then we create an unknown failure cause? `new FlinkException("Unknown cause for Execution failure. This might be caused by FLINK-21376")`? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java ########## @@ -635,6 +641,33 @@ public void cancel() { return executionGraph.getTerminationFuture().thenApply(FunctionUtils.nullFn()); } + protected void archiveGlobalFailure(Throwable failure) { + taskFailureHistory.add( + new ErrorInfo(failure, executionGraph.getStatusTimestamp(JobStatus.FAILED))); + log.debug("Archive global failure.", failure); + } + + protected void archiveFromFailureHandlingResult(FailureHandlingResult failureHandlingResult) { + Optional<Execution> executionOptional = + failureHandlingResult + .getExecutionVertexIdOfFailedTask() + .map(this::getExecutionVertex) + .map(ExecutionVertex::getCurrentExecutionAttempt); Review comment: I think `archiveFromFailureHandlingResult` can also be called when handling a global failover. In this case `executionOptional` would be empty and as a consequence we won't record a failure cause. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java ########## @@ -635,6 +641,33 @@ public void cancel() { return executionGraph.getTerminationFuture().thenApply(FunctionUtils.nullFn()); } + protected void archiveGlobalFailure(Throwable failure) { + taskFailureHistory.add( + new ErrorInfo(failure, executionGraph.getStatusTimestamp(JobStatus.FAILED))); + log.debug("Archive global failure.", failure); + } + + protected void archiveFromFailureHandlingResult(FailureHandlingResult failureHandlingResult) { + Optional<Execution> executionOptional = + failureHandlingResult + .getExecutionVertexIdOfFailedTask() + .map(this::getExecutionVertex) + .map(ExecutionVertex::getCurrentExecutionAttempt); + + executionOptional.ifPresent( + execution -> + execution + .getFailureInfo() + .ifPresent( + failureInfo -> { + taskFailureHistory.add(failureInfo); + log.debug( + "Archive local failure causing attempt {} to fail: {}", + execution.getAttemptId(), + failureInfo.getExceptionAsString()); + })); Review comment: Why do we have to ask the execution for the failure cause? If we are only interested in the root cause, doesn't `failureHandlingResult` contain all the required information? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResult.java ########## @@ -38,6 +42,12 @@ /** Delay before the restarting can be conducted. */ private final long restartDelayMS; + /** + * The ExecutionVertexID refering to the ExecutionVertex the failure is originating from or + * {@code null} if it's a global failure. + */ + @Nullable private final ExecutionVertexID failingExecutionVertexId; + /** Reason why the failure is not recoverable. */ Review comment: The JavaDoc seems wrong. If I am not mistaken, then we will also set this field if the failure is recoverable, right? ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java ########## @@ -870,6 +871,87 @@ public void allocationIsCanceledWhenVertexIsFailedOrCanceled() throws Exception assertThat(testExecutionSlotAllocator.getPendingRequests().keySet(), hasSize(0)); } + @Test + public void testExceptionHistoryWithRestartableFailure() { Review comment: I think we also need a test for global failover. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org