[jira] [Commented] (FLINK-6008) collection of BlobServer improvements
[ https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16073920#comment-16073920 ] ASF GitHub Bot commented on FLINK-6008: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4146 > collection of BlobServer improvements > - > > Key: FLINK-6008 > URL: https://issues.apache.org/jira/browse/FLINK-6008 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.3.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > The following things should be improved around the BlobServer/BlobCache: > * update config uptions with non-deprecated ones, e.g. > {{high-availability.cluster-id}} and {{high-availability.storageDir}} > * -promote {{BlobStore#deleteAll(JobID)}} to the {{BlobService}}- > * -extend the {{BlobService}} to work with {{NAME_ADDRESSABLE}} blobs > (prepares FLINK-4399]- > * -remove {{NAME_ADDRESSABLE}} blobs after job/task termination- > * do not fail the {{BlobServer}} when a delete operation fails > * code style, like using {{Preconditions.checkArgument}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6008) collection of BlobServer improvements
[ https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16073483#comment-16073483 ] ASF GitHub Bot commented on FLINK-6008: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/4146 merging, > collection of BlobServer improvements > - > > Key: FLINK-6008 > URL: https://issues.apache.org/jira/browse/FLINK-6008 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.3.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > The following things should be improved around the BlobServer/BlobCache: > * update config uptions with non-deprecated ones, e.g. > {{high-availability.cluster-id}} and {{high-availability.storageDir}} > * -promote {{BlobStore#deleteAll(JobID)}} to the {{BlobService}}- > * -extend the {{BlobService}} to work with {{NAME_ADDRESSABLE}} blobs > (prepares FLINK-4399]- > * -remove {{NAME_ADDRESSABLE}} blobs after job/task termination- > * do not fail the {{BlobServer}} when a delete operation fails > * code style, like using {{Preconditions.checkArgument}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6008) collection of BlobServer improvements
[ https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16072164#comment-16072164 ] ASF GitHub Bot commented on FLINK-6008: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4146#discussion_r125244675 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java --- @@ -92,6 +99,69 @@ public void testDeleteSingle() { catch (IllegalStateException e) { // expected } + + // delete a file directly on the server + server.delete(key2); + try { + server.getURL(key2); + fail("BLOB should have been deleted"); + } + catch (IOException e) { + // expected + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } --- End diff -- Absolutely right - I was told this was an issue some versions ago with JUnit or so... was just copying the code from the other tests in this class. Let me create a cleanup-PR on top of the newest PR in this series, i.e. #4238 > collection of BlobServer improvements > - > > Key: FLINK-6008 > URL: https://issues.apache.org/jira/browse/FLINK-6008 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.3.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > The following things should be improved around the BlobServer/BlobCache: > * update config uptions with non-deprecated ones, e.g. > {{high-availability.cluster-id}} and {{high-availability.storageDir}} > * -promote {{BlobStore#deleteAll(JobID)}} to the {{BlobService}}- > * -extend the {{BlobService}} to work with {{NAME_ADDRESSABLE}} blobs > (prepares FLINK-4399]- > * -remove {{NAME_ADDRESSABLE}} blobs after job/task termination- > * do not fail the {{BlobServer}} when a delete operation fails > * code style, like using {{Preconditions.checkArgument}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6008) collection of BlobServer improvements
[ https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16072156#comment-16072156 ] ASF GitHub Bot commented on FLINK-6008: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4146#discussion_r125243142 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java --- @@ -540,7 +526,7 @@ else if (type == NAME_ADDRESSABLE) { // we should make the local and remote file deletion atomic, otherwise we might risk not // removing the remote file in case of a concurrent put operation if (blobFile.exists() && !blobFile.delete()) { --- End diff -- you're right, this could be improved as well - the next PR in this series is removing this code though > collection of BlobServer improvements > - > > Key: FLINK-6008 > URL: https://issues.apache.org/jira/browse/FLINK-6008 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.3.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > The following things should be improved around the BlobServer/BlobCache: > * update config uptions with non-deprecated ones, e.g. > {{high-availability.cluster-id}} and {{high-availability.storageDir}} > * -promote {{BlobStore#deleteAll(JobID)}} to the {{BlobService}}- > * -extend the {{BlobService}} to work with {{NAME_ADDRESSABLE}} blobs > (prepares FLINK-4399]- > * -remove {{NAME_ADDRESSABLE}} blobs after job/task termination- > * do not fail the {{BlobServer}} when a delete operation fails > * code style, like using {{Preconditions.checkArgument}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6008) collection of BlobServer improvements
[ https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16071733#comment-16071733 ] ASF GitHub Bot commented on FLINK-6008: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4146#discussion_r125165594 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java --- @@ -92,6 +99,69 @@ public void testDeleteSingle() { catch (IllegalStateException e) { // expected } + + // delete a file directly on the server + server.delete(key2); + try { + server.getURL(key2); + fail("BLOB should have been deleted"); + } + catch (IOException e) { + // expected + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } --- End diff -- Can we remove this and simply let the exception be thrown? I think catching an exception, printing the stack trace and then failing with the exception message is an anti-pattern. > collection of BlobServer improvements > - > > Key: FLINK-6008 > URL: https://issues.apache.org/jira/browse/FLINK-6008 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.3.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > The following things should be improved around the BlobServer/BlobCache: > * update config uptions with non-deprecated ones, e.g. > {{high-availability.cluster-id}} and {{high-availability.storageDir}} > * -promote {{BlobStore#deleteAll(JobID)}} to the {{BlobService}}- > * -extend the {{BlobService}} to work with {{NAME_ADDRESSABLE}} blobs > (prepares FLINK-4399]- > * -remove {{NAME_ADDRESSABLE}} blobs after job/task termination- > * do not fail the {{BlobServer}} when a delete operation fails > * code style, like using {{Preconditions.checkArgument}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6008) collection of BlobServer improvements
[ https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16071734#comment-16071734 ] ASF GitHub Bot commented on FLINK-6008: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4146#discussion_r125165567 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java --- @@ -540,7 +526,7 @@ else if (type == NAME_ADDRESSABLE) { // we should make the local and remote file deletion atomic, otherwise we might risk not // removing the remote file in case of a concurrent put operation if (blobFile.exists() && !blobFile.delete()) { --- End diff -- Shouldn't the order of the operations be changed like in the other cases? > collection of BlobServer improvements > - > > Key: FLINK-6008 > URL: https://issues.apache.org/jira/browse/FLINK-6008 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.3.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > The following things should be improved around the BlobServer/BlobCache: > * update config uptions with non-deprecated ones, e.g. > {{high-availability.cluster-id}} and {{high-availability.storageDir}} > * -promote {{BlobStore#deleteAll(JobID)}} to the {{BlobService}}- > * -extend the {{BlobService}} to work with {{NAME_ADDRESSABLE}} blobs > (prepares FLINK-4399]- > * -remove {{NAME_ADDRESSABLE}} blobs after job/task termination- > * do not fail the {{BlobServer}} when a delete operation fails > * code style, like using {{Preconditions.checkArgument}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6008) collection of BlobServer improvements
[ https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16067956#comment-16067956 ] ASF GitHub Bot commented on FLINK-6008: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4146#discussion_r124734798 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java --- @@ -59,6 +60,110 @@ private final Random rnd = new Random(); + + // --- concurrency tests for utility methods which could fail during the put operation --- + + /** +* Checked thread that calls {@link BlobServer#getStorageLocation(BlobKey)} +*/ + public static class ContentAddressableGetStorageLocation extends CheckedThread { + private final BlobServer server; + private final BlobKey key; + + public ContentAddressableGetStorageLocation(BlobServer server, BlobKey key) { + this.server = server; + this.key = key; + } + + @Override + public void go() throws Exception { + server.getStorageLocation(key); --- End diff -- Unfortunately, concurrency with `delete` operations does not work either if not guarded - the directory may not exist anymore between `jobDirectory.mkdirs()` and `jobDirectory.exists()`. I was able to reproduce the error with the existing test though - if you want to try it, just change the order of these two commands back - the test will not hit every time, but some times. > collection of BlobServer improvements > - > > Key: FLINK-6008 > URL: https://issues.apache.org/jira/browse/FLINK-6008 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.3.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > The following things should be improved around the BlobServer/BlobCache: > * update config uptions with non-deprecated ones, e.g. > {{high-availability.cluster-id}} and {{high-availability.storageDir}} > * promote {{BlobStore#deleteAll(JobID)}} to the {{BlobService}} > * extend the {{BlobService}} to work with {{NAME_ADDRESSABLE}} blobs > (prepares FLINK-4399] > * remove {{NAME_ADDRESSABLE}} blobs after job/task termination > * do not fail the {{BlobServer}} when a delete operation fails > * code style, like using {{Preconditions.checkArgument}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6008) collection of BlobServer improvements
[ https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16067918#comment-16067918 ] ASF GitHub Bot commented on FLINK-6008: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4146#discussion_r124727496 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java --- @@ -593,9 +594,7 @@ private void sendPutHeader(OutputStream outputStream, JobID jobID, String key) t * the BLOB server or if the BLOB server cannot delete the file */ public void delete(BlobKey key) throws IOException { - if (key == null) { - throw new IllegalArgumentException("BLOB key must not be null"); - } + checkArgument(key != null, "BLOB key must not be null."); --- End diff -- Yes, but it would be cleaner and less surprising for future users. However I have no strong feelings about it. > collection of BlobServer improvements > - > > Key: FLINK-6008 > URL: https://issues.apache.org/jira/browse/FLINK-6008 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.3.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > The following things should be improved around the BlobServer/BlobCache: > * update config uptions with non-deprecated ones, e.g. > {{high-availability.cluster-id}} and {{high-availability.storageDir}} > * promote {{BlobStore#deleteAll(JobID)}} to the {{BlobService}} > * extend the {{BlobService}} to work with {{NAME_ADDRESSABLE}} blobs > (prepares FLINK-4399] > * remove {{NAME_ADDRESSABLE}} blobs after job/task termination > * do not fail the {{BlobServer}} when a delete operation fails > * code style, like using {{Preconditions.checkArgument}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6008) collection of BlobServer improvements
[ https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16067920#comment-16067920 ] ASF GitHub Bot commented on FLINK-6008: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4146#discussion_r124728498 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java --- @@ -227,7 +227,7 @@ static File getStorageLocation(File storageDir, JobID jobID, String key) { private static File getJobDirectory(File storageDir, JobID jobID) { final File jobDirectory = new File(storageDir, JOB_DIR_PREFIX + jobID.toString()); - if (!jobDirectory.exists() && !jobDirectory.mkdirs()) { --- End diff -- Ok, please add comment or commit message info about this. > collection of BlobServer improvements > - > > Key: FLINK-6008 > URL: https://issues.apache.org/jira/browse/FLINK-6008 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.3.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > The following things should be improved around the BlobServer/BlobCache: > * update config uptions with non-deprecated ones, e.g. > {{high-availability.cluster-id}} and {{high-availability.storageDir}} > * promote {{BlobStore#deleteAll(JobID)}} to the {{BlobService}} > * extend the {{BlobService}} to work with {{NAME_ADDRESSABLE}} blobs > (prepares FLINK-4399] > * remove {{NAME_ADDRESSABLE}} blobs after job/task termination > * do not fail the {{BlobServer}} when a delete operation fails > * code style, like using {{Preconditions.checkArgument}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6008) collection of BlobServer improvements
[ https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16067917#comment-16067917 ] ASF GitHub Bot commented on FLINK-6008: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4146#discussion_r124730014 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java --- @@ -59,6 +60,110 @@ private final Random rnd = new Random(); + + // --- concurrency tests for utility methods which could fail during the put operation --- + + /** +* Checked thread that calls {@link BlobServer#getStorageLocation(BlobKey)} +*/ + public static class ContentAddressableGetStorageLocation extends CheckedThread { + private final BlobServer server; + private final BlobKey key; + + public ContentAddressableGetStorageLocation(BlobServer server, BlobKey key) { + this.server = server; + this.key = key; + } + + @Override + public void go() throws Exception { + server.getStorageLocation(key); --- End diff -- So maybe you could call in a loop multiple times `gets` and `deletes` (if there is such operation) interleaved? Otherwise I don't see a real value of those tests and I would prefer to drop them (so that we don't have to maintain tests that do not check for anything). > collection of BlobServer improvements > - > > Key: FLINK-6008 > URL: https://issues.apache.org/jira/browse/FLINK-6008 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.3.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > The following things should be improved around the BlobServer/BlobCache: > * update config uptions with non-deprecated ones, e.g. > {{high-availability.cluster-id}} and {{high-availability.storageDir}} > * promote {{BlobStore#deleteAll(JobID)}} to the {{BlobService}} > * extend the {{BlobService}} to work with {{NAME_ADDRESSABLE}} blobs > (prepares FLINK-4399] > * remove {{NAME_ADDRESSABLE}} blobs after job/task termination > * do not fail the {{BlobServer}} when a delete operation fails > * code style, like using {{Preconditions.checkArgument}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6008) collection of BlobServer improvements
[ https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16067919#comment-16067919 ] ASF GitHub Bot commented on FLINK-6008: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4146#discussion_r124727900 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java --- @@ -509,21 +509,7 @@ private void delete(InputStream inputStream, OutputStream outputStream, byte[] b if (type == CONTENT_ADDRESSABLE) { BlobKey key = BlobKey.readFromInputStream(inputStream); - File blobFile = blobServer.getStorageLocation(key); - - writeLock.lock(); - - try { - // we should make the local and remote file deletion atomic, otherwise we might risk not - // removing the remote file in case of a concurrent put operation - if (blobFile.exists() && !blobFile.delete()) { - throw new IOException("Cannot delete BLOB file " + blobFile.getAbsolutePath()); --- End diff -- Ok, sorry, I didn't find it first time I was looking for it. > collection of BlobServer improvements > - > > Key: FLINK-6008 > URL: https://issues.apache.org/jira/browse/FLINK-6008 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.3.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > The following things should be improved around the BlobServer/BlobCache: > * update config uptions with non-deprecated ones, e.g. > {{high-availability.cluster-id}} and {{high-availability.storageDir}} > * promote {{BlobStore#deleteAll(JobID)}} to the {{BlobService}} > * extend the {{BlobService}} to work with {{NAME_ADDRESSABLE}} blobs > (prepares FLINK-4399] > * remove {{NAME_ADDRESSABLE}} blobs after job/task termination > * do not fail the {{BlobServer}} when a delete operation fails > * code style, like using {{Preconditions.checkArgument}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6008) collection of BlobServer improvements
[ https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066798#comment-16066798 ] ASF GitHub Bot commented on FLINK-6008: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4146#discussion_r124587248 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java --- @@ -509,21 +509,7 @@ private void delete(InputStream inputStream, OutputStream outputStream, byte[] b if (type == CONTENT_ADDRESSABLE) { BlobKey key = BlobKey.readFromInputStream(inputStream); - File blobFile = blobServer.getStorageLocation(key); - - writeLock.lock(); - - try { - // we should make the local and remote file deletion atomic, otherwise we might risk not - // removing the remote file in case of a concurrent put operation - if (blobFile.exists() && !blobFile.delete()) { - throw new IOException("Cannot delete BLOB file " + blobFile.getAbsolutePath()); --- End diff -- the (included) changes in `BlobServerDeleteTest` should cover this > collection of BlobServer improvements > - > > Key: FLINK-6008 > URL: https://issues.apache.org/jira/browse/FLINK-6008 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.3.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > The following things should be improved around the BlobServer/BlobCache: > * update config uptions with non-deprecated ones, e.g. > {{high-availability.cluster-id}} and {{high-availability.storageDir}} > * promote {{BlobStore#deleteAll(JobID)}} to the {{BlobService}} > * extend the {{BlobService}} to work with {{NAME_ADDRESSABLE}} blobs > (prepares FLINK-4399] > * remove {{NAME_ADDRESSABLE}} blobs after job/task termination > * do not fail the {{BlobServer}} when a delete operation fails > * code style, like using {{Preconditions.checkArgument}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6008) collection of BlobServer improvements
[ https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066796#comment-16066796 ] ASF GitHub Bot commented on FLINK-6008: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4146#discussion_r124586418 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java --- @@ -59,6 +60,110 @@ private final Random rnd = new Random(); + + // --- concurrency tests for utility methods which could fail during the put operation --- + + /** +* Checked thread that calls {@link BlobServer#getStorageLocation(BlobKey)} +*/ + public static class ContentAddressableGetStorageLocation extends CheckedThread { + private final BlobServer server; + private final BlobKey key; + + public ContentAddressableGetStorageLocation(BlobServer server, BlobKey key) { + this.server = server; + this.key = key; + } + + @Override + public void go() throws Exception { + server.getStorageLocation(key); --- End diff -- Actually, the job directory is only created once so it doesn't help adding more calls. What could help make this happen more often, is to add more threads to `BlobServerPutTest#testServerContentAddressableGetStorageLocationConcurrent()` otherwise the failure only happens once in a while if the implementation is not thread-safe > collection of BlobServer improvements > - > > Key: FLINK-6008 > URL: https://issues.apache.org/jira/browse/FLINK-6008 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.3.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > The following things should be improved around the BlobServer/BlobCache: > * update config uptions with non-deprecated ones, e.g. > {{high-availability.cluster-id}} and {{high-availability.storageDir}} > * promote {{BlobStore#deleteAll(JobID)}} to the {{BlobService}} > * extend the {{BlobService}} to work with {{NAME_ADDRESSABLE}} blobs > (prepares FLINK-4399] > * remove {{NAME_ADDRESSABLE}} blobs after job/task termination > * do not fail the {{BlobServer}} when a delete operation fails > * code style, like using {{Preconditions.checkArgument}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6008) collection of BlobServer improvements
[ https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066786#comment-16066786 ] ASF GitHub Bot commented on FLINK-6008: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4146#discussion_r124585333 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java --- @@ -227,7 +227,7 @@ static File getStorageLocation(File storageDir, JobID jobID, String key) { private static File getJobDirectory(File storageDir, JobID jobID) { final File jobDirectory = new File(storageDir, JOB_DIR_PREFIX + jobID.toString()); - if (!jobDirectory.exists() && !jobDirectory.mkdirs()) { --- End diff -- the new way is thread-safe - imagine a directory being concurrently created after another thread has failed the `exists()` part - not sure we actually fix a bug nowadays since the use may be guarded...so this is probably more of a "just-in-case" thing or if usage patterns change > collection of BlobServer improvements > - > > Key: FLINK-6008 > URL: https://issues.apache.org/jira/browse/FLINK-6008 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.3.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > The following things should be improved around the BlobServer/BlobCache: > * update config uptions with non-deprecated ones, e.g. > {{high-availability.cluster-id}} and {{high-availability.storageDir}} > * promote {{BlobStore#deleteAll(JobID)}} to the {{BlobService}} > * extend the {{BlobService}} to work with {{NAME_ADDRESSABLE}} blobs > (prepares FLINK-4399] > * remove {{NAME_ADDRESSABLE}} blobs after job/task termination > * do not fail the {{BlobServer}} when a delete operation fails > * code style, like using {{Preconditions.checkArgument}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6008) collection of BlobServer improvements
[ https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066780#comment-16066780 ] ASF GitHub Bot commented on FLINK-6008: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4146#discussion_r124584383 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java --- @@ -593,9 +594,7 @@ private void sendPutHeader(OutputStream outputStream, JobID jobID, String key) t * the BLOB server or if the BLOB server cannot delete the file */ public void delete(BlobKey key) throws IOException { - if (key == null) { - throw new IllegalArgumentException("BLOB key must not be null"); - } + checkArgument(key != null, "BLOB key must not be null."); --- End diff -- unfortunately, `checkNotNull` throws a `NullPointerException` instead and I did not want to change that here > collection of BlobServer improvements > - > > Key: FLINK-6008 > URL: https://issues.apache.org/jira/browse/FLINK-6008 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.3.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > The following things should be improved around the BlobServer/BlobCache: > * update config uptions with non-deprecated ones, e.g. > {{high-availability.cluster-id}} and {{high-availability.storageDir}} > * promote {{BlobStore#deleteAll(JobID)}} to the {{BlobService}} > * extend the {{BlobService}} to work with {{NAME_ADDRESSABLE}} blobs > (prepares FLINK-4399] > * remove {{NAME_ADDRESSABLE}} blobs after job/task termination > * do not fail the {{BlobServer}} when a delete operation fails > * code style, like using {{Preconditions.checkArgument}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6008) collection of BlobServer improvements
[ https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066686#comment-16066686 ] ASF GitHub Bot commented on FLINK-6008: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4146#discussion_r124566417 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java --- @@ -509,21 +509,7 @@ private void delete(InputStream inputStream, OutputStream outputStream, byte[] b if (type == CONTENT_ADDRESSABLE) { BlobKey key = BlobKey.readFromInputStream(inputStream); - File blobFile = blobServer.getStorageLocation(key); - - writeLock.lock(); - - try { - // we should make the local and remote file deletion atomic, otherwise we might risk not - // removing the remote file in case of a concurrent put operation - if (blobFile.exists() && !blobFile.delete()) { - throw new IOException("Cannot delete BLOB file " + blobFile.getAbsolutePath()); --- End diff -- Don't you want to add a test coverage for not throwing if delete fails? > collection of BlobServer improvements > - > > Key: FLINK-6008 > URL: https://issues.apache.org/jira/browse/FLINK-6008 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.3.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > The following things should be improved around the BlobServer/BlobCache: > * update config uptions with non-deprecated ones, e.g. > {{high-availability.cluster-id}} and {{high-availability.storageDir}} > * promote {{BlobStore#deleteAll(JobID)}} to the {{BlobService}} > * extend the {{BlobService}} to work with {{NAME_ADDRESSABLE}} blobs > (prepares FLINK-4399] > * remove {{NAME_ADDRESSABLE}} blobs after job/task termination > * do not fail the {{BlobServer}} when a delete operation fails > * code style, like using {{Preconditions.checkArgument}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6008) collection of BlobServer improvements
[ https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066688#comment-16066688 ] ASF GitHub Bot commented on FLINK-6008: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4146#discussion_r124566819 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java --- @@ -227,7 +227,7 @@ static File getStorageLocation(File storageDir, JobID jobID, String key) { private static File getJobDirectory(File storageDir, JobID jobID) { final File jobDirectory = new File(storageDir, JOB_DIR_PREFIX + jobID.toString()); - if (!jobDirectory.exists() && !jobDirectory.mkdirs()) { --- End diff -- Why did you change the order of those operations? Is that fix for something? If so could you add explanation to the commit message what's going on here? > collection of BlobServer improvements > - > > Key: FLINK-6008 > URL: https://issues.apache.org/jira/browse/FLINK-6008 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.3.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > The following things should be improved around the BlobServer/BlobCache: > * update config uptions with non-deprecated ones, e.g. > {{high-availability.cluster-id}} and {{high-availability.storageDir}} > * promote {{BlobStore#deleteAll(JobID)}} to the {{BlobService}} > * extend the {{BlobService}} to work with {{NAME_ADDRESSABLE}} blobs > (prepares FLINK-4399] > * remove {{NAME_ADDRESSABLE}} blobs after job/task termination > * do not fail the {{BlobServer}} when a delete operation fails > * code style, like using {{Preconditions.checkArgument}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6008) collection of BlobServer improvements
[ https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066685#comment-16066685 ] ASF GitHub Bot commented on FLINK-6008: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4146#discussion_r124564731 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java --- @@ -593,9 +594,7 @@ private void sendPutHeader(OutputStream outputStream, JobID jobID, String key) t * the BLOB server or if the BLOB server cannot delete the file */ public void delete(BlobKey key) throws IOException { - if (key == null) { - throw new IllegalArgumentException("BLOB key must not be null"); - } + checkArgument(key != null, "BLOB key must not be null."); --- End diff -- `requireNonNull`? > collection of BlobServer improvements > - > > Key: FLINK-6008 > URL: https://issues.apache.org/jira/browse/FLINK-6008 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.3.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > The following things should be improved around the BlobServer/BlobCache: > * update config uptions with non-deprecated ones, e.g. > {{high-availability.cluster-id}} and {{high-availability.storageDir}} > * promote {{BlobStore#deleteAll(JobID)}} to the {{BlobService}} > * extend the {{BlobService}} to work with {{NAME_ADDRESSABLE}} blobs > (prepares FLINK-4399] > * remove {{NAME_ADDRESSABLE}} blobs after job/task termination > * do not fail the {{BlobServer}} when a delete operation fails > * code style, like using {{Preconditions.checkArgument}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6008) collection of BlobServer improvements
[ https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066684#comment-16066684 ] ASF GitHub Bot commented on FLINK-6008: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4146#discussion_r124569928 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java --- @@ -411,12 +411,11 @@ public void delete(BlobKey key) throws IOException { readWriteLock.writeLock().lock(); try { - if (localFile.exists()) { - if (!localFile.delete()) { - LOG.warn("Failed to delete locally BLOB " + key + " at " + localFile.getAbsolutePath()); - } + if (!localFile.delete() && localFile.exists()) { + LOG.warn("Failed to delete locally BLOB " + key + " at " + localFile.getAbsolutePath()); } + --- End diff -- remove extra line > collection of BlobServer improvements > - > > Key: FLINK-6008 > URL: https://issues.apache.org/jira/browse/FLINK-6008 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.3.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > The following things should be improved around the BlobServer/BlobCache: > * update config uptions with non-deprecated ones, e.g. > {{high-availability.cluster-id}} and {{high-availability.storageDir}} > * promote {{BlobStore#deleteAll(JobID)}} to the {{BlobService}} > * extend the {{BlobService}} to work with {{NAME_ADDRESSABLE}} blobs > (prepares FLINK-4399] > * remove {{NAME_ADDRESSABLE}} blobs after job/task termination > * do not fail the {{BlobServer}} when a delete operation fails > * code style, like using {{Preconditions.checkArgument}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6008) collection of BlobServer improvements
[ https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066687#comment-16066687 ] ASF GitHub Bot commented on FLINK-6008: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4146#discussion_r124568869 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java --- @@ -59,6 +60,110 @@ private final Random rnd = new Random(); + + // --- concurrency tests for utility methods which could fail during the put operation --- + + /** +* Checked thread that calls {@link BlobServer#getStorageLocation(BlobKey)} +*/ + public static class ContentAddressableGetStorageLocation extends CheckedThread { + private final BlobServer server; + private final BlobKey key; + + public ContentAddressableGetStorageLocation(BlobServer server, BlobKey key) { + this.server = server; + this.key = key; + } + + @Override + public void go() throws Exception { + server.getStorageLocation(key); --- End diff -- Shouldn't you call this at least couple/couple of dozens/couple of hundred times? Otherwise won't this complete before next thread starts up? > collection of BlobServer improvements > - > > Key: FLINK-6008 > URL: https://issues.apache.org/jira/browse/FLINK-6008 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.3.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > The following things should be improved around the BlobServer/BlobCache: > * update config uptions with non-deprecated ones, e.g. > {{high-availability.cluster-id}} and {{high-availability.storageDir}} > * promote {{BlobStore#deleteAll(JobID)}} to the {{BlobService}} > * extend the {{BlobService}} to work with {{NAME_ADDRESSABLE}} blobs > (prepares FLINK-4399] > * remove {{NAME_ADDRESSABLE}} blobs after job/task termination > * do not fail the {{BlobServer}} when a delete operation fails > * code style, like using {{Preconditions.checkArgument}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6008) collection of BlobServer improvements
[ https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16057378#comment-16057378 ] ASF GitHub Bot commented on FLINK-6008: --- Github user NicoK commented on the issue: https://github.com/apache/flink/pull/4146 please note that there are four unrelated test failures: * 2x Kafka010ITCase * 2x YARNSessionCapacitySchedulerITCase > collection of BlobServer improvements > - > > Key: FLINK-6008 > URL: https://issues.apache.org/jira/browse/FLINK-6008 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.3.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > The following things should be improved around the BlobServer/BlobCache: > * update config uptions with non-deprecated ones, e.g. > {{high-availability.cluster-id}} and {{high-availability.storageDir}} > * promote {{BlobStore#deleteAll(JobID)}} to the {{BlobService}} > * extend the {{BlobService}} to work with {{NAME_ADDRESSABLE}} blobs > (prepares FLINK-4399] > * remove {{NAME_ADDRESSABLE}} blobs after job/task termination > * do not fail the {{BlobServer}} when a delete operation fails > * code style, like using {{Preconditions.checkArgument}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6008) collection of BlobServer improvements
[ https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16055463#comment-16055463 ] ASF GitHub Bot commented on FLINK-6008: --- Github user NicoK commented on the issue: https://github.com/apache/flink/pull/3512 let's include the improvements in a separate PR #4146 and drop the feature additions for a re-work in FLIP-19 > collection of BlobServer improvements > - > > Key: FLINK-6008 > URL: https://issues.apache.org/jira/browse/FLINK-6008 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.3.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > The following things should be improved around the BlobServer/BlobCache: > * update config uptions with non-deprecated ones, e.g. > {{high-availability.cluster-id}} and {{high-availability.storageDir}} > * promote {{BlobStore#deleteAll(JobID)}} to the {{BlobService}} > * extend the {{BlobService}} to work with {{NAME_ADDRESSABLE}} blobs > (prepares FLINK-4399] > * remove {{NAME_ADDRESSABLE}} blobs after job/task termination > * do not fail the {{BlobServer}} when a delete operation fails > * code style, like using {{Preconditions.checkArgument}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6008) collection of BlobServer improvements
[ https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16055464#comment-16055464 ] ASF GitHub Bot commented on FLINK-6008: --- Github user NicoK closed the pull request at: https://github.com/apache/flink/pull/3512 > collection of BlobServer improvements > - > > Key: FLINK-6008 > URL: https://issues.apache.org/jira/browse/FLINK-6008 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.3.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > The following things should be improved around the BlobServer/BlobCache: > * update config uptions with non-deprecated ones, e.g. > {{high-availability.cluster-id}} and {{high-availability.storageDir}} > * promote {{BlobStore#deleteAll(JobID)}} to the {{BlobService}} > * extend the {{BlobService}} to work with {{NAME_ADDRESSABLE}} blobs > (prepares FLINK-4399] > * remove {{NAME_ADDRESSABLE}} blobs after job/task termination > * do not fail the {{BlobServer}} when a delete operation fails > * code style, like using {{Preconditions.checkArgument}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6008) collection of BlobServer improvements
[ https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16055456#comment-16055456 ] ASF GitHub Bot commented on FLINK-6008: --- GitHub user NicoK opened a pull request: https://github.com/apache/flink/pull/4146 [FLINK-6008] collection of BlobServer improvements This PR is a light-weight version of #3512 that only includes the improvements and can serve as a base for FLIP-19. It improves the following things around the `BlobServer`/`BlobCache`: * replace config options in `config.md` with non-deprecated ones, e.g. `high-availability.cluster-id` and `high-availability.storageDir` * do not fail the `BlobServer` when a delete operation fails * add more unit tests * general code style and docs improvements, like using `Preconditions.checkArgument` You can merge this pull request into a Git repository by running: $ git pull https://github.com/NicoK/flink flink-6008b Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4146.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4146 commit ce719ee39fbbca7b7828c17d9792fc87d37450c7 Author: Nico Kruber Date: 2017-01-06T17:42:58Z [FLINK-6008][docs] update some config options to the new, non-deprecated ones commit 9efa8808e46adc1253f52a6a8cec6d3b4d29fee3 Author: Nico Kruber Date: 2016-12-20T15:49:57Z [FLINK-6008][docs] minor improvements in the BlobService docs commit ca3d533b0affa645ec93d40de378dadc829bbfe5 Author: Nico Kruber Date: 2016-12-20T17:27:13Z [FLINK-6008] refactor BlobCache#getURL() for cleaner code commit 0eededeb36dd833835753def7f4bb27c9d5fb67e Author: Nico Kruber Date: 2017-03-09T17:14:02Z [FLINK-6008] use Preconditions.checkArgument in BlobClient commit 6249041a9db2b39ddf54e79a1aed5e7706e739c7 Author: Nico Kruber Date: 2016-12-21T15:23:29Z [FLINK-6008] do not fail the BlobServer if delete fails also extend the delete tests and remove one code duplication commit e681239a538547f752d65358db1ebd2ba312b33c Author: Nico Kruber Date: 2017-03-17T15:21:40Z [FLINK-6008] fix concurrent job directory creation also add according unit tests commit 20beae2dbc91859e2ec724b35b20536dcd11fe90 Author: Nico Kruber Date: 2017-04-18T14:37:37Z [FLINK-6008] some comments about BlobLibraryCacheManager cleanup commit 8a33517fe6eb2fa932ab17cb0d82a3fa8d7b8d0b Author: Nico Kruber Date: 2017-04-19T13:39:03Z [hotfix] minor typos commit 23889866ac21494fc4af90905ab1518cbe897118 Author: Nico Kruber Date: 2017-04-19T14:10:16Z [FLINK-6008] further cleanup tests for BlobLibraryCacheManager commit 01b1a245528c264a6061ed3a48b24c5a207369f6 Author: Nico Kruber Date: 2017-06-14T16:01:47Z [FLINK-6008] do not guard a delete() call with a check for existence > collection of BlobServer improvements > - > > Key: FLINK-6008 > URL: https://issues.apache.org/jira/browse/FLINK-6008 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.3.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > The following things should be improved around the BlobServer/BlobCache: > * update config uptions with non-deprecated ones, e.g. > {{high-availability.cluster-id}} and {{high-availability.storageDir}} > * promote {{BlobStore#deleteAll(JobID)}} to the {{BlobService}} > * extend the {{BlobService}} to work with {{NAME_ADDRESSABLE}} blobs > (prepares FLINK-4399] > * remove {{NAME_ADDRESSABLE}} blobs after job/task termination > * do not fail the {{BlobServer}} when a delete operation fails > * code style, like using {{Preconditions.checkArgument}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6008) collection of BlobServer improvements
[ https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16053975#comment-16053975 ] ASF GitHub Bot commented on FLINK-6008: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/3512 I take it this PR will be subsumed in FLIP-19? > collection of BlobServer improvements > - > > Key: FLINK-6008 > URL: https://issues.apache.org/jira/browse/FLINK-6008 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.3.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > The following things should be improved around the BlobServer/BlobCache: > * update config uptions with non-deprecated ones, e.g. > {{high-availability.cluster-id}} and {{high-availability.storageDir}} > * promote {{BlobStore#deleteAll(JobID)}} to the {{BlobService}} > * extend the {{BlobService}} to work with {{NAME_ADDRESSABLE}} blobs > (prepares FLINK-4399] > * remove {{NAME_ADDRESSABLE}} blobs after job/task termination > * do not fail the {{BlobServer}} when a delete operation fails > * code style, like using {{Preconditions.checkArgument}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6008) collection of BlobServer improvements
[ https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15982583#comment-15982583 ] ASF GitHub Bot commented on FLINK-6008: --- Github user NicoK commented on the issue: https://github.com/apache/flink/pull/3512 After investigating a bit further, I noticed that this problem is actually a bit bigger: Even in `FileSystemBlobStore`, there is no guarantee that a directory will not be deleted concurrently (from a `delete` method) between its creation and writing a file during (in a `get` method): * the `delete` method for name-addressable blobs always deletes the job-specific storage directory if there is no further blob for this job * the content-addressable blobs do that similarly but are shared among jobs and thus only delete directories if there is no other blob. Since name-addressable blobs have not been used so far and the latter case typically does not occur concurrently with `get` requests, this has not been a problem so far. Therefore, I suggest to create a separate JIRA for this (delete vs. get) concurrency issue and apply a quick fix for #3742. > collection of BlobServer improvements > - > > Key: FLINK-6008 > URL: https://issues.apache.org/jira/browse/FLINK-6008 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.3.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > The following things should be improved around the BlobServer/BlobCache: > * update config uptions with non-deprecated ones, e.g. > {{high-availability.cluster-id}} and {{high-availability.storageDir}} > * promote {{BlobStore#deleteAll(JobID)}} to the {{BlobService}} > * extend the {{BlobService}} to work with {{NAME_ADDRESSABLE}} blobs > (prepares FLINK-4399] > * remove {{NAME_ADDRESSABLE}} blobs after job/task termination > * do not fail the {{BlobServer}} when a delete operation fails > * code style, like using {{Preconditions.checkArgument}} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6008) collection of BlobServer improvements
[ https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15975416#comment-15975416 ] ASF GitHub Bot commented on FLINK-6008: --- Github user NicoK commented on the issue: https://github.com/apache/flink/pull/3512 Found a race between `BlobCache#deleteAll(JobID)` and `BlobCache#getURL(BlobKey)` now that the former is actually being used - this needs to be fixed first before merging: `BlobCache#deleteAll(JobID)` deletes the job directory which is only created at the start of `BlobCache#getURL(BlobKey)` which then relies on the directory being present. > collection of BlobServer improvements > - > > Key: FLINK-6008 > URL: https://issues.apache.org/jira/browse/FLINK-6008 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.3.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > The following things should be improved around the BlobServer/BlobCache: > * update config uptions with non-deprecated ones, e.g. > {{high-availability.cluster-id}} and {{high-availability.storageDir}} > * promote {{BlobStore#deleteAll(JobID)}} to the {{BlobService}} > * extend the {{BlobService}} to work with {{NAME_ADDRESSABLE}} blobs > (prepares FLINK-4399] > * remove {{NAME_ADDRESSABLE}} blobs after job/task termination > * do not fail the {{BlobServer}} when a delete operation fails > * code style, like using {{Preconditions.checkArgument}} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6008) collection of BlobServer improvements
[ https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15974787#comment-15974787 ] ASF GitHub Bot commented on FLINK-6008: --- Github user NicoK commented on the issue: https://github.com/apache/flink/pull/3512 * I removed the exposed `BlobService` from the `LibraryCacheManager` * Also, I developed a new cleanup story that removes blobs only if there are no tasks referring to the job ID anymore. The extension of keeping the files around for re-deployment may be added as part of a new re-deployment story when it is actually implemented. * Regarding the suggestion of refactoring the `get(JobId, String)` methods, I created [FLINK-6329](https://issues.apache.org/jira/browse/FLINK-6329) as this change maybe a future improvement. > collection of BlobServer improvements > - > > Key: FLINK-6008 > URL: https://issues.apache.org/jira/browse/FLINK-6008 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.3.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > The following things should be improved around the BlobServer/BlobCache: > * update config uptions with non-deprecated ones, e.g. > {{high-availability.cluster-id}} and {{high-availability.storageDir}} > * promote {{BlobStore#deleteAll(JobID)}} to the {{BlobService}} > * extend the {{BlobService}} to work with {{NAME_ADDRESSABLE}} blobs > (prepares FLINK-4399] > * remove {{NAME_ADDRESSABLE}} blobs after job/task termination > * do not fail the {{BlobServer}} when a delete operation fails > * code style, like using {{Preconditions.checkArgument}} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6008) collection of BlobServer improvements
[ https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15930207#comment-15930207 ] ASF GitHub Bot commented on FLINK-6008: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3512#discussion_r106675058 --- Diff: docs/setup/config.md --- @@ -494,13 +494,13 @@ Previously this key was named `recovery.mode` and the default value was `standal - `high-availability.zookeeper.path.root`: (Default `/flink`) Defines the root dir under which the ZooKeeper HA mode will create namespace directories. Previously this ket was named `recovery.zookeeper.path.root`. -- `high-availability.zookeeper.path.namespace`: (Default `/default_ns` in standalone cluster mode, or the under YARN) Defines the subdirectory under the root dir where the ZooKeeper HA mode will create znodes. This allows to isolate multiple applications on the same ZooKeeper. Previously this key was named `recovery.zookeeper.path.namespace`. +- `high-availability.cluster-id`: (Default `/default_ns` in standalone cluster mode, or the under YARN) Defines the subdirectory under the root dir where the ZooKeeper HA mode will create znodes. This allows to isolate multiple applications on the same ZooKeeper. Previously this key was named `recovery.zookeeper.path.namespace` and `high-availability.zookeeper.path.namespace`. --- End diff -- I would move these into ` ### High Availability (HA)` section, because they are independent of ZooKeeper > collection of BlobServer improvements > - > > Key: FLINK-6008 > URL: https://issues.apache.org/jira/browse/FLINK-6008 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.3.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > The following things should be improved around the BlobServer/BlobCache: > * update config uptions with non-deprecated ones, e.g. > {{high-availability.cluster-id}} and {{high-availability.storageDir}} > * promote {{BlobStore#deleteAll(JobID)}} to the {{BlobService}} > * extend the {{BlobService}} to work with {{NAME_ADDRESSABLE}} blobs > (prepares FLINK-4399] > * remove {{NAME_ADDRESSABLE}} blobs after job/task termination > * do not fail the {{BlobServer}} when a delete operation fails > * code style, like using {{Preconditions.checkArgument}} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6008) collection of BlobServer improvements
[ https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15930211#comment-15930211 ] ASF GitHub Bot commented on FLINK-6008: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3512#discussion_r106680476 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -1305,6 +1305,9 @@ class TaskManager( s"${task.getExecutionState} to JobManager for task ${task.getTaskInfo.getTaskName} " + s"(${task.getExecutionId})") + // delete all NAME_ADDRESSABLE BLOBs + libraryCacheManager.get.getBlobService.deleteAll(task.getJobID) --- End diff -- Multiple tasks of the same job run in a TaskManager. This means that tasks delete each others blobs. > collection of BlobServer improvements > - > > Key: FLINK-6008 > URL: https://issues.apache.org/jira/browse/FLINK-6008 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.3.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > The following things should be improved around the BlobServer/BlobCache: > * update config uptions with non-deprecated ones, e.g. > {{high-availability.cluster-id}} and {{high-availability.storageDir}} > * promote {{BlobStore#deleteAll(JobID)}} to the {{BlobService}} > * extend the {{BlobService}} to work with {{NAME_ADDRESSABLE}} blobs > (prepares FLINK-4399] > * remove {{NAME_ADDRESSABLE}} blobs after job/task termination > * do not fail the {{BlobServer}} when a delete operation fails > * code style, like using {{Preconditions.checkArgument}} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6008) collection of BlobServer improvements
[ https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15930209#comment-15930209 ] ASF GitHub Bot commented on FLINK-6008: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3512#discussion_r106674262 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java --- @@ -180,91 +180,159 @@ public URL getURL(final BlobKey requiredBlob) throws IOException { // fallback: download from the BlobServer final byte[] buf = new byte[BlobServerProtocol.BUFFER_SIZE]; + LOG.info("Downloading {} from {}", requiredBlob, serverAddress); // loop over retries int attempt = 0; while (true) { + try ( + final BlobClient bc = new BlobClient(serverAddress, blobClientConfig); + final InputStream is = bc.get(requiredBlob); + final OutputStream os = new FileOutputStream(localJarFile) + ) { + getURLTransferFile(buf, is, os); + + // success, we finished + return localJarFile.toURI().toURL(); + } + catch (Throwable t) { + getURLOnException(requiredBlob.toString(), localJarFile, attempt, t); - if (attempt == 0) { - LOG.info("Downloading {} from {}", requiredBlob, serverAddress); - } else { + // retry + ++attempt; LOG.info("Downloading {} from {} (retry {})", requiredBlob, serverAddress, attempt); } + } // end loop over retries + } - try { - BlobClient bc = null; - InputStream is = null; - OutputStream os = null; + /** +* Returns the URL for the BLOB with the given parameters. The method will first attempt to +* serve the BLOB from its local cache. If the BLOB is not in the cache, the method will try +* to download it from this cache's BLOB server. +* +* @param jobId JobID of the file in the blob store +* @param key String key of the file in the blob store +* @return URL referring to the local storage location of the BLOB. +* @throws java.io.FileNotFoundException if the path does not exist; +* @throws IOException Thrown if an I/O error occurs while downloading the BLOBs from the BLOB server. +*/ + public URL getURL(final JobID jobId, final String key) throws IOException { + checkArgument(jobId != null, "Job id cannot be null."); + checkArgument(key != null, "BLOB name cannot be null."); - try { - bc = new BlobClient(serverAddress, blobClientConfig); - is = bc.get(requiredBlob); - os = new FileOutputStream(localJarFile); - - while (true) { - final int read = is.read(buf); - if (read < 0) { - break; - } - os.write(buf, 0, read); - } - - // we do explicitly not use a finally block, because we want the closing - // in the regular case to throw exceptions and cause the writing to fail. - // But, the closing on exception should not throw further exceptions and - // let us keep the root exception - os.close(); - os = null; - is.close(); - is = null; - bc.close(); - bc = null; - - // success, we finished - return localJarFile.toURI().toURL(); - } - catch (Throwable t) { - // we use "catch (Throwable)" to keep the root exception. Otherwise that exc
[jira] [Commented] (FLINK-6008) collection of BlobServer improvements
[ https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15930210#comment-15930210 ] ASF GitHub Bot commented on FLINK-6008: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3512#discussion_r106674837 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java --- @@ -180,91 +180,159 @@ public URL getURL(final BlobKey requiredBlob) throws IOException { // fallback: download from the BlobServer final byte[] buf = new byte[BlobServerProtocol.BUFFER_SIZE]; + LOG.info("Downloading {} from {}", requiredBlob, serverAddress); // loop over retries int attempt = 0; while (true) { + try ( + final BlobClient bc = new BlobClient(serverAddress, blobClientConfig); + final InputStream is = bc.get(requiredBlob); + final OutputStream os = new FileOutputStream(localJarFile) + ) { + getURLTransferFile(buf, is, os); + + // success, we finished + return localJarFile.toURI().toURL(); + } + catch (Throwable t) { + getURLOnException(requiredBlob.toString(), localJarFile, attempt, t); - if (attempt == 0) { - LOG.info("Downloading {} from {}", requiredBlob, serverAddress); - } else { + // retry + ++attempt; LOG.info("Downloading {} from {} (retry {})", requiredBlob, serverAddress, attempt); } + } // end loop over retries + } - try { - BlobClient bc = null; - InputStream is = null; - OutputStream os = null; + /** +* Returns the URL for the BLOB with the given parameters. The method will first attempt to +* serve the BLOB from its local cache. If the BLOB is not in the cache, the method will try +* to download it from this cache's BLOB server. +* +* @param jobId JobID of the file in the blob store +* @param key String key of the file in the blob store +* @return URL referring to the local storage location of the BLOB. +* @throws java.io.FileNotFoundException if the path does not exist; +* @throws IOException Thrown if an I/O error occurs while downloading the BLOBs from the BLOB server. +*/ + public URL getURL(final JobID jobId, final String key) throws IOException { + checkArgument(jobId != null, "Job id cannot be null."); + checkArgument(key != null, "BLOB name cannot be null."); - try { - bc = new BlobClient(serverAddress, blobClientConfig); - is = bc.get(requiredBlob); - os = new FileOutputStream(localJarFile); - - while (true) { - final int read = is.read(buf); - if (read < 0) { - break; - } - os.write(buf, 0, read); - } - - // we do explicitly not use a finally block, because we want the closing - // in the regular case to throw exceptions and cause the writing to fail. - // But, the closing on exception should not throw further exceptions and - // let us keep the root exception - os.close(); - os = null; - is.close(); - is = null; - bc.close(); - bc = null; - - // success, we finished - return localJarFile.toURI().toURL(); - } - catch (Throwable t) { - // we use "catch (Throwable)" to keep the root exception. Otherwise that exc
[jira] [Commented] (FLINK-6008) collection of BlobServer improvements
[ https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15930206#comment-15930206 ] ASF GitHub Bot commented on FLINK-6008: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3512#discussion_r106677990 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java --- @@ -400,6 +418,47 @@ public void delete(BlobKey key) throws IOException { } /** +* Deletes the file associated with the given job and key if it exists in the local +* storage of the blob server. +* +* @param jobId JobID of the file in the blob store +* @param key String key of the file in the blob store +*/ + @Override + public void delete(JobID jobId, String key) { + checkArgument(jobId != null, "Job id must not be null."); + checkArgument(key != null, "BLOB name must not be null."); + + final File localFile = BlobUtils.getStorageLocation(storageDir, jobId, key); + + if (localFile.exists()) { + if (!localFile.delete()) { + LOG.warn("Failed to delete locally BLOB " + key + " at " + localFile.getAbsolutePath()); + } + } + + blobStore.delete(jobId, key); + } + + /** +* Deletes all files associated with the given job id from the storage. +* +* @param jobId JobID of the files in the blob store +*/ + @Override + public void deleteAll(final JobID jobId) { + checkArgument(jobId != null, "Job id must not be null."); + + try { + BlobUtils.deleteJobDirectory(storageDir, jobId); + } catch (IOException e) { --- End diff -- If we want to make sure we cleanup in any case, we can actually catch `Exception` here. > collection of BlobServer improvements > - > > Key: FLINK-6008 > URL: https://issues.apache.org/jira/browse/FLINK-6008 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.3.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > The following things should be improved around the BlobServer/BlobCache: > * update config uptions with non-deprecated ones, e.g. > {{high-availability.cluster-id}} and {{high-availability.storageDir}} > * promote {{BlobStore#deleteAll(JobID)}} to the {{BlobService}} > * extend the {{BlobService}} to work with {{NAME_ADDRESSABLE}} blobs > (prepares FLINK-4399] > * remove {{NAME_ADDRESSABLE}} blobs after job/task termination > * do not fail the {{BlobServer}} when a delete operation fails > * code style, like using {{Preconditions.checkArgument}} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6008) collection of BlobServer improvements
[ https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15930208#comment-15930208 ] ASF GitHub Bot commented on FLINK-6008: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3512#discussion_r106677799 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java --- @@ -400,6 +418,47 @@ public void delete(BlobKey key) throws IOException { } /** +* Deletes the file associated with the given job and key if it exists in the local +* storage of the blob server. +* +* @param jobId JobID of the file in the blob store +* @param key String key of the file in the blob store +*/ + @Override + public void delete(JobID jobId, String key) { + checkArgument(jobId != null, "Job id must not be null."); + checkArgument(key != null, "BLOB name must not be null."); + + final File localFile = BlobUtils.getStorageLocation(storageDir, jobId, key); + + if (localFile.exists()) { --- End diff -- From concurrency safety, it better to do `if (!delete && exists)` > collection of BlobServer improvements > - > > Key: FLINK-6008 > URL: https://issues.apache.org/jira/browse/FLINK-6008 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.3.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > The following things should be improved around the BlobServer/BlobCache: > * update config uptions with non-deprecated ones, e.g. > {{high-availability.cluster-id}} and {{high-availability.storageDir}} > * promote {{BlobStore#deleteAll(JobID)}} to the {{BlobService}} > * extend the {{BlobService}} to work with {{NAME_ADDRESSABLE}} blobs > (prepares FLINK-4399] > * remove {{NAME_ADDRESSABLE}} blobs after job/task termination > * do not fail the {{BlobServer}} when a delete operation fails > * code style, like using {{Preconditions.checkArgument}} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6008) collection of BlobServer improvements
[ https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15905376#comment-15905376 ] ASF GitHub Bot commented on FLINK-6008: --- GitHub user NicoK opened a pull request: https://github.com/apache/flink/pull/3512 [FLINK-6008] collection of BlobServer improvements This PR improves the following things around the `BlobServer`/`BlobCache`: * replaces config uptions in `config.md` with non-deprecated ones, e.g. `high-availability.cluster-id` and `high-availability.storageDir` * promote `BlobStore#deleteAll(JobID)` to the `BlobService` * extend the `BlobService` to work with `NAME_ADDRESSABLE` blobs (prepares for FLINK-4399] * remove `NAME_ADDRESSABLE` blobs after job/task termination * add more unit tests for `NAME_ADDRESSABLE` blobs * do not fail the `BlobServer` when a delete operation fails * general code style and docs improvements, like using `Preconditions.checkArgument` You can merge this pull request into a Git repository by running: $ git pull https://github.com/NicoK/flink flink-6008 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3512.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3512 commit 8cfbe97df3f7c8fa268f5c19291174a99e3cf943 Author: Nico Kruber Date: 2016-12-20T15:49:57Z [FLINK-6008][docs] minor improvements in the BlobService docs commit a72b31474fd38f27e5cc582b3c2797fa51695e38 Author: Nico Kruber Date: 2017-01-06T17:42:58Z [FLINK-6008][docs] update some config options to the new, non-deprecated ones commit a6af4e0b393a8684984a6adada7e6eff4f99ac18 Author: Nico Kruber Date: 2016-12-20T17:27:13Z [FLINK-6008] refactor BlobCache#getURL() for cleaner code commit 69247739e127f8c941e352c07a0be6e03ecea1d1 Author: Nico Kruber Date: 2016-12-20T17:52:19Z [FLINK-6008] extend the BlobService to the NAME_ADDRESSABLE blobs These blobs are referenced by the job ID and a selected name instead of the hash sum of the blob's contents. Some code was already prepared but lacked the proper additions in further APIs. This commit adds some. commit 9913ae86b854e1c5b3dca404824ab9a70cc32db6 Author: Nico Kruber Date: 2016-12-21T15:23:29Z [FLINK-6008] promote BlobStore#deleteAll(JobID) to the BlobService commit d96e6d43ac637149e9d1077c6dee3801d30f679a Author: Nico Kruber Date: 2017-03-09T17:14:02Z [FLINK-6008] use Preconditions.checkArgument in BlobClient commit 6d53e3ff87110601eb1a71d60f850e6089930141 Author: Nico Kruber Date: 2016-12-21T16:59:27Z [FLINK-6008] properly remove NAME_ADDRESSABLE blobs after job/task termination commit 5ef5a74db3f6753437b585823b037e25e23a61ba Author: Nico Kruber Date: 2017-03-09T18:14:52Z [FLINK-6008] more unit tests for NAME_ADDRESSABLE and BlobService access NAME_ADDRESSABLE blobs were not that thouroughly tested before and also the access methods that the BlobService implementations provide. This adds tests covering both. commit 34857456a43ec5a2ccb5166bd379f263cd54697d Author: Nico Kruber Date: 2017-03-09T17:15:08Z [FLINK-6008] do not fail the BlobServer when delete fails This also enables us to reuse some more code between BlobServerConnection and BlobServer. commit e55ab0f37005ef37065b8156f59e4b8db1a7b95f Author: Nico Kruber Date: 2017-03-09T17:32:14Z [FLINK-6008] refactor BlobCache#deleteGlobal() for cleaner code > collection of BlobServer improvements > - > > Key: FLINK-6008 > URL: https://issues.apache.org/jira/browse/FLINK-6008 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.3.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > The following things should be improved around the BlobServer/BlobCache: > * update config uptions with non-deprecated ones, e.g. > {{high-availability.cluster-id}} and {{high-availability.storageDir}} > * promote {{BlobStore#deleteAll(JobID)}} to the {{BlobService}} > * extend the {{BlobService}} to work with {{NAME_ADDRESSABLE}} blobs > (prepares FLINK-4399] > * remove {{NAME_ADDRESSABLE}} blobs after job/task termination > * do not fail the {{BlobServer}} when a delete operation fails > * code style, like using {{Preconditions.checkArgument}} -- This message was sent by Atlassian JIRA (v6.3.15#6346)