This is an automated email from the ASF dual-hosted git repository. kkloudas pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 69db02bddbb535bfc918e97a796037e122a87a71 Author: Kostas Kloudas <[email protected]> AuthorDate: Wed Oct 21 20:55:44 2020 +0200 [hotfix] Merge ApplicationDispatcherBootstrap#initialize() with constructor. --- .../ApplicationDispatcherBootstrap.java | 38 +++++----- ...ApplicationDispatcherGatewayServiceFactory.java | 3 +- .../ApplicationDispatcherBootstrapTest.java | 84 ++++++++++++---------- .../flink/runtime/dispatcher/Dispatcher.java | 10 +-- .../runtime/dispatcher/DispatcherBootstrap.java | 13 +--- ...tstrap.java => DispatcherBootstrapFactory.java} | 23 ++---- .../runtime/dispatcher/DispatcherFactory.java | 4 +- .../runtime/dispatcher/JobDispatcherFactory.java | 4 +- .../flink/runtime/dispatcher/MiniDispatcher.java | 4 +- .../dispatcher/NoOpDispatcherBootstrap.java | 6 -- .../dispatcher/SessionDispatcherFactory.java | 4 +- .../runtime/dispatcher/StandaloneDispatcher.java | 4 +- .../DefaultDispatcherGatewayServiceFactory.java | 2 +- .../dispatcher/DispatcherResourceCleanupTest.java | 2 +- .../flink/runtime/dispatcher/DispatcherTest.java | 7 +- .../runtime/dispatcher/MiniDispatcherTest.java | 2 +- .../runtime/dispatcher/TestingDispatcher.java | 3 +- .../runner/DefaultDispatcherRunnerITCase.java | 6 +- 18 files changed, 97 insertions(+), 122 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java index a817048..1443804 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java @@ -84,7 +84,9 @@ public class ApplicationDispatcherBootstrap implements DispatcherBootstrap { private final FatalErrorHandler errorHandler; - private CompletableFuture<Void> applicationCompletionFuture; + private final CompletableFuture<Void> applicationCompletionFuture; + + private final CompletableFuture<Acknowledge> clusterShutdownFuture; private ScheduledFuture<?> applicationExecutionTask; @@ -92,20 +94,19 @@ public class ApplicationDispatcherBootstrap implements DispatcherBootstrap { final PackagedProgram application, final Collection<JobID> recoveredJobIds, final Configuration configuration, + final DispatcherGateway dispatcherGateway, + final ScheduledExecutor scheduledExecutor, final FatalErrorHandler errorHandler) { this.configuration = checkNotNull(configuration); this.recoveredJobIds = checkNotNull(recoveredJobIds); this.application = checkNotNull(application); this.errorHandler = checkNotNull(errorHandler); - } - @Override - public void initialize(final DispatcherGateway dispatcherGateway, ScheduledExecutor scheduledExecutor) { - checkNotNull(dispatcherGateway); + this.applicationCompletionFuture = + fixJobIdAndRunApplicationAsync(dispatcherGateway, scheduledExecutor); - runApplicationAndShutdownClusterAsync( - dispatcherGateway, - scheduledExecutor); + this.clusterShutdownFuture = + runApplicationAndShutdownClusterAsync(dispatcherGateway); } @Override @@ -124,17 +125,21 @@ public class ApplicationDispatcherBootstrap implements DispatcherBootstrap { return applicationExecutionTask; } + @VisibleForTesting + CompletableFuture<Void> getApplicationCompletionFuture() { + return applicationCompletionFuture; + } + + @VisibleForTesting + CompletableFuture<Acknowledge> getClusterShutdownFuture() { + return clusterShutdownFuture; + } + /** * Runs the user program entrypoint and shuts down the given dispatcherGateway when * the application completes (either successfully or in case of failure). */ - @VisibleForTesting - CompletableFuture<Acknowledge> runApplicationAndShutdownClusterAsync( - final DispatcherGateway dispatcherGateway, - final ScheduledExecutor scheduledExecutor) { - - applicationCompletionFuture = fixJobIdAndRunApplicationAsync(dispatcherGateway, scheduledExecutor); - + private CompletableFuture<Acknowledge> runApplicationAndShutdownClusterAsync(final DispatcherGateway dispatcherGateway) { return applicationCompletionFuture .handle((r, t) -> { if (t != null) { @@ -163,8 +168,7 @@ public class ApplicationDispatcherBootstrap implements DispatcherBootstrap { .thenCompose(Function.identity()); } - @VisibleForTesting - CompletableFuture<Void> fixJobIdAndRunApplicationAsync( + private CompletableFuture<Void> fixJobIdAndRunApplicationAsync( final DispatcherGateway dispatcherGateway, final ScheduledExecutor scheduledExecutor) { diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherGatewayServiceFactory.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherGatewayServiceFactory.java index a749fad..a492cf8 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherGatewayServiceFactory.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherGatewayServiceFactory.java @@ -88,7 +88,8 @@ public class ApplicationDispatcherGatewayServiceFactory implements AbstractDispa rpcService, fencingToken, recoveredJobs, - errorHandler -> new ApplicationDispatcherBootstrap(application, recoveredJobIds, configuration, errorHandler), + (dispatcherGateway, scheduledExecutor, errorHandler) -> new ApplicationDispatcherBootstrap( + application, recoveredJobIds, configuration, dispatcherGateway, scheduledExecutor, errorHandler), PartialDispatcherServicesWithJobGraphStore.from(partialDispatcherServices, jobGraphWriter)); } catch (Exception e) { throw new FlinkRuntimeException("Could not create the Dispatcher rpc endpoint.", e); diff --git a/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapTest.java b/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapTest.java index df1c786..09c8dbe 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapTest.java @@ -33,6 +33,7 @@ import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.concurrent.ScheduledExecutor; import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter; +import org.apache.flink.runtime.dispatcher.DispatcherGateway; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.runtime.messages.Acknowledge; @@ -263,10 +264,11 @@ public class ApplicationDispatcherBootstrapTest { }) .setRequestJobResultFunction(jobId -> CompletableFuture.completedFuture(createCancelledJobResult(jobId))); - ApplicationDispatcherBootstrap bootstrap = createApplicationDispatcherBootstrap(3); + ApplicationDispatcherBootstrap bootstrap = createApplicationDispatcherBootstrap( + 3, dispatcherBuilder.build(), scheduledExecutor); final CompletableFuture<Acknowledge> shutdownFuture = - bootstrap.runApplicationAndShutdownClusterAsync(dispatcherBuilder.build(), scheduledExecutor); + bootstrap.getClusterShutdownFuture(); // wait until the bootstrap "thinks" it's done, also makes sure that we don't // fail the future exceptionally with a JobCancelledException @@ -282,10 +284,10 @@ public class ApplicationDispatcherBootstrapTest { .setRequestJobStatusFunction(jobId -> CompletableFuture.completedFuture(JobStatus.FINISHED)) .setRequestJobResultFunction(jobId -> CompletableFuture.completedFuture(createSuccessfulJobResult(jobId))); - ApplicationDispatcherBootstrap bootstrap = createApplicationDispatcherBootstrap(3); + ApplicationDispatcherBootstrap bootstrap = createApplicationDispatcherBootstrap( + 3, dispatcherBuilder.build(), scheduledExecutor); - final CompletableFuture<Acknowledge> shutdownFuture = - bootstrap.runApplicationAndShutdownClusterAsync(dispatcherBuilder.build(), scheduledExecutor); + final CompletableFuture<Acknowledge> shutdownFuture = bootstrap.getClusterShutdownFuture(); ScheduledFuture<?> applicationExecutionFuture = bootstrap.getApplicationExecutionFuture(); @@ -306,10 +308,9 @@ public class ApplicationDispatcherBootstrapTest { // In production, this will shut down the cluster with an exception. final CompletableFuture<Void> errorHandlerFuture = new CompletableFuture<>(); final ApplicationDispatcherBootstrap bootstrap = createApplicationDispatcherBootstrap( - 3, errorHandlerFuture::completeExceptionally); + 3, dispatcherBuilder.build(), scheduledExecutor, errorHandlerFuture::completeExceptionally); - final CompletableFuture<Acknowledge> shutdownFuture = - bootstrap.runApplicationAndShutdownClusterAsync(dispatcherBuilder.build(), scheduledExecutor); + final CompletableFuture<Acknowledge> shutdownFuture = bootstrap.getClusterShutdownFuture(); ScheduledFuture<?> applicationExecutionFuture = bootstrap.getApplicationExecutionFuture(); @@ -335,10 +336,9 @@ public class ApplicationDispatcherBootstrapTest { // In production, this will shut down the cluster with an exception. final CompletableFuture<Void> errorHandlerFuture = new CompletableFuture<>(); final ApplicationDispatcherBootstrap bootstrap = createApplicationDispatcherBootstrap( - 2, errorHandlerFuture::completeExceptionally); + 2, dispatcherBuilder.build(), scheduledExecutor, errorHandlerFuture::completeExceptionally); - final CompletableFuture<Acknowledge> shutdownFuture = - bootstrap.runApplicationAndShutdownClusterAsync(dispatcherBuilder.build(), scheduledExecutor); + final CompletableFuture<Acknowledge> shutdownFuture = bootstrap.getClusterShutdownFuture(); bootstrap.stop(); @@ -363,12 +363,11 @@ public class ApplicationDispatcherBootstrapTest { // we're "listening" on this to be completed to verify that the error handler is called. // In production, this will shut down the cluster with an exception. final CompletableFuture<Void> errorHandlerFuture = new CompletableFuture<>(); + final TestingDispatcherGateway dispatcherGateway = dispatcherBuilder.build(); final ApplicationDispatcherBootstrap bootstrap = createApplicationDispatcherBootstrap( - 3, errorHandlerFuture::completeExceptionally); + 3, dispatcherGateway, scheduledExecutor, errorHandlerFuture::completeExceptionally); - final TestingDispatcherGateway dispatcherGateway = dispatcherBuilder.build(); - final CompletableFuture<Acknowledge> shutdownFuture = - bootstrap.runApplicationAndShutdownClusterAsync(dispatcherGateway, scheduledExecutor); + final CompletableFuture<Acknowledge> shutdownFuture = bootstrap.getClusterShutdownFuture(); // we call the error handler assertException(errorHandlerFuture, ApplicationExecutionException.class); @@ -392,10 +391,10 @@ public class ApplicationDispatcherBootstrapTest { return CompletableFuture.completedFuture(Acknowledge.get()); }); - ApplicationDispatcherBootstrap bootstrap = createApplicationDispatcherBootstrap(3); + ApplicationDispatcherBootstrap bootstrap = createApplicationDispatcherBootstrap( + 3, dispatcherBuilder.build(), scheduledExecutor); - final CompletableFuture<Acknowledge> shutdownFuture = - bootstrap.runApplicationAndShutdownClusterAsync(dispatcherBuilder.build(), scheduledExecutor); + final CompletableFuture<Acknowledge> shutdownFuture = bootstrap.getClusterShutdownFuture(); // wait until the bootstrap "thinks" it's done shutdownFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS); @@ -419,10 +418,10 @@ public class ApplicationDispatcherBootstrapTest { return CompletableFuture.completedFuture(Acknowledge.get()); }); - ApplicationDispatcherBootstrap bootstrap = createApplicationDispatcherBootstrap(3); + ApplicationDispatcherBootstrap bootstrap = createApplicationDispatcherBootstrap( + 3, dispatcherBuilder.build(), scheduledExecutor); - final CompletableFuture<Acknowledge> shutdownFuture = - bootstrap.runApplicationAndShutdownClusterAsync(dispatcherBuilder.build(), scheduledExecutor); + final CompletableFuture<Acknowledge> shutdownFuture = bootstrap.getClusterShutdownFuture(); // wait until the bootstrap "thinks" it's done shutdownFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS); @@ -446,10 +445,10 @@ public class ApplicationDispatcherBootstrapTest { return CompletableFuture.completedFuture(Acknowledge.get()); }); - ApplicationDispatcherBootstrap bootstrap = createApplicationDispatcherBootstrap(3); + ApplicationDispatcherBootstrap bootstrap = + createApplicationDispatcherBootstrap(3, dispatcherBuilder.build(), scheduledExecutor); - final CompletableFuture<Acknowledge> shutdownFuture = - bootstrap.runApplicationAndShutdownClusterAsync(dispatcherBuilder.build(), scheduledExecutor); + final CompletableFuture<Acknowledge> shutdownFuture = bootstrap.getClusterShutdownFuture(); // wait until the bootstrap "thinks" it's done shutdownFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS); @@ -471,11 +470,11 @@ public class ApplicationDispatcherBootstrapTest { return CompletableFuture.completedFuture(Acknowledge.get()); }); - final ApplicationDispatcherBootstrap bootstrap = createApplicationDispatcherBootstrap(3); - final TestingDispatcherGateway dispatcherGateway = dispatcherBuilder.build(); - final CompletableFuture<Acknowledge> applicationFuture = - bootstrap.runApplicationAndShutdownClusterAsync(dispatcherGateway, scheduledExecutor); + final ApplicationDispatcherBootstrap bootstrap = + createApplicationDispatcherBootstrap(3, dispatcherGateway, scheduledExecutor); + + final CompletableFuture<Acknowledge> applicationFuture = bootstrap.getClusterShutdownFuture(); final ApplicationFailureException exception = assertException(applicationFuture, ApplicationFailureException.class); assertEquals(exception.getStatus(), ApplicationStatus.UNKNOWN); @@ -509,22 +508,31 @@ public class ApplicationDispatcherBootstrapTest { final ApplicationDispatcherBootstrap bootstrap = new ApplicationDispatcherBootstrap( - program, Collections.emptyList(), configuration, exception -> {}); - - return bootstrap.fixJobIdAndRunApplicationAsync( - dispatcherBuilder.build(), - scheduledExecutor); + program, + Collections.emptyList(), + configuration, + dispatcherBuilder.build(), + scheduledExecutor, + exception -> {}); + + return bootstrap.getApplicationCompletionFuture(); } - private ApplicationDispatcherBootstrap createApplicationDispatcherBootstrap(int noOfJobs) throws FlinkException { - return createApplicationDispatcherBootstrap(noOfJobs, exception -> {}); + private ApplicationDispatcherBootstrap createApplicationDispatcherBootstrap( + final int noOfJobs, + final DispatcherGateway dispatcherGateway, + final ScheduledExecutor scheduledExecutor) throws FlinkException { + return createApplicationDispatcherBootstrap(noOfJobs, dispatcherGateway, scheduledExecutor, exception -> {}); } private ApplicationDispatcherBootstrap createApplicationDispatcherBootstrap( - int noOfJobs, - FatalErrorHandler errorHandler) throws FlinkException { + final int noOfJobs, + final DispatcherGateway dispatcherGateway, + final ScheduledExecutor scheduledExecutor, + final FatalErrorHandler errorHandler) throws FlinkException { final PackagedProgram program = getProgram(noOfJobs); - return new ApplicationDispatcherBootstrap(program, Collections.emptyList(), getConfiguration(), errorHandler); + return new ApplicationDispatcherBootstrap( + program, Collections.emptyList(), getConfiguration(), dispatcherGateway, scheduledExecutor, errorHandler); } private PackagedProgram getProgram(int noOfJobs) throws FlinkException { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java index 4aeba30..55e2c3d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java @@ -122,7 +122,7 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher private final Collection<JobGraph> recoveredJobs; - private final Function<FatalErrorHandler, DispatcherBootstrap> dispatcherBootstrapFactory; + private final DispatcherBootstrapFactory dispatcherBootstrapFactory; private final ArchivedExecutionGraphStore archivedExecutionGraphStore; @@ -145,7 +145,7 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher RpcService rpcService, DispatcherId fencingToken, Collection<JobGraph> recoveredJobs, - Function<FatalErrorHandler, DispatcherBootstrap> dispatcherBootstrapFactory, + DispatcherBootstrapFactory dispatcherBootstrapFactory, DispatcherServices dispatcherServices) throws Exception { super(rpcService, AkkaRpcServiceUtils.createRandomName(DISPATCHER_NAME), fencingToken); checkNotNull(dispatcherServices); @@ -206,9 +206,11 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher throw exception; } - this.dispatcherBootstrap = this.dispatcherBootstrapFactory.apply(this::onFatalError); startRecoveredJobs(); - this.dispatcherBootstrap.initialize(getSelfGateway(DispatcherGateway.class), this.getRpcService().getScheduledExecutor()); + this.dispatcherBootstrap = this.dispatcherBootstrapFactory.create( + getSelfGateway(DispatcherGateway.class), + this.getRpcService().getScheduledExecutor() , + this::onFatalError); } private void startDispatcherServices() throws Exception { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherBootstrap.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherBootstrap.java index 5cf9336..d8b7b80 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherBootstrap.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherBootstrap.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.dispatcher; import org.apache.flink.annotation.Internal; -import org.apache.flink.runtime.concurrent.ScheduledExecutor; /** * An interface containing the logic of bootstrapping the {@link Dispatcher} of a cluster. @@ -28,18 +27,8 @@ import org.apache.flink.runtime.concurrent.ScheduledExecutor; public interface DispatcherBootstrap { /** - * Initializes the {@link Dispatcher} provided as an argument. - * - * <p>IMPORTANT: In HA settings, this method will run during - * the initialization of the **leader** dispatcher. - * - * @param dispatcher the dispatcher to be initialized. - */ - void initialize(final DispatcherGateway dispatcher, ScheduledExecutor scheduledExecutor) throws Exception; - - /** * Stops and frees any resources (e.g. threads) acquired - * by the {@link #initialize(DispatcherGateway, ScheduledExecutor)}. + * during the execution of the bootstrap. */ void stop() throws Exception; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/NoOpDispatcherBootstrap.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherBootstrapFactory.java similarity index 67% copy from flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/NoOpDispatcherBootstrap.java copy to flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherBootstrapFactory.java index 9d3934d..2cbba75 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/NoOpDispatcherBootstrap.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherBootstrapFactory.java @@ -20,25 +20,16 @@ package org.apache.flink.runtime.dispatcher; import org.apache.flink.annotation.Internal; import org.apache.flink.runtime.concurrent.ScheduledExecutor; -import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.rpc.FatalErrorHandler; /** - * A {@link DispatcherBootstrap} which submits the provided {@link JobGraph job graphs} - * for execution upon dispatcher initialization. + * A factory to create a {@link DispatcherBootstrap}. */ @Internal -public class NoOpDispatcherBootstrap implements DispatcherBootstrap { +public interface DispatcherBootstrapFactory { - public NoOpDispatcherBootstrap() { - } - - @Override - public void initialize(final DispatcherGateway dispatcher, ScheduledExecutor scheduledExecutor) { - - } - - @Override - public void stop() throws Exception { - // do nothing - } + DispatcherBootstrap create( + final DispatcherGateway dispatcher, + final ScheduledExecutor scheduledExecutor, + final FatalErrorHandler errorHandler) throws Exception; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherFactory.java index 54a8338..2ad968d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherFactory.java @@ -19,11 +19,9 @@ package org.apache.flink.runtime.dispatcher; import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; import java.util.Collection; -import java.util.function.Function; /** * {@link Dispatcher} factory interface. @@ -37,6 +35,6 @@ public interface DispatcherFactory { RpcService rpcService, DispatcherId fencingToken, Collection<JobGraph> recoveredJobs, - Function<FatalErrorHandler, DispatcherBootstrap> dispatcherBootstrapFactory, + DispatcherBootstrapFactory dispatcherBootstrapFactory, PartialDispatcherServicesWithJobGraphStore partialDispatcherServicesWithJobGraphStore) throws Exception; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobDispatcherFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobDispatcherFactory.java index 7b08e10..c779c19 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobDispatcherFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobDispatcherFactory.java @@ -21,13 +21,11 @@ package org.apache.flink.runtime.dispatcher; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.entrypoint.ClusterEntrypoint; import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables; import java.util.Collection; -import java.util.function.Function; import static org.apache.flink.runtime.entrypoint.ClusterEntrypoint.EXECUTION_MODE; @@ -42,7 +40,7 @@ public enum JobDispatcherFactory implements DispatcherFactory { RpcService rpcService, DispatcherId fencingToken, Collection<JobGraph> recoveredJobs, - Function<FatalErrorHandler, DispatcherBootstrap> dispatcherBootstrapFactory, + DispatcherBootstrapFactory dispatcherBootstrapFactory, PartialDispatcherServicesWithJobGraphStore partialDispatcherServicesWithJobGraphStore) throws Exception { final JobGraph jobGraph = Iterables.getOnlyElement(recoveredJobs); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java index 568e456..b06453d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java @@ -27,7 +27,6 @@ import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.runtime.messages.Acknowledge; -import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.util.FlinkException; @@ -36,7 +35,6 @@ import org.slf4j.LoggerFactory; import java.util.Collections; import java.util.concurrent.CompletableFuture; -import java.util.function.Function; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -59,7 +57,7 @@ public class MiniDispatcher extends Dispatcher { DispatcherId fencingToken, DispatcherServices dispatcherServices, JobGraph jobGraph, - Function<FatalErrorHandler, DispatcherBootstrap> dispatcherBootstrapFactory, + DispatcherBootstrapFactory dispatcherBootstrapFactory, JobClusterEntrypoint.ExecutionMode executionMode) throws Exception { super( rpcService, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/NoOpDispatcherBootstrap.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/NoOpDispatcherBootstrap.java index 9d3934d..b4a3b36 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/NoOpDispatcherBootstrap.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/NoOpDispatcherBootstrap.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.dispatcher; import org.apache.flink.annotation.Internal; -import org.apache.flink.runtime.concurrent.ScheduledExecutor; import org.apache.flink.runtime.jobgraph.JobGraph; /** @@ -33,11 +32,6 @@ public class NoOpDispatcherBootstrap implements DispatcherBootstrap { } @Override - public void initialize(final DispatcherGateway dispatcher, ScheduledExecutor scheduledExecutor) { - - } - - @Override public void stop() throws Exception { // do nothing } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SessionDispatcherFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SessionDispatcherFactory.java index 05fcc14..3aa7b4b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SessionDispatcherFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SessionDispatcherFactory.java @@ -19,11 +19,9 @@ package org.apache.flink.runtime.dispatcher; import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; import java.util.Collection; -import java.util.function.Function; /** * {@link DispatcherFactory} which creates a {@link StandaloneDispatcher}. @@ -36,7 +34,7 @@ public enum SessionDispatcherFactory implements DispatcherFactory { RpcService rpcService, DispatcherId fencingToken, Collection<JobGraph> recoveredJobs, - Function<FatalErrorHandler, DispatcherBootstrap> dispatcherBootstrapFactory, + DispatcherBootstrapFactory dispatcherBootstrapFactory, PartialDispatcherServicesWithJobGraphStore partialDispatcherServicesWithJobGraphStore) throws Exception { // create the default dispatcher return new StandaloneDispatcher( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java index 6b7b942..8d88d6d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java @@ -20,11 +20,9 @@ package org.apache.flink.runtime.dispatcher; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmaster.JobMaster; -import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; import java.util.Collection; -import java.util.function.Function; /** * Dispatcher implementation which spawns a {@link JobMaster} for each @@ -36,7 +34,7 @@ public class StandaloneDispatcher extends Dispatcher { RpcService rpcService, DispatcherId fencingToken, Collection<JobGraph> recoveredJobs, - Function<FatalErrorHandler, DispatcherBootstrap> dispatcherBootstrapFactory, + DispatcherBootstrapFactory dispatcherBootstrapFactory, DispatcherServices dispatcherServices) throws Exception { super( rpcService, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherGatewayServiceFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherGatewayServiceFactory.java index adc1440..67a3231 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherGatewayServiceFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherGatewayServiceFactory.java @@ -63,7 +63,7 @@ class DefaultDispatcherGatewayServiceFactory implements AbstractDispatcherLeader rpcService, fencingToken, recoveredJobs, - errorHandler -> new NoOpDispatcherBootstrap(), + (dispatcherGateway, scheduledExecutor, errorHandler) -> new NoOpDispatcherBootstrap(), PartialDispatcherServicesWithJobGraphStore.from(partialDispatcherServices, jobGraphWriter)); } catch (Exception e) { throw new FlinkRuntimeException("Could not create the Dispatcher rpc endpoint.", e); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java index 06c9cc8b..fdbf165 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java @@ -194,7 +194,7 @@ public class DispatcherResourceCleanupTest extends TestLogger { rpcService, DispatcherId.generate(), Collections.emptyList(), - errorHandler -> new NoOpDispatcherBootstrap(), + (dispatcher, scheduledExecutor, errorHandler) -> new NoOpDispatcherBootstrap(), new DispatcherServices( configuration, highAvailabilityServices, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java index 3ccc6b8..1ffaf86 100755 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java @@ -99,7 +99,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.function.Function; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; @@ -206,8 +205,8 @@ public class DispatcherTest extends TestLogger { private Collection<JobGraph> initialJobGraphs = Collections.emptyList(); - private Function<FatalErrorHandler, DispatcherBootstrap> dispatcherBootstrapFactory = - errorHandler -> new NoOpDispatcherBootstrap(); + private DispatcherBootstrapFactory dispatcherBootstrapFactory = + (dispatcher, scheduledExecutor, errorHandler) -> new NoOpDispatcherBootstrap(); private HeartbeatServices heartbeatServices = DispatcherTest.this.heartbeatServices; @@ -233,7 +232,7 @@ public class DispatcherTest extends TestLogger { } TestingDispatcherBuilder setDispatcherBootstrapFactory( - Function<FatalErrorHandler, DispatcherBootstrap> dispatcherBootstrapFactory) { + DispatcherBootstrapFactory dispatcherBootstrapFactory) { this.dispatcherBootstrapFactory = dispatcherBootstrapFactory; return this; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java index 4794797..bcded74 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java @@ -251,7 +251,7 @@ public class MiniDispatcherTest extends TestLogger { highAvailabilityServices.getJobGraphStore(), testingJobManagerRunnerFactory), jobGraph, - errorHandler -> new NoOpDispatcherBootstrap(), + (dispatcher, scheduledExecutor, errorHandler) -> new NoOpDispatcherBootstrap(), executionMode); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java index 7b2bbcb..646fcda 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java @@ -22,7 +22,6 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; import javax.annotation.Nonnull; @@ -42,7 +41,7 @@ class TestingDispatcher extends Dispatcher { RpcService rpcService, DispatcherId fencingToken, Collection<JobGraph> recoveredJobs, - Function<FatalErrorHandler, DispatcherBootstrap> dispatcherBootstrapFactory, + DispatcherBootstrapFactory dispatcherBootstrapFactory, DispatcherServices dispatcherServices) throws Exception { super( rpcService, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerITCase.java index 51325bf..2d5613e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerITCase.java @@ -22,7 +22,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.dispatcher.Dispatcher; -import org.apache.flink.runtime.dispatcher.DispatcherBootstrap; +import org.apache.flink.runtime.dispatcher.DispatcherBootstrapFactory; import org.apache.flink.runtime.dispatcher.DispatcherFactory; import org.apache.flink.runtime.dispatcher.DispatcherGateway; import org.apache.flink.runtime.dispatcher.DispatcherId; @@ -44,7 +44,6 @@ import org.apache.flink.runtime.jobmanager.JobGraphStore; import org.apache.flink.runtime.jobmaster.TestingJobManagerRunner; import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; -import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.TestingRpcServiceResource; import org.apache.flink.runtime.testingUtils.TestingUtils; @@ -66,7 +65,6 @@ import java.util.Collection; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -import java.util.function.Function; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.equalTo; @@ -209,7 +207,7 @@ public class DefaultDispatcherRunnerITCase extends TestLogger { RpcService rpcService, DispatcherId fencingToken, Collection<JobGraph> recoveredJobs, - Function<FatalErrorHandler, DispatcherBootstrap> dispatcherBootstrapFactory, + DispatcherBootstrapFactory dispatcherBootstrapFactory, PartialDispatcherServicesWithJobGraphStore partialDispatcherServicesWithJobGraphStore) throws Exception { return new StandaloneDispatcher( rpcService,
