This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9f5fd073bd128006571c9e788c87938125fef52d
Author: Till Rohrmann <[email protected]>
AuthorDate: Thu Sep 27 14:17:21 2018 +0200

    [FLINK-10411] Introduce DispatcherResourceManagerComponentFactory
    
    This commit introduces the DispatcherResourceManagerComponentFactory which 
is used
    to create a DispatcherResourceManagerComponent. That way, it is possible to 
eagerly
    initialize all fields of the DispatcherResourceManagerComponent making it 
possible
    to make all fields final and remove the lock.
    
    This closes #6743.
---
 .../entrypoint/ClassPathJobGraphRetriever.java     |   2 +-
 .../entrypoint/StandaloneJobClusterEntryPoint.java |   8 +-
 .../entrypoint/MesosJobClusterEntrypoint.java      |  10 +-
 .../entrypoint/MesosSessionClusterEntrypoint.java  |  17 +-
 .../runtime/dispatcher/JobDispatcherFactory.java   |   2 +-
 .../runtime/entrypoint/ClusterEntrypoint.java      |  10 +-
 .../SessionDispatcherResourceManagerComponent.java |  34 ---
 .../StandaloneSessionClusterEntrypoint.java        |   6 +-
 ...DispatcherResourceManagerComponentFactory.java} | 261 +++++++--------------
 .../DispatcherResourceManagerComponent.java        | 180 ++++++++++++++
 .../DispatcherResourceManagerComponentFactory.java |  45 ++++
 .../{ => component}/FileJobGraphRetriever.java     |   2 +-
 .../JobDispatcherResourceManagerComponent.java     |  28 ++-
 ...bDispatcherResourceManagerComponentFactory.java |  58 +++++
 .../{ => component}/JobGraphRetriever.java         |   2 +-
 .../SessionDispatcherResourceManagerComponent.java |  40 ++++
 ...nDispatcherResourceManagerComponentFactory.java |  58 +++++
 .../yarn/entrypoint/YarnJobClusterEntrypoint.java  |  10 +-
 .../entrypoint/YarnSessionClusterEntrypoint.java   |   8 +-
 19 files changed, 522 insertions(+), 259 deletions(-)

diff --git 
a/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java
 
b/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java
index 5ded04b..3e0645d 100644
--- 
a/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java
+++ 
b/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java
@@ -23,7 +23,7 @@ import org.apache.flink.client.program.PackagedProgramUtils;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
-import org.apache.flink.runtime.entrypoint.JobGraphRetriever;
+import org.apache.flink.runtime.entrypoint.component.JobGraphRetriever;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.util.FlinkException;
diff --git 
a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java
 
b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java
index b81d992..6c42bf2 100644
--- 
a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java
+++ 
b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java
@@ -19,11 +19,11 @@
 package org.apache.flink.container.entrypoint;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.entrypoint.ClusterComponent;
 import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
 import org.apache.flink.runtime.entrypoint.FlinkParseException;
-import org.apache.flink.runtime.entrypoint.JobClusterComponent;
 import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint;
+import 
org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
+import 
org.apache.flink.runtime.entrypoint.component.JobDispatcherResourceManagerComponentFactory;
 import org.apache.flink.runtime.entrypoint.parser.CommandLineParser;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import 
org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
@@ -61,8 +61,8 @@ public final class StandaloneJobClusterEntryPoint extends 
JobClusterEntrypoint {
        }
 
        @Override
