This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d940af688be90c92ce4f8b9ca883f6753c94aa0f
Author: Matthias Pohl <matth...@ververica.com>
AuthorDate: Tue Apr 26 13:25:01 2022 +0200

    [hotfix][runtime][test] Adds additional test for when the JobManagerRunner 
result completes exceptionally
    
    Additionally, I moved the test's documentation into the
    production code because it makes more sense to have the
    reasoning over there.
---
 .../flink/runtime/dispatcher/Dispatcher.java       |  2 +
 .../flink/runtime/dispatcher/DispatcherTest.java   | 50 +++++++++++++---------
 2 files changed, 32 insertions(+), 20 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 7c02082ed53..a709b2617fa 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -652,6 +652,8 @@ public abstract class Dispatcher extends 
PermanentlyFencedRpcEndpoint<Dispatcher
             JobManagerRunnerResult jobManagerRunnerResult, ExecutionType 
executionType) {
         if (jobManagerRunnerResult.isInitializationFailure()
                 && executionType == ExecutionType.RECOVERY) {
+            // fail fatally to make the Dispatcher fail-over and recover all 
jobs once more (which
+            // can only happen in HA mode)
             return CompletableFuture.completedFuture(
                     jobManagerRunnerFailed(
                             
jobManagerRunnerResult.getExecutionGraphInfo().getJobId(),
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index 7c4cc5b87d2..9db64cd1c68 100755
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -120,6 +120,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
 
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThan;
@@ -663,13 +664,35 @@ public class DispatcherTest extends 
AbstractDispatcherTest {
         return new 
URI(completedCheckpointStorageLocation.getExternalPointer());
     }
 
-    /**
-     * Tests that the {@link Dispatcher} fails fatally if the recovered jobs 
cannot be started. See
-     * FLINK-9097.
-     */
     @Test
     public void testFatalErrorIfRecoveredJobsCannotBeStarted() throws 
Exception {
-        final FlinkException testException = new FlinkException("Test 
exception");
+        testJobManagerRunnerFailureResultingInFatalError(
+                (testingJobManagerRunner, actualError) ->
+                        testingJobManagerRunner.completeResultFuture(
+                                // Let the initialization of the 
JobManagerRunner fail
+                                
JobManagerRunnerResult.forInitializationFailure(
+                                        new ExecutionGraphInfo(
+                                                ArchivedExecutionGraph
+                                                        
.createSparseArchivedExecutionGraph(
+                                                                jobId,
+                                                                
jobGraph.getName(),
+                                                                
JobStatus.FAILED,
+                                                                actualError,
+                                                                
jobGraph.getCheckpointingSettings(),
+                                                                1L)),
+                                        actualError)));
+    }
+
+    @Test
+    public void testFatalErrorIfSomeOtherErrorCausedTheJobMasterToFail() 
throws Exception {
+        testJobManagerRunnerFailureResultingInFatalError(
+                TestingJobManagerRunner::completeResultFutureExceptionally);
+    }
+
+    private void testJobManagerRunnerFailureResultingInFatalError(
+            BiConsumer<TestingJobManagerRunner, Exception> 
jobManagerRunnerWithErrorConsumer)
+            throws Exception {
+        final FlinkException testException = new FlinkException("Expected test 
exception");
         jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
 
         final TestingJobMasterServiceLeadershipRunnerFactory 
jobManagerRunnerFactory =
@@ -685,21 +708,8 @@ public class DispatcherTest extends AbstractDispatcherTest 
{
         final TestingFatalErrorHandler fatalErrorHandler =
                 testingFatalErrorHandlerResource.getFatalErrorHandler();
 
-        final TestingJobManagerRunner testingJobManagerRunner =
-                jobManagerRunnerFactory.takeCreatedJobManagerRunner();
-
-        // Let the initialization of the JobManagerRunner fail
-        testingJobManagerRunner.completeResultFuture(
-                JobManagerRunnerResult.forInitializationFailure(
-                        new ExecutionGraphInfo(
-                                
ArchivedExecutionGraph.createSparseArchivedExecutionGraph(
-                                        jobId,
-                                        jobGraph.getName(),
-                                        JobStatus.FAILED,
-                                        testException,
-                                        jobGraph.getCheckpointingSettings(),
-                                        1L)),
-                        testException));
+        jobManagerRunnerWithErrorConsumer.accept(
+                jobManagerRunnerFactory.takeCreatedJobManagerRunner(), 
testException);
 
         final Throwable error =
                 fatalErrorHandler

Reply via email to