[ https://issues.apache.org/jira/browse/FLINK-7057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16072336#comment-16072336 ]
ASF GitHub Bot commented on FLINK-7057: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4238#discussion_r125247578 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java --- @@ -477,97 +406,96 @@ else if (contentAddressable == CONTENT_ADDRESSABLE) { clientSocket.close(); } finally { - if (fos != null) { - try { - fos.close(); - } catch (Throwable t) { - LOG.warn("Cannot close stream to BLOB staging file", t); - } - } if (incomingFile != null) { - if (!incomingFile.delete()) { + if (!incomingFile.delete() && incomingFile.exists()) { LOG.warn("Cannot delete BLOB server staging file " + incomingFile.getAbsolutePath()); } } } } /** - * Handles an incoming DELETE request from a BLOB client. - * - * @param inputStream The input stream to read the request from. - * @param outputStream The output stream to write the response to. - * @throws java.io.IOException Thrown if an I/O error occurs while reading the request data from the input stream. + * Reads a full file from <tt>inputStream</tt> into <tt>incomingFile</tt> returning its checksum. + * + * @param inputStream + * stream to read from + * @param incomingFile + * file to write to + * @param buf + * An auxiliary buffer for data serialization/deserialization + * + * @return the received file's content hash as a BLOB key + * + * @throws IOException + * thrown if an I/O error occurs while reading/writing data from/to the respective streams */ - private void delete(InputStream inputStream, OutputStream outputStream, byte[] buf) throws IOException { + private static BlobKey readFileFully( + final InputStream inputStream, final File incomingFile, final byte[] buf) + throws IOException { + MessageDigest md = BlobUtils.createMessageDigest(); + FileOutputStream fos = new FileOutputStream(incomingFile); try { - int type = inputStream.read(); - if (type < 0) { - throw new EOFException("Premature end of DELETE request"); - } - - if (type == CONTENT_ADDRESSABLE) { - BlobKey key = BlobKey.readFromInputStream(inputStream); - File blobFile = blobServer.getStorageLocation(key); - - writeLock.lock(); - - try { - // we should make the local and remote file deletion atomic, otherwise we might risk not - // removing the remote file in case of a concurrent put operation - if (blobFile.exists() && !blobFile.delete()) { - throw new IOException("Cannot delete BLOB file " + blobFile.getAbsolutePath()); - } - - blobStore.delete(key); - } finally { - writeLock.unlock(); + while (true) { + final int bytesExpected = readLength(inputStream); + if (bytesExpected == -1) { + // done + break; + } + if (bytesExpected > BUFFER_SIZE) { + throw new IOException( + "Unexpected number of incoming bytes: " + bytesExpected); } - } - else if (type == NAME_ADDRESSABLE) { - byte[] jidBytes = new byte[JobID.SIZE]; - readFully(inputStream, jidBytes, 0, JobID.SIZE, "JobID"); - JobID jobID = JobID.fromByteArray(jidBytes); - String key = readKey(buf, inputStream); + readFully(inputStream, buf, 0, bytesExpected, "buffer"); + fos.write(buf, 0, bytesExpected); - File blobFile = this.blobServer.getStorageLocation(jobID, key); + md.update(buf, 0, bytesExpected); + } + return new BlobKey(md.digest()); + } finally { --- End diff -- try with resources maybe? > move BLOB ref-counting from LibraryCacheManager to BlobCache > ------------------------------------------------------------ > > Key: FLINK-7057 > URL: https://issues.apache.org/jira/browse/FLINK-7057 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, Network > Affects Versions: 1.4.0 > Reporter: Nico Kruber > Assignee: Nico Kruber > > Currently, the {{LibraryCacheManager}} is doing some ref-counting for JAR > files managed by it. Instead, we want the {{BlobCache}} to do that itself for > all job-related BLOBs. Also, we do not want to operate on a per-{{BlobKey}} > level but rather per job. Therefore, the cleanup process should be adapted, > too. -- This message was sent by Atlassian JIRA (v6.4.14#64029)