[ https://issues.apache.org/jira/browse/FLINK-7057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16072338#comment-16072338 ]
ASF GitHub Bot commented on FLINK-7057: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4238#discussion_r125217930 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java --- @@ -107,146 +133,268 @@ public BlobCache( this.numFetchRetries = 0; } + // Initializing the clean up task + this.cleanupTimer = new Timer(true); + + cleanupInterval = blobClientConfig.getLong( + ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL, + ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000; + this.cleanupTimer.schedule(this, cleanupInterval, cleanupInterval); + // Add shutdown hook to delete storage directory shutdownHook = BlobUtils.addShutdownHook(this, LOG); } + @Override + public void registerJob(JobID jobId) { + synchronized (lockObject) { + RefCount ref = jobRefCounters.get(jobId); + if (ref == null) { + ref = new RefCount(); + jobRefCounters.put(jobId, ref); + } + ++ref.references; + } + } + + @Override + public void releaseJob(JobID jobId) { + synchronized (lockObject) { + RefCount ref = jobRefCounters.get(jobId); + + if (ref == null) { + LOG.warn("improper use of releaseJob() without a matching number of registerJob() calls"); + return; + } + + --ref.references; + if (ref.references == 0) { + ref.keepUntil = System.currentTimeMillis() + cleanupInterval; + } + } + } + + /** + * Returns local copy of the (job-unrelated) file for the BLOB with the given key. + * <p> + * The method will first attempt to serve the BLOB from its local cache. If the BLOB is not in + * the cache, the method will try to download it from this cache's BLOB server. + * + * @param key + * The key of the desired BLOB. + * + * @return file referring to the local storage location of the BLOB. + * + * @throws IOException + * Thrown if an I/O error occurs while downloading the BLOBs from the BLOB server. + */ + @Override + public File getFile(BlobKey key) throws IOException { + return getFileInternal(null, key); + } + /** - * Returns the URL for the BLOB with the given key. The method will first attempt to serve - * the BLOB from its local cache. If the BLOB is not in the cache, the method will try to download it - * from this cache's BLOB server. + * Returns local copy of the file for the BLOB with the given key. + * <p> + * The method will first attempt to serve the BLOB from its local cache. If the BLOB is not in + * the cache, the method will try to download it from this cache's BLOB server. * - * @param requiredBlob The key of the desired BLOB. - * @return URL referring to the local storage location of the BLOB. - * @throws IOException Thrown if an I/O error occurs while downloading the BLOBs from the BLOB server. + * @param jobId + * ID of the job this blob belongs to + * @param key + * The key of the desired BLOB. + * + * @return file referring to the local storage location of the BLOB. + * + * @throws IOException + * Thrown if an I/O error occurs while downloading the BLOBs from the BLOB server. */ - public URL getURL(final BlobKey requiredBlob) throws IOException { + @Override + public File getFile(@Nonnull JobID jobId, BlobKey key) throws IOException { --- End diff -- I think so far the convention is that fields without an annotation are considered `@Nonnull` and only fields which are annotated with `@Nullable` can be `null`. Otherwise `key` should also be marked as `@Nonnull`. > move BLOB ref-counting from LibraryCacheManager to BlobCache > ------------------------------------------------------------ > > Key: FLINK-7057 > URL: https://issues.apache.org/jira/browse/FLINK-7057 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, Network > Affects Versions: 1.4.0 > Reporter: Nico Kruber > Assignee: Nico Kruber > > Currently, the {{LibraryCacheManager}} is doing some ref-counting for JAR > files managed by it. Instead, we want the {{BlobCache}} to do that itself for > all job-related BLOBs. Also, we do not want to operate on a per-{{BlobKey}} > level but rather per job. Therefore, the cleanup process should be adapted, > too. -- This message was sent by Atlassian JIRA (v6.4.14#64029)