[ 
https://issues.apache.org/jira/browse/FLINK-7057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16072337#comment-16072337
 ] 

ASF GitHub Bot commented on FLINK-7057:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4238#discussion_r125218256
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java ---
    @@ -107,146 +133,268 @@ public BlobCache(
                        this.numFetchRetries = 0;
                }
     
    +           // Initializing the clean up task
    +           this.cleanupTimer = new Timer(true);
    +
    +           cleanupInterval = blobClientConfig.getLong(
    +                   ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL,
    +                   
ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000;
    +           this.cleanupTimer.schedule(this, cleanupInterval, 
cleanupInterval);
    +
                // Add shutdown hook to delete storage directory
                shutdownHook = BlobUtils.addShutdownHook(this, LOG);
        }
     
    +   @Override
    +   public void registerJob(JobID jobId) {
    +           synchronized (lockObject) {
    +                   RefCount ref = jobRefCounters.get(jobId);
    +                   if (ref == null) {
    +                           ref = new RefCount();
    +                           jobRefCounters.put(jobId, ref);
    +                   }
    +                   ++ref.references;
    +           }
    +   }
    +
    +   @Override
    +   public void releaseJob(JobID jobId) {
    +           synchronized (lockObject) {
    +                   RefCount ref = jobRefCounters.get(jobId);
    +
    +                   if (ref == null) {
    +                           LOG.warn("improper use of releaseJob() without 
a matching number of registerJob() calls");
    +                           return;
    +                   }
    +
    +                   --ref.references;
    +                   if (ref.references == 0) {
    +                           ref.keepUntil = System.currentTimeMillis() + 
cleanupInterval;
    +                   }
    +           }
    +   }
    +
    +   /**
    +    * Returns local copy of the (job-unrelated) file for the BLOB with the 
given key.
    +    * <p>
    +    * The method will first attempt to serve the BLOB from its local 
cache. If the BLOB is not in
    +    * the cache, the method will try to download it from this cache's BLOB 
server.
    +    *
    +    * @param key
    +    *              The key of the desired BLOB.
    +    *
    +    * @return file referring to the local storage location of the BLOB.
    +    *
    +    * @throws IOException
    +    *              Thrown if an I/O error occurs while downloading the 
BLOBs from the BLOB server.
    +    */
    +   @Override
    +   public File getFile(BlobKey key) throws IOException {
    +           return getFileInternal(null, key);
    +   }
    +
        /**
    -    * Returns the URL for the BLOB with the given key. The method will 
first attempt to serve
    -    * the BLOB from its local cache. If the BLOB is not in the cache, the 
method will try to download it
    -    * from this cache's BLOB server.
    +    * Returns local copy of the file for the BLOB with the given key.
    +    * <p>
    +    * The method will first attempt to serve the BLOB from its local 
cache. If the BLOB is not in
    +    * the cache, the method will try to download it from this cache's BLOB 
server.
         *
    -    * @param requiredBlob The key of the desired BLOB.
    -    * @return URL referring to the local storage location of the BLOB.
    -    * @throws IOException Thrown if an I/O error occurs while downloading 
the BLOBs from the BLOB server.
    +    * @param jobId
    +    *              ID of the job this blob belongs to
    +    * @param key
    +    *              The key of the desired BLOB.
    +    *
    +    * @return file referring to the local storage location of the BLOB.
    +    *
    +    * @throws IOException
    +    *              Thrown if an I/O error occurs while downloading the 
BLOBs from the BLOB server.
         */
    -   public URL getURL(final BlobKey requiredBlob) throws IOException {
    +   @Override
    +   public File getFile(@Nonnull JobID jobId, BlobKey key) throws 
IOException {
    +           checkNotNull(jobId);
    +           return getFileInternal(jobId, key);
    +   }
    +
    +   /**
    +    * Returns local copy of the file for the BLOB with the given key.
    +    * <p>
    +    * The method will first attempt to serve the BLOB from its local 
cache. If the BLOB is not in
    +    * the cache, the method will try to download it from this cache's BLOB 
server.
    +    *
    +    * @param jobId
    +    *              ID of the job this blob belongs to (or <tt>null</tt> if 
job-unrelated)
    +    * @param requiredBlob
    +    *              The key of the desired BLOB.
    +    *
    +    * @return file referring to the local storage location of the BLOB.
    +    *
    +    * @throws IOException
    +    *              Thrown if an I/O error occurs while downloading the 
BLOBs from the BLOB server.
    +    */
    +   private File getFileInternal(@Nullable JobID jobId, BlobKey 
requiredBlob) throws IOException {
                checkArgument(requiredBlob != null, "BLOB key cannot be null.");
     
    -           final File localJarFile = 
BlobUtils.getStorageLocation(storageDir, requiredBlob);
    +           final File localJarFile = 
BlobUtils.getStorageLocation(storageDir, jobId, requiredBlob);
     
                if (localJarFile.exists()) {
    -                   return localJarFile.toURI().toURL();
    +                   return localJarFile;
                }
     
                // first try the distributed blob store (if available)
                try {
    -                   blobView.get(requiredBlob, localJarFile);
    +                   blobView.get(jobId, requiredBlob, localJarFile);
                } catch (Exception e) {
                        LOG.info("Failed to copy from blob store. Downloading 
from BLOB server instead.", e);
                }
     
                if (localJarFile.exists()) {
    -                   return localJarFile.toURI().toURL();
    +                   return localJarFile;
                }
     
                // fallback: download from the BlobServer
                final byte[] buf = new byte[BlobServerProtocol.BUFFER_SIZE];
    +           LOG.info("Downloading {}/{} from {}", jobId, requiredBlob, 
serverAddress);
     
                // loop over retries
                int attempt = 0;
                while (true) {
    -
    -                   if (attempt == 0) {
    -                           LOG.info("Downloading {} from {}", 
requiredBlob, serverAddress);
    -                   } else {
    -                           LOG.info("Downloading {} from {} (retry {})", 
requiredBlob, serverAddress, attempt);
    -                   }
    -
    -                   try {
    -                           BlobClient bc = null;
    -                           InputStream is = null;
    -                           OutputStream os = null;
    -
    -                           try {
    -                                   bc = new BlobClient(serverAddress, 
blobClientConfig);
    -                                   is = bc.get(requiredBlob);
    -                                   os = new FileOutputStream(localJarFile);
    -
    -                                   while (true) {
    -                                           final int read = is.read(buf);
    -                                           if (read < 0) {
    -                                                   break;
    -                                           }
    -                                           os.write(buf, 0, read);
    -                                   }
    -
    -                                   // we do explicitly not use a finally 
block, because we want the closing
    -                                   // in the regular case to throw 
exceptions and cause the writing to fail.
    -                                   // But, the closing on exception should 
not throw further exceptions and
    -                                   // let us keep the root exception
    -                                   os.close();
    -                                   os = null;
    -                                   is.close();
    -                                   is = null;
    -                                   bc.close();
    -                                   bc = null;
    -
    -                                   // success, we finished
    -                                   return localJarFile.toURI().toURL();
    -                           }
    -                           catch (Throwable t) {
    -                                   // we use "catch (Throwable)" to keep 
the root exception. Otherwise that exception
    -                                   // it would be replaced by any 
exception thrown in the finally block
    -                                   IOUtils.closeQuietly(os);
    -                                   IOUtils.closeQuietly(is);
    -                                   IOUtils.closeQuietly(bc);
    -
    -                                   if (t instanceof IOException) {
    -                                           throw (IOException) t;
    -                                   } else {
    -                                           throw new 
IOException(t.getMessage(), t);
    +                   try (
    +                           final BlobClient bc = new 
BlobClient(serverAddress, blobClientConfig);
    +                           final InputStream is = bc.getInternal(jobId, 
requiredBlob);
    +                           final OutputStream os = new 
FileOutputStream(localJarFile)
    +                   ) {
    +                           while (true) {
    +                                   final int read = is.read(buf);
    +                                   if (read < 0) {
    +                                           break;
                                        }
    +                                   os.write(buf, 0, read);
                                }
    +
    +                           // success, we finished
    +                           return localJarFile;
                        }
    -                   catch (IOException e) {
    -                           String message = "Failed to fetch BLOB " + 
requiredBlob + " from " + serverAddress +
    +                   catch (Throwable t) {
    +                           String message = "Failed to fetch BLOB " + 
jobId + "/" + requiredBlob + " from " + serverAddress +
                                        " and store it under " + 
localJarFile.getAbsolutePath();
                                if (attempt < numFetchRetries) {
    -                                   attempt++;
                                        if (LOG.isDebugEnabled()) {
    -                                           LOG.debug(message + " 
Retrying...", e);
    +                                           LOG.debug(message + " 
Retrying...", t);
                                        } else {
                                                LOG.error(message + " 
Retrying...");
                                        }
                                }
                                else {
    -                                   LOG.error(message + " No retries 
left.", e);
    -                                   throw new IOException(message, e);
    +                                   LOG.error(message + " No retries 
left.", t);
    +                                   throw new IOException(message, t);
                                }
    +
    +                           // retry
    +                           ++attempt;
    +                           LOG.info("Downloading {}/{} from {} (retry 
{})", jobId, requiredBlob, serverAddress, attempt);
                        }
                } // end loop over retries
        }
     
        /**
    -    * Deletes the file associated with the given key from the BLOB cache.
    -    * @param key referring to the file to be deleted
    +    * Deletes the (job-unrelated) file associated with the blob key in 
this BLOB cache.
    +    *
    +    * @param key
    +    *              blob key associated with the file to be deleted
    +    *
    +    * @throws IOException
    +    */
    +   @Override
    +   public void delete(BlobKey key) throws IOException {
    +           deleteInternal(null, key);
    +   }
    +
    +   /**
    +    * Deletes the file associated with the blob key in this BLOB cache.
    +    *
    +    * @param jobId
    +    *              ID of the job this blob belongs to
    +    * @param key
    +    *              blob key associated with the file to be deleted
    +    *
    +    * @throws IOException
         */
    -   public void delete(BlobKey key) throws IOException{
    -           final File localFile = BlobUtils.getStorageLocation(storageDir, 
key);
    +   @Override
    +   public void delete(@Nonnull JobID jobId, BlobKey key) throws 
IOException {
    +           checkNotNull(jobId);
    +           deleteInternal(jobId, key);
    +   }
     
    -           if (localFile.exists() && !localFile.delete()) {
    -                   LOG.warn("Failed to delete locally cached BLOB " + key 
+ " at " + localFile.getAbsolutePath());
    +   /**
    +    * Deletes the file associated with the blob key in this BLOB cache.
    +    *
    +    * @param jobId
    +    *              ID of the job this blob belongs to (or <tt>null</tt> if 
job-unrelated)
    +    * @param key
    +    *              blob key associated with the file to be deleted
    +    *
    +    * @throws IOException
    +    */
    +   private void deleteInternal(@Nullable JobID jobId, BlobKey key) throws 
IOException{
    +           final File localFile = BlobUtils.getStorageLocation(storageDir, 
jobId, key);
    +           if (!localFile.delete() && localFile.exists()) {
    +                   LOG.warn("Failed to delete locally cached BLOB {} at 
{}", key, localFile.getAbsolutePath());
                }
        }
     
        /**
    -    * Deletes the file associated with the given key from the BLOB cache 
and
    +    * Deletes the (job-unrelated) file associated with the given key from 
the BLOB cache and
         * BLOB server.
         *
    -    * @param key referring to the file to be deleted
    +    * @param key
    +    *              referring to the file to be deleted
    +    *
         * @throws IOException
    -    *         thrown if an I/O error occurs while transferring the request 
to
    -    *         the BLOB server or if the BLOB server cannot delete the file
    +    *              thrown if an I/O error occurs while transferring the 
request to the BLOB server or if the
    +    *              BLOB server cannot delete the file
         */
        public void deleteGlobal(BlobKey key) throws IOException {
    +           deleteGlobalInternal(null, key);
    +   }
    +
    +   /**
    +    * Deletes the file associated with the given key from the BLOB cache 
and BLOB server.
    +    *
    +    * @param jobId
    +    *              ID of the job this blob belongs to
    +    * @param key
    +    *              referring to the file to be deleted
    +    *
    +    * @throws IOException
    +    *              thrown if an I/O error occurs while transferring the 
request to the BLOB server or if the
    +    *              BLOB server cannot delete the file
    +    */
    +   public void deleteGlobal(@Nonnull JobID jobId, BlobKey key) throws 
IOException {
    --- End diff --
    
    Do we still want the cache to be able to delete files from the server? I 
thought the server should make these kind of decisions.


> move BLOB ref-counting from LibraryCacheManager to BlobCache
> ------------------------------------------------------------
>
>                 Key: FLINK-7057
>                 URL: https://issues.apache.org/jira/browse/FLINK-7057
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Distributed Coordination, Network
>    Affects Versions: 1.4.0
>            Reporter: Nico Kruber
>            Assignee: Nico Kruber
>
> Currently, the {{LibraryCacheManager}} is doing some ref-counting for JAR 
> files managed by it. Instead, we want the {{BlobCache}} to do that itself for 
> all job-related BLOBs. Also, we do not want to operate on a per-{{BlobKey}} 
> level but rather per job. Therefore, the cleanup process should be adapted, 
> too.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to