Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3512#discussion_r106674262 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java --- @@ -180,91 +180,159 @@ public URL getURL(final BlobKey requiredBlob) throws IOException { // fallback: download from the BlobServer final byte[] buf = new byte[BlobServerProtocol.BUFFER_SIZE]; + LOG.info("Downloading {} from {}", requiredBlob, serverAddress); // loop over retries int attempt = 0; while (true) { + try ( + final BlobClient bc = new BlobClient(serverAddress, blobClientConfig); + final InputStream is = bc.get(requiredBlob); + final OutputStream os = new FileOutputStream(localJarFile) + ) { + getURLTransferFile(buf, is, os); + + // success, we finished + return localJarFile.toURI().toURL(); + } + catch (Throwable t) { + getURLOnException(requiredBlob.toString(), localJarFile, attempt, t); - if (attempt == 0) { - LOG.info("Downloading {} from {}", requiredBlob, serverAddress); - } else { + // retry + ++attempt; LOG.info("Downloading {} from {} (retry {})", requiredBlob, serverAddress, attempt); } + } // end loop over retries + } - try { - BlobClient bc = null; - InputStream is = null; - OutputStream os = null; + /** + * Returns the URL for the BLOB with the given parameters. The method will first attempt to + * serve the BLOB from its local cache. If the BLOB is not in the cache, the method will try + * to download it from this cache's BLOB server. + * + * @param jobId JobID of the file in the blob store + * @param key String key of the file in the blob store + * @return URL referring to the local storage location of the BLOB. + * @throws java.io.FileNotFoundException if the path does not exist; + * @throws IOException Thrown if an I/O error occurs while downloading the BLOBs from the BLOB server. + */ + public URL getURL(final JobID jobId, final String key) throws IOException { + checkArgument(jobId != null, "Job id cannot be null."); + checkArgument(key != null, "BLOB name cannot be null."); - try { - bc = new BlobClient(serverAddress, blobClientConfig); - is = bc.get(requiredBlob); - os = new FileOutputStream(localJarFile); - - while (true) { - final int read = is.read(buf); - if (read < 0) { - break; - } - os.write(buf, 0, read); - } - - // we do explicitly not use a finally block, because we want the closing - // in the regular case to throw exceptions and cause the writing to fail. - // But, the closing on exception should not throw further exceptions and - // let us keep the root exception - os.close(); - os = null; - is.close(); - is = null; - bc.close(); - bc = null; - - // success, we finished - return localJarFile.toURI().toURL(); - } - catch (Throwable t) { - // we use "catch (Throwable)" to keep the root exception. Otherwise that exception - // it would be replaced by any exception thrown in the finally block - IOUtils.closeQuietly(os); - IOUtils.closeQuietly(is); - IOUtils.closeQuietly(bc); - - if (t instanceof IOException) { - throw (IOException) t; - } else { - throw new IOException(t.getMessage(), t); - } - } + final File localJarFile = BlobUtils.getStorageLocation(storageDir, jobId, key); + + if (localJarFile.exists()) { + return localJarFile.toURI().toURL(); + } + + // first try the distributed blob store (if available) + try { + blobStore.get(jobId, key, localJarFile); + } catch (Exception e) { + LOG.info("Failed to copy from blob store. Downloading from BLOB server instead.", e); + } + + if (localJarFile.exists()) { + return localJarFile.toURI().toURL(); + } + + // fallback: download from the BlobServer + final byte[] buf = new byte[BlobServerProtocol.BUFFER_SIZE]; + LOG.info("Downloading {}/{} from {}", jobId, key, serverAddress); + + // loop over retries + int attempt = 0; + while (true) { + try ( + final BlobClient bc = new BlobClient(serverAddress, blobClientConfig); + final InputStream is = bc.get(jobId, key); + final OutputStream os = new FileOutputStream(localJarFile) + ) { + getURLTransferFile(buf, is, os); + + // success, we finished + return localJarFile.toURI().toURL(); } - catch (IOException e) { - String message = "Failed to fetch BLOB " + requiredBlob + " from " + serverAddress + - " and store it under " + localJarFile.getAbsolutePath(); - if (attempt < numFetchRetries) { - attempt++; - if (LOG.isDebugEnabled()) { - LOG.debug(message + " Retrying...", e); - } else { - LOG.error(message + " Retrying..."); - } - } - else { - LOG.error(message + " No retries left.", e); - throw new IOException(message, e); - } + catch (Throwable t) { + getURLOnException(String.format("%s/%s", jobId, key), localJarFile, attempt, t); + + // retry + ++attempt; + LOG.info("Downloading {}/{} from {} (retry {})", jobId, key, serverAddress, attempt); } } // end loop over retries } + private static void getURLTransferFile( + final byte[] buf, final InputStream is, final OutputStream os) throws IOException { + while (true) { + final int read = is.read(buf); + if (read < 0) { + break; + } + os.write(buf, 0, read); + } + } + + private final void getURLOnException( + final String requiredBlob, final File localJarFile, final int attempt, + final Throwable t) throws IOException { + String message = "Failed to fetch BLOB " + requiredBlob + " from " + serverAddress + + " and store it under " + localJarFile.getAbsolutePath(); + if (attempt < numFetchRetries) { + if (LOG.isDebugEnabled()) { + LOG.debug(message + " Retrying...", t); + } else { + LOG.error(message + " Retrying..."); + } + } + else { + LOG.error(message + " No retries left.", t); + throw new IOException(message, t); + } + } + /** * Deletes the file associated with the given key from the BLOB cache. + * * @param key referring to the file to be deleted */ - public void delete(BlobKey key) throws IOException{ + @Override + public void delete(BlobKey key) { final File localFile = BlobUtils.getStorageLocation(storageDir, key); if (localFile.exists() && !localFile.delete()) { - LOG.warn("Failed to delete locally cached BLOB " + key + " at " + localFile.getAbsolutePath()); + LOG.warn("Failed to delete locally cached BLOB {} at {}" + key, localFile.getAbsolutePath()); --- End diff -- The `+` is wrong here, I think.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---