This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2ff910ff1d1721d88d39ddf3523217b558255543
Author: Kostas Kloudas <[email protected]>
AuthorDate: Mon Oct 19 12:42:16 2020 +0200

    [FLINK-19154] Pass DispatcherGateway to DispatcherBootstrap
    
    In order to make sure that all calls from the
    ApplicationDispatcherBootstrap are executed from the main
    thread, we are passing the DispatcherGateway
    to the Bootstrap and not the Dispatcher itself.
---
 .../ApplicationDispatcherBootstrap.java            | 59 ++++++-------
 ...ApplicationDispatcherGatewayServiceFactory.java | 15 +++-
 .../ApplicationDispatcherBootstrapTest.java        | 97 ++++++++++++++++------
 .../dispatcher/AbstractDispatcherBootstrap.java    | 46 ----------
 .../flink/runtime/dispatcher/Dispatcher.java       | 19 ++++-
 .../runtime/dispatcher/DispatcherBootstrap.java    |  4 +-
 .../runtime/dispatcher/DispatcherFactory.java      |  3 +
 .../runtime/dispatcher/JobDispatcherFactory.java   |  8 ++
 .../flink/runtime/dispatcher/MiniDispatcher.java   |  3 +
 ...Bootstrap.java => NoOpDispatcherBootstrap.java} | 17 +---
 .../dispatcher/SessionDispatcherFactory.java       |  4 +
 .../runtime/dispatcher/StandaloneDispatcher.java   |  3 +
 .../DefaultDispatcherGatewayServiceFactory.java    |  9 +-
 .../dispatcher/DispatcherResourceCleanupTest.java  |  3 +-
 .../flink/runtime/dispatcher/DispatcherTest.java   | 14 +++-
 .../runtime/dispatcher/MiniDispatcherTest.java     |  4 +-
 .../runtime/dispatcher/TestingDispatcher.java      |  4 +
 .../runner/DefaultDispatcherRunnerITCase.java      |  2 +
 18 files changed, 176 insertions(+), 138 deletions(-)

diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java
index e581e29..a817048 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java
@@ -33,8 +33,6 @@ import 
org.apache.flink.core.execution.PipelineExecutorServiceLoader;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
-import org.apache.flink.runtime.dispatcher.AbstractDispatcherBootstrap;
-import org.apache.flink.runtime.dispatcher.Dispatcher;
 import org.apache.flink.runtime.dispatcher.DispatcherBootstrap;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -43,6 +41,7 @@ import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -71,7 +70,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * if it should submit a job for execution (in case of a new job) or the job 
was already recovered and is running.
  */
 @Internal
