tillrohrmann commented on a change in pull request #14798:
URL: https://github.com/apache/flink/pull/14798#discussion_r579197850



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResult.java
##########
@@ -100,17 +131,24 @@ public long getRestartDelayMS() {
         }
     }
 
+    /**
+     * Returns an {@code Optional} with the {@link ExecutionVertexID} of the 
task causing this
+     * failure or an empty {@code Optional} if it's a global failure.
+     *
+     * @return The {@code ExecutionVertexID} of the causing task or an empty 
{@code Optional} if
+     *     it's a global failure.
+     */
+    public Optional<ExecutionVertexID> getExecutionVertexIdOfFailedTask() {
+        return Optional.ofNullable(failingExecutionVertexId);
+    }
+
     /**
      * Returns reason why the restarting cannot be conducted.
      *
      * @return reason why the restarting cannot be conducted
      */
     public Throwable getError() {

Review comment:
       `@Nullable` is missing. One way to find nullability problems is to turn 
on IntelliJ's inspections. There are couple of those for null checks. This 
helps me quite a lot.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##########
@@ -635,6 +641,41 @@ public void cancel() {
         return 
executionGraph.getTerminationFuture().thenApply(FunctionUtils.nullFn());
     }
 
+    protected void archiveGlobalFailure(Throwable failure) {
+        archiveGlobalFailure(failure, 
executionGraph.getStatusTimestamp(JobStatus.FAILED));
+    }
+
+    protected void archiveGlobalFailure(Throwable failure, long timestamp) {
+        taskFailureHistory.add(new ErrorInfo(failure, timestamp));
+        log.debug("Archive global failure.", failure);
+    }
+
+    protected void archiveFromFailureHandlingResult(FailureHandlingResult 
failureHandlingResult) {

Review comment:
       Shall we make these methods `final` since we don't want them to be 
overriden.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##########
@@ -635,6 +641,41 @@ public void cancel() {
         return 
executionGraph.getTerminationFuture().thenApply(FunctionUtils.nullFn());
     }
 
+    protected void archiveGlobalFailure(Throwable failure) {
+        archiveGlobalFailure(failure, 
executionGraph.getStatusTimestamp(JobStatus.FAILED));
+    }
+
+    protected void archiveGlobalFailure(Throwable failure, long timestamp) {
+        taskFailureHistory.add(new ErrorInfo(failure, timestamp));
+        log.debug("Archive global failure.", failure);
+    }
+
+    protected void archiveFromFailureHandlingResult(FailureHandlingResult 
failureHandlingResult) {
+        final Optional<Execution> executionOptional =
+                failureHandlingResult
+                        .getExecutionVertexIdOfFailedTask()
+                        .map(this::getExecutionVertex)
+                        .map(ExecutionVertex::getCurrentExecutionAttempt);
+
+        if (executionOptional.isPresent()) {
+            final Execution failedExecution = executionOptional.get();
+            failedExecution
+                    .getFailureInfo()
+                    .ifPresent(
+                            failureInfo -> {
+                                taskFailureHistory.add(failureInfo);
+                                log.debug(
+                                        "Archive local failure causing attempt 
{} to fail: {}",
+                                        failedExecution.getAttemptId(),
+                                        failureInfo.getExceptionAsString());
+                            });
+        } else {
+            // fallback in case of a global fail over - no failed state is set 
and, therefore, no
+            // timestamp was taken
+            archiveGlobalFailure(failureHandlingResult.getError(), 
System.currentTimeMillis());

Review comment:
       I think `getError` can return `null`. This needs to be handled properly.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##########
@@ -635,6 +641,41 @@ public void cancel() {
         return 
executionGraph.getTerminationFuture().thenApply(FunctionUtils.nullFn());
     }
 
+    protected void archiveGlobalFailure(Throwable failure) {
+        archiveGlobalFailure(failure, 
executionGraph.getStatusTimestamp(JobStatus.FAILED));
+    }
+
+    protected void archiveGlobalFailure(Throwable failure, long timestamp) {
+        taskFailureHistory.add(new ErrorInfo(failure, timestamp));
+        log.debug("Archive global failure.", failure);
+    }
+
+    protected void archiveFromFailureHandlingResult(FailureHandlingResult 
failureHandlingResult) {
+        final Optional<Execution> executionOptional =
+                failureHandlingResult
+                        .getExecutionVertexIdOfFailedTask()
+                        .map(this::getExecutionVertex)
+                        .map(ExecutionVertex::getCurrentExecutionAttempt);
+
+        if (executionOptional.isPresent()) {

Review comment:
       Maybe it is a bit clearer to use 
`failureHandlingResult.isGlobalFailure()` because at the moment the contract is 
that `executionVertexIdOfFailedTask == null` iff it's a global failure.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to