XComp commented on code in PR #24003:
URL: https://github.com/apache/flink/pull/24003#discussion_r1455652997


##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FixedDelayRestartBackoffTimeStrategy.java:
##########
@@ -61,8 +61,9 @@ public long getBackoffTime() {
     }
 
     @Override
-    public void notifyFailure(Throwable cause) {
+    public boolean notifyFailure(Throwable cause) {
         currentRestartAttempt++;
+        return true;

Review Comment:
   I understand that it wasn't covered by FLIP-364. But from a conceptual point 
of view: shouldn't we cover it also for the two other restart strategies. Or am 
I missing something here? :thinking:  I'm also find with having this being 
covered by follow-up Jira issues.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandlerTest.java:
##########
@@ -183,6 +194,55 @@ void testNonRecoverableFailureHandlingResult() throws 
Exception {
         assertThat(executionFailureHandler.getNumberOfRestarts()).isZero();
     }
 
+    /** Test isNewAttempt of {@link FailureHandlingResult} is expected. */
+    @Test
+    void testNewAttemptAndNumberOfRestarts() throws Exception {

Review Comment:
   Like in my previous comment: We can avoid using comments in favor of the 
assert message :shrug: 



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntryTest.java:
##########
@@ -81,10 +81,14 @@ void testFromFailureHandlingResultSnapshot() throws 
ExecutionException, Interrup
         final CompletableFuture<Map<String, String>> rootFailureLabels =
                 
CompletableFuture.completedFuture(Collections.singletonMap("key", "value"));
 
-        final Throwable concurrentException = new 
IllegalStateException("Expected other failure");
-        final ExecutionVertex concurrentlyFailedExecutionVertex = 
extractExecutionVertex(1);
-        final long concurrentExceptionTimestamp =
-                triggerFailure(concurrentlyFailedExecutionVertex, 
concurrentException);
+        final Throwable concurrentException1 = new 
IllegalStateException("Expected other failure1");
+        final ExecutionVertex concurrentlyFailedExecutionVertex1 = 
extractExecutionVertex(1);
+        Predicate<ExceptionHistoryEntry> exception1Predicate =
+                getExceptionHistoryEntryPredicate(
+                        concurrentException1, 
concurrentlyFailedExecutionVertex1);
+
+        final Throwable concurrentException2 = new 
IllegalStateException("Expected other failure2");
+        final ExecutionVertex concurrentlyFailedExecutionVertex2 = 
extractExecutionVertex(2);

Review Comment:
   Fine with me. About reusing the code: You could move the code that's common 
between tests into a private static method. But other's might argue that this 
is also not good for code readability.
   
   Anyway, it's your choice in the end. Most of my comments are discussion 
points rather than comments where I claim that my proposal is the right one. :-)



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java:
##########
@@ -708,27 +712,43 @@ private void archiveGlobalFailure(
             long timestamp,
             CompletableFuture<Map<String, String>> failureLabels,
             Iterable<Execution> executions) {
-        exceptionHistory.add(
+        latestRootExceptionEntry =
                 RootExceptionHistoryEntry.fromGlobalFailure(
-                        failure, timestamp, failureLabels, executions));
+                        failure, timestamp, failureLabels, executions);
+        exceptionHistory.add(latestRootExceptionEntry);
         log.debug("Archive global failure.", failure);
     }
 
     protected final void archiveFromFailureHandlingResult(
             FailureHandlingResultSnapshot failureHandlingResult) {
+        // Handle all subsequent exceptions as the concurrent exceptions when 
it's not a new
+        // attempt.
+        if (!failureHandlingResult.isNewAttempt()) {
+            checkState(
+                    latestRootExceptionEntry != null,
+                    "A root exception entry should exist if 
failureHandlingResult wasn't "
+                            + "generated as part of a new error handling 
cycle.");
+            List<Execution> concurrentlyExecutions = new ArrayList<>();
+            
failureHandlingResult.getRootCauseExecution().ifPresent(concurrentlyExecutions::add);
+            
concurrentlyExecutions.addAll(failureHandlingResult.getConcurrentlyFailedExecution());
+
+            
latestRootExceptionEntry.addConcurrentExceptions(concurrentlyExecutions);
+            return;
+        }
+
         if (failureHandlingResult.getRootCauseExecution().isPresent()) {

Review Comment:
   We still might want to move the comment into the if block



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandlerTest.java:
##########
@@ -171,6 +179,9 @@ void testNonRecoverableFailureHandlingResult() throws 
Exception {
         assertThat(result.getFailureLabels().get())
                 .isEqualTo(testingFailureEnricher.getFailureLabels());
         assertThat(result.getTimestamp()).isEqualTo(timestamp);
+        // NonRecoverableFailure is new attempt even if 
RestartBackoffTimeStrategy consider it's not
+        // new attempt.
+        assertThat(result.isNewAttempt()).isTrue();

Review Comment:
   That also means to remove the comment itself ;-)



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailureHandlingResult.java:
##########
@@ -85,7 +89,9 @@ private FailureHandlingResult(
             CompletableFuture<Map<String, String>> failureLabels,
             @Nullable Set<ExecutionVertexID> verticesToRestart,
             long restartDelayMS,
-            boolean globalFailure) {
+            boolean globalFailure,
+            boolean isNewAttempt) {
+        this.isNewAttempt = isNewAttempt;

Review Comment:
   This also applies to the constructor below.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to