-public class ApplicationDispatcherBootstrap extends 
AbstractDispatcherBootstrap {
+public class ApplicationDispatcherBootstrap implements DispatcherBootstrap {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(ApplicationDispatcherBootstrap.class);
 
@@ -79,7 +78,7 @@ public class ApplicationDispatcherBootstrap extends 
AbstractDispatcherBootstrap
 
        private final PackagedProgram application;
 
-       private final Collection<JobGraph> recoveredJobs;
+       private final Collection<JobID> recoveredJobIds;
 
        private final Configuration configuration;
 
@@ -91,22 +90,21 @@ public class ApplicationDispatcherBootstrap extends 
AbstractDispatcherBootstrap
 
        public ApplicationDispatcherBootstrap(
                        final PackagedProgram application,
-                       final Collection<JobGraph> recoveredJobs,
+                       final Collection<JobID> recoveredJobIds,
                        final Configuration configuration,
                        final FatalErrorHandler errorHandler) {
                this.configuration = checkNotNull(configuration);
-               this.recoveredJobs = checkNotNull(recoveredJobs);
+               this.recoveredJobIds = checkNotNull(recoveredJobIds);
                this.application = checkNotNull(application);
                this.errorHandler = checkNotNull(errorHandler);
        }
 
        @Override
-       public void initialize(final Dispatcher dispatcher, ScheduledExecutor 
scheduledExecutor) {
-               checkNotNull(dispatcher);
-               launchRecoveredJobGraphs(dispatcher, recoveredJobs);
+       public void initialize(final DispatcherGateway dispatcherGateway, 
ScheduledExecutor scheduledExecutor) {
+               checkNotNull(dispatcherGateway);
 
                runApplicationAndShutdownClusterAsync(
-                               dispatcher,
+                               dispatcherGateway,
                                scheduledExecutor);
        }
 
@@ -127,16 +125,15 @@ public class ApplicationDispatcherBootstrap extends 
AbstractDispatcherBootstrap
        }
 
        /**
-        * Runs the user program entrypoint using {@link 
#runApplicationAsync(DispatcherGateway,
-        * ScheduledExecutor, boolean)} and shuts down the given dispatcher 
when the application
-        * completes (either successfully or in case of failure).
+        * Runs the user program entrypoint and shuts down the given 
dispatcherGateway when
+        * the application completes (either successfully or in case of 
failure).
         */
        @VisibleForTesting
        CompletableFuture<Acknowledge> runApplicationAndShutdownClusterAsync(
-                       final DispatcherGateway dispatcher,
+                       final DispatcherGateway dispatcherGateway,
                        final ScheduledExecutor scheduledExecutor) {
 
-               applicationCompletionFuture = 
fixJobIdAndRunApplicationAsync(dispatcher, scheduledExecutor);
+               applicationCompletionFuture = 
fixJobIdAndRunApplicationAsync(dispatcherGateway, scheduledExecutor);
 
                return applicationCompletionFuture
                                .handle((r, t) -> {
@@ -150,38 +147,38 @@ public class ApplicationDispatcherBootstrap extends 
AbstractDispatcherBootstrap
 
                                                        if (applicationStatus 
== ApplicationStatus.CANCELED || applicationStatus == ApplicationStatus.FAILED) 
{
                                                                
LOG.info("Application {}: ", applicationStatus, t);
-                                                               return 
dispatcher.shutDownCluster(applicationStatus);
+                                                               return 
dispatcherGateway.shutDownCluster(applicationStatus);
                                                        }
                                                }
 
                                                LOG.warn("Exiting with 
Application Status UNKNOWN: ", t);
-                                               
this.errorHandler.onFatalError(t);
+                                               
this.errorHandler.onFatalError(new FlinkException("Application failed 
unexpectedly.", t));
 
                                                return 
FutureUtils.<Acknowledge>completedExceptionally(t);
                                        }
 
                                        LOG.info("Application completed 
SUCCESSFULLY");
-                                       return 
dispatcher.shutDownCluster(ApplicationStatus.SUCCEEDED);
+                                       return 
dispatcherGateway.shutDownCluster(ApplicationStatus.SUCCEEDED);
                                })
                                .thenCompose(Function.identity());
        }
 
        @VisibleForTesting
        CompletableFuture<Void> fixJobIdAndRunApplicationAsync(
-                       final DispatcherGateway dispatcher,
+                       final DispatcherGateway dispatcherGateway,
                        final ScheduledExecutor scheduledExecutor) {
 
                final Optional<String> configuredJobId =
                                
configuration.getOptional(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID);
 
                if 
(!HighAvailabilityMode.isHighAvailabilityModeActivated(configuration) && 
!configuredJobId.isPresent()) {
-                       return runApplicationAsync(dispatcher, 
scheduledExecutor, false);
+                       return runApplicationAsync(dispatcherGateway, 
scheduledExecutor, false);
                }
 
                if (!configuredJobId.isPresent()) {
                        
configuration.set(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, 
ZERO_JOB_ID.toHexString());
                }
-               return runApplicationAsync(dispatcher, scheduledExecutor, true);
+               return runApplicationAsync(dispatcherGateway, 
scheduledExecutor, true);
        }
 
        /**
@@ -190,7 +187,7 @@ public class ApplicationDispatcherBootstrap extends 
AbstractDispatcherBootstrap
         * succeeded. if any of them fails, or if job submission fails.
         */
        private CompletableFuture<Void> runApplicationAsync(
-                       final DispatcherGateway dispatcher,
+                       final DispatcherGateway dispatcherGateway,
                        final ScheduledExecutor scheduledExecutor,
                        final boolean enforceSingleJobExecution) {
                final CompletableFuture<List<JobID>> applicationExecutionFuture 
= new CompletableFuture<>();
@@ -200,14 +197,14 @@ public class ApplicationDispatcherBootstrap extends 
AbstractDispatcherBootstrap
                applicationExecutionTask = scheduledExecutor.schedule(
                                () -> runApplicationEntryPoint(
                                                applicationExecutionFuture,
-                                               dispatcher,
+                                               dispatcherGateway,
                                                scheduledExecutor,
                                                enforceSingleJobExecution),
                                0L,
                                TimeUnit.MILLISECONDS);
 
                return applicationExecutionFuture.thenCompose(
-                               jobIds -> getApplicationResult(dispatcher, 
jobIds, scheduledExecutor));
+                               jobIds -> 
getApplicationResult(dispatcherGateway, jobIds, scheduledExecutor));
        }
 
        /**
@@ -218,16 +215,15 @@ public class ApplicationDispatcherBootstrap extends 
AbstractDispatcherBootstrap
         */
        private void runApplicationEntryPoint(
                        final CompletableFuture<List<JobID>> jobIdsFuture,
-                       final DispatcherGateway dispatcher,
+                       final DispatcherGateway dispatcherGateway,
                        final ScheduledExecutor scheduledExecutor,
                        final boolean enforceSingleJobExecution) {
                try {
-                       final List<JobID> applicationJobIds =
-                                       new 
ArrayList<>(getRecoveredJobIds(recoveredJobs));
+                       final List<JobID> applicationJobIds = new 
ArrayList<>(recoveredJobIds);
 
                        final PipelineExecutorServiceLoader 
executorServiceLoader =
                                        new EmbeddedExecutorServiceLoader(
-                                                       applicationJobIds, 
dispatcher, scheduledExecutor);
+                                                       applicationJobIds, 
dispatcherGateway, scheduledExecutor);
 
                        ClientUtils.executeProgram(
                                        executorServiceLoader,
@@ -288,11 +284,4 @@ public class ApplicationDispatcherBootstrap extends 
AbstractDispatcherBootstrap
                                                        result, 
application.getUserCodeClassLoader()));
                });
        }
