[ https://issues.apache.org/jira/browse/FLINK-7057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16072333#comment-16072333 ]
ASF GitHub Bot commented on FLINK-7057: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4238#discussion_r125249421 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java --- @@ -162,105 +164,116 @@ static File initStorageDirectory(String storageDirectory) throws } /** - * Returns the BLOB service's directory for incoming files. The directory is created if it did - * not exist so far. + * Returns the BLOB service's directory for incoming (job-unrelated) files. The directory is + * created if it does not exist yet. + * + * @param storageDir + * storage directory used be the BLOB service * - * @return the BLOB server's directory for incoming files + * @return the BLOB service's directory for incoming files */ static File getIncomingDirectory(File storageDir) { final File incomingDir = new File(storageDir, "incoming"); - if (!incomingDir.mkdirs() && !incomingDir.exists()) { - throw new RuntimeException("Cannot create directory for incoming files " + incomingDir.getAbsolutePath()); - } + mkdirTolerateExisting(incomingDir, "incoming"); return incomingDir; } /** - * Returns the BLOB service's directory for cached files. The directory is created if it did - * not exist so far. + * Makes sure a given directory exists by creating it if necessary. * - * @return the BLOB server's directory for cached files + * @param dir + * directory to create + * @param dirType + * the type of the directory (included in error message if something fails) */ - private static File getCacheDirectory(File storageDir) { - final File cacheDirectory = new File(storageDir, "cache"); - - if (!cacheDirectory.mkdirs() && !cacheDirectory.exists()) { - throw new RuntimeException("Could not create cache directory '" + cacheDirectory.getAbsolutePath() + "'."); + private static void mkdirTolerateExisting(final File dir, final String dirType) { + // note: thread-safe create should try to mkdir first and then ignore the case that the + // directory already existed + if (!dir.mkdirs() && !dir.exists()) { + throw new RuntimeException( + "Cannot create " + dirType + " directory '" + dir.getAbsolutePath() + "'."); } - - return cacheDirectory; } /** * Returns the (designated) physical storage location of the BLOB with the given key. * + * @param storageDir + * storage directory used be the BLOB service * @param key - * the key identifying the BLOB + * the key identifying the BLOB + * @param jobId + * ID of the job for the incoming files (or <tt>null</tt> if job-unrelated) + * * @return the (designated) physical storage location of the BLOB */ - static File getStorageLocation(File storageDir, BlobKey key) { - return new File(getCacheDirectory(storageDir), BLOB_FILE_PREFIX + key.toString()); - } + static File getStorageLocation( + @Nonnull File storageDir, @Nullable JobID jobId, @Nonnull BlobKey key) { + File file = new File(getStorageLocationPath(storageDir.getAbsolutePath(), jobId, key)); - /** - * Returns the (designated) physical storage location of the BLOB with the given job ID and key. - * - * @param jobID - * the ID of the job the BLOB belongs to - * @param key - * the key of the BLOB - * @return the (designated) physical storage location of the BLOB with the given job ID and key - */ - static File getStorageLocation(File storageDir, JobID jobID, String key) { - return new File(getJobDirectory(storageDir, jobID), BLOB_FILE_PREFIX + encodeKey(key)); + mkdirTolerateExisting(file.getParentFile(), "cache"); --- End diff -- Why are we creating a `cache` directory here? > move BLOB ref-counting from LibraryCacheManager to BlobCache > ------------------------------------------------------------ > > Key: FLINK-7057 > URL: https://issues.apache.org/jira/browse/FLINK-7057 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, Network > Affects Versions: 1.4.0 > Reporter: Nico Kruber > Assignee: Nico Kruber > > Currently, the {{LibraryCacheManager}} is doing some ref-counting for JAR > files managed by it. Instead, we want the {{BlobCache}} to do that itself for > all job-related BLOBs. Also, we do not want to operate on a per-{{BlobKey}} > level but rather per job. Therefore, the cleanup process should be adapted, > too. -- This message was sent by Atlassian JIRA (v6.4.14#64029)