XComp commented on code in PR #24003: URL: https://github.com/apache/flink/pull/24003#discussion_r1455652997
########## flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FixedDelayRestartBackoffTimeStrategy.java: ########## @@ -61,8 +61,9 @@ public long getBackoffTime() { } @Override - public void notifyFailure(Throwable cause) { + public boolean notifyFailure(Throwable cause) { currentRestartAttempt++; + return true; Review Comment: I understand that it wasn't covered by FLIP-364. But from a conceptual point of view: shouldn't we cover it also for the two other restart strategies. Or am I missing something here? :thinking: I'm also find with having this being covered by follow-up Jira issues. ########## flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandlerTest.java: ########## @@ -183,6 +194,55 @@ void testNonRecoverableFailureHandlingResult() throws Exception { assertThat(executionFailureHandler.getNumberOfRestarts()).isZero(); } + /** Test isNewAttempt of {@link FailureHandlingResult} is expected. */ + @Test + void testNewAttemptAndNumberOfRestarts() throws Exception { Review Comment: Like in my previous comment: We can avoid using comments in favor of the assert message :shrug: ########## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntryTest.java: ########## @@ -81,10 +81,14 @@ void testFromFailureHandlingResultSnapshot() throws ExecutionException, Interrup final CompletableFuture<Map<String, String>> rootFailureLabels = CompletableFuture.completedFuture(Collections.singletonMap("key", "value")); - final Throwable concurrentException = new IllegalStateException("Expected other failure"); - final ExecutionVertex concurrentlyFailedExecutionVertex = extractExecutionVertex(1); - final long concurrentExceptionTimestamp = - triggerFailure(concurrentlyFailedExecutionVertex, concurrentException); + final Throwable concurrentException1 = new IllegalStateException("Expected other failure1"); + final ExecutionVertex concurrentlyFailedExecutionVertex1 = extractExecutionVertex(1); + Predicate<ExceptionHistoryEntry> exception1Predicate = + getExceptionHistoryEntryPredicate( + concurrentException1, concurrentlyFailedExecutionVertex1); + + final Throwable concurrentException2 = new IllegalStateException("Expected other failure2"); + final ExecutionVertex concurrentlyFailedExecutionVertex2 = extractExecutionVertex(2); Review Comment: Fine with me. About reusing the code: You could move the code that's common between tests into a private static method. But other's might argue that this is also not good for code readability. Anyway, it's your choice in the end. Most of my comments are discussion points rather than comments where I claim that my proposal is the right one. :-) ########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java: ########## @@ -708,27 +712,43 @@ private void archiveGlobalFailure( long timestamp, CompletableFuture<Map<String, String>> failureLabels, Iterable<Execution> executions) { - exceptionHistory.add( + latestRootExceptionEntry = RootExceptionHistoryEntry.fromGlobalFailure( - failure, timestamp, failureLabels, executions)); + failure, timestamp, failureLabels, executions); + exceptionHistory.add(latestRootExceptionEntry); log.debug("Archive global failure.", failure); } protected final void archiveFromFailureHandlingResult( FailureHandlingResultSnapshot failureHandlingResult) { + // Handle all subsequent exceptions as the concurrent exceptions when it's not a new + // attempt. + if (!failureHandlingResult.isNewAttempt()) { + checkState( + latestRootExceptionEntry != null, + "A root exception entry should exist if failureHandlingResult wasn't " + + "generated as part of a new error handling cycle."); + List<Execution> concurrentlyExecutions = new ArrayList<>(); + failureHandlingResult.getRootCauseExecution().ifPresent(concurrentlyExecutions::add); + concurrentlyExecutions.addAll(failureHandlingResult.getConcurrentlyFailedExecution()); + + latestRootExceptionEntry.addConcurrentExceptions(concurrentlyExecutions); + return; + } + if (failureHandlingResult.getRootCauseExecution().isPresent()) { Review Comment: We still might want to move the comment into the if block ########## flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandlerTest.java: ########## @@ -171,6 +179,9 @@ void testNonRecoverableFailureHandlingResult() throws Exception { assertThat(result.getFailureLabels().get()) .isEqualTo(testingFailureEnricher.getFailureLabels()); assertThat(result.getTimestamp()).isEqualTo(timestamp); + // NonRecoverableFailure is new attempt even if RestartBackoffTimeStrategy consider it's not + // new attempt. + assertThat(result.isNewAttempt()).isTrue(); Review Comment: That also means to remove the comment itself ;-) ########## flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailureHandlingResult.java: ########## @@ -85,7 +89,9 @@ private FailureHandlingResult( CompletableFuture<Map<String, String>> failureLabels, @Nullable Set<ExecutionVertexID> verticesToRestart, long restartDelayMS, - boolean globalFailure) { + boolean globalFailure, + boolean isNewAttempt) { + this.isNewAttempt = isNewAttempt; Review Comment: This also applies to the constructor below. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org