-
-       private List<JobID> getRecoveredJobIds(final Collection<JobGraph> 
recoveredJobs) {
-               return recoveredJobs
-                               .stream()
-                               .map(JobGraph::getJobID)
-                               .collect(Collectors.toList());
-       }
 }
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherGatewayServiceFactory.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherGatewayServiceFactory.java
index 1389b34..a749fad 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherGatewayServiceFactory.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherGatewayServiceFactory.java
@@ -19,6 +19,7 @@
 package org.apache.flink.client.deployment.application;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.dispatcher.Dispatcher;
@@ -34,6 +35,8 @@ import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.util.FlinkRuntimeException;
 
 import java.util.Collection;
+import java.util.List;
+import java.util.stream.Collectors;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -77,12 +80,15 @@ public class ApplicationDispatcherGatewayServiceFactory 
implements AbstractDispa
                        Collection<JobGraph> recoveredJobs,
                        JobGraphWriter jobGraphWriter) {
 
+               final List<JobID> recoveredJobIds = 
getRecoveredJobIds(recoveredJobs);
+
                final Dispatcher dispatcher;
                try {
                        dispatcher = dispatcherFactory.createDispatcher(
                                        rpcService,
                                        fencingToken,
-                                       errorHandler -> new 
ApplicationDispatcherBootstrap(application, recoveredJobs, configuration, 
errorHandler),
+                                       recoveredJobs,
+                                       errorHandler -> new 
ApplicationDispatcherBootstrap(application, recoveredJobIds, configuration, 
errorHandler),
                                        
PartialDispatcherServicesWithJobGraphStore.from(partialDispatcherServices, 
jobGraphWriter));
                } catch (Exception e) {
                        throw new FlinkRuntimeException("Could not create the 
Dispatcher rpc endpoint.", e);
@@ -92,4 +98,11 @@ public class ApplicationDispatcherGatewayServiceFactory 
implements AbstractDispa
 
                return DefaultDispatcherGatewayService.from(dispatcher);
        }
+
+       private List<JobID> getRecoveredJobIds(final Collection<JobGraph> 
recoveredJobs) {
+               return recoveredJobs
+                               .stream()
+                               .map(JobGraph::getJobID)
+                               .collect(Collectors.toList());
+       }
 }
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapTest.java
index 0d0fc03..df1c786 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapTest.java
@@ -33,7 +33,6 @@ import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
-import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.messages.Acknowledge;
@@ -51,7 +50,6 @@ import org.junit.Test;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.net.MalformedURLException;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.Optional;
 import java.util.concurrent.CancellationException;
@@ -235,18 +233,8 @@ public class ApplicationDispatcherBootstrapTest {
                                });
 
                final CompletableFuture<Void> applicationFuture = 
runApplication(dispatcherBuilder, 2);
-
-               try {
-                       applicationFuture.get(TIMEOUT_SECONDS, 
TimeUnit.SECONDS);
-                       fail("Expected exception but the execution went 
smoothly.");
-               } catch (Throwable e) {
-                       Optional<ApplicationFailureException> expectedException 
=
-                                       ExceptionUtils.findThrowable(e, 
ApplicationFailureException.class);
-                       if (!expectedException.isPresent()) {
-                               throw e;
-                       }
-                       assertEquals(expectedException.get().getStatus(), 
ApplicationStatus.FAILED);
-               }
+               final ApplicationFailureException exception = 
assertException(applicationFuture, ApplicationFailureException.class);
+               assertEquals(exception.getStatus(), ApplicationStatus.FAILED);
        }
 
        @Test
