[
https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15930210#comment-15930210
]
ASF GitHub Bot commented on FLINK-6008:
---------------------------------------
Github user StephanEwen commented on a diff in the pull request:
https://github.com/apache/flink/pull/3512#discussion_r106674837
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java ---
@@ -180,91 +180,159 @@ public URL getURL(final BlobKey requiredBlob) throws
IOException {
// fallback: download from the BlobServer
final byte[] buf = new byte[BlobServerProtocol.BUFFER_SIZE];
+ LOG.info("Downloading {} from {}", requiredBlob, serverAddress);
// loop over retries
int attempt = 0;
while (true) {
+ try (
+ final BlobClient bc = new
BlobClient(serverAddress, blobClientConfig);
+ final InputStream is = bc.get(requiredBlob);
+ final OutputStream os = new
FileOutputStream(localJarFile)
+ ) {
+ getURLTransferFile(buf, is, os);
+
+ // success, we finished
+ return localJarFile.toURI().toURL();
+ }
+ catch (Throwable t) {
+ getURLOnException(requiredBlob.toString(),
localJarFile, attempt, t);
- if (attempt == 0) {
- LOG.info("Downloading {} from {}",
requiredBlob, serverAddress);
- } else {
+ // retry
+ ++attempt;
LOG.info("Downloading {} from {} (retry {})",
requiredBlob, serverAddress, attempt);
}
+ } // end loop over retries
+ }
- try {
- BlobClient bc = null;
- InputStream is = null;
- OutputStream os = null;
+ /**
+ * Returns the URL for the BLOB with the given parameters. The method
will first attempt to
+ * serve the BLOB from its local cache. If the BLOB is not in the
cache, the method will try
+ * to download it from this cache's BLOB server.
+ *
+ * @param jobId JobID of the file in the blob store
+ * @param key String key of the file in the blob store
+ * @return URL referring to the local storage location of the BLOB.
+ * @throws java.io.FileNotFoundException if the path does not exist;
+ * @throws IOException Thrown if an I/O error occurs while downloading
the BLOBs from the BLOB server.
+ */
+ public URL getURL(final JobID jobId, final String key) throws
IOException {
+ checkArgument(jobId != null, "Job id cannot be null.");
+ checkArgument(key != null, "BLOB name cannot be null.");
- try {
- bc = new BlobClient(serverAddress,
blobClientConfig);
- is = bc.get(requiredBlob);
- os = new FileOutputStream(localJarFile);
-
- while (true) {
- final int read = is.read(buf);
- if (read < 0) {
- break;
- }
- os.write(buf, 0, read);
- }
-
- // we do explicitly not use a finally
block, because we want the closing
- // in the regular case to throw
exceptions and cause the writing to fail.
- // But, the closing on exception should
not throw further exceptions and
- // let us keep the root exception
- os.close();
- os = null;
- is.close();
- is = null;
- bc.close();
- bc = null;
-
- // success, we finished
- return localJarFile.toURI().toURL();
- }
- catch (Throwable t) {
- // we use "catch (Throwable)" to keep
the root exception. Otherwise that exception
- // it would be replaced by any
exception thrown in the finally block
- IOUtils.closeQuietly(os);
- IOUtils.closeQuietly(is);
- IOUtils.closeQuietly(bc);
-
- if (t instanceof IOException) {
- throw (IOException) t;
- } else {
- throw new
IOException(t.getMessage(), t);
- }
- }
+ final File localJarFile =
BlobUtils.getStorageLocation(storageDir, jobId, key);
+
+ if (localJarFile.exists()) {
+ return localJarFile.toURI().toURL();
+ }
+
+ // first try the distributed blob store (if available)
+ try {
+ blobStore.get(jobId, key, localJarFile);
+ } catch (Exception e) {
+ LOG.info("Failed to copy from blob store. Downloading
from BLOB server instead.", e);
+ }
+
+ if (localJarFile.exists()) {
+ return localJarFile.toURI().toURL();
+ }
+
+ // fallback: download from the BlobServer
+ final byte[] buf = new byte[BlobServerProtocol.BUFFER_SIZE];
+ LOG.info("Downloading {}/{} from {}", jobId, key,
serverAddress);
+
+ // loop over retries
+ int attempt = 0;
+ while (true) {
+ try (
+ final BlobClient bc = new
BlobClient(serverAddress, blobClientConfig);
+ final InputStream is = bc.get(jobId, key);
+ final OutputStream os = new
FileOutputStream(localJarFile)
+ ) {
+ getURLTransferFile(buf, is, os);
+
+ // success, we finished
+ return localJarFile.toURI().toURL();
}
- catch (IOException e) {
- String message = "Failed to fetch BLOB " +
requiredBlob + " from " + serverAddress +
- " and store it under " +
localJarFile.getAbsolutePath();
- if (attempt < numFetchRetries) {
- attempt++;
- if (LOG.isDebugEnabled()) {
- LOG.debug(message + "
Retrying...", e);
- } else {
- LOG.error(message + "
Retrying...");
- }
- }
- else {
- LOG.error(message + " No retries
left.", e);
- throw new IOException(message, e);
- }
+ catch (Throwable t) {
+ getURLOnException(String.format("%s/%s", jobId,
key), localJarFile, attempt, t);
+
+ // retry
+ ++attempt;
+ LOG.info("Downloading {}/{} from {} (retry
{})", jobId, key, serverAddress, attempt);
}
} // end loop over retries
}
+ private static void getURLTransferFile(
+ final byte[] buf, final InputStream is, final
OutputStream os) throws IOException {
+ while (true) {
+ final int read = is.read(buf);
+ if (read < 0) {
+ break;
+ }
+ os.write(buf, 0, read);
+ }
+ }
+
+ private final void getURLOnException(
+ final String requiredBlob, final File localJarFile,
final int attempt,
+ final Throwable t) throws IOException {
+ String message = "Failed to fetch BLOB " + requiredBlob + "
from " + serverAddress +
+ " and store it under " + localJarFile.getAbsolutePath();
+ if (attempt < numFetchRetries) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(message + " Retrying...", t);
+ } else {
+ LOG.error(message + " Retrying...");
+ }
+ }
+ else {
+ LOG.error(message + " No retries left.", t);
+ throw new IOException(message, t);
+ }
+ }
+
/**
* Deletes the file associated with the given key from the BLOB cache.
+ *
* @param key referring to the file to be deleted
*/
- public void delete(BlobKey key) throws IOException{
+ @Override
+ public void delete(BlobKey key) {
final File localFile = BlobUtils.getStorageLocation(storageDir,
key);
if (localFile.exists() && !localFile.delete()) {
- LOG.warn("Failed to delete locally cached BLOB " + key
+ " at " + localFile.getAbsolutePath());
+ LOG.warn("Failed to delete locally cached BLOB {} at
{}" + key, localFile.getAbsolutePath());
+ }
+ }
+
+ /**
+ * Deletes the file associated with the given job and key from the BLOB
cache.
+ *
+ * @param jobId JobID of the file in the blob store
+ * @param key String key of the file in the blob store
+ */
+ @Override
+ public void delete(JobID jobId, String key) {
+ final File localFile = BlobUtils.getStorageLocation(storageDir,
jobId, key);
+
+ if (localFile.exists() && !localFile.delete()) {
--- End diff --
The concurrency safe way is to invert the operations `if (!delete &&
exists)`
> collection of BlobServer improvements
> -------------------------------------
>
> Key: FLINK-6008
> URL: https://issues.apache.org/jira/browse/FLINK-6008
> Project: Flink
> Issue Type: Improvement
> Components: Network
> Affects Versions: 1.3.0
> Reporter: Nico Kruber
> Assignee: Nico Kruber
>
> The following things should be improved around the BlobServer/BlobCache:
> * update config uptions with non-deprecated ones, e.g.
> {{high-availability.cluster-id}} and {{high-availability.storageDir}}
> * promote {{BlobStore#deleteAll(JobID)}} to the {{BlobService}}
> * extend the {{BlobService}} to work with {{NAME_ADDRESSABLE}} blobs
> (prepares FLINK-4399]
> * remove {{NAME_ADDRESSABLE}} blobs after job/task termination
> * do not fail the {{BlobServer}} when a delete operation fails
> * code style, like using {{Preconditions.checkArgument}}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)