[ https://issues.apache.org/jira/browse/FLINK-7057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16072337#comment-16072337 ]
ASF GitHub Bot commented on FLINK-7057: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4238#discussion_r125218256 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java --- @@ -107,146 +133,268 @@ public BlobCache( this.numFetchRetries = 0; } + // Initializing the clean up task + this.cleanupTimer = new Timer(true); + + cleanupInterval = blobClientConfig.getLong( + ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL, + ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000; + this.cleanupTimer.schedule(this, cleanupInterval, cleanupInterval); + // Add shutdown hook to delete storage directory shutdownHook = BlobUtils.addShutdownHook(this, LOG); } + @Override + public void registerJob(JobID jobId) { + synchronized (lockObject) { + RefCount ref = jobRefCounters.get(jobId); + if (ref == null) { + ref = new RefCount(); + jobRefCounters.put(jobId, ref); + } + ++ref.references; + } + } + + @Override + public void releaseJob(JobID jobId) { + synchronized (lockObject) { + RefCount ref = jobRefCounters.get(jobId); + + if (ref == null) { + LOG.warn("improper use of releaseJob() without a matching number of registerJob() calls"); + return; + } + + --ref.references; + if (ref.references == 0) { + ref.keepUntil = System.currentTimeMillis() + cleanupInterval; + } + } + } + + /** + * Returns local copy of the (job-unrelated) file for the BLOB with the given key. + * <p> + * The method will first attempt to serve the BLOB from its local cache. If the BLOB is not in + * the cache, the method will try to download it from this cache's BLOB server. + * + * @param key + * The key of the desired BLOB. + * + * @return file referring to the local storage location of the BLOB. + * + * @throws IOException + * Thrown if an I/O error occurs while downloading the BLOBs from the BLOB server. + */ + @Override + public File getFile(BlobKey key) throws IOException { + return getFileInternal(null, key); + } + /** - * Returns the URL for the BLOB with the given key. The method will first attempt to serve - * the BLOB from its local cache. If the BLOB is not in the cache, the method will try to download it - * from this cache's BLOB server. + * Returns local copy of the file for the BLOB with the given key. + * <p> + * The method will first attempt to serve the BLOB from its local cache. If the BLOB is not in + * the cache, the method will try to download it from this cache's BLOB server. * - * @param requiredBlob The key of the desired BLOB. - * @return URL referring to the local storage location of the BLOB. - * @throws IOException Thrown if an I/O error occurs while downloading the BLOBs from the BLOB server. + * @param jobId + * ID of the job this blob belongs to + * @param key + * The key of the desired BLOB. + * + * @return file referring to the local storage location of the BLOB. + * + * @throws IOException + * Thrown if an I/O error occurs while downloading the BLOBs from the BLOB server. */ - public URL getURL(final BlobKey requiredBlob) throws IOException { + @Override + public File getFile(@Nonnull JobID jobId, BlobKey key) throws IOException { + checkNotNull(jobId); + return getFileInternal(jobId, key); + } + + /** + * Returns local copy of the file for the BLOB with the given key. + * <p> + * The method will first attempt to serve the BLOB from its local cache. If the BLOB is not in + * the cache, the method will try to download it from this cache's BLOB server. + * + * @param jobId + * ID of the job this blob belongs to (or <tt>null</tt> if job-unrelated) + * @param requiredBlob + * The key of the desired BLOB. + * + * @return file referring to the local storage location of the BLOB. + * + * @throws IOException + * Thrown if an I/O error occurs while downloading the BLOBs from the BLOB server. + */ + private File getFileInternal(@Nullable JobID jobId, BlobKey requiredBlob) throws IOException { checkArgument(requiredBlob != null, "BLOB key cannot be null."); - final File localJarFile = BlobUtils.getStorageLocation(storageDir, requiredBlob); + final File localJarFile = BlobUtils.getStorageLocation(storageDir, jobId, requiredBlob); if (localJarFile.exists()) { - return localJarFile.toURI().toURL(); + return localJarFile; } // first try the distributed blob store (if available) try { - blobView.get(requiredBlob, localJarFile); + blobView.get(jobId, requiredBlob, localJarFile); } catch (Exception e) { LOG.info("Failed to copy from blob store. Downloading from BLOB server instead.", e); } if (localJarFile.exists()) { - return localJarFile.toURI().toURL(); + return localJarFile; } // fallback: download from the BlobServer final byte[] buf = new byte[BlobServerProtocol.BUFFER_SIZE]; + LOG.info("Downloading {}/{} from {}", jobId, requiredBlob, serverAddress); // loop over retries int attempt = 0; while (true) { - - if (attempt == 0) { - LOG.info("Downloading {} from {}", requiredBlob, serverAddress); - } else { - LOG.info("Downloading {} from {} (retry {})", requiredBlob, serverAddress, attempt); - } - - try { - BlobClient bc = null; - InputStream is = null; - OutputStream os = null; - - try { - bc = new BlobClient(serverAddress, blobClientConfig); - is = bc.get(requiredBlob); - os = new FileOutputStream(localJarFile); - - while (true) { - final int read = is.read(buf); - if (read < 0) { - break; - } - os.write(buf, 0, read); - } - - // we do explicitly not use a finally block, because we want the closing - // in the regular case to throw exceptions and cause the writing to fail. - // But, the closing on exception should not throw further exceptions and - // let us keep the root exception - os.close(); - os = null; - is.close(); - is = null; - bc.close(); - bc = null; - - // success, we finished - return localJarFile.toURI().toURL(); - } - catch (Throwable t) { - // we use "catch (Throwable)" to keep the root exception. Otherwise that exception - // it would be replaced by any exception thrown in the finally block - IOUtils.closeQuietly(os); - IOUtils.closeQuietly(is); - IOUtils.closeQuietly(bc); - - if (t instanceof IOException) { - throw (IOException) t; - } else { - throw new IOException(t.getMessage(), t); + try ( + final BlobClient bc = new BlobClient(serverAddress, blobClientConfig); + final InputStream is = bc.getInternal(jobId, requiredBlob); + final OutputStream os = new FileOutputStream(localJarFile) + ) { + while (true) { + final int read = is.read(buf); + if (read < 0) { + break; } + os.write(buf, 0, read); } + + // success, we finished + return localJarFile; } - catch (IOException e) { - String message = "Failed to fetch BLOB " + requiredBlob + " from " + serverAddress + + catch (Throwable t) { + String message = "Failed to fetch BLOB " + jobId + "/" + requiredBlob + " from " + serverAddress + " and store it under " + localJarFile.getAbsolutePath(); if (attempt < numFetchRetries) { - attempt++; if (LOG.isDebugEnabled()) { - LOG.debug(message + " Retrying...", e); + LOG.debug(message + " Retrying...", t); } else { LOG.error(message + " Retrying..."); } } else { - LOG.error(message + " No retries left.", e); - throw new IOException(message, e); + LOG.error(message + " No retries left.", t); + throw new IOException(message, t); } + + // retry + ++attempt; + LOG.info("Downloading {}/{} from {} (retry {})", jobId, requiredBlob, serverAddress, attempt); } } // end loop over retries } /** - * Deletes the file associated with the given key from the BLOB cache. - * @param key referring to the file to be deleted + * Deletes the (job-unrelated) file associated with the blob key in this BLOB cache. + * + * @param key + * blob key associated with the file to be deleted + * + * @throws IOException + */ + @Override + public void delete(BlobKey key) throws IOException { + deleteInternal(null, key); + } + + /** + * Deletes the file associated with the blob key in this BLOB cache. + * + * @param jobId + * ID of the job this blob belongs to + * @param key + * blob key associated with the file to be deleted + * + * @throws IOException */ - public void delete(BlobKey key) throws IOException{ - final File localFile = BlobUtils.getStorageLocation(storageDir, key); + @Override + public void delete(@Nonnull JobID jobId, BlobKey key) throws IOException { + checkNotNull(jobId); + deleteInternal(jobId, key); + } - if (localFile.exists() && !localFile.delete()) { - LOG.warn("Failed to delete locally cached BLOB " + key + " at " + localFile.getAbsolutePath()); + /** + * Deletes the file associated with the blob key in this BLOB cache. + * + * @param jobId + * ID of the job this blob belongs to (or <tt>null</tt> if job-unrelated) + * @param key + * blob key associated with the file to be deleted + * + * @throws IOException + */ + private void deleteInternal(@Nullable JobID jobId, BlobKey key) throws IOException{ + final File localFile = BlobUtils.getStorageLocation(storageDir, jobId, key); + if (!localFile.delete() && localFile.exists()) { + LOG.warn("Failed to delete locally cached BLOB {} at {}", key, localFile.getAbsolutePath()); } } /** - * Deletes the file associated with the given key from the BLOB cache and + * Deletes the (job-unrelated) file associated with the given key from the BLOB cache and * BLOB server. * - * @param key referring to the file to be deleted + * @param key + * referring to the file to be deleted + * * @throws IOException - * thrown if an I/O error occurs while transferring the request to - * the BLOB server or if the BLOB server cannot delete the file + * thrown if an I/O error occurs while transferring the request to the BLOB server or if the + * BLOB server cannot delete the file */ public void deleteGlobal(BlobKey key) throws IOException { + deleteGlobalInternal(null, key); + } + + /** + * Deletes the file associated with the given key from the BLOB cache and BLOB server. + * + * @param jobId + * ID of the job this blob belongs to + * @param key + * referring to the file to be deleted + * + * @throws IOException + * thrown if an I/O error occurs while transferring the request to the BLOB server or if the + * BLOB server cannot delete the file + */ + public void deleteGlobal(@Nonnull JobID jobId, BlobKey key) throws IOException { --- End diff -- Do we still want the cache to be able to delete files from the server? I thought the server should make these kind of decisions. > move BLOB ref-counting from LibraryCacheManager to BlobCache > ------------------------------------------------------------ > > Key: FLINK-7057 > URL: https://issues.apache.org/jira/browse/FLINK-7057 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, Network > Affects Versions: 1.4.0 > Reporter: Nico Kruber > Assignee: Nico Kruber > > Currently, the {{LibraryCacheManager}} is doing some ref-counting for JAR > files managed by it. Instead, we want the {{BlobCache}} to do that itself for > all job-related BLOBs. Also, we do not want to operate on a per-{{BlobKey}} > level but rather per job. Therefore, the cleanup process should be adapted, > too. -- This message was sent by Atlassian JIRA (v6.4.14#64029)