This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3dd57624b2df7a856352a64a89af8606ae0c61a4
Author: Kostas Kloudas <[email protected]>
AuthorDate: Mon Oct 19 09:49:14 2020 +0200

    [FLINK-19154] ApplicationDispatcherBootstrap cleans up HA data only on 
FAILED, CANCELLED, SUCCEEDED
    
    Depending on the status with which a job got terminated, we may
    want to shutdown the cluster and clean up all HA data, or not. To
    be able to differentiate between the different termination reasons
    we add the ApplicationFailureException.
    
    In addition, to be able to shutdown the cluster without cleaning up
    the HA data, we need to be able to terminate the dispatcher's shutdown 
future
    with an exception. This is what the new error handler pass in the
    ApplicationDispatcherBootstrap does. We chose to pass the FatalErrorHandler
    as a constructor argument because this allows for more robust code.
---
 .../ApplicationDispatcherBootstrap.java            | 68 +++++++---------
 ...ApplicationDispatcherGatewayServiceFactory.java |  6 +-
 .../application/ApplicationFailureException.java   | 82 +++++++++++++++++++
 .../ApplicationDispatcherBootstrapTest.java        | 93 +++++++++++++---------
 .../flink/runtime/dispatcher/Dispatcher.java       | 11 ++-
 .../runtime/dispatcher/DispatcherFactory.java      | 11 ++-
 .../runtime/dispatcher/JobDispatcherFactory.java   |  7 +-
 .../flink/runtime/dispatcher/MiniDispatcher.java   |  6 +-
 .../dispatcher/SessionDispatcherFactory.java       |  7 +-
 .../runtime/dispatcher/StandaloneDispatcher.java   |  7 +-
 .../DefaultDispatcherGatewayServiceFactory.java    |  2 +-
 .../dispatcher/DispatcherResourceCleanupTest.java  |  2 +-
 .../flink/runtime/dispatcher/DispatcherTest.java   | 15 ++--
 .../runtime/dispatcher/MiniDispatcherTest.java     |  2 +-
 .../runtime/dispatcher/TestingDispatcher.java      |  5 +-
 .../runner/DefaultDispatcherRunnerITCase.java      |  6 +-
 16 files changed, 218 insertions(+), 112 deletions(-)

diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java
index 2b4ddf3..e581e29 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java
@@ -30,7 +30,6 @@ import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.PipelineOptionsInternal;
 import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
-import org.apache.flink.runtime.client.JobCancellationException;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
@@ -42,8 +41,8 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.SerializedThrowable;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -52,7 +51,6 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Optional;
-import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ScheduledFuture;
@@ -85,6 +83,8 @@ public class ApplicationDispatcherBootstrap extends 
AbstractDispatcherBootstrap
 
        private final Configuration configuration;
 
+       private final FatalErrorHandler errorHandler;
+
        private CompletableFuture<Void> applicationCompletionFuture;
 
        private ScheduledFuture<?> applicationExecutionTask;
@@ -92,10 +92,12 @@ public class ApplicationDispatcherBootstrap extends 
AbstractDispatcherBootstrap
        public ApplicationDispatcherBootstrap(
                        final PackagedProgram application,
                        final Collection<JobGraph> recoveredJobs,
-                       final Configuration configuration) {
+                       final Configuration configuration,
+                       final FatalErrorHandler errorHandler) {
                this.configuration = checkNotNull(configuration);
                this.recoveredJobs = checkNotNull(recoveredJobs);
                this.application = checkNotNull(application);
+               this.errorHandler = checkNotNull(errorHandler);
        }
 
        @Override
