Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4238#discussion_r125430028
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java ---
    @@ -162,105 +164,116 @@ static File initStorageDirectory(String 
storageDirectory) throws
        }
     
        /**
    -    * Returns the BLOB service's directory for incoming files. The 
directory is created if it did
    -    * not exist so far.
    +    * Returns the BLOB service's directory for incoming (job-unrelated) 
files. The directory is
    +    * created if it does not exist yet.
    +    *
    +    * @param storageDir
    +    *              storage directory used be the BLOB service
         *
    -    * @return the BLOB server's directory for incoming files
    +    * @return the BLOB service's directory for incoming files
         */
        static File getIncomingDirectory(File storageDir) {
                final File incomingDir = new File(storageDir, "incoming");
     
    -           if (!incomingDir.mkdirs() && !incomingDir.exists()) {
    -                   throw new RuntimeException("Cannot create directory for 
incoming files " + incomingDir.getAbsolutePath());
    -           }
    +           mkdirTolerateExisting(incomingDir, "incoming");
     
                return incomingDir;
        }
     
        /**
    -    * Returns the BLOB service's directory for cached files. The directory 
is created if it did
    -    * not exist so far.
    +    * Makes sure a given directory exists by creating it if necessary.
         *
    -    * @return the BLOB server's directory for cached files
    +    * @param dir
    +    *              directory to create
    +    * @param dirType
    +    *              the type of the directory (included in error message if 
something fails)
         */
    -   private static File getCacheDirectory(File storageDir) {
    -           final File cacheDirectory = new File(storageDir, "cache");
    -
    -           if (!cacheDirectory.mkdirs() && !cacheDirectory.exists()) {
    -                   throw new RuntimeException("Could not create cache 
directory '" + cacheDirectory.getAbsolutePath() + "'.");
    +   private static void mkdirTolerateExisting(final File dir, final String 
dirType) {
    +           // note: thread-safe create should try to mkdir first and then 
ignore the case that the
    +           //       directory already existed
    +           if (!dir.mkdirs() && !dir.exists()) {
    +                   throw new RuntimeException(
    +                           "Cannot create " + dirType + " directory '" + 
dir.getAbsolutePath() + "'.");
                }
    -
    -           return cacheDirectory;
        }
     
        /**
         * Returns the (designated) physical storage location of the BLOB with 
the given key.
         *
    +    * @param storageDir
    +    *              storage directory used be the BLOB service
         * @param key
    -    *        the key identifying the BLOB
    +    *              the key identifying the BLOB
    +    * @param jobId
    +    *              ID of the job for the incoming files (or <tt>null</tt> 
if job-unrelated)
    +    *
         * @return the (designated) physical storage location of the BLOB
         */
    -   static File getStorageLocation(File storageDir, BlobKey key) {
    -           return new File(getCacheDirectory(storageDir), BLOB_FILE_PREFIX 
+ key.toString());
    -   }
    +   static File getStorageLocation(
    +                   @Nonnull File storageDir, @Nullable JobID jobId, 
@Nonnull BlobKey key) {
    +           File file = new 
File(getStorageLocationPath(storageDir.getAbsolutePath(), jobId, key));
     
    -   /**
    -    * Returns the (designated) physical storage location of the BLOB with 
the given job ID and key.
    -    *
    -    * @param jobID
    -    *        the ID of the job the BLOB belongs to
    -    * @param key
    -    *        the key of the BLOB
    -    * @return the (designated) physical storage location of the BLOB with 
the given job ID and key
    -    */
    -   static File getStorageLocation(File storageDir, JobID jobID, String 
key) {
    -           return new File(getJobDirectory(storageDir, jobID), 
BLOB_FILE_PREFIX + encodeKey(key));
    +           mkdirTolerateExisting(file.getParentFile(), "cache");
    --- End diff --
    
    "cache" is actually not the directory name but part of an error message (if 
there is an error) - thinking a bit more about this though, I can remove that 
parameter.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to