This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit d940af688be90c92ce4f8b9ca883f6753c94aa0f Author: Matthias Pohl <matth...@ververica.com> AuthorDate: Tue Apr 26 13:25:01 2022 +0200 [hotfix][runtime][test] Adds additional test for when the JobManagerRunner result completes exceptionally Additionally, I moved the test's documentation into the production code because it makes more sense to have the reasoning over there. --- .../flink/runtime/dispatcher/Dispatcher.java | 2 + .../flink/runtime/dispatcher/DispatcherTest.java | 50 +++++++++++++--------- 2 files changed, 32 insertions(+), 20 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java index 7c02082ed53..a709b2617fa 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java @@ -652,6 +652,8 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher JobManagerRunnerResult jobManagerRunnerResult, ExecutionType executionType) { if (jobManagerRunnerResult.isInitializationFailure() && executionType == ExecutionType.RECOVERY) { + // fail fatally to make the Dispatcher fail-over and recover all jobs once more (which + // can only happen in HA mode) return CompletableFuture.completedFuture( jobManagerRunnerFailed( jobManagerRunnerResult.getExecutionGraphInfo().getJobId(), diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java index 7c4cc5b87d2..9db64cd1c68 100755 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java @@ -120,6 +120,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; @@ -663,13 +664,35 @@ public class DispatcherTest extends AbstractDispatcherTest { return new URI(completedCheckpointStorageLocation.getExternalPointer()); } - /** - * Tests that the {@link Dispatcher} fails fatally if the recovered jobs cannot be started. See - * FLINK-9097. - */ @Test public void testFatalErrorIfRecoveredJobsCannotBeStarted() throws Exception { - final FlinkException testException = new FlinkException("Test exception"); + testJobManagerRunnerFailureResultingInFatalError( + (testingJobManagerRunner, actualError) -> + testingJobManagerRunner.completeResultFuture( + // Let the initialization of the JobManagerRunner fail + JobManagerRunnerResult.forInitializationFailure( + new ExecutionGraphInfo( + ArchivedExecutionGraph + .createSparseArchivedExecutionGraph( + jobId, + jobGraph.getName(), + JobStatus.FAILED, + actualError, + jobGraph.getCheckpointingSettings(), + 1L)), + actualError))); + } + + @Test + public void testFatalErrorIfSomeOtherErrorCausedTheJobMasterToFail() throws Exception { + testJobManagerRunnerFailureResultingInFatalError( + TestingJobManagerRunner::completeResultFutureExceptionally); + } + + private void testJobManagerRunnerFailureResultingInFatalError( + BiConsumer<TestingJobManagerRunner, Exception> jobManagerRunnerWithErrorConsumer) + throws Exception { + final FlinkException testException = new FlinkException("Expected test exception"); jobMasterLeaderElectionService.isLeader(UUID.randomUUID()); final TestingJobMasterServiceLeadershipRunnerFactory jobManagerRunnerFactory = @@ -685,21 +708,8 @@ public class DispatcherTest extends AbstractDispatcherTest { final TestingFatalErrorHandler fatalErrorHandler = testingFatalErrorHandlerResource.getFatalErrorHandler(); - final TestingJobManagerRunner testingJobManagerRunner = - jobManagerRunnerFactory.takeCreatedJobManagerRunner(); - - // Let the initialization of the JobManagerRunner fail - testingJobManagerRunner.completeResultFuture( - JobManagerRunnerResult.forInitializationFailure( - new ExecutionGraphInfo( - ArchivedExecutionGraph.createSparseArchivedExecutionGraph( - jobId, - jobGraph.getName(), - JobStatus.FAILED, - testException, - jobGraph.getCheckpointingSettings(), - 1L)), - testException)); + jobManagerRunnerWithErrorConsumer.accept( + jobManagerRunnerFactory.takeCreatedJobManagerRunner(), testException); final Throwable error = fatalErrorHandler