tillrohrmann commented on a change in pull request #14798:
URL: https://github.com/apache/flink/pull/14798#discussion_r568774178



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResult.java
##########
@@ -100,17 +126,26 @@ public long getRestartDelayMS() {
         }
     }
 
+    /**
+     * Returns an {@code Optional} with the {@link ExecutionVertexID} of the 
task causing this
+     * failure or an empty {@code Optional} if it's a global failure.
+     *
+     * @return The {@code ExecutionVertexID} of the causing task or an empty 
{@code Optional} if
+     *     it's a global failure.
+     */
+    public Optional<ExecutionVertexID> getExecutionVertexIdOfFailedTask() {
+        return failingExecutionVertexId == null
+                ? Optional.empty()
+                : Optional.of(failingExecutionVertexId);

Review comment:
       ```suggestion
           return Optional.ofNullable(failingExecutionVertexId);
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java
##########
@@ -121,8 +125,8 @@ public AllocationID getAssignedAllocationID() {
     }
 
     @Override
-    public String getFailureCauseAsString() {
-        return failureCause;
+    public Optional<ErrorInfo> getFailureInfo() {
+        return failureInfo == null ? Optional.empty() : 
Optional.of(failureInfo);

Review comment:
       ```suggestion
           return Optional.ofNullable(failureInfo);
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##########
@@ -522,6 +527,7 @@ protected ComponentMainThreadExecutor 
getMainThreadExecutor() {
     protected void failJob(Throwable cause) {
         incrementVersionsOfAllVertices();
         executionGraph.failJob(cause);
+        getTerminationFuture().thenRun(() -> archiveGlobalFailure(cause));

Review comment:
       I think we should check here that we are indeed in the `FAILED` state. 
What can also happen is that the user cancels the job in between.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
##########
@@ -317,13 +316,11 @@ public TaskManagerLocation getAssignedResourceLocation() {
                 : null;
     }
 
-    public Throwable getFailureCause() {
-        return failureCause;
-    }
-
     @Override
-    public String getFailureCauseAsString() {
-        return ExceptionUtils.stringifyException(getFailureCause());
+    public Optional<ErrorInfo> getFailureInfo() {
+        return failureCause == null
+                ? Optional.empty()
+                : Optional.of(new ErrorInfo(failureCause, 
getStateTimestamp(FAILED)));

Review comment:
       Does it make sense to change `failureCause` to be of type `ErrorInfo`? 
If yes, then we could save recreating this object multiple times.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
##########
@@ -371,9 +375,12 @@ private static void compareExecution(
         assertEquals(
                 runtimeExecution.getAssignedResourceLocation(),
                 archivedExecution.getAssignedResourceLocation());
-        assertEquals(
-                runtimeExecution.getFailureCauseAsString(),
-                archivedExecution.getFailureCauseAsString());
+        assertThat(
+                
runtimeExecution.getFailureInfo().map(ErrorInfo::getExceptionAsString),
+                
is(archivedExecution.getFailureInfo().map(ErrorInfo::getExceptionAsString)));
+        assertThat(
+                runtimeExecution.getFailureInfo().map(ErrorInfo::getTimestamp),
+                
is(archivedExecution.getFailureInfo().map(ErrorInfo::getExceptionAsString)));

Review comment:
       Is this correct? Why does CI passes with this assertion if it is wrong?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
##########
@@ -870,6 +874,78 @@ public void 
allocationIsCanceledWhenVertexIsFailedOrCanceled() throws Exception
         assertThat(testExecutionSlotAllocator.getPendingRequests().keySet(), 
hasSize(0));
     }
 
+    @Test
+    public void testExceptionHistoryWithRestartableFailure() {
+        final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
+        final JobID jobId = jobGraph.getJobID();
+
+        final DefaultScheduler scheduler = 
createSchedulerAndStartScheduling(jobGraph);
+
+        // initiate restartable failure
+        final ExecutionAttemptID restartableAttemptId =
+                
Iterables.getOnlyElement(scheduler.requestJob().getAllExecutionVertices())
+                        .getCurrentExecutionAttempt()
+                        .getAttemptId();
+        final RuntimeException restartableException = new 
RuntimeException("restartable exception");
+        Range<Long> updateStateTriggeringRestartTimeframe =
+                initiateFailure(scheduler, jobId, restartableAttemptId, 
restartableException);
+
+        taskRestartExecutor.triggerNonPeriodicScheduledTask();
+
+        // initiate job failure
+        testRestartBackoffTimeStrategy.setCanRestart(false);
+
+        final ExecutionAttemptID failingAttemptId =
+                
Iterables.getOnlyElement(scheduler.requestJob().getAllExecutionVertices())
+                        .getCurrentExecutionAttempt()
+                        .getAttemptId();
+        final RuntimeException failingException = new 
RuntimeException("failing exception");
+        Range<Long> updateStateTriggeringJobFailureTimeframe =
+                initiateFailure(scheduler, jobId, failingAttemptId, 
failingException);
+
+        List<ErrorInfo> actualExceptionHistory = 
scheduler.getExceptionHistory();
+        assertThat(actualExceptionHistory.size(), is(2));
+
+        // assert restarted attempt
+        ErrorInfo restartableFailure = actualExceptionHistory.get(0);
+        assertThat(
+                restartableFailure
+                        .getException()
+                        .deserializeError(ClassLoader.getSystemClassLoader()),
+                is(restartableException));
+        assertThat(
+                restartableFailure.getTimestamp(),
+                
greaterThanOrEqualTo(updateStateTriggeringRestartTimeframe.lowerEndpoint()));
+        assertThat(
+                restartableFailure.getTimestamp(),
+                
lessThanOrEqualTo(updateStateTriggeringRestartTimeframe.upperEndpoint()));
+
+        // assert job failure attempt
+        ErrorInfo globalFailure = actualExceptionHistory.get(1);
+        Throwable actualException =
+                
globalFailure.getException().deserializeError(ClassLoader.getSystemClassLoader());
+        assertThat(actualException, 
org.hamcrest.core.IsInstanceOf.instanceOf(JobException.class));
+        assertThat(actualException.getCause(), is(failingException));
+        assertThat(
+                globalFailure.getTimestamp(),
+                
greaterThanOrEqualTo(updateStateTriggeringJobFailureTimeframe.lowerEndpoint()));
+        assertThat(
+                globalFailure.getTimestamp(),
+                
lessThanOrEqualTo(updateStateTriggeringJobFailureTimeframe.upperEndpoint()));
+    }
+
+    private static Range<Long> initiateFailure(
+            DefaultScheduler scheduler,
+            JobID jobId,
+            ExecutionAttemptID executionAttemptID,
+            Throwable exception) {
+        long start = System.currentTimeMillis();

Review comment:
       Yes, we are actually interested in a timestamp and not some measure of 
passed time. But for testing purposes a manually controlled `Clock` can help to 
avoid instabilities cause by `System.currentTimeMillis`. The production code 
could then still use `System.currentTimeMillis`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecution.java
##########
@@ -64,9 +66,11 @@
      * Returns the exception that caused the job to fail. This is the first 
root exception that was
      * not recoverable and triggered job failure.
      *
-     * @return failure exception as a string, or {@code "(null)"}
+     * @return an {@code Optional} of {@link ErrorInfo} containing the {@code 
Throwable} wrapped in
+     *     a {@link SerializedThrowable} and the time it was registered if an 
error occurred. If no

Review comment:
       nit: That the throwable is wrapped in a `SerializedThrowable` should be 
an implementation detail of the `ErrorInfo`. I think it is not necessary to 
mention it here.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##########
@@ -641,6 +647,32 @@ public void cancel() {
         return 
executionGraph.getTerminationFuture().thenApply(FunctionUtils.nullFn());
     }
 
+    protected void archiveGlobalFailure(Throwable failure) {
+        taskFailureHistory.add(new ErrorInfo(failure, 
System.currentTimeMillis()));

Review comment:
       I would suggest to take the `ExecutionGraph.getStatusTimestamp`. That 
way the timestamp will be consistent.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to