[ 
https://issues.apache.org/jira/browse/FLINK-7057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16072336#comment-16072336
 ] 

ASF GitHub Bot commented on FLINK-7057:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4238#discussion_r125247578
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
 ---
    @@ -477,97 +406,96 @@ else if (contentAddressable == CONTENT_ADDRESSABLE) {
                        clientSocket.close();
                }
                finally {
    -                   if (fos != null) {
    -                           try {
    -                                   fos.close();
    -                           } catch (Throwable t) {
    -                                   LOG.warn("Cannot close stream to BLOB 
staging file", t);
    -                           }
    -                   }
                        if (incomingFile != null) {
    -                           if (!incomingFile.delete()) {
    +                           if (!incomingFile.delete() && 
incomingFile.exists()) {
                                        LOG.warn("Cannot delete BLOB server 
staging file " + incomingFile.getAbsolutePath());
                                }
                        }
                }
        }
     
        /**
    -    * Handles an incoming DELETE request from a BLOB client.
    -    * 
    -    * @param inputStream The input stream to read the request from.
    -    * @param outputStream The output stream to write the response to.
    -    * @throws java.io.IOException Thrown if an I/O error occurs while 
reading the request data from the input stream.
    +    * Reads a full file from <tt>inputStream</tt> into 
<tt>incomingFile</tt> returning its checksum.
    +    *
    +    * @param inputStream
    +    *              stream to read from
    +    * @param incomingFile
    +    *              file to write to
    +    * @param buf
    +    *              An auxiliary buffer for data 
serialization/deserialization
    +    *
    +    * @return the received file's content hash as a BLOB key
    +    *
    +    * @throws IOException
    +    *              thrown if an I/O error occurs while reading/writing 
data from/to the respective streams
         */
    -   private void delete(InputStream inputStream, OutputStream outputStream, 
byte[] buf) throws IOException {
    +   private static BlobKey readFileFully(
    +                   final InputStream inputStream, final File incomingFile, 
final byte[] buf)
    +                   throws IOException {
    +           MessageDigest md = BlobUtils.createMessageDigest();
    +           FileOutputStream fos = new FileOutputStream(incomingFile);
     
                try {
    -                   int type = inputStream.read();
    -                   if (type < 0) {
    -                           throw new EOFException("Premature end of DELETE 
request");
    -                   }
    -
    -                   if (type == CONTENT_ADDRESSABLE) {
    -                           BlobKey key = 
BlobKey.readFromInputStream(inputStream);
    -                           File blobFile = 
blobServer.getStorageLocation(key);
    -
    -                           writeLock.lock();
    -
    -                           try {
    -                                   // we should make the local and remote 
file deletion atomic, otherwise we might risk not
    -                                   // removing the remote file in case of 
a concurrent put operation
    -                                   if (blobFile.exists() && 
!blobFile.delete()) {
    -                                           throw new IOException("Cannot 
delete BLOB file " + blobFile.getAbsolutePath());
    -                                   }
    -
    -                                   blobStore.delete(key);
    -                           } finally {
    -                                   writeLock.unlock();
    +                   while (true) {
    +                           final int bytesExpected = 
readLength(inputStream);
    +                           if (bytesExpected == -1) {
    +                                   // done
    +                                   break;
    +                           }
    +                           if (bytesExpected > BUFFER_SIZE) {
    +                                   throw new IOException(
    +                                           "Unexpected number of incoming 
bytes: " + bytesExpected);
                                }
    -                   }
    -                   else if (type == NAME_ADDRESSABLE) {
    -                           byte[] jidBytes = new byte[JobID.SIZE];
    -                           readFully(inputStream, jidBytes, 0, JobID.SIZE, 
"JobID");
    -                           JobID jobID = JobID.fromByteArray(jidBytes);
     
    -                           String key = readKey(buf, inputStream);
    +                           readFully(inputStream, buf, 0, bytesExpected, 
"buffer");
    +                           fos.write(buf, 0, bytesExpected);
     
    -                           File blobFile = 
this.blobServer.getStorageLocation(jobID, key);
    +                           md.update(buf, 0, bytesExpected);
    +                   }
    +                   return new BlobKey(md.digest());
    +           } finally {
    --- End diff --
    
    try with resources maybe?


> move BLOB ref-counting from LibraryCacheManager to BlobCache
> ------------------------------------------------------------
>
>                 Key: FLINK-7057
>                 URL: https://issues.apache.org/jira/browse/FLINK-7057
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Distributed Coordination, Network
>    Affects Versions: 1.4.0
>            Reporter: Nico Kruber
>            Assignee: Nico Kruber
>
> Currently, the {{LibraryCacheManager}} is doing some ref-counting for JAR 
> files managed by it. Instead, we want the {{BlobCache}} to do that itself for 
> all job-related BLOBs. Also, we do not want to operate on a per-{{BlobKey}} 
> level but rather per job. Therefore, the cleanup process should be adapted, 
> too.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to