@@ -366,6 +354,10 @@ public class ApplicationDispatcherBootstrapTest {
                final TestingDispatcherGateway.Builder dispatcherBuilder = new 
TestingDispatcherGateway.Builder()
                                .setSubmitFunction(jobGraph -> {
                                        throw new 
FlinkRuntimeException("Nope!");
+                               })
+                               .setClusterShutdownFunction(status -> {
+                                       fail("We should not call 
shutdownCluster()");
+                                       return 
CompletableFuture.completedFuture(Acknowledge.get());
                                });
 
                // we're "listening" on this to be completed to verify that the 
error handler is called.
@@ -374,8 +366,9 @@ public class ApplicationDispatcherBootstrapTest {
                final ApplicationDispatcherBootstrap bootstrap = 
createApplicationDispatcherBootstrap(
                                3, errorHandlerFuture::completeExceptionally);
 
+               final TestingDispatcherGateway dispatcherGateway = 
dispatcherBuilder.build();
                final CompletableFuture<Acknowledge> shutdownFuture =
-                               
bootstrap.runApplicationAndShutdownClusterAsync(dispatcherBuilder.build(), 
scheduledExecutor);
+                               
bootstrap.runApplicationAndShutdownClusterAsync(dispatcherGateway, 
scheduledExecutor);
 
                // we call the error handler
                assertException(errorHandlerFuture, 
ApplicationExecutionException.class);
@@ -438,6 +431,56 @@ public class ApplicationDispatcherBootstrapTest {
                assertThat(externalShutdownFuture.get(TIMEOUT_SECONDS, 
TimeUnit.SECONDS), is(ApplicationStatus.FAILED));
        }
 
+       @Test
+       public void testClusterShutdownWhenApplicationGetsCancelled() throws 
Exception {
+               // we're "listening" on this to be completed to verify that the 
cluster
+               // is being shut down from the ApplicationDispatcherBootstrap
+               final CompletableFuture<ApplicationStatus> 
externalShutdownFuture = new CompletableFuture<>();
+
+               final TestingDispatcherGateway.Builder dispatcherBuilder = new 
TestingDispatcherGateway.Builder()
+                               .setSubmitFunction(jobGraph -> 
CompletableFuture.completedFuture(Acknowledge.get()))
+                               .setRequestJobStatusFunction(jobId -> 
CompletableFuture.completedFuture(JobStatus.CANCELED))
+                               .setRequestJobResultFunction(jobId -> 
CompletableFuture.completedFuture(createCancelledJobResult(jobId)))
+                               .setClusterShutdownFunction((status) -> {
+                                       externalShutdownFuture.complete(status);
+                                       return 
CompletableFuture.completedFuture(Acknowledge.get());
+                               });
+
+               ApplicationDispatcherBootstrap bootstrap = 
createApplicationDispatcherBootstrap(3);
+
+               final CompletableFuture<Acknowledge> shutdownFuture =
+                               
bootstrap.runApplicationAndShutdownClusterAsync(dispatcherBuilder.build(), 
scheduledExecutor);
+
+               // wait until the bootstrap "thinks" it's done
+               shutdownFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+
+               // verify that the dispatcher is actually being shut down
+               assertThat(externalShutdownFuture.get(TIMEOUT_SECONDS, 
TimeUnit.SECONDS), is(ApplicationStatus.CANCELED));
+       }
+
+       @Test
+       public void testClusterDoesNOTShutdownWhenApplicationStatusUknown() 
throws Exception {
+               // we're "listening" on this to be completed to verify that the 
cluster
+               // is being shut down from the ApplicationDispatcherBootstrap
+               final TestingDispatcherGateway.Builder dispatcherBuilder = new 
TestingDispatcherGateway.Builder()
+                               .setSubmitFunction(jobGraph -> 
CompletableFuture.completedFuture(Acknowledge.get()))
+                               .setRequestJobStatusFunction(jobId -> 
CompletableFuture.completedFuture(JobStatus.FAILED))
+                               .setRequestJobResultFunction(jobId -> 
CompletableFuture.completedFuture(createUnknownJobResult(jobId)))
+                               .setClusterShutdownFunction(status -> {
+                                       fail("We should not call 
shutdownCluster()");
+                                       return 
CompletableFuture.completedFuture(Acknowledge.get());
+                               });
+
+               final ApplicationDispatcherBootstrap bootstrap = 
createApplicationDispatcherBootstrap(3);
+
+               final TestingDispatcherGateway dispatcherGateway = 
dispatcherBuilder.build();
+               final CompletableFuture<Acknowledge> applicationFuture =
+                               
bootstrap.runApplicationAndShutdownClusterAsync(dispatcherGateway, 
scheduledExecutor);
+
+               final ApplicationFailureException exception = 
assertException(applicationFuture, ApplicationFailureException.class);
+               assertEquals(exception.getStatus(), ApplicationStatus.UNKNOWN);
+       }
+
        private CompletableFuture<Void> runApplication(
                        TestingDispatcherGateway.Builder dispatcherBuilder,
                        int noOfJobs) throws FlinkException {
@@ -480,15 +523,8 @@ public class ApplicationDispatcherBootstrapTest {
        private ApplicationDispatcherBootstrap 
createApplicationDispatcherBootstrap(
                        int noOfJobs,
                        FatalErrorHandler errorHandler) throws FlinkException {
-               return createApplicationDispatcherBootstrap(noOfJobs, 
Collections.emptyList(), errorHandler);
-       }
-
-       private ApplicationDispatcherBootstrap 
createApplicationDispatcherBootstrap(
-                       int noOfJobs,
-                       Collection<JobGraph> recoveredJobGraphs,
-                       FatalErrorHandler errorHandler) throws FlinkException {
                final PackagedProgram program = getProgram(noOfJobs);
-               return new ApplicationDispatcherBootstrap(program, 
recoveredJobGraphs, getConfiguration(), errorHandler);
+               return new ApplicationDispatcherBootstrap(program, 
Collections.emptyList(), getConfiguration(), errorHandler);
        }
 
        private PackagedProgram getProgram(int noOfJobs) throws FlinkException {
@@ -503,6 +539,15 @@ public class ApplicationDispatcherBootstrapTest {
                }
        }
 
+       private static JobResult createUnknownJobResult(final JobID jobId) {
+               return new JobResult.Builder()
+                               .jobId(jobId)
+                               .netRuntime(2L)
+                               .applicationStatus(ApplicationStatus.UNKNOWN)
+                               .serializedThrowable(new 
SerializedThrowable(new JobExecutionException(jobId, "unknown bla bla bla")))
+                               .build();
+       }
+
        private static JobResult createFailedJobResult(final JobID jobId) {
                return new JobResult.Builder()
                                .jobId(jobId)
@@ -531,7 +576,7 @@ public class ApplicationDispatcherBootstrapTest {
                                .build();
        }
 
-       private static <T, E extends Throwable> void assertException(
+       private static <T, E extends Throwable> E assertException(
                        CompletableFuture<T> future,
                        Class<E> exceptionClass) throws Exception {
 
@@ -543,9 +588,9 @@ public class ApplicationDispatcherBootstrapTest {
                        if (!expectionException.isPresent()) {
                                throw e;
                        }
-                       return;
+                       return expectionException.get();
                }
-               fail("Future should have completed exceptionally with " + 
exceptionClass.getCanonicalName() + ".");
+               throw new Exception("Future should have completed exceptionally 
with " + exceptionClass.getCanonicalName() + ".");
        }
 
        private Configuration getConfiguration() {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/AbstractDispatcherBootstrap.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/AbstractDispatcherBootstrap.java
deleted file mode 100644
index 9dde71e..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/AbstractDispatcherBootstrap.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.dispatcher;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-
-import java.util.Collection;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * A base class for a {@link DispatcherBootstrap}.
- *
- * <p>NOTE TO IMPLEMENTERS: This is meant as a bridge between the 
package-private
- * {@link Dispatcher#runRecoveredJob(JobGraph)} method, and dispatcher 
bootstrap
- * implementations in other packages.
- */
-@Internal
-public abstract class AbstractDispatcherBootstrap implements 
DispatcherBootstrap {
-
-       protected void launchRecoveredJobGraphs(final Dispatcher dispatcher, 
final Collection<JobGraph> recoveredJobGraphs) {
-               checkNotNull(dispatcher);
-               checkNotNull(recoveredJobGraphs);
-
-               for (JobGraph recoveredJob : recoveredJobGraphs) {
-                       dispatcher.runRecoveredJob(recoveredJob);
-               }
-       }
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 013d22c..4aeba30 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -120,6 +120,8 @@ public abstract class Dispatcher extends 
PermanentlyFencedRpcEndpoint<Dispatcher
 
        private final Map<JobID, CompletableFuture<JobManagerRunner>> 
jobManagerRunnerFutures;
 
+       private final Collection<JobGraph> recoveredJobs;
+
        private final Function<FatalErrorHandler, DispatcherBootstrap> 
dispatcherBootstrapFactory;
 
        private final ArchivedExecutionGraphStore archivedExecutionGraphStore;
@@ -142,6 +144,7 @@ public abstract class Dispatcher extends 
PermanentlyFencedRpcEndpoint<Dispatcher
        public Dispatcher(
                        RpcService rpcService,
                        DispatcherId fencingToken,
+                       Collection<JobGraph> recoveredJobs,
                        Function<FatalErrorHandler, DispatcherBootstrap> 
dispatcherBootstrapFactory,
                        DispatcherServices dispatcherServices) throws Exception 
{
                super(rpcService, 
AkkaRpcServiceUtils.createRandomName(DISPATCHER_NAME), fencingToken);
@@ -177,6 +180,8 @@ public abstract class Dispatcher extends 
PermanentlyFencedRpcEndpoint<Dispatcher
                this.shutDownFuture = new CompletableFuture<>();
 
                this.dispatcherBootstrapFactory = 
checkNotNull(dispatcherBootstrapFactory);
+
+               this.recoveredJobs = new HashSet<>(recoveredJobs);
        }
 
        //------------------------------------------------------
@@ -201,8 +206,9 @@ public abstract class Dispatcher extends 
PermanentlyFencedRpcEndpoint<Dispatcher
                        throw exception;
                }
 
-               this.dispatcherBootstrap = 
this.dispatcherBootstrapFactory.apply(shutDownFuture::completeExceptionally);
-               this.dispatcherBootstrap.initialize(this, 
this.getRpcService().getScheduledExecutor());
+               this.dispatcherBootstrap = 
this.dispatcherBootstrapFactory.apply(this::onFatalError);
+               startRecoveredJobs();
+               
this.dispatcherBootstrap.initialize(getSelfGateway(DispatcherGateway.class), 
this.getRpcService().getScheduledExecutor());
        }
 
        private void startDispatcherServices() throws Exception {
@@ -213,7 +219,14 @@ public abstract class Dispatcher extends 
PermanentlyFencedRpcEndpoint<Dispatcher
                }
        }
 
-       void runRecoveredJob(final JobGraph recoveredJob) {
+       private void startRecoveredJobs() {
+               for (JobGraph recoveredJob : recoveredJobs) {
+                       runRecoveredJob(recoveredJob);
+               }
+               recoveredJobs.clear();
+       }
+
+       private void runRecoveredJob(final JobGraph recoveredJob) {
                checkNotNull(recoveredJob);
                FutureUtils.assertNoException(runJob(recoveredJob)
                        
.handle(handleRecoveredJobStartError(recoveredJob.getJobID())));
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherBootstrap.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherBootstrap.java
index e0a327b..5cf9336 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherBootstrap.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherBootstrap.java
@@ -35,11 +35,11 @@ public interface DispatcherBootstrap {
         *
         * @param dispatcher the dispatcher to be initialized.
         */
-       void initialize(final Dispatcher dispatcher, ScheduledExecutor 
scheduledExecutor) throws Exception;
+       void initialize(final DispatcherGateway dispatcher, ScheduledExecutor 
scheduledExecutor) throws Exception;
 
        /**
         * Stops and frees any resources (e.g. threads) acquired
-        * by the {@link #initialize(Dispatcher, ScheduledExecutor)}.
+        * by the {@link #initialize(DispatcherGateway, ScheduledExecutor)}.
         */
        void stop() throws Exception;
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherFactory.java
index e1f3827..54a8338 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherFactory.java
@@ -18,9 +18,11 @@
 
 package org.apache.flink.runtime.dispatcher;
 
+import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 
+import java.util.Collection;
 import java.util.function.Function;
 
 /**
@@ -34,6 +36,7 @@ public interface DispatcherFactory {
        Dispatcher createDispatcher(
                        RpcService rpcService,
                        DispatcherId fencingToken,
+                       Collection<JobGraph> recoveredJobs,
                        Function<FatalErrorHandler, DispatcherBootstrap> 
dispatcherBootstrapFactory,
                        PartialDispatcherServicesWithJobGraphStore 
partialDispatcherServicesWithJobGraphStore) throws Exception;
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobDispatcherFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobDispatcherFactory.java
index a68da40..7b08e10 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobDispatcherFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobDispatcherFactory.java
@@ -20,9 +20,13 @@ package org.apache.flink.runtime.dispatcher;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
+import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 
+import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
+
+import java.util.Collection;
 import java.util.function.Function;
 
 import static 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.EXECUTION_MODE;
@@ -37,8 +41,11 @@ public enum JobDispatcherFactory implements 
DispatcherFactory {
        public MiniDispatcher createDispatcher(
                        RpcService rpcService,
                        DispatcherId fencingToken,
+                       Collection<JobGraph> recoveredJobs,
                        Function<FatalErrorHandler, DispatcherBootstrap> 
dispatcherBootstrapFactory,
                        PartialDispatcherServicesWithJobGraphStore 
partialDispatcherServicesWithJobGraphStore) throws Exception {
+               final JobGraph jobGraph = 
Iterables.getOnlyElement(recoveredJobs);
+
                final Configuration configuration = 
partialDispatcherServicesWithJobGraphStore.getConfiguration();
                final String executionModeValue = 
configuration.getString(EXECUTION_MODE);
                final ClusterEntrypoint.ExecutionMode executionMode = 
ClusterEntrypoint.ExecutionMode.valueOf(executionModeValue);
@@ -47,6 +54,7 @@ public enum JobDispatcherFactory implements DispatcherFactory 
{
                        rpcService,
                        fencingToken,
                        
DispatcherServices.from(partialDispatcherServicesWithJobGraphStore, 
DefaultJobManagerRunnerFactory.INSTANCE),
+                       jobGraph,
                        dispatcherBootstrapFactory,
                        executionMode);
        }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
index 79872ff..568e456 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
@@ -34,6 +34,7 @@ import org.apache.flink.util.FlinkException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collections;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
 
@@ -57,11 +58,13 @@ public class MiniDispatcher extends Dispatcher {
                        RpcService rpcService,
                        DispatcherId fencingToken,
                        DispatcherServices dispatcherServices,
+                       JobGraph jobGraph,
                        Function<FatalErrorHandler, DispatcherBootstrap> 
dispatcherBootstrapFactory,
                        JobClusterEntrypoint.ExecutionMode executionMode) 
throws Exception {
                super(
                        rpcService,
                        fencingToken,
+                       Collections.singleton(jobGraph),
                        dispatcherBootstrapFactory,
                        dispatcherServices);
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultDispatcherBootstrap.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/NoOpDispatcherBootstrap.java
similarity index 68%
rename from 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultDispatcherBootstrap.java
rename to 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/NoOpDispatcherBootstrap.java
index 871b5a4..9d3934d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultDispatcherBootstrap.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/NoOpDispatcherBootstrap.java
@@ -22,28 +22,19 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 
-import java.util.Collection;
-import java.util.HashSet;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
 /**
  * A {@link DispatcherBootstrap} which submits the provided {@link JobGraph 
job graphs}
  * for execution upon dispatcher initialization.
  */
 @Internal
-public class DefaultDispatcherBootstrap extends AbstractDispatcherBootstrap {
+public class NoOpDispatcherBootstrap implements DispatcherBootstrap {
 
-       private final Collection<JobGraph> recoveredJobs;
-
-       public DefaultDispatcherBootstrap(final Collection<JobGraph> 
recoveredJobsGraphs) {
-               this.recoveredJobs = new 
HashSet<>(checkNotNull(recoveredJobsGraphs));
+       public NoOpDispatcherBootstrap() {
        }
 
        @Override
-       public void initialize(final Dispatcher dispatcher, ScheduledExecutor 
scheduledExecutor) {
-               launchRecoveredJobGraphs(dispatcher, recoveredJobs);
-               recoveredJobs.clear();
+       public void initialize(final DispatcherGateway dispatcher, 
ScheduledExecutor scheduledExecutor) {
+
        }
 
        @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SessionDispatcherFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SessionDispatcherFactory.java
index 35f8d32..05fcc14 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SessionDispatcherFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SessionDispatcherFactory.java
@@ -18,9 +18,11 @@
 
 package org.apache.flink.runtime.dispatcher;
 
+import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 
+import java.util.Collection;
 import java.util.function.Function;
 
 /**
@@ -33,12 +35,14 @@ public enum SessionDispatcherFactory implements 
DispatcherFactory {
        public StandaloneDispatcher createDispatcher(
                        RpcService rpcService,
                        DispatcherId fencingToken,
+                       Collection<JobGraph> recoveredJobs,
                        Function<FatalErrorHandler, DispatcherBootstrap> 
dispatcherBootstrapFactory,
                        PartialDispatcherServicesWithJobGraphStore 
partialDispatcherServicesWithJobGraphStore) throws Exception {
                // create the default dispatcher
                return new StandaloneDispatcher(
                        rpcService,
                        fencingToken,
+                       recoveredJobs,
                        dispatcherBootstrapFactory,
                        
DispatcherServices.from(partialDispatcherServicesWithJobGraphStore, 
DefaultJobManagerRunnerFactory.INSTANCE));
        }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
index 65bd4a3..6b7b942 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
@@ -23,6 +23,7 @@ import org.apache.flink.runtime.jobmaster.JobMaster;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 
+import java.util.Collection;
 import java.util.function.Function;
 
 /**
@@ -34,11 +35,13 @@ public class StandaloneDispatcher extends Dispatcher {
        public StandaloneDispatcher(
                        RpcService rpcService,
                        DispatcherId fencingToken,
+                       Collection<JobGraph> recoveredJobs,
                        Function<FatalErrorHandler, DispatcherBootstrap> 
dispatcherBootstrapFactory,
                        DispatcherServices dispatcherServices) throws Exception 
{
                super(
                        rpcService,
                        fencingToken,
+                       recoveredJobs,
                        dispatcherBootstrapFactory,
                        dispatcherServices);
        }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherGatewayServiceFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherGatewayServiceFactory.java
index e5312d7..adc1440 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherGatewayServiceFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherGatewayServiceFactory.java
@@ -18,11 +18,10 @@
 
 package org.apache.flink.runtime.dispatcher.runner;
 
-import org.apache.flink.runtime.dispatcher.DefaultDispatcherBootstrap;
 import org.apache.flink.runtime.dispatcher.Dispatcher;
-import org.apache.flink.runtime.dispatcher.DispatcherBootstrap;
 import org.apache.flink.runtime.dispatcher.DispatcherFactory;
 import org.apache.flink.runtime.dispatcher.DispatcherId;
+import org.apache.flink.runtime.dispatcher.NoOpDispatcherBootstrap;
 import org.apache.flink.runtime.dispatcher.PartialDispatcherServices;
 import 
org.apache.flink.runtime.dispatcher.PartialDispatcherServicesWithJobGraphStore;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -58,15 +57,13 @@ class DefaultDispatcherGatewayServiceFactory implements 
AbstractDispatcherLeader
                        Collection<JobGraph> recoveredJobs,
                        JobGraphWriter jobGraphWriter) {
 
-               final DispatcherBootstrap bootstrap =
-                               new DefaultDispatcherBootstrap(recoveredJobs);
-
                final Dispatcher dispatcher;
                try {
                        dispatcher = dispatcherFactory.createDispatcher(
                                rpcService,
                                fencingToken,
-                               errorHandler -> bootstrap,
+                               recoveredJobs,
+                               errorHandler -> new NoOpDispatcherBootstrap(),
                                
PartialDispatcherServicesWithJobGraphStore.from(partialDispatcherServices, 
jobGraphWriter));
                } catch (Exception e) {
                        throw new FlinkRuntimeException("Could not create the 
Dispatcher rpc endpoint.", e);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
index bc0cad5..06c9cc8b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
@@ -193,7 +193,8 @@ public class DispatcherResourceCleanupTest extends 
TestLogger {
                dispatcher = new TestingDispatcher(
                        rpcService,
                        DispatcherId.generate(),
-                       errorHandler -> new 
DefaultDispatcherBootstrap(Collections.emptyList()),
+                       Collections.emptyList(),
+                       errorHandler -> new NoOpDispatcherBootstrap(),
                        new DispatcherServices(
                                configuration,
                                highAvailabilityServices,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index 8215bc4..3ccc6b8 100755
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -204,8 +204,10 @@ public class DispatcherTest extends TestLogger {
 
        private class TestingDispatcherBuilder {
 
+               private Collection<JobGraph> initialJobGraphs = 
Collections.emptyList();
+
                private Function<FatalErrorHandler, DispatcherBootstrap> 
dispatcherBootstrapFactory =
-                               errorHandler -> new 
DefaultDispatcherBootstrap(Collections.emptyList());
+                               errorHandler -> new NoOpDispatcherBootstrap();
 
                private HeartbeatServices heartbeatServices = 
DispatcherTest.this.heartbeatServices;
 
@@ -225,6 +227,11 @@ public class DispatcherTest extends TestLogger {
                        return this;
                }
 
+               TestingDispatcherBuilder 
setInitialJobGraphs(Collection<JobGraph> initialJobGraphs) {
+                       this.initialJobGraphs = initialJobGraphs;
+                       return this;
+               }
+
                TestingDispatcherBuilder setDispatcherBootstrapFactory(
                                Function<FatalErrorHandler, 
DispatcherBootstrap> dispatcherBootstrapFactory) {
                        this.dispatcherBootstrapFactory = 
dispatcherBootstrapFactory;
@@ -249,6 +256,7 @@ public class DispatcherTest extends TestLogger {
                        return new TestingDispatcher(
                                rpcService,
                                DispatcherId.generate(),
+                               initialJobGraphs,
                                dispatcherBootstrapFactory,
                                new DispatcherServices(
                                        configuration,
@@ -449,7 +457,7 @@ public class DispatcherTest extends TestLogger {
                final JobGraph failingJobGraph = 
createFailingJobGraph(testException);
 
                dispatcher = new TestingDispatcherBuilder()
-                       .setDispatcherBootstrapFactory(errorHandler -> new 
DefaultDispatcherBootstrap(Collections.singleton(failingJobGraph)))
+                       
.setInitialJobGraphs(Collections.singleton(failingJobGraph))
                        .build();
 
                dispatcher.start();
@@ -601,7 +609,7 @@ public class DispatcherTest extends TestLogger {
                        .build();
 
                dispatcher = new TestingDispatcherBuilder()
-                       .setDispatcherBootstrapFactory(errorHandler -> new 
DefaultDispatcherBootstrap(Collections.singleton(jobGraph)))
+                       .setInitialJobGraphs(Collections.singleton(jobGraph))
                        .setJobGraphWriter(testingJobGraphStore)
                        .build();
                dispatcher.start();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
index 02dedab..4794797 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
@@ -52,7 +52,6 @@ import org.junit.rules.TemporaryFolder;
 import javax.annotation.Nonnull;
 
 import java.io.IOException;
-import java.util.Collections;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
@@ -251,7 +250,8 @@ public class MiniDispatcherTest extends TestLogger {
                                
UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
                                highAvailabilityServices.getJobGraphStore(),
                                testingJobManagerRunnerFactory),
-                       errorHandler -> new 
DefaultDispatcherBootstrap(Collections.singletonList(jobGraph)),
+                       jobGraph,
+                       errorHandler -> new NoOpDispatcherBootstrap(),
                        executionMode);
        }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
index fe6472a..7b2bbcb 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
@@ -21,11 +21,13 @@ package org.apache.flink.runtime.dispatcher;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 
 import javax.annotation.Nonnull;
 
+import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
 
@@ -39,11 +41,13 @@ class TestingDispatcher extends Dispatcher {
        TestingDispatcher(
                        RpcService rpcService,
                        DispatcherId fencingToken,
+                       Collection<JobGraph> recoveredJobs,
                        Function<FatalErrorHandler, DispatcherBootstrap> 
dispatcherBootstrapFactory,
                        DispatcherServices dispatcherServices) throws Exception 
{
                super(
                        rpcService,
                        fencingToken,
+                       recoveredJobs,
                        dispatcherBootstrapFactory,
                        dispatcherServices);
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerITCase.java
index d31a7f8..51325bf 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerITCase.java
@@ -208,11 +208,13 @@ public class DefaultDispatcherRunnerITCase extends 
TestLogger {
                public Dispatcher createDispatcher(
                        RpcService rpcService,
                        DispatcherId fencingToken,
+                       Collection<JobGraph> recoveredJobs,
                        Function<FatalErrorHandler, DispatcherBootstrap> 
dispatcherBootstrapFactory,
                        PartialDispatcherServicesWithJobGraphStore 
partialDispatcherServicesWithJobGraphStore) throws Exception {
                        return new StandaloneDispatcher(
                                rpcService,
                                fencingToken,
+                               recoveredJobs,
                                dispatcherBootstrapFactory,
                                
DispatcherServices.from(partialDispatcherServicesWithJobGraphStore, 
jobManagerRunnerFactory));
                }

Reply via email to