tillrohrmann commented on a change in pull request #14798: URL: https://github.com/apache/flink/pull/14798#discussion_r568774178
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResult.java ########## @@ -100,17 +126,26 @@ public long getRestartDelayMS() { } } + /** + * Returns an {@code Optional} with the {@link ExecutionVertexID} of the task causing this + * failure or an empty {@code Optional} if it's a global failure. + * + * @return The {@code ExecutionVertexID} of the causing task or an empty {@code Optional} if + * it's a global failure. + */ + public Optional<ExecutionVertexID> getExecutionVertexIdOfFailedTask() { + return failingExecutionVertexId == null + ? Optional.empty() + : Optional.of(failingExecutionVertexId); Review comment: ```suggestion return Optional.ofNullable(failingExecutionVertexId); ``` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java ########## @@ -121,8 +125,8 @@ public AllocationID getAssignedAllocationID() { } @Override - public String getFailureCauseAsString() { - return failureCause; + public Optional<ErrorInfo> getFailureInfo() { + return failureInfo == null ? Optional.empty() : Optional.of(failureInfo); Review comment: ```suggestion return Optional.ofNullable(failureInfo); ``` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java ########## @@ -522,6 +527,7 @@ protected ComponentMainThreadExecutor getMainThreadExecutor() { protected void failJob(Throwable cause) { incrementVersionsOfAllVertices(); executionGraph.failJob(cause); + getTerminationFuture().thenRun(() -> archiveGlobalFailure(cause)); Review comment: I think we should check here that we are indeed in the `FAILED` state. What can also happen is that the user cancels the job in between. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ########## @@ -317,13 +316,11 @@ public TaskManagerLocation getAssignedResourceLocation() { : null; } - public Throwable getFailureCause() { - return failureCause; - } - @Override - public String getFailureCauseAsString() { - return ExceptionUtils.stringifyException(getFailureCause()); + public Optional<ErrorInfo> getFailureInfo() { + return failureCause == null + ? Optional.empty() + : Optional.of(new ErrorInfo(failureCause, getStateTimestamp(FAILED))); Review comment: Does it make sense to change `failureCause` to be of type `ErrorInfo`? If yes, then we could save recreating this object multiple times. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java ########## @@ -371,9 +375,12 @@ private static void compareExecution( assertEquals( runtimeExecution.getAssignedResourceLocation(), archivedExecution.getAssignedResourceLocation()); - assertEquals( - runtimeExecution.getFailureCauseAsString(), - archivedExecution.getFailureCauseAsString()); + assertThat( + runtimeExecution.getFailureInfo().map(ErrorInfo::getExceptionAsString), + is(archivedExecution.getFailureInfo().map(ErrorInfo::getExceptionAsString))); + assertThat( + runtimeExecution.getFailureInfo().map(ErrorInfo::getTimestamp), + is(archivedExecution.getFailureInfo().map(ErrorInfo::getExceptionAsString))); Review comment: Is this correct? Why does CI passes with this assertion if it is wrong? ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java ########## @@ -870,6 +874,78 @@ public void allocationIsCanceledWhenVertexIsFailedOrCanceled() throws Exception assertThat(testExecutionSlotAllocator.getPendingRequests().keySet(), hasSize(0)); } + @Test + public void testExceptionHistoryWithRestartableFailure() { + final JobGraph jobGraph = singleNonParallelJobVertexJobGraph(); + final JobID jobId = jobGraph.getJobID(); + + final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph); + + // initiate restartable failure + final ExecutionAttemptID restartableAttemptId = + Iterables.getOnlyElement(scheduler.requestJob().getAllExecutionVertices()) + .getCurrentExecutionAttempt() + .getAttemptId(); + final RuntimeException restartableException = new RuntimeException("restartable exception"); + Range<Long> updateStateTriggeringRestartTimeframe = + initiateFailure(scheduler, jobId, restartableAttemptId, restartableException); + + taskRestartExecutor.triggerNonPeriodicScheduledTask(); + + // initiate job failure + testRestartBackoffTimeStrategy.setCanRestart(false); + + final ExecutionAttemptID failingAttemptId = + Iterables.getOnlyElement(scheduler.requestJob().getAllExecutionVertices()) + .getCurrentExecutionAttempt() + .getAttemptId(); + final RuntimeException failingException = new RuntimeException("failing exception"); + Range<Long> updateStateTriggeringJobFailureTimeframe = + initiateFailure(scheduler, jobId, failingAttemptId, failingException); + + List<ErrorInfo> actualExceptionHistory = scheduler.getExceptionHistory(); + assertThat(actualExceptionHistory.size(), is(2)); + + // assert restarted attempt + ErrorInfo restartableFailure = actualExceptionHistory.get(0); + assertThat( + restartableFailure + .getException() + .deserializeError(ClassLoader.getSystemClassLoader()), + is(restartableException)); + assertThat( + restartableFailure.getTimestamp(), + greaterThanOrEqualTo(updateStateTriggeringRestartTimeframe.lowerEndpoint())); + assertThat( + restartableFailure.getTimestamp(), + lessThanOrEqualTo(updateStateTriggeringRestartTimeframe.upperEndpoint())); + + // assert job failure attempt + ErrorInfo globalFailure = actualExceptionHistory.get(1); + Throwable actualException = + globalFailure.getException().deserializeError(ClassLoader.getSystemClassLoader()); + assertThat(actualException, org.hamcrest.core.IsInstanceOf.instanceOf(JobException.class)); + assertThat(actualException.getCause(), is(failingException)); + assertThat( + globalFailure.getTimestamp(), + greaterThanOrEqualTo(updateStateTriggeringJobFailureTimeframe.lowerEndpoint())); + assertThat( + globalFailure.getTimestamp(), + lessThanOrEqualTo(updateStateTriggeringJobFailureTimeframe.upperEndpoint())); + } + + private static Range<Long> initiateFailure( + DefaultScheduler scheduler, + JobID jobId, + ExecutionAttemptID executionAttemptID, + Throwable exception) { + long start = System.currentTimeMillis(); Review comment: Yes, we are actually interested in a timestamp and not some measure of passed time. But for testing purposes a manually controlled `Clock` can help to avoid instabilities cause by `System.currentTimeMillis`. The production code could then still use `System.currentTimeMillis`. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecution.java ########## @@ -64,9 +66,11 @@ * Returns the exception that caused the job to fail. This is the first root exception that was * not recoverable and triggered job failure. * - * @return failure exception as a string, or {@code "(null)"} + * @return an {@code Optional} of {@link ErrorInfo} containing the {@code Throwable} wrapped in + * a {@link SerializedThrowable} and the time it was registered if an error occurred. If no Review comment: nit: That the throwable is wrapped in a `SerializedThrowable` should be an implementation detail of the `ErrorInfo`. I think it is not necessary to mention it here. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java ########## @@ -641,6 +647,32 @@ public void cancel() { return executionGraph.getTerminationFuture().thenApply(FunctionUtils.nullFn()); } + protected void archiveGlobalFailure(Throwable failure) { + taskFailureHistory.add(new ErrorInfo(failure, System.currentTimeMillis())); Review comment: I would suggest to take the `ExecutionGraph.getStatusTimestamp`. That way the timestamp will be consistent. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org