Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4238#discussion_r125430786
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
 ---
    @@ -477,97 +406,96 @@ else if (contentAddressable == CONTENT_ADDRESSABLE) {
                        clientSocket.close();
                }
                finally {
    -                   if (fos != null) {
    -                           try {
    -                                   fos.close();
    -                           } catch (Throwable t) {
    -                                   LOG.warn("Cannot close stream to BLOB 
staging file", t);
    -                           }
    -                   }
                        if (incomingFile != null) {
    -                           if (!incomingFile.delete()) {
    +                           if (!incomingFile.delete() && 
incomingFile.exists()) {
                                        LOG.warn("Cannot delete BLOB server 
staging file " + incomingFile.getAbsolutePath());
                                }
                        }
                }
        }
     
        /**
    -    * Handles an incoming DELETE request from a BLOB client.
    -    * 
    -    * @param inputStream The input stream to read the request from.
    -    * @param outputStream The output stream to write the response to.
    -    * @throws java.io.IOException Thrown if an I/O error occurs while 
reading the request data from the input stream.
    +    * Reads a full file from <tt>inputStream</tt> into 
<tt>incomingFile</tt> returning its checksum.
    +    *
    +    * @param inputStream
    +    *              stream to read from
    +    * @param incomingFile
    +    *              file to write to
    +    * @param buf
    +    *              An auxiliary buffer for data 
serialization/deserialization
    +    *
    +    * @return the received file's content hash as a BLOB key
    +    *
    +    * @throws IOException
    +    *              thrown if an I/O error occurs while reading/writing 
data from/to the respective streams
         */
    -   private void delete(InputStream inputStream, OutputStream outputStream, 
byte[] buf) throws IOException {
    +   private static BlobKey readFileFully(
    +                   final InputStream inputStream, final File incomingFile, 
final byte[] buf)
    +                   throws IOException {
    +           MessageDigest md = BlobUtils.createMessageDigest();
    +           FileOutputStream fos = new FileOutputStream(incomingFile);
     
                try {
    -                   int type = inputStream.read();
    -                   if (type < 0) {
    -                           throw new EOFException("Premature end of DELETE 
request");
    -                   }
    -
    -                   if (type == CONTENT_ADDRESSABLE) {
    -                           BlobKey key = 
BlobKey.readFromInputStream(inputStream);
    -                           File blobFile = 
blobServer.getStorageLocation(key);
    -
    -                           writeLock.lock();
    -
    -                           try {
    -                                   // we should make the local and remote 
file deletion atomic, otherwise we might risk not
    -                                   // removing the remote file in case of 
a concurrent put operation
    -                                   if (blobFile.exists() && 
!blobFile.delete()) {
    -                                           throw new IOException("Cannot 
delete BLOB file " + blobFile.getAbsolutePath());
    -                                   }
    -
    -                                   blobStore.delete(key);
    -                           } finally {
    -                                   writeLock.unlock();
    +                   while (true) {
    +                           final int bytesExpected = 
readLength(inputStream);
    +                           if (bytesExpected == -1) {
    +                                   // done
    +                                   break;
    +                           }
    +                           if (bytesExpected > BUFFER_SIZE) {
    +                                   throw new IOException(
    +                                           "Unexpected number of incoming 
bytes: " + bytesExpected);
                                }
    -                   }
    -                   else if (type == NAME_ADDRESSABLE) {
    -                           byte[] jidBytes = new byte[JobID.SIZE];
    -                           readFully(inputStream, jidBytes, 0, JobID.SIZE, 
"JobID");
    -                           JobID jobID = JobID.fromByteArray(jidBytes);
     
    -                           String key = readKey(buf, inputStream);
    +                           readFully(inputStream, buf, 0, bytesExpected, 
"buffer");
    +                           fos.write(buf, 0, bytesExpected);
     
    -                           File blobFile = 
this.blobServer.getStorageLocation(jobID, key);
    +                           md.update(buf, 0, bytesExpected);
    +                   }
    +                   return new BlobKey(md.digest());
    +           } finally {
    --- End diff --
    
    ...and throw an exception if closing the stream fails? (the current and 
previous take on this was to swallow the exception and log the error message) 
How grave would you consider the failure of closing an output stream to a 
temporary file which was completely written just fine?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to