[ 
https://issues.apache.org/jira/browse/FLINK-7057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16072338#comment-16072338
 ] 

ASF GitHub Bot commented on FLINK-7057:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4238#discussion_r125217930
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java ---
    @@ -107,146 +133,268 @@ public BlobCache(
                        this.numFetchRetries = 0;
                }
     
    +           // Initializing the clean up task
    +           this.cleanupTimer = new Timer(true);
    +
    +           cleanupInterval = blobClientConfig.getLong(
    +                   ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL,
    +                   
ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000;
    +           this.cleanupTimer.schedule(this, cleanupInterval, 
cleanupInterval);
    +
                // Add shutdown hook to delete storage directory
                shutdownHook = BlobUtils.addShutdownHook(this, LOG);
        }
     
    +   @Override
    +   public void registerJob(JobID jobId) {
    +           synchronized (lockObject) {
    +                   RefCount ref = jobRefCounters.get(jobId);
    +                   if (ref == null) {
    +                           ref = new RefCount();
    +                           jobRefCounters.put(jobId, ref);
    +                   }
    +                   ++ref.references;
    +           }
    +   }
    +
    +   @Override
    +   public void releaseJob(JobID jobId) {
    +           synchronized (lockObject) {
    +                   RefCount ref = jobRefCounters.get(jobId);
    +
    +                   if (ref == null) {
    +                           LOG.warn("improper use of releaseJob() without 
a matching number of registerJob() calls");
    +                           return;
    +                   }
    +
    +                   --ref.references;
    +                   if (ref.references == 0) {
    +                           ref.keepUntil = System.currentTimeMillis() + 
cleanupInterval;
    +                   }
    +           }
    +   }
    +
    +   /**
    +    * Returns local copy of the (job-unrelated) file for the BLOB with the 
given key.
    +    * <p>
    +    * The method will first attempt to serve the BLOB from its local 
cache. If the BLOB is not in
    +    * the cache, the method will try to download it from this cache's BLOB 
server.
    +    *
    +    * @param key
    +    *              The key of the desired BLOB.
    +    *
    +    * @return file referring to the local storage location of the BLOB.
    +    *
    +    * @throws IOException
    +    *              Thrown if an I/O error occurs while downloading the 
BLOBs from the BLOB server.
    +    */
    +   @Override
    +   public File getFile(BlobKey key) throws IOException {
    +           return getFileInternal(null, key);
    +   }
    +
        /**
    -    * Returns the URL for the BLOB with the given key. The method will 
first attempt to serve
    -    * the BLOB from its local cache. If the BLOB is not in the cache, the 
method will try to download it
    -    * from this cache's BLOB server.
    +    * Returns local copy of the file for the BLOB with the given key.
    +    * <p>
    +    * The method will first attempt to serve the BLOB from its local 
cache. If the BLOB is not in
    +    * the cache, the method will try to download it from this cache's BLOB 
server.
         *
    -    * @param requiredBlob The key of the desired BLOB.
    -    * @return URL referring to the local storage location of the BLOB.
    -    * @throws IOException Thrown if an I/O error occurs while downloading 
the BLOBs from the BLOB server.
    +    * @param jobId
    +    *              ID of the job this blob belongs to
    +    * @param key
    +    *              The key of the desired BLOB.
    +    *
    +    * @return file referring to the local storage location of the BLOB.
    +    *
    +    * @throws IOException
    +    *              Thrown if an I/O error occurs while downloading the 
BLOBs from the BLOB server.
         */
    -   public URL getURL(final BlobKey requiredBlob) throws IOException {
    +   @Override
    +   public File getFile(@Nonnull JobID jobId, BlobKey key) throws 
IOException {
    --- End diff --
    
    I think so far the convention is that fields without an annotation are 
considered `@Nonnull` and only fields which are annotated with `@Nullable` can 
be `null`. Otherwise `key` should also be marked as `@Nonnull`.


> move BLOB ref-counting from LibraryCacheManager to BlobCache
> ------------------------------------------------------------
>
>                 Key: FLINK-7057
>                 URL: https://issues.apache.org/jira/browse/FLINK-7057
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Distributed Coordination, Network
>    Affects Versions: 1.4.0
>            Reporter: Nico Kruber
>            Assignee: Nico Kruber
>
> Currently, the {{LibraryCacheManager}} is doing some ref-counting for JAR 
> files managed by it. Instead, we want the {{BlobCache}} to do that itself for 
> all job-related BLOBs. Also, we do not want to operate on a per-{{BlobKey}} 
> level but rather per job. Therefore, the cleanup process should be adapted, 
> too.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to