[ 
https://issues.apache.org/jira/browse/FLINK-8608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16361018#comment-16361018
 ] 

ASF GitHub Bot commented on FLINK-8608:
---------------------------------------

Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5431#discussion_r167593183
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java 
---
    @@ -671,13 +674,68 @@ public void jobFinishedByOther() {
                        log.info("Job {} was finished by other JobManager.", 
jobId);
     
                        runAsync(
    -                           () -> {
    -                                   try {
    -                                           removeJob(jobId, false);
    -                                   } catch (Exception e) {
    -                                           log.warn("Could not properly 
remove job {} from the dispatcher.", jobId, e);
    -                                   }
    -                           });
    +                           () -> 
Dispatcher.this.jobFinishedByOther(jobId));
                }
        }
    +
    +   //------------------------------------------------------
    +   // Factories
    +   //------------------------------------------------------
    +
    +   /**
    +    * Factory for a {@link JobManagerRunner}.
    +    */
    +   @FunctionalInterface
    +   public interface JobManagerRunnerFactory {
    +           JobManagerRunner createJobManagerRunner(
    +                   ResourceID resourceId,
    +                   JobGraph jobGraph,
    +                   Configuration configuration,
    +                   RpcService rpcService,
    +                   HighAvailabilityServices highAvailabilityServices,
    +                   HeartbeatServices heartbeatServices,
    +                   BlobServer blobServer,
    +                   JobManagerServices jobManagerServices,
    +                   MetricRegistry metricRegistry,
    +                   OnCompletionActions onCompleteActions,
    +                   FatalErrorHandler fatalErrorHandler,
    +                   @Nullable String restAddress) throws Exception;
    +   }
    +
    +   /**
    +    * Singleton default factory for {@link JobManagerRunner}.
    +    */
    +   public enum DefaultJobManagerRunnerFactory implements 
JobManagerRunnerFactory {
    +           INSTANCE {
    --- End diff --
    
    Since there is only one instance, it is ok to write:
    ```
    public enum DefaultJobManagerRunnerFactory implements 
JobManagerRunnerFactory {
                INSTANCE;
    
                @Override
                public JobManagerRunner createJobManagerRunner(
                        ResourceID resourceId,
                        JobGraph jobGraph,
                        Configuration configuration,
                        RpcService rpcService,
                        HighAvailabilityServices highAvailabilityServices,
                        HeartbeatServices heartbeatServices,
                        BlobServer blobServer,
                        JobManagerServices jobManagerServices,
                        MetricRegistry metricRegistry,
                        OnCompletionActions onCompleteActions,
                        FatalErrorHandler fatalErrorHandler,
                        @Nullable String restAddress) throws Exception {
                        return new JobManagerRunner(
                                resourceId,
                                jobGraph,
                                configuration,
                                rpcService,
                                highAvailabilityServices,
                                heartbeatServices,
                                blobServer,
                                jobManagerServices,
                                metricRegistry,
                                onCompleteActions,
                                fatalErrorHandler,
                                restAddress);
                }
        }
    ```
    Saves one level of indentation. 


> Add MiniDispatcher for job mode
> -------------------------------
>
>                 Key: FLINK-8608
>                 URL: https://issues.apache.org/jira/browse/FLINK-8608
>             Project: Flink
>          Issue Type: New Feature
>          Components: Distributed Coordination
>    Affects Versions: 1.5.0
>            Reporter: Till Rohrmann
>            Assignee: Till Rohrmann
>            Priority: Major
>              Labels: flip-6
>             Fix For: 1.5.0
>
>
> In order to properly support the job mode, we need a {{MiniDispatcher}} which 
> is started with a pre initialized {{JobGraph}} and launches a single 
> {{JobManagerRunner}} with this job. Once the job is completed and if the 
> {{MiniDispatcher}} is running in detached mode, the {{MiniDispatcher}} should 
> terminate.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to