zhuzhurk commented on a change in pull request #16498:
URL: https://github.com/apache/flink/pull/16498#discussion_r677940469



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobCache.java
##########
@@ -187,6 +211,139 @@ public File getFile(JobID jobId, PermanentBlobKey key) 
throws IOException {
         return getFileInternal(jobId, key);
     }
 
+    /**
+     * Returns the content of the file for the BLOB with the provided job ID 
the blob key.
+     *
+     * <p>The method will first attempt to serve the BLOB from the local 
cache. If the BLOB is not
+     * in the cache, the method will try to download it from the HA store, or 
directly from the
+     * {@link BlobServer}.
+     *
+     * <p>Compared to {@code getFile}, {@code readFile} makes sure that the 
file is fully read in
+     * the same write lock as the file is accessed. This avoids the scenario 
that the path is
+     * returned as the file is deleted concurrently by other threads.
+     *
+     * @param jobId ID of the job this blob belongs to
+     * @param blobKey BLOB key associated with the requested file
+     * @return The content of the BLOB.
+     * @throws java.io.FileNotFoundException if the BLOB does not exist;
+     * @throws IOException if any other error occurs when retrieving the file.
+     */
+    @Override
+    public byte[] readFile(JobID jobId, PermanentBlobKey blobKey) throws 
IOException {
+        checkNotNull(jobId);
+        checkNotNull(blobKey);
+
+        final File localFile = BlobUtils.getStorageLocation(storageDir, jobId, 
blobKey);
+        readWriteLock.readLock().lock();
+
+        try {
+            if (localFile.exists()) {
+                blobCacheSizeTracker.update(jobId, blobKey);
+                return FileUtils.readAllBytes(localFile.toPath());
+            }
+        } finally {
+            readWriteLock.readLock().unlock();
+        }
+
+        // first try the distributed blob store (if available)
+        // use a temporary file (thread-safe without locking)
+        File incomingFile = createTemporaryFilename();
+        try {
+            try {
+                if (blobView.get(jobId, blobKey, incomingFile)) {
+                    // now move the temp file to our local cache atomically
+                    readWriteLock.writeLock().lock();
+                    try {
+                        checkLimitAndMoveFile(incomingFile, jobId, blobKey, 
localFile, log, null);
+                        return FileUtils.readAllBytes(localFile.toPath());
+                    } finally {
+                        readWriteLock.writeLock().unlock();
+                    }
+                }
+            } catch (Exception e) {
+                log.info(
+                        "Failed to copy from blob store. Downloading from BLOB 
server instead.", e);
+            }
+
+            final InetSocketAddress currentServerAddress = serverAddress;
+
+            if (currentServerAddress != null) {
+                // fallback: download from the BlobServer
+                BlobClient.downloadFromBlobServer(
+                        jobId,
+                        blobKey,
+                        incomingFile,
+                        currentServerAddress,
+                        blobClientConfig,
+                        numFetchRetries);
+
+                readWriteLock.writeLock().lock();
+                try {
+                    checkLimitAndMoveFile(incomingFile, jobId, blobKey, 
localFile, log, null);
+                    return FileUtils.readAllBytes(localFile.toPath());
+                } finally {
+                    readWriteLock.writeLock().unlock();
+                }
+            } else {
+                throw new IOException(
+                        "Cannot download from BlobServer, because the server 
address is unknown.");
+            }
+
+        } finally {
+            // delete incomingFile from a failed download
+            if (!incomingFile.delete() && incomingFile.exists()) {
+                log.warn(
+                        "Could not delete the staging file {} for blob key {} 
and job {}.",
+                        incomingFile,
+                        blobKey,
+                        jobId);
+            }
+        }
+    }
+
+    private void checkLimitAndMoveFile(
+            File incomingFile,
+            @Nullable JobID jobId,
+            BlobKey blobKey,
+            File localFile,
+            Logger log,
+            @Nullable BlobStore blobStore)
+            throws IOException {
+
+        // Check the size limit and delete the files that exceeds the limit
+        final long sizeOfIncomingFile = incomingFile.length();
+        final List<Tuple2<JobID, BlobKey>> blobsToDelete =
+                blobCacheSizeTracker.checkLimit(sizeOfIncomingFile);
+
+        for (Tuple2<JobID, BlobKey> key : blobsToDelete) {
+            deleteFile(key.f0, key.f1);
+            blobCacheSizeTracker.untrack(key);

Review comment:
       >>> The only risk is that we will try to delete the blob every time a 
new blob inserts until the deletion is successful
   
   I think it's better to have to expose the problem. And it also enables retry 
of failed deletions.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to