[ 
https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15930209#comment-15930209
 ] 

ASF GitHub Bot commented on FLINK-6008:
---------------------------------------

Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3512#discussion_r106674262
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java ---
    @@ -180,91 +180,159 @@ public URL getURL(final BlobKey requiredBlob) throws 
IOException {
     
                // fallback: download from the BlobServer
                final byte[] buf = new byte[BlobServerProtocol.BUFFER_SIZE];
    +           LOG.info("Downloading {} from {}", requiredBlob, serverAddress);
     
                // loop over retries
                int attempt = 0;
                while (true) {
    +                   try (
    +                           final BlobClient bc = new 
BlobClient(serverAddress, blobClientConfig);
    +                           final InputStream is = bc.get(requiredBlob);
    +                           final OutputStream os = new 
FileOutputStream(localJarFile)
    +                   ) {
    +                           getURLTransferFile(buf, is, os);
    +
    +                           // success, we finished
    +                           return localJarFile.toURI().toURL();
    +                   }
    +                   catch (Throwable t) {
    +                           getURLOnException(requiredBlob.toString(), 
localJarFile, attempt, t);
     
    -                   if (attempt == 0) {
    -                           LOG.info("Downloading {} from {}", 
requiredBlob, serverAddress);
    -                   } else {
    +                           // retry
    +                           ++attempt;
                                LOG.info("Downloading {} from {} (retry {})", 
requiredBlob, serverAddress, attempt);
                        }
    +           } // end loop over retries
    +   }
     
    -                   try {
    -                           BlobClient bc = null;
    -                           InputStream is = null;
    -                           OutputStream os = null;
    +   /**
    +    * Returns the URL for the BLOB with the given parameters. The method 
will first attempt to
    +    * serve the BLOB from its local cache. If the BLOB is not in the 
cache, the method will try
    +    * to download it from this cache's BLOB server.
    +    *
    +    * @param jobId     JobID of the file in the blob store
    +    * @param key       String key of the file in the blob store
    +    * @return URL referring to the local storage location of the BLOB.
    +    * @throws java.io.FileNotFoundException if the path does not exist;
    +    * @throws IOException Thrown if an I/O error occurs while downloading 
the BLOBs from the BLOB server.
    +    */
    +   public URL getURL(final JobID jobId, final String key) throws 
IOException {
    +           checkArgument(jobId != null, "Job id cannot be null.");
    +           checkArgument(key != null, "BLOB name cannot be null.");
     
    -                           try {
    -                                   bc = new BlobClient(serverAddress, 
blobClientConfig);
    -                                   is = bc.get(requiredBlob);
    -                                   os = new FileOutputStream(localJarFile);
    -
    -                                   while (true) {
    -                                           final int read = is.read(buf);
    -                                           if (read < 0) {
    -                                                   break;
    -                                           }
    -                                           os.write(buf, 0, read);
    -                                   }
    -
    -                                   // we do explicitly not use a finally 
block, because we want the closing
    -                                   // in the regular case to throw 
exceptions and cause the writing to fail.
    -                                   // But, the closing on exception should 
not throw further exceptions and
    -                                   // let us keep the root exception
    -                                   os.close();
    -                                   os = null;
    -                                   is.close();
    -                                   is = null;
    -                                   bc.close();
    -                                   bc = null;
    -
    -                                   // success, we finished
    -                                   return localJarFile.toURI().toURL();
    -                           }
    -                           catch (Throwable t) {
    -                                   // we use "catch (Throwable)" to keep 
the root exception. Otherwise that exception
    -                                   // it would be replaced by any 
exception thrown in the finally block
    -                                   IOUtils.closeQuietly(os);
    -                                   IOUtils.closeQuietly(is);
    -                                   IOUtils.closeQuietly(bc);
    -
    -                                   if (t instanceof IOException) {
    -                                           throw (IOException) t;
    -                                   } else {
    -                                           throw new 
IOException(t.getMessage(), t);
    -                                   }
    -                           }
    +           final File localJarFile = 
BlobUtils.getStorageLocation(storageDir, jobId, key);
    +
    +           if (localJarFile.exists()) {
    +                   return localJarFile.toURI().toURL();
    +           }
    +
    +           // first try the distributed blob store (if available)
    +           try {
    +                   blobStore.get(jobId, key, localJarFile);
    +           } catch (Exception e) {
    +                   LOG.info("Failed to copy from blob store. Downloading 
from BLOB server instead.", e);
    +           }
    +
    +           if (localJarFile.exists()) {
    +                   return localJarFile.toURI().toURL();
    +           }
    +
    +           // fallback: download from the BlobServer
    +           final byte[] buf = new byte[BlobServerProtocol.BUFFER_SIZE];
    +           LOG.info("Downloading {}/{} from {}", jobId, key, 
serverAddress);
    +
    +           // loop over retries
    +           int attempt = 0;
    +           while (true) {
    +                   try (
    +                           final BlobClient bc = new 
BlobClient(serverAddress, blobClientConfig);
    +                           final InputStream is = bc.get(jobId, key);
    +                           final OutputStream os = new 
FileOutputStream(localJarFile)
    +                   ) {
    +                           getURLTransferFile(buf, is, os);
    +
    +                           // success, we finished
    +                           return localJarFile.toURI().toURL();
                        }
    -                   catch (IOException e) {
    -                           String message = "Failed to fetch BLOB " + 
requiredBlob + " from " + serverAddress +
    -                                   " and store it under " + 
localJarFile.getAbsolutePath();
    -                           if (attempt < numFetchRetries) {
    -                                   attempt++;
    -                                   if (LOG.isDebugEnabled()) {
    -                                           LOG.debug(message + " 
Retrying...", e);
    -                                   } else {
    -                                           LOG.error(message + " 
Retrying...");
    -                                   }
    -                           }
    -                           else {
    -                                   LOG.error(message + " No retries 
left.", e);
    -                                   throw new IOException(message, e);
    -                           }
    +                   catch (Throwable t) {
    +                           getURLOnException(String.format("%s/%s", jobId, 
key), localJarFile, attempt, t);
    +
    +                           // retry
    +                           ++attempt;
    +                           LOG.info("Downloading {}/{} from {} (retry 
{})", jobId, key, serverAddress, attempt);
                        }
                } // end loop over retries
        }
     
    +   private static void getURLTransferFile(
    +                   final byte[] buf, final InputStream is, final 
OutputStream os) throws IOException {
    +           while (true) {
    +                   final int read = is.read(buf);
    +                   if (read < 0) {
    +                           break;
    +                   }
    +                   os.write(buf, 0, read);
    +           }
    +   }
    +
    +   private final void getURLOnException(
    +                   final String requiredBlob, final File localJarFile, 
final int attempt,
    +                   final Throwable t) throws IOException {
    +           String message = "Failed to fetch BLOB " + requiredBlob + " 
from " + serverAddress +
    +                   " and store it under " + localJarFile.getAbsolutePath();
    +           if (attempt < numFetchRetries) {
    +                   if (LOG.isDebugEnabled()) {
    +                           LOG.debug(message + " Retrying...", t);
    +                   } else {
    +                           LOG.error(message + " Retrying...");
    +                   }
    +           }
    +           else {
    +                   LOG.error(message + " No retries left.", t);
    +                   throw new IOException(message, t);
    +           }
    +   }
    +
        /**
         * Deletes the file associated with the given key from the BLOB cache.
    +    *
         * @param key referring to the file to be deleted
         */
    -   public void delete(BlobKey key) throws IOException{
    +   @Override
    +   public void delete(BlobKey key) {
                final File localFile = BlobUtils.getStorageLocation(storageDir, 
key);
     
                if (localFile.exists() && !localFile.delete()) {
    -                   LOG.warn("Failed to delete locally cached BLOB " + key 
+ " at " + localFile.getAbsolutePath());
    +                   LOG.warn("Failed to delete locally cached BLOB {} at 
{}" + key, localFile.getAbsolutePath());
    --- End diff --
    
    The `+` is wrong here, I think.


> collection of BlobServer improvements
> -------------------------------------
>
>                 Key: FLINK-6008
>                 URL: https://issues.apache.org/jira/browse/FLINK-6008
>             Project: Flink
>          Issue Type: Improvement
>          Components: Network
>    Affects Versions: 1.3.0
>            Reporter: Nico Kruber
>            Assignee: Nico Kruber
>
> The following things should be improved around the BlobServer/BlobCache:
> * update config uptions with non-deprecated ones, e.g. 
> {{high-availability.cluster-id}} and {{high-availability.storageDir}}
> * promote {{BlobStore#deleteAll(JobID)}} to the {{BlobService}}
> * extend the {{BlobService}} to work with {{NAME_ADDRESSABLE}} blobs 
> (prepares FLINK-4399]
> * remove {{NAME_ADDRESSABLE}} blobs after job/task termination
> * do not fail the {{BlobServer}} when a delete operation fails
> * code style, like using {{Preconditions.checkArgument}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to