-       protected ClusterComponent<?> createClusterComponent(Configuration 
configuration) {
-               return new JobClusterComponent(
+       protected DispatcherResourceManagerComponentFactory<?> 
createDispatcherResourceManagerComponentFactory(Configuration configuration) {
+               return new JobDispatcherResourceManagerComponentFactory(
                        StandaloneResourceManagerFactory.INSTANCE,
                        new ClassPathJobGraphRetriever(jobClassName, 
savepointRestoreSettings, programArguments));
        }
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
index 922d5cd..377b5b5 100755
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
@@ -28,11 +28,11 @@ import org.apache.flink.mesos.util.MesosConfiguration;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.clusterframework.ContainerSpecification;
 import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.entrypoint.ClusterComponent;
 import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
-import org.apache.flink.runtime.entrypoint.FileJobGraphRetriever;
-import org.apache.flink.runtime.entrypoint.JobClusterComponent;
 import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint;
+import 
org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
+import org.apache.flink.runtime.entrypoint.component.FileJobGraphRetriever;
+import 
org.apache.flink.runtime.entrypoint.component.JobDispatcherResourceManagerComponentFactory;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.SignalHandler;
@@ -109,8 +109,8 @@ public class MesosJobClusterEntrypoint extends 
JobClusterEntrypoint {
        }
 
        @Override
-       protected ClusterComponent<?> createClusterComponent(Configuration 
configuration) {
-               return new JobClusterComponent(
+       protected DispatcherResourceManagerComponentFactory<?> 
createDispatcherResourceManagerComponentFactory(Configuration configuration) {
+               return new JobDispatcherResourceManagerComponentFactory(
                        new MesosResourceManagerFactory(
                                mesosServices,
                                schedulerConfiguration,
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java
index f691940..9879628 100755
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java
@@ -28,10 +28,10 @@ import org.apache.flink.mesos.util.MesosConfiguration;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.clusterframework.ContainerSpecification;
 import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.entrypoint.ClusterComponent;
 import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
-import org.apache.flink.runtime.entrypoint.SessionClusterComponent;
 import org.apache.flink.runtime.entrypoint.SessionClusterEntrypoint;
+import 
org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
+import 
org.apache.flink.runtime.entrypoint.component.SessionDispatcherResourceManagerComponentFactory;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.SignalHandler;
@@ -108,12 +108,13 @@ public class MesosSessionClusterEntrypoint extends 
SessionClusterEntrypoint {
        }
 
        @Override
-       protected ClusterComponent<?> createClusterComponent(Configuration 
configuration) {
-               return new SessionClusterComponent(new 
MesosResourceManagerFactory(
-                       mesosServices,
-                       mesosConfig,
-                       taskManagerParameters,
-                       taskManagerContainerSpec));
+       protected DispatcherResourceManagerComponentFactory<?> 
createDispatcherResourceManagerComponentFactory(Configuration configuration) {
+               return new SessionDispatcherResourceManagerComponentFactory(
+                       new MesosResourceManagerFactory(
+                               mesosServices,
+                               mesosConfig,
+                               taskManagerParameters,
+                               taskManagerContainerSpec));
        }
 
        public static void main(String[] args) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobDispatcherFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobDispatcherFactory.java
index 488f2fc..e6b1a26 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobDispatcherFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobDispatcherFactory.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.dispatcher;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
-import org.apache.flink.runtime.entrypoint.JobGraphRetriever;
+import org.apache.flink.runtime.entrypoint.component.JobGraphRetriever;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobGraph;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index 54ccaec..c9a1722 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -38,6 +38,8 @@ import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
 import org.apache.flink.runtime.dispatcher.MiniDispatcher;
+import 
org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent;
+import 
org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
 import org.apache.flink.runtime.entrypoint.parser.CommandLineParser;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
@@ -109,7 +111,7 @@ public abstract class ClusterEntrypoint implements 
FatalErrorHandler {
        private final AtomicBoolean isShutDown = new AtomicBoolean(false);
 
        @GuardedBy("lock")
-       private ClusterComponent<?> clusterComponent;
+       private DispatcherResourceManagerComponent<?> clusterComponent;
 
        @GuardedBy("lock")
        private MetricRegistryImpl metricRegistry;
@@ -204,9 +206,9 @@ public abstract class ClusterEntrypoint implements 
FatalErrorHandler {
                        configuration.setString(JobManagerOptions.ADDRESS, 
commonRpcService.getAddress());
                        configuration.setInteger(JobManagerOptions.PORT, 
commonRpcService.getPort());
 
-                       clusterComponent = 
createClusterComponent(configuration);
+                       final DispatcherResourceManagerComponentFactory<?> 
dispatcherResourceManagerComponentFactory = 
createDispatcherResourceManagerComponentFactory(configuration);
 
-                       clusterComponent.startComponent(
+                       clusterComponent = 
dispatcherResourceManagerComponentFactory.create(
                                configuration,
                                commonRpcService,
                                haServices,
@@ -460,7 +462,7 @@ public abstract class ClusterEntrypoint implements 
FatalErrorHandler {
        // Abstract methods
        // --------------------------------------------------
 
-       protected abstract ClusterComponent<?> 
createClusterComponent(Configuration configuration);
+       protected abstract DispatcherResourceManagerComponentFactory<?> 
createDispatcherResourceManagerComponentFactory(Configuration configuration);
 
        protected abstract ArchivedExecutionGraphStore 
createSerializableExecutionGraphStore(
                Configuration configuration,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionDispatcherResourceManagerComponent.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionDispatcherResourceManagerComponent.java
deleted file mode 100644
index 8ab0701..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionDispatcherResourceManagerComponent.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.entrypoint;
-
-import org.apache.flink.runtime.dispatcher.Dispatcher;
-import org.apache.flink.runtime.dispatcher.SessionDispatcherFactory;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory;
-import org.apache.flink.runtime.rest.SessionRestEndpointFactory;
-
-/**
- * {@link ClusterComponent} used by session clusters.
- */
-public class SessionClusterComponent extends ClusterComponent<Dispatcher> {
-
-       public SessionClusterComponent(ResourceManagerFactory<?> 
resourceManagerFactory) {
-               super(SessionDispatcherFactory.INSTANCE, 
resourceManagerFactory, SessionRestEndpointFactory.INSTANCE);
-       }
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java
index 1675235..127fc8b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java
@@ -19,6 +19,8 @@
 package org.apache.flink.runtime.entrypoint;
 
 import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
+import 
org.apache.flink.runtime.entrypoint.component.SessionDispatcherResourceManagerComponentFactory;
 import org.apache.flink.runtime.entrypoint.parser.CommandLineParser;
 import 
org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
 import org.apache.flink.runtime.util.EnvironmentInformation;
@@ -35,8 +37,8 @@ public class StandaloneSessionClusterEntrypoint extends 
SessionClusterEntrypoint
        }
 
        @Override
-       protected ClusterComponent<?> createClusterComponent(Configuration 
configuration) {
-               return new 
SessionClusterComponent(StandaloneResourceManagerFactory.INSTANCE);
+       protected DispatcherResourceManagerComponentFactory<?> 
createDispatcherResourceManagerComponentFactory(Configuration configuration) {
+               return new 
SessionDispatcherResourceManagerComponentFactory(StandaloneResourceManagerFactory.INSTANCE);
        }
 
        public static void main(String[] args) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/DispatcherResourceManagerComponent.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
similarity index 51%
rename from 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/DispatcherResourceManagerComponent.java
rename to 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
index af02729..0a37411 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/DispatcherResourceManagerComponent.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
@@ -16,14 +16,13 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.entrypoint;
+package org.apache.flink.runtime.entrypoint.component;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.runtime.blob.BlobServer;
-import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
@@ -32,6 +31,7 @@ import org.apache.flink.runtime.dispatcher.DispatcherFactory;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.dispatcher.DispatcherId;
 import org.apache.flink.runtime.dispatcher.HistoryServerArchivist;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
@@ -46,102 +46,54 @@ import org.apache.flink.runtime.rest.RestEndpointFactory;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
 import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
 import 
org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetriever;
 import org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever;
-import org.apache.flink.util.AutoCloseableAsync;
 import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.FlinkException;
 
 import akka.actor.ActorSystem;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nullable;
-import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.Nonnull;
 
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 
 /**
- * Component which starts a {@link Dispatcher}, {@link ResourceManager} and 
{@link WebMonitorEndpoint}
- * in the same process.
+ * Abstract class which implements the creation of the {@link 
DispatcherResourceManagerComponent} components.
+ *
+ * @param <T> type of the {@link Dispatcher}
+ * @param <U> type of the {@link RestfulGateway} given to the {@link 
WebMonitorEndpoint}
  */
-public class ClusterComponent<T extends Dispatcher> implements 
AutoCloseableAsync {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(ClusterComponent.class);
+public abstract class AbstractDispatcherResourceManagerComponentFactory<T 
extends Dispatcher, U extends RestfulGateway> implements 
DispatcherResourceManagerComponentFactory<T> {
 
-       private final Object lock = new Object();
+       private final Logger log = LoggerFactory.getLogger(getClass());
 
+       @Nonnull
        private final DispatcherFactory<T> dispatcherFactory;
 
+       @Nonnull
        private final ResourceManagerFactory<?> resourceManagerFactory;
 
-       private final RestEndpointFactory<?> restEndpointFactory;
-
-       private final CompletableFuture<Void> terminationFuture;
-
-       private final CompletableFuture<ApplicationStatus> shutDownFuture;
-
-       @GuardedBy("lock")
-       private State state;
-
-       @GuardedBy("lock")
-       private ResourceManager<?> resourceManager;
+       @Nonnull
+       private final RestEndpointFactory<U> restEndpointFactory;
 
-       @GuardedBy("lock")
-       private T dispatcher;
-
-       @GuardedBy("lock")
-       private LeaderRetrievalService dispatcherLeaderRetrievalService;
-
-       @GuardedBy("lock")
-       private LeaderRetrievalService resourceManagerRetrievalService;
-
-       @GuardedBy("lock")
-       private WebMonitorEndpoint<?> webMonitorEndpoint;
-
-       @GuardedBy("lock")
-       private JobManagerMetricGroup jobManagerMetricGroup;
-
-       public ClusterComponent(
-                       DispatcherFactory<T> dispatcherFactory,
-                       ResourceManagerFactory<?> resourceManagerFactory,
-                       RestEndpointFactory<?> restEndpointFactory) {
+       public AbstractDispatcherResourceManagerComponentFactory(
+                       @Nonnull DispatcherFactory<T> dispatcherFactory,
+                       @Nonnull ResourceManagerFactory<?> 
resourceManagerFactory,
+                       @Nonnull RestEndpointFactory<U> restEndpointFactory) {
                this.dispatcherFactory = dispatcherFactory;
                this.resourceManagerFactory = resourceManagerFactory;
                this.restEndpointFactory = restEndpointFactory;
-               this.terminationFuture = new CompletableFuture<>();
-               this.shutDownFuture = new CompletableFuture<>();
-               this.state = State.CREATED;
-
-               terminationFuture.whenComplete(
-                       (aVoid, throwable) -> {
-                               if (throwable != null) {
-                                       
shutDownFuture.completeExceptionally(throwable);
-                               } else {
-                                       
shutDownFuture.complete(ApplicationStatus.SUCCEEDED);
-                               }
-                       });
        }
 
-       public T getDispatcher() {
-               synchronized (lock) {
-                       return dispatcher;
-               }
-       }
-
-       public CompletableFuture<Void> getTerminationFuture() {
-               return terminationFuture;
-       }
-
-       public CompletableFuture<ApplicationStatus> getShutDownFuture() {
-               return shutDownFuture;
-       }
-
-       public void startComponent(
+       @Override
+       public DispatcherResourceManagerComponent<T> create(
                        Configuration configuration,
                        RpcService rpcService,
                        HighAvailabilityServices highAvailabilityServices,
@@ -150,22 +102,27 @@ public class ClusterComponent<T extends Dispatcher> 
implements AutoCloseableAsyn
                        MetricRegistry metricRegistry,
                        ArchivedExecutionGraphStore archivedExecutionGraphStore,
                        FatalErrorHandler fatalErrorHandler) throws Exception {
-               synchronized (lock) {
-                       Preconditions.checkState(state == State.CREATED);
-                       state = State.RUNNING;
 
+               LeaderRetrievalService dispatcherLeaderRetrievalService = null;
+               LeaderRetrievalService resourceManagerRetrievalService = null;
+               WebMonitorEndpoint<U> webMonitorEndpoint = null;
+               ResourceManager<?> resourceManager = null;
+               JobManagerMetricGroup jobManagerMetricGroup = null;
+               T dispatcher = null;
+
+               try {
                        dispatcherLeaderRetrievalService = 
highAvailabilityServices.getDispatcherLeaderRetriever();
 
                        resourceManagerRetrievalService = 
highAvailabilityServices.getResourceManagerLeaderRetriever();
 
-                       LeaderGatewayRetriever<DispatcherGateway> 
dispatcherGatewayRetriever = new RpcGatewayRetriever<>(
+                       final LeaderGatewayRetriever<DispatcherGateway> 
dispatcherGatewayRetriever = new RpcGatewayRetriever<>(
                                rpcService,
                                DispatcherGateway.class,
                                DispatcherId::fromUuid,
                                10,
                                Time.milliseconds(50L));
 
-                       LeaderGatewayRetriever<ResourceManagerGateway> 
resourceManagerGatewayRetriever = new RpcGatewayRetriever<>(
+                       final LeaderGatewayRetriever<ResourceManagerGateway> 
resourceManagerGatewayRetriever = new RpcGatewayRetriever<>(
                                rpcService,
                                ResourceManagerGateway.class,
                                ResourceManagerId::fromUuid,
@@ -186,7 +143,7 @@ public class ClusterComponent<T extends Dispatcher> 
implements AutoCloseableAsyn
                                
highAvailabilityServices.getWebMonitorLeaderElectionService(),
                                fatalErrorHandler);
 
-                       LOG.debug("Starting Dispatcher REST endpoint.");
+                       log.debug("Starting Dispatcher REST endpoint.");
                        webMonitorEndpoint.start();
 
                        resourceManager = 
resourceManagerFactory.createResourceManager(
@@ -221,127 +178,77 @@ public class ClusterComponent<T extends Dispatcher> 
implements AutoCloseableAsyn
                                webMonitorEndpoint.getRestBaseUrl(),
                                historyServerArchivist);
 
-                       registerShutDownFuture(dispatcher, shutDownFuture);
-
-                       LOG.debug("Starting ResourceManager.");
+                       log.debug("Starting ResourceManager.");
                        resourceManager.start();
                        
resourceManagerRetrievalService.start(resourceManagerGatewayRetriever);
 
-                       LOG.debug("Starting Dispatcher.");
+                       log.debug("Starting Dispatcher.");
                        dispatcher.start();
                        
dispatcherLeaderRetrievalService.start(dispatcherGatewayRetriever);
-               }
-       }
-
-       protected void registerShutDownFuture(T dispatcher, 
CompletableFuture<ApplicationStatus> shutDownFuture) {
-                       dispatcher
-                               .getTerminationFuture()
-                               .whenComplete(
-                                       (aVoid, throwable) -> {
-                                               if (throwable != null) {
-                                                       
shutDownFuture.completeExceptionally(throwable);
-                                               } else {
-                                                       
shutDownFuture.complete(ApplicationStatus.SUCCEEDED);
-                                               }
-                                       });
-       }
-
-       @Override
-       public CompletableFuture<Void> closeAsync() {
-               synchronized (lock) {
-                       if (state == State.RUNNING) {
-                               Exception exception = null;
-
-                               final Collection<CompletableFuture<Void>> 
terminationFutures = new ArrayList<>(4);
-
-                               if (dispatcherLeaderRetrievalService != null) {
-                                       try {
-                                               
dispatcherLeaderRetrievalService.stop();
-                                       } catch (Exception e) {
-                                               exception = 
ExceptionUtils.firstOrSuppressed(e, exception);
-                                       }
-                               }
 
-                               if (resourceManagerRetrievalService != null) {
-                                       try {
-                                               
resourceManagerRetrievalService.stop();
-                                       } catch (Exception e) {
-                                               exception = 
ExceptionUtils.firstOrSuppressed(e, exception);
-                                       }
-                               }
-
-                               if (webMonitorEndpoint != null) {
-                                       
terminationFutures.add(webMonitorEndpoint.closeAsync());
+                       return createDispatcherResourceManagerComponent(
+                               dispatcher,
+                               resourceManager,
+                               dispatcherLeaderRetrievalService,
+                               resourceManagerRetrievalService,
+                               webMonitorEndpoint,
+                               jobManagerMetricGroup);
+
+               } catch (Exception exception) {
+                       // clean up all started components
+                       if (dispatcherLeaderRetrievalService != null) {
+                               try {
+                                       dispatcherLeaderRetrievalService.stop();
+                               } catch (Exception e) {
+                                       exception = 
ExceptionUtils.firstOrSuppressed(e, exception);
                                }
+                       }
 
-                               if (dispatcher != null) {
-                                       dispatcher.shutDown();
-                                       
terminationFutures.add(dispatcher.getTerminationFuture());
+                       if (resourceManagerRetrievalService != null) {
+                               try {
+                                       resourceManagerRetrievalService.stop();
+                               } catch (Exception e) {
+                                       exception = 
ExceptionUtils.firstOrSuppressed(e, exception);
                                }
+                       }
 
-                               if (resourceManager != null) {
-                                       resourceManager.shutDown();
-                                       
terminationFutures.add(resourceManager.getTerminationFuture());
-                               }
+                       final Collection<CompletableFuture<Void>> 
terminationFutures = new ArrayList<>(3);
 
-                               if (exception != null) {
-                                       
terminationFutures.add(FutureUtils.completedExceptionally(exception));
-                               }
+                       if (webMonitorEndpoint != null) {
+                               
terminationFutures.add(webMonitorEndpoint.closeAsync());
+                       }
 
-                               final CompletableFuture<Void> 
componentTerminationFuture = FutureUtils.completeAll(terminationFutures);
+                       if (resourceManager != null) {
+                               resourceManager.shutDown();
+                               
terminationFutures.add(resourceManager.getTerminationFuture());
+                       }
 
-                               final CompletableFuture<Void> 
metricGroupTerminationFuture;
+                       if (dispatcher != null) {
+                               dispatcher.shutDown();
+                               
terminationFutures.add(dispatcher.getTerminationFuture());
+                       }
 
-                               if (jobManagerMetricGroup != null) {
-                                       metricGroupTerminationFuture = 
FutureUtils.runAfterwards(
-                                               componentTerminationFuture,
-                                               () -> {
-                                                       synchronized (lock) {
-                                                               
jobManagerMetricGroup.close();
-                                                       }
-                                               });
-                               } else {
-                                       metricGroupTerminationFuture = 
componentTerminationFuture;
-                               }
+                       final FutureUtils.ConjunctFuture<Void> 
terminationFuture = FutureUtils.completeAll(terminationFutures);
 
-                               
metricGroupTerminationFuture.whenComplete((aVoid, throwable) -> {
-                                       if (throwable != null) {
-                                               
terminationFuture.completeExceptionally(throwable);
-                                       } else {
-                                               
terminationFuture.complete(aVoid);
-                                       }
-                               });
-                       } else if (state == State.CREATED) {
-                               terminationFuture.complete(null);
+                       try {
+                               terminationFuture.get();
+                       } catch (Exception e) {
+                               exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
                        }
 
-                       state = State.TERMINATED;
-                       return terminationFuture;
-               }
-       }
-
-       /**
-        * Deregister the Flink application from the resource management system 
by signalling
-        * the {@link ResourceManager}.
-        *
-        * @param applicationStatus to terminate the application with
-        * @param diagnostics additional information about the shut down, can 
be {@code null}
-        * @return Future which is completed once the shut down
-        */
-       public CompletableFuture<Void> deregisterApplication(ApplicationStatus 
applicationStatus, @Nullable String diagnostics) {
-               synchronized (lock) {
-                       if (resourceManager != null) {
-                               final ResourceManagerGateway selfGateway = 
resourceManager.getSelfGateway(ResourceManagerGateway.class);
-                               return 
selfGateway.deregisterApplication(applicationStatus, diagnostics).thenApply(ack 
-> null);
-                       } else {
-                               return CompletableFuture.completedFuture(null);
+                       if (jobManagerMetricGroup != null) {
+                               jobManagerMetricGroup.close();
                        }
+
+                       throw new FlinkException("Could not create the 
DispatcherResourceManagerComponent.", exception);
                }
        }
 
-       enum State {
-               CREATED,
-               RUNNING,
-               TERMINATED
-       }
+       protected abstract DispatcherResourceManagerComponent<T> 
createDispatcherResourceManagerComponent(
+               T dispatcher,
+               ResourceManager<?> resourceManager,
+               LeaderRetrievalService dispatcherLeaderRetrievalService,
+               LeaderRetrievalService resourceManagerRetrievalService,
+               WebMonitorEndpoint<?> webMonitorEndpoint,
+               JobManagerMetricGroup jobManagerMetricGroup);
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java
new file mode 100644
index 0000000..94925b2
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint.component;
+
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.dispatcher.Dispatcher;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
+import org.apache.flink.util.AutoCloseableAsync;
+import org.apache.flink.util.ExceptionUtils;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Component which starts a {@link Dispatcher}, {@link ResourceManager} and 
{@link WebMonitorEndpoint}
+ * in the same process.
+ */
+public class DispatcherResourceManagerComponent<T extends Dispatcher> 
implements AutoCloseableAsync {
+
+       @Nonnull
+       private final T dispatcher;
+
+       @Nonnull
+       private final ResourceManager<?> resourceManager;
+
+       @Nonnull
+       private final LeaderRetrievalService dispatcherLeaderRetrievalService;
+
+       @Nonnull
+       private final LeaderRetrievalService resourceManagerRetrievalService;
+
+       @Nonnull
+       private final WebMonitorEndpoint<?> webMonitorEndpoint;
+
+       @Nonnull
+       private final JobManagerMetricGroup jobManagerMetricGroup;
+
+       private final CompletableFuture<Void> terminationFuture;
+
+       private final CompletableFuture<ApplicationStatus> shutDownFuture;
+
+       private final AtomicBoolean isRunning = new AtomicBoolean(true);
+
+       DispatcherResourceManagerComponent(
+                       @Nonnull T dispatcher,
+                       @Nonnull ResourceManager<?> resourceManager,
+                       @Nonnull LeaderRetrievalService 
dispatcherLeaderRetrievalService,
+                       @Nonnull LeaderRetrievalService 
resourceManagerRetrievalService,
+                       @Nonnull WebMonitorEndpoint<?> webMonitorEndpoint,
+                       @Nonnull JobManagerMetricGroup jobManagerMetricGroup) {
+               this.resourceManager = resourceManager;
+               this.dispatcher = dispatcher;
+               this.dispatcherLeaderRetrievalService = 
dispatcherLeaderRetrievalService;
+               this.resourceManagerRetrievalService = 
resourceManagerRetrievalService;
+               this.webMonitorEndpoint = webMonitorEndpoint;
+               this.jobManagerMetricGroup = jobManagerMetricGroup;
+               this.terminationFuture = new CompletableFuture<>();
+               this.shutDownFuture = new CompletableFuture<>();
+
+               registerShutDownFuture();
+       }
+
+       private void registerShutDownFuture() {
+               terminationFuture.whenComplete(
+                       (aVoid, throwable) -> {
+                               if (throwable != null) {
+                                       
shutDownFuture.completeExceptionally(throwable);
+                               } else {
+                                       
shutDownFuture.complete(ApplicationStatus.SUCCEEDED);
+                               }
+                       });
+
+               dispatcher
+                       .getTerminationFuture()
+                       .whenComplete(
+                               (aVoid, throwable) -> {
+                                       if (throwable != null) {
+                                               
shutDownFuture.completeExceptionally(throwable);
+                                       } else {
+                                               
shutDownFuture.complete(ApplicationStatus.SUCCEEDED);
+                                       }
+                               });
+       }
+
+       public CompletableFuture<Void> getTerminationFuture() {
+               return terminationFuture;
+       }
+
+       public final CompletableFuture<ApplicationStatus> getShutDownFuture() {
+               return shutDownFuture;
+       }
+
+       @Override
+       public CompletableFuture<Void> closeAsync() {
+               if (isRunning.compareAndSet(true, false)) {
+                       Exception exception = null;
+
+                       final Collection<CompletableFuture<Void>> 
terminationFutures = new ArrayList<>(4);
+
+                       try {
+                               dispatcherLeaderRetrievalService.stop();
+                       } catch (Exception e) {
+                               exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
+                       }
+
+                       try {
+                               resourceManagerRetrievalService.stop();
+                       } catch (Exception e) {
+                               exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
+                       }
+
+                       terminationFutures.add(webMonitorEndpoint.closeAsync());
+
+                       dispatcher.shutDown();
+                       
terminationFutures.add(dispatcher.getTerminationFuture());
+
+                       resourceManager.shutDown();
+                       
terminationFutures.add(resourceManager.getTerminationFuture());
+
+                       if (exception != null) {
+                               
terminationFutures.add(FutureUtils.completedExceptionally(exception));
+                       }
+
+                       final CompletableFuture<Void> 
componentTerminationFuture = FutureUtils.completeAll(terminationFutures);
+
+                       final CompletableFuture<Void> 
metricGroupTerminationFuture = FutureUtils.runAfterwards(
+                               componentTerminationFuture,
+                               jobManagerMetricGroup::close);
+
+                       metricGroupTerminationFuture.whenComplete((aVoid, 
throwable) -> {
+                               if (throwable != null) {
+                                       
terminationFuture.completeExceptionally(throwable);
+                               } else {
+                                       terminationFuture.complete(aVoid);
+                               }
+                       });
+               }
+
+               return terminationFuture;
+       }
+
+       /**
+        * Deregister the Flink application from the resource management system 
by signalling
+        * the {@link ResourceManager}.
+        *
+        * @param applicationStatus to terminate the application with
+        * @param diagnostics additional information about the shut down, can 
be {@code null}
+        * @return Future which is completed once the shut down
+        */
+       public CompletableFuture<Void> deregisterApplication(ApplicationStatus 
applicationStatus, @Nullable String diagnostics) {
+               final ResourceManagerGateway selfGateway = 
resourceManager.getSelfGateway(ResourceManagerGateway.class);
+               return selfGateway.deregisterApplication(applicationStatus, 
diagnostics).thenApply(ack -> null);
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponentFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponentFactory.java
new file mode 100644
index 0000000..df22a59
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponentFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint.component;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
+import org.apache.flink.runtime.dispatcher.Dispatcher;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+
+/**
+ * Factory for the {@link DispatcherResourceManagerComponent}.
+ */
+public interface DispatcherResourceManagerComponentFactory<T extends 
Dispatcher> {
+
+       DispatcherResourceManagerComponent<T> create(
+               Configuration configuration,
+               RpcService rpcService,
+               HighAvailabilityServices highAvailabilityServices,
+               BlobServer blobServer,
+               HeartbeatServices heartbeatServices,
+               MetricRegistry metricRegistry,
+               ArchivedExecutionGraphStore archivedExecutionGraphStore,
+               FatalErrorHandler fatalErrorHandler) throws Exception;
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/FileJobGraphRetriever.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/FileJobGraphRetriever.java
similarity index 97%
rename from 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/FileJobGraphRetriever.java
rename to 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/FileJobGraphRetriever.java
index 7a194f6..1848408 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/FileJobGraphRetriever.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/FileJobGraphRetriever.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.entrypoint;
+package org.apache.flink.runtime.entrypoint.component;
 
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobDispatcherResourceManagerComponent.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/JobDispatcherResourceManagerComponent.java
similarity index 54%
rename from 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobDispatcherResourceManagerComponent.java
rename to 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/JobDispatcherResourceManagerComponent.java
index 17583ac..c1df47f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobDispatcherResourceManagerComponent.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/JobDispatcherResourceManagerComponent.java
@@ -16,29 +16,33 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.entrypoint;
+package org.apache.flink.runtime.entrypoint.component;
 
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
-import org.apache.flink.runtime.dispatcher.JobDispatcherFactory;
 import org.apache.flink.runtime.dispatcher.MiniDispatcher;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory;
-import org.apache.flink.runtime.rest.JobRestEndpointFactory;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
 
 import java.util.concurrent.CompletableFuture;
 
 /**
- * {@link ClusterComponent} for a job cluster. The dispatcher component starts
+ * {@link DispatcherResourceManagerComponent} for a job cluster. The 
dispatcher component starts
  * a {@link MiniDispatcher}.
  */
-public class JobClusterComponent extends ClusterComponent<MiniDispatcher> {
+class JobDispatcherResourceManagerComponent extends 
DispatcherResourceManagerComponent<MiniDispatcher> {
 
-       public JobClusterComponent(ResourceManagerFactory<?> 
resourceManagerFactory, JobGraphRetriever jobActions) {
-               super(new JobDispatcherFactory(jobActions), 
resourceManagerFactory, JobRestEndpointFactory.INSTANCE);
-       }
+       JobDispatcherResourceManagerComponent(
+                       MiniDispatcher dispatcher,
+                       ResourceManager<?> resourceManager,
+                       LeaderRetrievalService dispatcherLeaderRetrievalService,
+                       LeaderRetrievalService resourceManagerRetrievalService,
+                       WebMonitorEndpoint<?> webMonitorEndpoint,
+                       JobManagerMetricGroup jobManagerMetricGroup) {
+               super(dispatcher, resourceManager, 
dispatcherLeaderRetrievalService, resourceManagerRetrievalService, 
webMonitorEndpoint, jobManagerMetricGroup);
 
-       @Override
-       protected void registerShutDownFuture(MiniDispatcher dispatcher, 
CompletableFuture<ApplicationStatus> shutDownFuture) {
-               super.registerShutDownFuture(dispatcher, shutDownFuture);
+               final CompletableFuture<ApplicationStatus> shutDownFuture = 
getShutDownFuture();
 
                
dispatcher.getJobTerminationFuture().whenComplete((applicationStatus, 
throwable) -> {
                        if (throwable != null) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/JobDispatcherResourceManagerComponentFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/JobDispatcherResourceManagerComponentFactory.java
new file mode 100644
index 0000000..c7ce14c
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/JobDispatcherResourceManagerComponentFactory.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint.component;
+
+import org.apache.flink.runtime.dispatcher.JobDispatcherFactory;
+import org.apache.flink.runtime.dispatcher.MiniDispatcher;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory;
+import org.apache.flink.runtime.rest.JobRestEndpointFactory;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
+
+import javax.annotation.Nonnull;
+
+/**
+ * {@link DispatcherResourceManagerComponentFactory} for a {@link 
JobDispatcherResourceManagerComponent}.
+ */
+public class JobDispatcherResourceManagerComponentFactory extends 
AbstractDispatcherResourceManagerComponentFactory<MiniDispatcher, 
RestfulGateway> {
+
+       public JobDispatcherResourceManagerComponentFactory(@Nonnull 
ResourceManagerFactory<?> resourceManagerFactory, @Nonnull JobGraphRetriever 
jobGraphRetriever) {
+               super(new JobDispatcherFactory(jobGraphRetriever), 
resourceManagerFactory, JobRestEndpointFactory.INSTANCE);
+       }
+
+       @Override
+       protected DispatcherResourceManagerComponent<MiniDispatcher> 
createDispatcherResourceManagerComponent(
+                       MiniDispatcher dispatcher,
+                       ResourceManager<?> resourceManager,
+                       LeaderRetrievalService dispatcherLeaderRetrievalService,
+                       LeaderRetrievalService resourceManagerRetrievalService,
+                       WebMonitorEndpoint<?> webMonitorEndpoint,
+                       JobManagerMetricGroup jobManagerMetricGroup) {
+               return new JobDispatcherResourceManagerComponent(
+                       dispatcher,
+                       resourceManager,
+                       dispatcherLeaderRetrievalService,
+                       resourceManagerRetrievalService,
+                       webMonitorEndpoint,
+                       jobManagerMetricGroup);
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobGraphRetriever.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/JobGraphRetriever.java
similarity index 96%
rename from 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobGraphRetriever.java
rename to 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/JobGraphRetriever.java
index e2ace15..b1586ea 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobGraphRetriever.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/JobGraphRetriever.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.entrypoint;
+package org.apache.flink.runtime.entrypoint.component;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobGraph;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/SessionDispatcherResourceManagerComponent.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/SessionDispatcherResourceManagerComponent.java
new file mode 100644
index 0000000..8be7b74
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/SessionDispatcherResourceManagerComponent.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint.component;
+
+import org.apache.flink.runtime.dispatcher.Dispatcher;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
+
+/**
+ * {@link DispatcherResourceManagerComponent} used by session clusters.
+ */
+class SessionDispatcherResourceManagerComponent extends 
DispatcherResourceManagerComponent<Dispatcher> {
+       SessionDispatcherResourceManagerComponent(
+                       Dispatcher dispatcher,
+                       ResourceManager<?> resourceManager,
+                       LeaderRetrievalService dispatcherLeaderRetrievalService,
+                       LeaderRetrievalService resourceManagerRetrievalService,
+                       WebMonitorEndpoint<?> webMonitorEndpoint,
+                       JobManagerMetricGroup jobManagerMetricGroup) {
+               super(dispatcher, resourceManager, 
dispatcherLeaderRetrievalService, resourceManagerRetrievalService, 
webMonitorEndpoint, jobManagerMetricGroup);
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/SessionDispatcherResourceManagerComponentFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/SessionDispatcherResourceManagerComponentFactory.java
new file mode 100644
index 0000000..c44833d
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/SessionDispatcherResourceManagerComponentFactory.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint.component;
+
+import org.apache.flink.runtime.dispatcher.Dispatcher;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.dispatcher.SessionDispatcherFactory;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory;
+import org.apache.flink.runtime.rest.SessionRestEndpointFactory;
+import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
+
+import javax.annotation.Nonnull;
+
+/**
+ * {@link DispatcherResourceManagerComponentFactory} for a {@link 
SessionDispatcherResourceManagerComponent}.
+ */
+public class SessionDispatcherResourceManagerComponentFactory extends 
AbstractDispatcherResourceManagerComponentFactory<Dispatcher, 
DispatcherGateway> {
+
+       public SessionDispatcherResourceManagerComponentFactory(@Nonnull 
ResourceManagerFactory<?> resourceManagerFactory) {
+               super(SessionDispatcherFactory.INSTANCE, 
resourceManagerFactory, SessionRestEndpointFactory.INSTANCE);
+       }
+
+       @Override
+       protected DispatcherResourceManagerComponent<Dispatcher> 
createDispatcherResourceManagerComponent(
+                       Dispatcher dispatcher,
+                       ResourceManager<?> resourceManager,
+                       LeaderRetrievalService dispatcherLeaderRetrievalService,
+                       LeaderRetrievalService resourceManagerRetrievalService,
+                       WebMonitorEndpoint<?> webMonitorEndpoint,
+                       JobManagerMetricGroup jobManagerMetricGroup) {
+               return new SessionDispatcherResourceManagerComponent(
+                       dispatcher,
+                       resourceManager,
+                       dispatcherLeaderRetrievalService,
+                       resourceManagerRetrievalService,
+                       webMonitorEndpoint,
+                       jobManagerMetricGroup);
+       }
+}
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
index 1733f49..42e666e 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
@@ -19,11 +19,11 @@
 package org.apache.flink.yarn.entrypoint;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.entrypoint.ClusterComponent;
 import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
-import org.apache.flink.runtime.entrypoint.FileJobGraphRetriever;
-import org.apache.flink.runtime.entrypoint.JobClusterComponent;
 import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint;
+import 
org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
+import org.apache.flink.runtime.entrypoint.component.FileJobGraphRetriever;
+import 
org.apache.flink.runtime.entrypoint.component.JobDispatcherResourceManagerComponentFactory;
 import org.apache.flink.runtime.security.SecurityContext;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
@@ -62,8 +62,8 @@ public class YarnJobClusterEntrypoint extends 
JobClusterEntrypoint {
        }
 
        @Override
-       protected ClusterComponent<?> createClusterComponent(Configuration 
configuration) {
-               return new JobClusterComponent(
+       protected DispatcherResourceManagerComponentFactory<?> 
createDispatcherResourceManagerComponentFactory(Configuration configuration) {
+               return new JobDispatcherResourceManagerComponentFactory(
                        YarnResourceManagerFactory.INSTANCE,
                        FileJobGraphRetriever.createFrom(configuration));
        }
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java
index e0bebfd..0f4656e 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java
@@ -19,10 +19,10 @@
 package org.apache.flink.yarn.entrypoint;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.entrypoint.ClusterComponent;
 import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
-import org.apache.flink.runtime.entrypoint.SessionClusterComponent;
 import org.apache.flink.runtime.entrypoint.SessionClusterEntrypoint;
+import 
org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
+import 
org.apache.flink.runtime.entrypoint.component.SessionDispatcherResourceManagerComponentFactory;
 import org.apache.flink.runtime.security.SecurityContext;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
@@ -60,8 +60,8 @@ public class YarnSessionClusterEntrypoint extends 
SessionClusterEntrypoint {
        }
 
        @Override
-       protected ClusterComponent<?> createClusterComponent(Configuration 
configuration) {
-               return new 
SessionClusterComponent(YarnResourceManagerFactory.INSTANCE);
+       protected DispatcherResourceManagerComponentFactory<?> 
createDispatcherResourceManagerComponentFactory(Configuration configuration) {
+               return new 
SessionDispatcherResourceManagerComponentFactory(YarnResourceManagerFactory.INSTANCE);
        }
 
        public static void main(String[] args) {

Reply via email to