Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3512#discussion_r106674262
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java ---
    @@ -180,91 +180,159 @@ public URL getURL(final BlobKey requiredBlob) throws 
IOException {
     
                // fallback: download from the BlobServer
                final byte[] buf = new byte[BlobServerProtocol.BUFFER_SIZE];
    +           LOG.info("Downloading {} from {}", requiredBlob, serverAddress);
     
                // loop over retries
                int attempt = 0;
                while (true) {
    +                   try (
    +                           final BlobClient bc = new 
BlobClient(serverAddress, blobClientConfig);
    +                           final InputStream is = bc.get(requiredBlob);
    +                           final OutputStream os = new 
FileOutputStream(localJarFile)
    +                   ) {
    +                           getURLTransferFile(buf, is, os);
    +
    +                           // success, we finished
    +                           return localJarFile.toURI().toURL();
    +                   }
    +                   catch (Throwable t) {
    +                           getURLOnException(requiredBlob.toString(), 
localJarFile, attempt, t);
     
    -                   if (attempt == 0) {
    -                           LOG.info("Downloading {} from {}", 
requiredBlob, serverAddress);
    -                   } else {
    +                           // retry
    +                           ++attempt;
                                LOG.info("Downloading {} from {} (retry {})", 
requiredBlob, serverAddress, attempt);
                        }
    +           } // end loop over retries
    +   }
     
    -                   try {
    -                           BlobClient bc = null;
    -                           InputStream is = null;
    -                           OutputStream os = null;
    +   /**
    +    * Returns the URL for the BLOB with the given parameters. The method 
will first attempt to
    +    * serve the BLOB from its local cache. If the BLOB is not in the 
cache, the method will try
    +    * to download it from this cache's BLOB server.
    +    *
    +    * @param jobId     JobID of the file in the blob store
    +    * @param key       String key of the file in the blob store
    +    * @return URL referring to the local storage location of the BLOB.
    +    * @throws java.io.FileNotFoundException if the path does not exist;
    +    * @throws IOException Thrown if an I/O error occurs while downloading 
the BLOBs from the BLOB server.
    +    */
    +   public URL getURL(final JobID jobId, final String key) throws 
IOException {
    +           checkArgument(jobId != null, "Job id cannot be null.");
    +           checkArgument(key != null, "BLOB name cannot be null.");
     
    -                           try {
    -                                   bc = new BlobClient(serverAddress, 
blobClientConfig);
    -                                   is = bc.get(requiredBlob);
    -                                   os = new FileOutputStream(localJarFile);
    -
    -                                   while (true) {
    -                                           final int read = is.read(buf);
    -                                           if (read < 0) {
    -                                                   break;
    -                                           }
    -                                           os.write(buf, 0, read);
    -                                   }
    -
    -                                   // we do explicitly not use a finally 
block, because we want the closing
    -                                   // in the regular case to throw 
exceptions and cause the writing to fail.
    -                                   // But, the closing on exception should 
not throw further exceptions and
    -                                   // let us keep the root exception
    -                                   os.close();
    -                                   os = null;
    -                                   is.close();
    -                                   is = null;
    -                                   bc.close();
    -                                   bc = null;
    -
    -                                   // success, we finished
    -                                   return localJarFile.toURI().toURL();
    -                           }
    -                           catch (Throwable t) {
    -                                   // we use "catch (Throwable)" to keep 
the root exception. Otherwise that exception
    -                                   // it would be replaced by any 
exception thrown in the finally block
    -                                   IOUtils.closeQuietly(os);
    -                                   IOUtils.closeQuietly(is);
    -                                   IOUtils.closeQuietly(bc);
    -
    -                                   if (t instanceof IOException) {
    -                                           throw (IOException) t;
    -                                   } else {
    -                                           throw new 
IOException(t.getMessage(), t);
    -                                   }
    -                           }
    +           final File localJarFile = 
BlobUtils.getStorageLocation(storageDir, jobId, key);
    +
    +           if (localJarFile.exists()) {
    +                   return localJarFile.toURI().toURL();
    +           }
    +
    +           // first try the distributed blob store (if available)
    +           try {
    +                   blobStore.get(jobId, key, localJarFile);
    +           } catch (Exception e) {
    +                   LOG.info("Failed to copy from blob store. Downloading 
from BLOB server instead.", e);
    +           }
    +
    +           if (localJarFile.exists()) {
    +                   return localJarFile.toURI().toURL();
    +           }
    +
    +           // fallback: download from the BlobServer
    +           final byte[] buf = new byte[BlobServerProtocol.BUFFER_SIZE];
    +           LOG.info("Downloading {}/{} from {}", jobId, key, 
serverAddress);
    +
    +           // loop over retries
    +           int attempt = 0;
    +           while (true) {
    +                   try (
    +                           final BlobClient bc = new 
BlobClient(serverAddress, blobClientConfig);
    +                           final InputStream is = bc.get(jobId, key);
    +                           final OutputStream os = new 
FileOutputStream(localJarFile)
    +                   ) {
    +                           getURLTransferFile(buf, is, os);
    +
    +                           // success, we finished
    +                           return localJarFile.toURI().toURL();
                        }
    -                   catch (IOException e) {
    -                           String message = "Failed to fetch BLOB " + 
requiredBlob + " from " + serverAddress +
    -                                   " and store it under " + 
localJarFile.getAbsolutePath();
    -                           if (attempt < numFetchRetries) {
    -                                   attempt++;
    -                                   if (LOG.isDebugEnabled()) {
    -                                           LOG.debug(message + " 
Retrying...", e);
    -                                   } else {
    -                                           LOG.error(message + " 
Retrying...");
    -                                   }
    -                           }
    -                           else {
    -                                   LOG.error(message + " No retries 
left.", e);
    -                                   throw new IOException(message, e);
    -                           }
    +                   catch (Throwable t) {
    +                           getURLOnException(String.format("%s/%s", jobId, 
key), localJarFile, attempt, t);
    +
    +                           // retry
    +                           ++attempt;
    +                           LOG.info("Downloading {}/{} from {} (retry 
{})", jobId, key, serverAddress, attempt);
                        }
                } // end loop over retries
        }
     
    +   private static void getURLTransferFile(
    +                   final byte[] buf, final InputStream is, final 
OutputStream os) throws IOException {
    +           while (true) {
    +                   final int read = is.read(buf);
    +                   if (read < 0) {
    +                           break;
    +                   }
    +                   os.write(buf, 0, read);
    +           }
    +   }
    +
    +   private final void getURLOnException(
    +                   final String requiredBlob, final File localJarFile, 
final int attempt,
    +                   final Throwable t) throws IOException {
    +           String message = "Failed to fetch BLOB " + requiredBlob + " 
from " + serverAddress +
    +                   " and store it under " + localJarFile.getAbsolutePath();
    +           if (attempt < numFetchRetries) {
    +                   if (LOG.isDebugEnabled()) {
    +                           LOG.debug(message + " Retrying...", t);
    +                   } else {
    +                           LOG.error(message + " Retrying...");
    +                   }
    +           }
    +           else {
    +                   LOG.error(message + " No retries left.", t);
    +                   throw new IOException(message, t);
    +           }
    +   }
    +
        /**
         * Deletes the file associated with the given key from the BLOB cache.
    +    *
         * @param key referring to the file to be deleted
         */
    -   public void delete(BlobKey key) throws IOException{
    +   @Override
    +   public void delete(BlobKey key) {
                final File localFile = BlobUtils.getStorageLocation(storageDir, 
key);
     
                if (localFile.exists() && !localFile.delete()) {
    -                   LOG.warn("Failed to delete locally cached BLOB " + key 
+ " at " + localFile.getAbsolutePath());
    +                   LOG.warn("Failed to delete locally cached BLOB {} at 
{}" + key, localFile.getAbsolutePath());
    --- End diff --
    
    The `+` is wrong here, I think.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to