[ 
https://issues.apache.org/jira/browse/FLINK-7057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16072333#comment-16072333
 ] 

ASF GitHub Bot commented on FLINK-7057:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4238#discussion_r125249421
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java ---
    @@ -162,105 +164,116 @@ static File initStorageDirectory(String 
storageDirectory) throws
        }
     
        /**
    -    * Returns the BLOB service's directory for incoming files. The 
directory is created if it did
    -    * not exist so far.
    +    * Returns the BLOB service's directory for incoming (job-unrelated) 
files. The directory is
    +    * created if it does not exist yet.
    +    *
    +    * @param storageDir
    +    *              storage directory used be the BLOB service
         *
    -    * @return the BLOB server's directory for incoming files
    +    * @return the BLOB service's directory for incoming files
         */
        static File getIncomingDirectory(File storageDir) {
                final File incomingDir = new File(storageDir, "incoming");
     
    -           if (!incomingDir.mkdirs() && !incomingDir.exists()) {
    -                   throw new RuntimeException("Cannot create directory for 
incoming files " + incomingDir.getAbsolutePath());
    -           }
    +           mkdirTolerateExisting(incomingDir, "incoming");
     
                return incomingDir;
        }
     
        /**
    -    * Returns the BLOB service's directory for cached files. The directory 
is created if it did
    -    * not exist so far.
    +    * Makes sure a given directory exists by creating it if necessary.
         *
    -    * @return the BLOB server's directory for cached files
    +    * @param dir
    +    *              directory to create
    +    * @param dirType
    +    *              the type of the directory (included in error message if 
something fails)
         */
    -   private static File getCacheDirectory(File storageDir) {
    -           final File cacheDirectory = new File(storageDir, "cache");
    -
    -           if (!cacheDirectory.mkdirs() && !cacheDirectory.exists()) {
    -                   throw new RuntimeException("Could not create cache 
directory '" + cacheDirectory.getAbsolutePath() + "'.");
    +   private static void mkdirTolerateExisting(final File dir, final String 
dirType) {
    +           // note: thread-safe create should try to mkdir first and then 
ignore the case that the
    +           //       directory already existed
    +           if (!dir.mkdirs() && !dir.exists()) {
    +                   throw new RuntimeException(
    +                           "Cannot create " + dirType + " directory '" + 
dir.getAbsolutePath() + "'.");
                }
    -
    -           return cacheDirectory;
        }
     
        /**
         * Returns the (designated) physical storage location of the BLOB with 
the given key.
         *
    +    * @param storageDir
    +    *              storage directory used be the BLOB service
         * @param key
    -    *        the key identifying the BLOB
    +    *              the key identifying the BLOB
    +    * @param jobId
    +    *              ID of the job for the incoming files (or <tt>null</tt> 
if job-unrelated)
    +    *
         * @return the (designated) physical storage location of the BLOB
         */
    -   static File getStorageLocation(File storageDir, BlobKey key) {
    -           return new File(getCacheDirectory(storageDir), BLOB_FILE_PREFIX 
+ key.toString());
    -   }
    +   static File getStorageLocation(
    +                   @Nonnull File storageDir, @Nullable JobID jobId, 
@Nonnull BlobKey key) {
    +           File file = new 
File(getStorageLocationPath(storageDir.getAbsolutePath(), jobId, key));
     
    -   /**
    -    * Returns the (designated) physical storage location of the BLOB with 
the given job ID and key.
    -    *
    -    * @param jobID
    -    *        the ID of the job the BLOB belongs to
    -    * @param key
    -    *        the key of the BLOB
    -    * @return the (designated) physical storage location of the BLOB with 
the given job ID and key
    -    */
    -   static File getStorageLocation(File storageDir, JobID jobID, String 
key) {
    -           return new File(getJobDirectory(storageDir, jobID), 
BLOB_FILE_PREFIX + encodeKey(key));
    +           mkdirTolerateExisting(file.getParentFile(), "cache");
    --- End diff --
    
    Why are we creating a `cache` directory here?


> move BLOB ref-counting from LibraryCacheManager to BlobCache
> ------------------------------------------------------------
>
>                 Key: FLINK-7057
>                 URL: https://issues.apache.org/jira/browse/FLINK-7057
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Distributed Coordination, Network
>    Affects Versions: 1.4.0
>            Reporter: Nico Kruber
>            Assignee: Nico Kruber
>
> Currently, the {{LibraryCacheManager}} is doing some ref-counting for JAR 
> files managed by it. Instead, we want the {{BlobCache}} to do that itself for 
> all job-related BLOBs. Also, we do not want to operate on a per-{{BlobKey}} 
> level but rather per job. Therefore, the cleanup process should be adapted, 
> too.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to