[ 
https://issues.apache.org/jira/browse/FLINK-8608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16363722#comment-16363722
 ] 

ASF GitHub Bot commented on FLINK-8608:
---------------------------------------

Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5431#discussion_r168126480
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
 ---
    @@ -163,29 +210,130 @@ protected void runCluster(Configuration 
configuration) throws Exception {
                                blobServer,
                                heartbeatServices,
                                metricRegistry);
    +
    +                   // TODO: Make shutDownAndTerminate non blocking to not 
use the global executor
    +                   dispatcher.getTerminationFuture().whenCompleteAsync(
    +                           (Boolean success, Throwable throwable) -> {
    +                                   if (throwable != null) {
    +                                           LOG.info("Could not properly 
terminate the Dispatcher.", throwable);
    +                                   }
    +
    +                                   shutDownAndTerminate(
    +                                           SUCCESS_RETURN_CODE,
    +                                           ApplicationStatus.SUCCEEDED,
    +                                           true);
    +                           });
                }
        }
     
        protected void initializeServices(Configuration configuration) throws 
Exception {
    -           assert(Thread.holdsLock(lock));
     
                LOG.info("Initializing cluster services.");
     
    -           final String bindAddress = 
configuration.getString(JobManagerOptions.ADDRESS);
    -           // TODO: Add support for port ranges
    -           final String portRange = 
String.valueOf(configuration.getInteger(JobManagerOptions.PORT));
    -
    -           commonRpcService = createRpcService(configuration, bindAddress, 
portRange);
    -           haServices = createHaServices(configuration, 
commonRpcService.getExecutor());
    -           blobServer = new BlobServer(configuration, 
haServices.createBlobStore());
    -           blobServer.start();
    -           heartbeatServices = createHeartbeatServices(configuration);
    -           metricRegistry = createMetricRegistry(configuration);
    -
    -           // TODO: This is a temporary hack until we have ported the 
MetricQueryService to the new RpcEndpoint
    -           // start the MetricQueryService
    -           final ActorSystem actorSystem = ((AkkaRpcService) 
commonRpcService).getActorSystem();
    -           metricRegistry.startQueryService(actorSystem, null);
    +           synchronized (lock) {
    +                   final String bindAddress = 
configuration.getString(JobManagerOptions.ADDRESS);
    +                   // TODO: Add support for port ranges
    +                   final String portRange = 
String.valueOf(configuration.getInteger(JobManagerOptions.PORT));
    +
    +                   commonRpcService = createRpcService(configuration, 
bindAddress, portRange);
    +                   haServices = createHaServices(configuration, 
commonRpcService.getExecutor());
    +                   blobServer = new BlobServer(configuration, 
haServices.createBlobStore());
    +                   blobServer.start();
    +                   heartbeatServices = 
createHeartbeatServices(configuration);
    +                   metricRegistry = createMetricRegistry(configuration);
    +
    +                   // TODO: This is a temporary hack until we have ported 
the MetricQueryService to the new RpcEndpoint
    +                   // start the MetricQueryService
    +                   final ActorSystem actorSystem = ((AkkaRpcService) 
commonRpcService).getActorSystem();
    +                   metricRegistry.startQueryService(actorSystem, null);
    +           }
    +   }
    +
    +   protected void startClusterComponents(
    +           Configuration configuration,
    --- End diff --
    
    nit: indentation


> Add MiniDispatcher for job mode
> -------------------------------
>
>                 Key: FLINK-8608
>                 URL: https://issues.apache.org/jira/browse/FLINK-8608
>             Project: Flink
>          Issue Type: New Feature
>          Components: Distributed Coordination
>    Affects Versions: 1.5.0
>            Reporter: Till Rohrmann
>            Assignee: Till Rohrmann
>            Priority: Major
>              Labels: flip-6
>             Fix For: 1.5.0
>
>
> In order to properly support the job mode, we need a {{MiniDispatcher}} which 
> is started with a pre initialized {{JobGraph}} and launches a single 
> {{JobManagerRunner}} with this job. Once the job is completed and if the 
> {{MiniDispatcher}} is running in detached mode, the {{MiniDispatcher}} should 
> terminate.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to