[ https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15930209#comment-15930209 ]
ASF GitHub Bot commented on FLINK-6008: --------------------------------------- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3512#discussion_r106674262 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java --- @@ -180,91 +180,159 @@ public URL getURL(final BlobKey requiredBlob) throws IOException { // fallback: download from the BlobServer final byte[] buf = new byte[BlobServerProtocol.BUFFER_SIZE]; + LOG.info("Downloading {} from {}", requiredBlob, serverAddress); // loop over retries int attempt = 0; while (true) { + try ( + final BlobClient bc = new BlobClient(serverAddress, blobClientConfig); + final InputStream is = bc.get(requiredBlob); + final OutputStream os = new FileOutputStream(localJarFile) + ) { + getURLTransferFile(buf, is, os); + + // success, we finished + return localJarFile.toURI().toURL(); + } + catch (Throwable t) { + getURLOnException(requiredBlob.toString(), localJarFile, attempt, t); - if (attempt == 0) { - LOG.info("Downloading {} from {}", requiredBlob, serverAddress); - } else { + // retry + ++attempt; LOG.info("Downloading {} from {} (retry {})", requiredBlob, serverAddress, attempt); } + } // end loop over retries + } - try { - BlobClient bc = null; - InputStream is = null; - OutputStream os = null; + /** + * Returns the URL for the BLOB with the given parameters. The method will first attempt to + * serve the BLOB from its local cache. If the BLOB is not in the cache, the method will try + * to download it from this cache's BLOB server. + * + * @param jobId JobID of the file in the blob store + * @param key String key of the file in the blob store + * @return URL referring to the local storage location of the BLOB. + * @throws java.io.FileNotFoundException if the path does not exist; + * @throws IOException Thrown if an I/O error occurs while downloading the BLOBs from the BLOB server. + */ + public URL getURL(final JobID jobId, final String key) throws IOException { + checkArgument(jobId != null, "Job id cannot be null."); + checkArgument(key != null, "BLOB name cannot be null."); - try { - bc = new BlobClient(serverAddress, blobClientConfig); - is = bc.get(requiredBlob); - os = new FileOutputStream(localJarFile); - - while (true) { - final int read = is.read(buf); - if (read < 0) { - break; - } - os.write(buf, 0, read); - } - - // we do explicitly not use a finally block, because we want the closing - // in the regular case to throw exceptions and cause the writing to fail. - // But, the closing on exception should not throw further exceptions and - // let us keep the root exception - os.close(); - os = null; - is.close(); - is = null; - bc.close(); - bc = null; - - // success, we finished - return localJarFile.toURI().toURL(); - } - catch (Throwable t) { - // we use "catch (Throwable)" to keep the root exception. Otherwise that exception - // it would be replaced by any exception thrown in the finally block - IOUtils.closeQuietly(os); - IOUtils.closeQuietly(is); - IOUtils.closeQuietly(bc); - - if (t instanceof IOException) { - throw (IOException) t; - } else { - throw new IOException(t.getMessage(), t); - } - } + final File localJarFile = BlobUtils.getStorageLocation(storageDir, jobId, key); + + if (localJarFile.exists()) { + return localJarFile.toURI().toURL(); + } + + // first try the distributed blob store (if available) + try { + blobStore.get(jobId, key, localJarFile); + } catch (Exception e) { + LOG.info("Failed to copy from blob store. Downloading from BLOB server instead.", e); + } + + if (localJarFile.exists()) { + return localJarFile.toURI().toURL(); + } + + // fallback: download from the BlobServer + final byte[] buf = new byte[BlobServerProtocol.BUFFER_SIZE]; + LOG.info("Downloading {}/{} from {}", jobId, key, serverAddress); + + // loop over retries + int attempt = 0; + while (true) { + try ( + final BlobClient bc = new BlobClient(serverAddress, blobClientConfig); + final InputStream is = bc.get(jobId, key); + final OutputStream os = new FileOutputStream(localJarFile) + ) { + getURLTransferFile(buf, is, os); + + // success, we finished + return localJarFile.toURI().toURL(); } - catch (IOException e) { - String message = "Failed to fetch BLOB " + requiredBlob + " from " + serverAddress + - " and store it under " + localJarFile.getAbsolutePath(); - if (attempt < numFetchRetries) { - attempt++; - if (LOG.isDebugEnabled()) { - LOG.debug(message + " Retrying...", e); - } else { - LOG.error(message + " Retrying..."); - } - } - else { - LOG.error(message + " No retries left.", e); - throw new IOException(message, e); - } + catch (Throwable t) { + getURLOnException(String.format("%s/%s", jobId, key), localJarFile, attempt, t); + + // retry + ++attempt; + LOG.info("Downloading {}/{} from {} (retry {})", jobId, key, serverAddress, attempt); } } // end loop over retries } + private static void getURLTransferFile( + final byte[] buf, final InputStream is, final OutputStream os) throws IOException { + while (true) { + final int read = is.read(buf); + if (read < 0) { + break; + } + os.write(buf, 0, read); + } + } + + private final void getURLOnException( + final String requiredBlob, final File localJarFile, final int attempt, + final Throwable t) throws IOException { + String message = "Failed to fetch BLOB " + requiredBlob + " from " + serverAddress + + " and store it under " + localJarFile.getAbsolutePath(); + if (attempt < numFetchRetries) { + if (LOG.isDebugEnabled()) { + LOG.debug(message + " Retrying...", t); + } else { + LOG.error(message + " Retrying..."); + } + } + else { + LOG.error(message + " No retries left.", t); + throw new IOException(message, t); + } + } + /** * Deletes the file associated with the given key from the BLOB cache. + * * @param key referring to the file to be deleted */ - public void delete(BlobKey key) throws IOException{ + @Override + public void delete(BlobKey key) { final File localFile = BlobUtils.getStorageLocation(storageDir, key); if (localFile.exists() && !localFile.delete()) { - LOG.warn("Failed to delete locally cached BLOB " + key + " at " + localFile.getAbsolutePath()); + LOG.warn("Failed to delete locally cached BLOB {} at {}" + key, localFile.getAbsolutePath()); --- End diff -- The `+` is wrong here, I think. > collection of BlobServer improvements > ------------------------------------- > > Key: FLINK-6008 > URL: https://issues.apache.org/jira/browse/FLINK-6008 > Project: Flink > Issue Type: Improvement > Components: Network > Affects Versions: 1.3.0 > Reporter: Nico Kruber > Assignee: Nico Kruber > > The following things should be improved around the BlobServer/BlobCache: > * update config uptions with non-deprecated ones, e.g. > {{high-availability.cluster-id}} and {{high-availability.storageDir}} > * promote {{BlobStore#deleteAll(JobID)}} to the {{BlobService}} > * extend the {{BlobService}} to work with {{NAME_ADDRESSABLE}} blobs > (prepares FLINK-4399] > * remove {{NAME_ADDRESSABLE}} blobs after job/task termination > * do not fail the {{BlobServer}} when a delete operation fails > * code style, like using {{Preconditions.checkArgument}} -- This message was sent by Atlassian JIRA (v6.3.15#6346)