[ 
https://issues.apache.org/jira/browse/FLINK-2805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14945093#comment-14945093
 ] 

ASF GitHub Bot commented on FLINK-2805:
---------------------------------------

Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1227#discussion_r41269987
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java ---
    @@ -70,18 +82,65 @@
        private final int maxConnections;
     
        /**
    +    * Shutdown hook thread to ensure deletion of the storage directory (or 
<code>null</code> if
    +    * {@link RecoveryMode#STANDALONE})
    +    */
    +   private final Thread shutdownHook;
    +
    +   /**
         * Instantiates a new BLOB server and binds it to a free network port.
         * 
         * @throws IOException
         *         thrown if the BLOB server cannot bind to a free network port
         */
        public BlobServer(Configuration config) throws IOException {
    +           checkNotNull(config, "Configuration");
    +
    +           this.recoveryMode = RecoveryMode.fromConfig(config);
     
                // configure and create the storage directory
                String storageDirectory = 
config.getString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY, null);
                this.storageDir = 
BlobUtils.initStorageDirectory(storageDirectory);
                LOG.info("Created BLOB server storage directory {}", 
storageDir);
     
    +           if (recoveryMode == RecoveryMode.STANDALONE) {
    +                   recoveryBasePath = null;
    +           }
    +           else {
    +                   // Initialize file state backend for recovery
    +                   String stateBackend = 
config.getString(ConfigConstants.STATE_BACKEND,
    +                                   ConfigConstants.DEFAULT_STATE_BACKEND);
    +
    +                   if (!stateBackend.toLowerCase().equals("filesystem")) {
    +                           throw new 
IllegalConfigurationException(String.format("Illegal state backend " +
    +                                                           "configuration 
'%s'. Please configure 'FILESYSTEM' as state " +
    +                                                           "backend and 
specify the recovery path via '%s' key.",
    +                                           stateBackend, 
ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH));
    +                   }
    --- End diff --
    
    State backend enum is part of the streaming module, which is not a 
dependency of the runtime atm. I will coordinate with @aljoscha and check what 
a good place for it is with the streaming rewrite going on.


> Make user jars available for all job managers to recover
> --------------------------------------------------------
>
>                 Key: FLINK-2805
>                 URL: https://issues.apache.org/jira/browse/FLINK-2805
>             Project: Flink
>          Issue Type: Bug
>          Components: BlobManager, JobManager
>            Reporter: Ufuk Celebi
>            Assignee: Ufuk Celebi
>
> This is a bug in https://github.com/apache/flink/pull/1153.
> In case of multiple job managers, the user jars need to be accessible by all 
> job managers (including those who arrive later).
> Since #1153 requires the file state backend to be configured, the simplest 
> solution is to make the blob server aware of the configured recovery mode and 
> put/get/delete the user jars from the file state backend as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to