@@ -138,28 +140,28 @@ public class ApplicationDispatcherBootstrap extends 
AbstractDispatcherBootstrap
 
                return applicationCompletionFuture
                                .handle((r, t) -> {
-                                       final ApplicationStatus 
applicationStatus;
                                        if (t != null) {
 
-                                               final 
Optional<JobCancellationException> cancellationException =
-                                                               
ExceptionUtils.findThrowable(t, JobCancellationException.class);
-
-                                               if 
(cancellationException.isPresent()) {
-                                                       // this means the Flink 
Job was cancelled
-                                                       applicationStatus = 
ApplicationStatus.CANCELED;
-                                               } else if (t instanceof 
CancellationException) {
-                                                       // this means that the 
future was cancelled
-                                                       applicationStatus = 
ApplicationStatus.UNKNOWN;
-                                               } else {
-                                                       applicationStatus = 
ApplicationStatus.FAILED;
+                                               final 
Optional<ApplicationFailureException> exception =
+                                                               
ExceptionUtils.findThrowable(t, ApplicationFailureException.class);
+
+                                               if (exception.isPresent()) {
+                                                       final ApplicationStatus 
applicationStatus = exception.get().getStatus();
+
+                                                       if (applicationStatus 
== ApplicationStatus.CANCELED || applicationStatus == ApplicationStatus.FAILED) 
{
+                                                               
LOG.info("Application {}: ", applicationStatus, t);
+                                                               return 
dispatcher.shutDownCluster(applicationStatus);
+                                                       }
                                                }
 
-                                               LOG.warn("Application {}: ", 
applicationStatus, t);
-                                       } else {
-                                               applicationStatus = 
ApplicationStatus.SUCCEEDED;
-                                               LOG.info("Application completed 
SUCCESSFULLY");
+                                               LOG.warn("Exiting with 
Application Status UNKNOWN: ", t);
+                                               
this.errorHandler.onFatalError(t);
+
+                                               return 
FutureUtils.<Acknowledge>completedExceptionally(t);
                                        }
-                                       return 
dispatcher.shutDownCluster(applicationStatus);
+
+                                       LOG.info("Application completed 
SUCCESSFULLY");
+                                       return 
dispatcher.shutDownCluster(ApplicationStatus.SUCCEEDED);
                                })
                                .thenCompose(Function.identity());
        }
@@ -275,31 +277,15 @@ public class ApplicationDispatcherBootstrap extends 
AbstractDispatcherBootstrap
         * Otherwise, this returns a future that is finished exceptionally 
(potentially with an
         * exception from the {@link JobResult}.
         */
-       private CompletableFuture<JobResult> unwrapJobResultException(
-                       CompletableFuture<JobResult> jobResult) {
+       private CompletableFuture<JobResult> unwrapJobResultException(final 
CompletableFuture<JobResult> jobResult) {
                return jobResult.thenApply(result -> {
                        if (result.isSuccess()) {
                                return result;
                        }
 
-                       Optional<SerializedThrowable> serializedThrowable = 
result.getSerializedThrowable();
-
-                       if (result.getApplicationStatus() == 
ApplicationStatus.CANCELED) {
-                               throw new CompletionException(
-                                               new JobCancellationException(
-                                                               
result.getJobId(),
-                                                               "Job was 
cancelled.",
-                                                               
serializedThrowable.orElse(null)));
-                       }
-
-                       if (serializedThrowable.isPresent()) {
-                               Throwable throwable =
-                                               serializedThrowable
-                                                               .get()
-                                                               
.deserializeError(application.getUserCodeClassLoader());
-                               throw new CompletionException(throwable);
-                       }
-                       throw new RuntimeException("Job execution failed for 
unknown reason.");
+                       throw new CompletionException(
+                                       
ApplicationFailureException.fromJobResult(
+                                                       result, 
application.getUserCodeClassLoader()));
                });
        }
 
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherGatewayServiceFactory.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherGatewayServiceFactory.java
index fdac2ec..1389b34 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherGatewayServiceFactory.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherGatewayServiceFactory.java
@@ -22,7 +22,6 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.dispatcher.Dispatcher;
-import org.apache.flink.runtime.dispatcher.DispatcherBootstrap;
 import org.apache.flink.runtime.dispatcher.DispatcherFactory;
 import org.apache.flink.runtime.dispatcher.DispatcherId;
 import org.apache.flink.runtime.dispatcher.PartialDispatcherServices;
@@ -78,15 +77,12 @@ public class ApplicationDispatcherGatewayServiceFactory 
implements AbstractDispa
                        Collection<JobGraph> recoveredJobs,
                        JobGraphWriter jobGraphWriter) {
 
-               final DispatcherBootstrap bootstrap =
-                               new ApplicationDispatcherBootstrap(application, 
recoveredJobs, configuration);
-
                final Dispatcher dispatcher;
                try {
                        dispatcher = dispatcherFactory.createDispatcher(
                                        rpcService,
                                        fencingToken,
-                                       bootstrap,
+                                       errorHandler -> new 
ApplicationDispatcherBootstrap(application, recoveredJobs, configuration, 
errorHandler),
                                        
PartialDispatcherServicesWithJobGraphStore.from(partialDispatcherServices, 
jobGraphWriter));
                } catch (Exception e) {
                        throw new FlinkRuntimeException("Could not create the 
Dispatcher rpc endpoint.", e);
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationFailureException.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationFailureException.java
new file mode 100644
index 0000000..7bb8719
--- /dev/null
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationFailureException.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment.application;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.util.FlinkException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Exception that signals the failure of an
+ * application with a given {@link ApplicationStatus}.
+ */
+@Internal
+public class ApplicationFailureException extends FlinkException {
+
+       private final JobID jobID;
+
+       private final ApplicationStatus status;
+
+       public ApplicationFailureException(
+                       final JobID jobID,
+                       final ApplicationStatus status,
+                       final String message,
+                       final Throwable cause) {
+               super(message, cause);
+               this.jobID = jobID;
+               this.status = checkNotNull(status);
+       }
+
+       public JobID getJobID() {
+               return jobID;
+       }
+
+       public ApplicationStatus getStatus() {
+               return status;
+       }
+
+       public static ApplicationFailureException fromJobResult(
+                       final JobResult result,
+                       final ClassLoader userClassLoader) {
+
+               checkState(result != null && !result.isSuccess());
+               checkNotNull(userClassLoader);
+
+               final JobID jobID = result.getJobId();
+               final ApplicationStatus status = result.getApplicationStatus();
+
+               final Throwable throwable = result
+                               .getSerializedThrowable()
+                               .map(ser -> 
ser.deserializeError(userClassLoader))
+                               .orElse(new FlinkException("Unknown reason."));
+
+               if (status == ApplicationStatus.CANCELED || status == 
ApplicationStatus.FAILED) {
+                       return new ApplicationFailureException(
+                                       jobID, status, "Application Status: " + 
status.name(), throwable);
+               }
+
+               return new ApplicationFailureException(
+                               jobID, ApplicationStatus.UNKNOWN, "Job failed 
for unknown reason.", throwable);
+       }
+}
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapTest.java
index 3b7aa2e..0d0fc03 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapTest.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.ExecutorUtils;
@@ -53,6 +54,7 @@ import java.net.MalformedURLException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Optional;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.Executors;
@@ -61,6 +63,7 @@ import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
 import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 
@@ -233,7 +236,17 @@ public class ApplicationDispatcherBootstrapTest {
 
                final CompletableFuture<Void> applicationFuture = 
runApplication(dispatcherBuilder, 2);
 
-               assertException(applicationFuture, JobExecutionException.class);
+               try {
+                       applicationFuture.get(TIMEOUT_SECONDS, 
TimeUnit.SECONDS);
+                       fail("Expected exception but the execution went 
smoothly.");
+               } catch (Throwable e) {
+                       Optional<ApplicationFailureException> expectedException 
=
+                                       ExceptionUtils.findThrowable(e, 
ApplicationFailureException.class);
+                       if (!expectedException.isPresent()) {
+                               throw e;
+                       }
+                       assertEquals(expectedException.get().getStatus(), 
ApplicationStatus.FAILED);
+               }
        }
 
        @Test
@@ -301,7 +314,11 @@ public class ApplicationDispatcherBootstrapTest {
                                .setSubmitFunction(jobGraph -> 
CompletableFuture.completedFuture(Acknowledge.get()))
                                .setRequestJobStatusFunction(jobId -> 
CompletableFuture.completedFuture(JobStatus.RUNNING));
 
-               ApplicationDispatcherBootstrap bootstrap = 
createApplicationDispatcherBootstrap(3);
+               // we're "listening" on this to be completed to verify that the 
error handler is called.
+               // In production, this will shut down the cluster with an 
exception.
+               final CompletableFuture<Void> errorHandlerFuture = new 
CompletableFuture<>();
+               final ApplicationDispatcherBootstrap bootstrap = 
createApplicationDispatcherBootstrap(
+                               3, errorHandlerFuture::completeExceptionally);
 
                final CompletableFuture<Acknowledge> shutdownFuture =
                                
bootstrap.runApplicationAndShutdownClusterAsync(dispatcherBuilder.build(), 
scheduledExecutor);
@@ -310,66 +327,61 @@ public class ApplicationDispatcherBootstrapTest {
 
                bootstrap.stop();
 
-               // wait until the bootstrap "thinks" it's done
-               shutdownFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+               // we call the error handler
+               assertException(errorHandlerFuture, 
CancellationException.class);
+
+               // we return a future that is completed exceptionally
+               assertException(shutdownFuture, CancellationException.class);
 
                // verify that the application task is being cancelled
                assertThat(applicationExecutionFuture.isCancelled(), is(true));
        }
 
        @Test
-       public void testClusterShutdownWhenStoppingBootstrap() throws Exception 
{
-               // we're "listening" on this to be completed to verify that the 
cluster
-               // is being shut down from the ApplicationDispatcherBootstrap
-               final CompletableFuture<ApplicationStatus> 
externalShutdownFuture = new CompletableFuture<>();
-
+       public void testErrorHandlerIsCalledWhenStoppingBootstrap() throws 
Exception {
                final TestingDispatcherGateway.Builder dispatcherBuilder = new 
TestingDispatcherGateway.Builder()
                                .setSubmitFunction(jobGraph -> 
CompletableFuture.completedFuture(Acknowledge.get()))
-                               .setRequestJobStatusFunction(jobId -> 
CompletableFuture.completedFuture(JobStatus.RUNNING))
-                               .setClusterShutdownFunction((status) -> {
-                                       externalShutdownFuture.complete(status);
-                                       return 
CompletableFuture.completedFuture(Acknowledge.get());
-                               });
+                               .setRequestJobStatusFunction(jobId -> 
CompletableFuture.completedFuture(JobStatus.RUNNING));
 
-               ApplicationDispatcherBootstrap bootstrap = 
createApplicationDispatcherBootstrap(2);
+               // we're "listening" on this to be completed to verify that the 
error handler is called.
+               // In production, this will shut down the cluster with an 
exception.
+               final CompletableFuture<Void> errorHandlerFuture = new 
CompletableFuture<>();
+               final ApplicationDispatcherBootstrap bootstrap = 
createApplicationDispatcherBootstrap(
+                               2, errorHandlerFuture::completeExceptionally);
 
                final CompletableFuture<Acknowledge> shutdownFuture =
                                
bootstrap.runApplicationAndShutdownClusterAsync(dispatcherBuilder.build(), 
scheduledExecutor);
 
                bootstrap.stop();
 
-               // wait until the bootstrap "thinks" it's done
-               shutdownFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+               // we call the error handler
+               assertException(errorHandlerFuture, 
CancellationException.class);
 
-               // verify that the dispatcher is actually being shut down
-               assertThat(externalShutdownFuture.get(TIMEOUT_SECONDS, 
TimeUnit.SECONDS), is(ApplicationStatus.UNKNOWN));
+               // we return a future that is completed exceptionally
+               assertException(shutdownFuture, CancellationException.class);
        }
 
        @Test
-       public void testClusterShutdownWhenSubmissionFails() throws Exception {
-               // we're "listening" on this to be completed to verify that the 
cluster
-               // is being shut down from the ApplicationDispatcherBootstrap
-               final CompletableFuture<ApplicationStatus> 
externalShutdownFuture = new CompletableFuture<>();
-
+       public void testErrorHandlerIsCalledWhenSubmissionFails() throws 
Exception {
                final TestingDispatcherGateway.Builder dispatcherBuilder = new 
TestingDispatcherGateway.Builder()
                                .setSubmitFunction(jobGraph -> {
                                        throw new 
FlinkRuntimeException("Nope!");
-                               })
-                               .setClusterShutdownFunction((status) -> {
-                                       externalShutdownFuture.complete(status);
-                                       return 
CompletableFuture.completedFuture(Acknowledge.get());
                                });
 
-               ApplicationDispatcherBootstrap bootstrap = 
createApplicationDispatcherBootstrap(3);
+               // we're "listening" on this to be completed to verify that the 
error handler is called.
+               // In production, this will shut down the cluster with an 
exception.
+               final CompletableFuture<Void> errorHandlerFuture = new 
CompletableFuture<>();
+               final ApplicationDispatcherBootstrap bootstrap = 
createApplicationDispatcherBootstrap(
+                               3, errorHandlerFuture::completeExceptionally);
 
                final CompletableFuture<Acknowledge> shutdownFuture =
                                
bootstrap.runApplicationAndShutdownClusterAsync(dispatcherBuilder.build(), 
scheduledExecutor);
 
-               // wait until the bootstrap "thinks" it's done
-               shutdownFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+               // we call the error handler
+               assertException(errorHandlerFuture, 
ApplicationExecutionException.class);
 
-               // verify that the dispatcher is actually being shut down
-               assertThat(externalShutdownFuture.get(TIMEOUT_SECONDS, 
TimeUnit.SECONDS), is(ApplicationStatus.FAILED));
+               // we return a future that is completed exceptionally
+               assertException(shutdownFuture, 
ApplicationExecutionException.class);
        }
 
        @Test
@@ -454,7 +466,7 @@ public class ApplicationDispatcherBootstrapTest {
 
                final ApplicationDispatcherBootstrap bootstrap =
                                new ApplicationDispatcherBootstrap(
-                                               program, 
Collections.emptyList(), configuration);
+                                               program, 
Collections.emptyList(), configuration, exception -> {});
 
                return bootstrap.fixJobIdAndRunApplicationAsync(
                                dispatcherBuilder.build(),
@@ -462,14 +474,21 @@ public class ApplicationDispatcherBootstrapTest {
        }
 
        private ApplicationDispatcherBootstrap 
createApplicationDispatcherBootstrap(int noOfJobs) throws FlinkException {
-               return createApplicationDispatcherBootstrap(noOfJobs, 
Collections.emptyList());
+               return createApplicationDispatcherBootstrap(noOfJobs, exception 
-> {});
+       }
+
+       private ApplicationDispatcherBootstrap 
createApplicationDispatcherBootstrap(
+                       int noOfJobs,
+                       FatalErrorHandler errorHandler) throws FlinkException {
+               return createApplicationDispatcherBootstrap(noOfJobs, 
Collections.emptyList(), errorHandler);
        }
 
        private ApplicationDispatcherBootstrap 
createApplicationDispatcherBootstrap(
                        int noOfJobs,
-                       Collection<JobGraph> recoveredJobGraphs) throws 
FlinkException {
+                       Collection<JobGraph> recoveredJobGraphs,
+                       FatalErrorHandler errorHandler) throws FlinkException {
                final PackagedProgram program = getProgram(noOfJobs);
-               return new ApplicationDispatcherBootstrap(program, 
recoveredJobGraphs, getConfiguration());
+               return new ApplicationDispatcherBootstrap(program, 
recoveredJobGraphs, getConfiguration(), errorHandler);
        }
 
        private PackagedProgram getProgram(int noOfJobs) throws FlinkException {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index c4f1670..013d22c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -120,7 +120,7 @@ public abstract class Dispatcher extends 
PermanentlyFencedRpcEndpoint<Dispatcher
 
        private final Map<JobID, CompletableFuture<JobManagerRunner>> 
jobManagerRunnerFutures;
 
-       private final DispatcherBootstrap dispatcherBootstrap;
+       private final Function<FatalErrorHandler, DispatcherBootstrap> 
dispatcherBootstrapFactory;
 
        private final ArchivedExecutionGraphStore archivedExecutionGraphStore;
 
@@ -137,10 +137,12 @@ public abstract class Dispatcher extends 
PermanentlyFencedRpcEndpoint<Dispatcher
 
        protected final CompletableFuture<ApplicationStatus> shutDownFuture;
 
+       private DispatcherBootstrap dispatcherBootstrap;
+
        public Dispatcher(
                        RpcService rpcService,
                        DispatcherId fencingToken,
-                       DispatcherBootstrap dispatcherBootstrap,
+                       Function<FatalErrorHandler, DispatcherBootstrap> 
dispatcherBootstrapFactory,
                        DispatcherServices dispatcherServices) throws Exception 
{
                super(rpcService, 
AkkaRpcServiceUtils.createRandomName(DISPATCHER_NAME), fencingToken);
                checkNotNull(dispatcherServices);
@@ -174,7 +176,7 @@ public abstract class Dispatcher extends 
PermanentlyFencedRpcEndpoint<Dispatcher
 
                this.shutDownFuture = new CompletableFuture<>();
 
-               this.dispatcherBootstrap = checkNotNull(dispatcherBootstrap);
+               this.dispatcherBootstrapFactory = 
checkNotNull(dispatcherBootstrapFactory);
        }
 
        //------------------------------------------------------
@@ -199,7 +201,8 @@ public abstract class Dispatcher extends 
PermanentlyFencedRpcEndpoint<Dispatcher
                        throw exception;
                }
 
-               dispatcherBootstrap.initialize(this, 
this.getRpcService().getScheduledExecutor());
+               this.dispatcherBootstrap = 
this.dispatcherBootstrapFactory.apply(shutDownFuture::completeExceptionally);
+               this.dispatcherBootstrap.initialize(this, 
this.getRpcService().getScheduledExecutor());
        }
 
        private void startDispatcherServices() throws Exception {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherFactory.java
index 71376c2..e1f3827 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherFactory.java
@@ -18,8 +18,11 @@
 
 package org.apache.flink.runtime.dispatcher;
 
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 
+import java.util.function.Function;
+
 /**
  * {@link Dispatcher} factory interface.
  */
@@ -29,8 +32,8 @@ public interface DispatcherFactory {
         * Create a {@link Dispatcher}.
         */
        Dispatcher createDispatcher(
-               RpcService rpcService,
-               DispatcherId fencingToken,
-               DispatcherBootstrap dispatcherBootstrap,
-               PartialDispatcherServicesWithJobGraphStore 
partialDispatcherServicesWithJobGraphStore) throws Exception;
+                       RpcService rpcService,
+                       DispatcherId fencingToken,
+                       Function<FatalErrorHandler, DispatcherBootstrap> 
dispatcherBootstrapFactory,
+                       PartialDispatcherServicesWithJobGraphStore 
partialDispatcherServicesWithJobGraphStore) throws Exception;
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobDispatcherFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobDispatcherFactory.java
index cc17177..a68da40 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobDispatcherFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobDispatcherFactory.java
@@ -20,8 +20,11 @@ package org.apache.flink.runtime.dispatcher;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 
+import java.util.function.Function;
+
 import static 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.EXECUTION_MODE;
 
 /**
@@ -34,7 +37,7 @@ public enum JobDispatcherFactory implements DispatcherFactory 
{
        public MiniDispatcher createDispatcher(
                        RpcService rpcService,
                        DispatcherId fencingToken,
-                       DispatcherBootstrap dispatcherBootstrap,
+                       Function<FatalErrorHandler, DispatcherBootstrap> 
dispatcherBootstrapFactory,
                        PartialDispatcherServicesWithJobGraphStore 
partialDispatcherServicesWithJobGraphStore) throws Exception {
                final Configuration configuration = 
partialDispatcherServicesWithJobGraphStore.getConfiguration();
                final String executionModeValue = 
configuration.getString(EXECUTION_MODE);
@@ -44,7 +47,7 @@ public enum JobDispatcherFactory implements DispatcherFactory 
{
                        rpcService,
                        fencingToken,
                        
DispatcherServices.from(partialDispatcherServicesWithJobGraphStore, 
DefaultJobManagerRunnerFactory.INSTANCE),
-                       dispatcherBootstrap,
+                       dispatcherBootstrapFactory,
                        executionMode);
        }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
index 1464b3c..79872ff 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
@@ -27,6 +27,7 @@ import 
org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.util.FlinkException;
 
@@ -34,6 +35,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -55,12 +57,12 @@ public class MiniDispatcher extends Dispatcher {
                        RpcService rpcService,
                        DispatcherId fencingToken,
                        DispatcherServices dispatcherServices,
-                       DispatcherBootstrap dispatcherBootstrap,
+                       Function<FatalErrorHandler, DispatcherBootstrap> 
dispatcherBootstrapFactory,
                        JobClusterEntrypoint.ExecutionMode executionMode) 
throws Exception {
                super(
                        rpcService,
                        fencingToken,
-                       dispatcherBootstrap,
+                       dispatcherBootstrapFactory,
                        dispatcherServices);
 
                this.executionMode = checkNotNull(executionMode);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SessionDispatcherFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SessionDispatcherFactory.java
index cc867ca..35f8d32 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SessionDispatcherFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SessionDispatcherFactory.java
@@ -18,8 +18,11 @@
 
 package org.apache.flink.runtime.dispatcher;
 
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 
+import java.util.function.Function;
+
 /**
  * {@link DispatcherFactory} which creates a {@link StandaloneDispatcher}.
  */
@@ -30,13 +33,13 @@ public enum SessionDispatcherFactory implements 
DispatcherFactory {
        public StandaloneDispatcher createDispatcher(
                        RpcService rpcService,
                        DispatcherId fencingToken,
-                       DispatcherBootstrap dispatcherBootstrap,
+                       Function<FatalErrorHandler, DispatcherBootstrap> 
dispatcherBootstrapFactory,
                        PartialDispatcherServicesWithJobGraphStore 
partialDispatcherServicesWithJobGraphStore) throws Exception {
                // create the default dispatcher
                return new StandaloneDispatcher(
                        rpcService,
                        fencingToken,
-                       dispatcherBootstrap,
+                       dispatcherBootstrapFactory,
                        
DispatcherServices.from(partialDispatcherServicesWithJobGraphStore, 
DefaultJobManagerRunnerFactory.INSTANCE));
        }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
index 4ba50c5..65bd4a3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
@@ -20,8 +20,11 @@ package org.apache.flink.runtime.dispatcher;
 
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 
+import java.util.function.Function;
+
 /**
  * Dispatcher implementation which spawns a {@link JobMaster} for each
  * submitted {@link JobGraph} within in the same process. This dispatcher
@@ -31,12 +34,12 @@ public class StandaloneDispatcher extends Dispatcher {
        public StandaloneDispatcher(
                        RpcService rpcService,
                        DispatcherId fencingToken,
-                       DispatcherBootstrap dispatcherBootstrap,
+                       Function<FatalErrorHandler, DispatcherBootstrap> 
dispatcherBootstrapFactory,
                        DispatcherServices dispatcherServices) throws Exception 
{
                super(
                        rpcService,
                        fencingToken,
-                       dispatcherBootstrap,
+                       dispatcherBootstrapFactory,
                        dispatcherServices);
        }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherGatewayServiceFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherGatewayServiceFactory.java
index 072d048..e5312d7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherGatewayServiceFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherGatewayServiceFactory.java
@@ -66,7 +66,7 @@ class DefaultDispatcherGatewayServiceFactory implements 
AbstractDispatcherLeader
                        dispatcher = dispatcherFactory.createDispatcher(
                                rpcService,
                                fencingToken,
-                               bootstrap,
+                               errorHandler -> bootstrap,
                                
PartialDispatcherServicesWithJobGraphStore.from(partialDispatcherServices, 
jobGraphWriter));
                } catch (Exception e) {
                        throw new FlinkRuntimeException("Could not create the 
Dispatcher rpc endpoint.", e);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
index d2ad995..bc0cad5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
@@ -193,7 +193,7 @@ public class DispatcherResourceCleanupTest extends 
TestLogger {
                dispatcher = new TestingDispatcher(
                        rpcService,
                        DispatcherId.generate(),
-                       new DefaultDispatcherBootstrap(Collections.emptyList()),
+                       errorHandler -> new 
DefaultDispatcherBootstrap(Collections.emptyList()),
                        new DispatcherServices(
                                configuration,
                                highAvailabilityServices,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index c04e765..8215bc4 100755
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -99,6 +99,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.function.Function;
 
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;
@@ -203,7 +204,8 @@ public class DispatcherTest extends TestLogger {
 
        private class TestingDispatcherBuilder {
 
-               private DispatcherBootstrap dispatcherBootstrap = new 
DefaultDispatcherBootstrap(Collections.emptyList());
+               private Function<FatalErrorHandler, DispatcherBootstrap> 
dispatcherBootstrapFactory =
+                               errorHandler -> new 
DefaultDispatcherBootstrap(Collections.emptyList());
 
                private HeartbeatServices heartbeatServices = 
DispatcherTest.this.heartbeatServices;
 
@@ -223,8 +225,9 @@ public class DispatcherTest extends TestLogger {
                        return this;
                }
 
-               TestingDispatcherBuilder 
setDispatcherBootstrap(DispatcherBootstrap dispatcherBootstrap) {
-                       this.dispatcherBootstrap = dispatcherBootstrap;
+               TestingDispatcherBuilder setDispatcherBootstrapFactory(
+                               Function<FatalErrorHandler, 
DispatcherBootstrap> dispatcherBootstrapFactory) {
+                       this.dispatcherBootstrapFactory = 
dispatcherBootstrapFactory;
                        return this;
                }
 
@@ -246,7 +249,7 @@ public class DispatcherTest extends TestLogger {
                        return new TestingDispatcher(
                                rpcService,
                                DispatcherId.generate(),
-                               dispatcherBootstrap,
+                               dispatcherBootstrapFactory,
                                new DispatcherServices(
                                        configuration,
                                        haServices,
@@ -446,7 +449,7 @@ public class DispatcherTest extends TestLogger {
                final JobGraph failingJobGraph = 
createFailingJobGraph(testException);
 
                dispatcher = new TestingDispatcherBuilder()
-                       .setDispatcherBootstrap(new 
DefaultDispatcherBootstrap(Collections.singleton(failingJobGraph)))
+                       .setDispatcherBootstrapFactory(errorHandler -> new 
DefaultDispatcherBootstrap(Collections.singleton(failingJobGraph)))
                        .build();
 
                dispatcher.start();
@@ -598,7 +601,7 @@ public class DispatcherTest extends TestLogger {
                        .build();
 
                dispatcher = new TestingDispatcherBuilder()
-                       .setDispatcherBootstrap(new 
DefaultDispatcherBootstrap(Collections.singleton(jobGraph)))
+                       .setDispatcherBootstrapFactory(errorHandler -> new 
DefaultDispatcherBootstrap(Collections.singleton(jobGraph)))
                        .setJobGraphWriter(testingJobGraphStore)
                        .build();
                dispatcher.start();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
index 2362d65..02dedab 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
@@ -251,7 +251,7 @@ public class MiniDispatcherTest extends TestLogger {
                                
UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
                                highAvailabilityServices.getJobGraphStore(),
                                testingJobManagerRunnerFactory),
-                       new 
DefaultDispatcherBootstrap(Collections.singletonList(jobGraph)),
+                       errorHandler -> new 
DefaultDispatcherBootstrap(Collections.singletonList(jobGraph)),
                        executionMode);
        }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
index 22e5d74..fe6472a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.dispatcher;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 
 import javax.annotation.Nonnull;
@@ -38,12 +39,12 @@ class TestingDispatcher extends Dispatcher {
        TestingDispatcher(
                        RpcService rpcService,
                        DispatcherId fencingToken,
-                       DispatcherBootstrap dispatcherBootstrap,
+                       Function<FatalErrorHandler, DispatcherBootstrap> 
dispatcherBootstrapFactory,
                        DispatcherServices dispatcherServices) throws Exception 
{
                super(
                        rpcService,
                        fencingToken,
-                       dispatcherBootstrap,
+                       dispatcherBootstrapFactory,
                        dispatcherServices);
 
                this.startFuture = new CompletableFuture<>();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerITCase.java
index bcd1eb5..d31a7f8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerITCase.java
@@ -44,6 +44,7 @@ import org.apache.flink.runtime.jobmanager.JobGraphStore;
 import org.apache.flink.runtime.jobmaster.TestingJobManagerRunner;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.TestingRpcServiceResource;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
@@ -65,6 +66,7 @@ import java.util.Collection;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
 
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.equalTo;
@@ -206,12 +208,12 @@ public class DefaultDispatcherRunnerITCase extends 
TestLogger {
                public Dispatcher createDispatcher(
                        RpcService rpcService,
                        DispatcherId fencingToken,
-                       DispatcherBootstrap dispatcherBootstrap,
+                       Function<FatalErrorHandler, DispatcherBootstrap> 
dispatcherBootstrapFactory,
                        PartialDispatcherServicesWithJobGraphStore 
partialDispatcherServicesWithJobGraphStore) throws Exception {
                        return new StandaloneDispatcher(
                                rpcService,
                                fencingToken,
-                               dispatcherBootstrap,
+                               dispatcherBootstrapFactory,
                                
DispatcherServices.from(partialDispatcherServicesWithJobGraphStore, 
jobManagerRunnerFactory));
                }
        }

Reply via email to