[ 
https://issues.apache.org/jira/browse/FLINK-8608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16361030#comment-16361030
 ] 

ASF GitHub Bot commented on FLINK-8608:
---------------------------------------

Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5431#discussion_r167610671
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
 ---
    @@ -262,32 +430,151 @@ protected void shutDown(boolean cleanupHaData) 
throws FlinkException {
                                        exception = 
ExceptionUtils.firstOrSuppressed(t, exception);
                                }
                        }
    -
    -                   terminationFuture.complete(true);
                }
     
                if (exception != null) {
                        throw new FlinkException("Could not properly shut down 
the cluster services.", exception);
                }
        }
     
    +   protected void stopClusterComponents() throws Exception {
    +           synchronized (lock) {
    +                   Throwable exception = null;
    +
    +                   if (webMonitorEndpoint != null) {
    +                           webMonitorEndpoint.shutdown(Time.seconds(10L));
    +                   }
    +
    +                   if (dispatcherLeaderRetrievalService != null) {
    +                           try {
    +                                   dispatcherLeaderRetrievalService.stop();
    +                           } catch (Throwable t) {
    +                                   exception = 
ExceptionUtils.firstOrSuppressed(t, exception);
    +                           }
    +                   }
    +
    +                   if (dispatcher != null) {
    +                           try {
    +                                   dispatcher.shutDown();
    +                           } catch (Throwable t) {
    +                                   exception = 
ExceptionUtils.firstOrSuppressed(t, exception);
    +                           }
    +                   }
    +
    +                   if (resourceManagerRetrievalService != null) {
    +                           try {
    +                                   resourceManagerRetrievalService.stop();
    +                           } catch (Throwable t) {
    +                                   exception = 
ExceptionUtils.firstOrSuppressed(t, exception);
    +                           }
    +                   }
    +
    +                   if (resourceManager != null) {
    +                           try {
    +                                   resourceManager.shutDown();
    +                           } catch (Throwable t) {
    +                                   exception = 
ExceptionUtils.firstOrSuppressed(t, exception);
    +                           }
    +                   }
    +
    +                   if (archivedExecutionGraphStore != null) {
    +                           try {
    +                                   archivedExecutionGraphStore.close();
    +                           } catch (Throwable t) {
    +                                   exception = 
ExceptionUtils.firstOrSuppressed(t, exception);
    +                           }
    +                   }
    +
    +                   if (transientBlobCache != null) {
    +                           try {
    +                                   transientBlobCache.close();
    +                           } catch (Throwable t) {
    +                                   exception = 
ExceptionUtils.firstOrSuppressed(t, exception);
    +                           }
    +                   }
    +
    +                   if (exception != null) {
    +                           throw new FlinkException("Could not properly 
shut down the session cluster entry point.", exception);
    +                   }
    +           }
    +   }
    +
        @Override
        public void onFatalError(Throwable exception) {
                LOG.error("Fatal error occurred in the cluster entrypoint.", 
exception);
     
                System.exit(RUNTIME_FAILURE_RETURN_CODE);
        }
     
    -   protected abstract void startClusterComponents(
    +   // --------------------------------------------------
    +   // Internal methods
    +   // --------------------------------------------------
    +
    +   private void shutDownAndTerminate(
    +           int returnCode,
    +           ApplicationStatus applicationStatus,
    +           @Nullable String diagnostics,
    --- End diff --
    
    `diagnostics` is unused. You already log errors so I don't think this 
argument is needed.


> Add MiniDispatcher for job mode
> -------------------------------
>
>                 Key: FLINK-8608
>                 URL: https://issues.apache.org/jira/browse/FLINK-8608
>             Project: Flink
>          Issue Type: New Feature
>          Components: Distributed Coordination
>    Affects Versions: 1.5.0
>            Reporter: Till Rohrmann
>            Assignee: Till Rohrmann
>            Priority: Major
>              Labels: flip-6
>             Fix For: 1.5.0
>
>
> In order to properly support the job mode, we need a {{MiniDispatcher}} which 
> is started with a pre initialized {{JobGraph}} and launches a single 
> {{JobManagerRunner}} with this job. Once the job is completed and if the 
> {{MiniDispatcher}} is running in detached mode, the {{MiniDispatcher}} should 
> terminate.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to