[jira] [Commented] (FLINK-6008) collection of BlobServer improvements

2017-07-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16073920#comment-16073920
 ] 

ASF GitHub Bot commented on FLINK-6008:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4146


> collection of BlobServer improvements
> -
>
> Key: FLINK-6008
> URL: https://issues.apache.org/jira/browse/FLINK-6008
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.3.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> The following things should be improved around the BlobServer/BlobCache:
> * update config uptions with non-deprecated ones, e.g. 
> {{high-availability.cluster-id}} and {{high-availability.storageDir}}
> * -promote {{BlobStore#deleteAll(JobID)}} to the {{BlobService}}-
> * -extend the {{BlobService}} to work with {{NAME_ADDRESSABLE}} blobs 
> (prepares FLINK-4399]-
> * -remove {{NAME_ADDRESSABLE}} blobs after job/task termination-
> * do not fail the {{BlobServer}} when a delete operation fails
> * code style, like using {{Preconditions.checkArgument}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6008) collection of BlobServer improvements

2017-07-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16073483#comment-16073483
 ] 

ASF GitHub Bot commented on FLINK-6008:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/4146
  
merging,


> collection of BlobServer improvements
> -
>
> Key: FLINK-6008
> URL: https://issues.apache.org/jira/browse/FLINK-6008
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.3.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> The following things should be improved around the BlobServer/BlobCache:
> * update config uptions with non-deprecated ones, e.g. 
> {{high-availability.cluster-id}} and {{high-availability.storageDir}}
> * -promote {{BlobStore#deleteAll(JobID)}} to the {{BlobService}}-
> * -extend the {{BlobService}} to work with {{NAME_ADDRESSABLE}} blobs 
> (prepares FLINK-4399]-
> * -remove {{NAME_ADDRESSABLE}} blobs after job/task termination-
> * do not fail the {{BlobServer}} when a delete operation fails
> * code style, like using {{Preconditions.checkArgument}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6008) collection of BlobServer improvements

2017-07-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16072164#comment-16072164
 ] 

ASF GitHub Bot commented on FLINK-6008:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4146#discussion_r125244675
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
 ---
@@ -92,6 +99,69 @@ public void testDeleteSingle() {
catch (IllegalStateException e) {
// expected
}
+
+   // delete a file directly on the server
+   server.delete(key2);
+   try {
+   server.getURL(key2);
+   fail("BLOB should have been deleted");
+   }
+   catch (IOException e) {
+   // expected
+   }
+   }
+   catch (Exception e) {
+   e.printStackTrace();
+   fail(e.getMessage());
+   }
--- End diff --

Absolutely right - I was told this was an issue some versions ago with 
JUnit or so... was just copying the code from the other tests in this class. 
Let me create a cleanup-PR on top of the newest PR in this series, i.e. #4238 


> collection of BlobServer improvements
> -
>
> Key: FLINK-6008
> URL: https://issues.apache.org/jira/browse/FLINK-6008
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.3.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> The following things should be improved around the BlobServer/BlobCache:
> * update config uptions with non-deprecated ones, e.g. 
> {{high-availability.cluster-id}} and {{high-availability.storageDir}}
> * -promote {{BlobStore#deleteAll(JobID)}} to the {{BlobService}}-
> * -extend the {{BlobService}} to work with {{NAME_ADDRESSABLE}} blobs 
> (prepares FLINK-4399]-
> * -remove {{NAME_ADDRESSABLE}} blobs after job/task termination-
> * do not fail the {{BlobServer}} when a delete operation fails
> * code style, like using {{Preconditions.checkArgument}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6008) collection of BlobServer improvements

2017-07-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16072156#comment-16072156
 ] 

ASF GitHub Bot commented on FLINK-6008:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4146#discussion_r125243142
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
 ---
@@ -540,7 +526,7 @@ else if (type == NAME_ADDRESSABLE) {
// we should make the local and remote 
file deletion atomic, otherwise we might risk not
// removing the remote file in case of 
a concurrent put operation
if (blobFile.exists() && 
!blobFile.delete()) {
--- End diff --

you're right, this could be improved as well - the next PR in this series 
is removing this code though


> collection of BlobServer improvements
> -
>
> Key: FLINK-6008
> URL: https://issues.apache.org/jira/browse/FLINK-6008
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.3.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> The following things should be improved around the BlobServer/BlobCache:
> * update config uptions with non-deprecated ones, e.g. 
> {{high-availability.cluster-id}} and {{high-availability.storageDir}}
> * -promote {{BlobStore#deleteAll(JobID)}} to the {{BlobService}}-
> * -extend the {{BlobService}} to work with {{NAME_ADDRESSABLE}} blobs 
> (prepares FLINK-4399]-
> * -remove {{NAME_ADDRESSABLE}} blobs after job/task termination-
> * do not fail the {{BlobServer}} when a delete operation fails
> * code style, like using {{Preconditions.checkArgument}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6008) collection of BlobServer improvements

2017-07-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16071733#comment-16071733
 ] 

ASF GitHub Bot commented on FLINK-6008:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4146#discussion_r125165594
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
 ---
@@ -92,6 +99,69 @@ public void testDeleteSingle() {
catch (IllegalStateException e) {
// expected
}
+
+   // delete a file directly on the server
+   server.delete(key2);
+   try {
+   server.getURL(key2);
+   fail("BLOB should have been deleted");
+   }
+   catch (IOException e) {
+   // expected
+   }
+   }
+   catch (Exception e) {
+   e.printStackTrace();
+   fail(e.getMessage());
+   }
--- End diff --

Can we remove this and simply let the exception be thrown? I think catching 
an exception, printing the stack trace and then failing with the exception 
message is an anti-pattern.


> collection of BlobServer improvements
> -
>
> Key: FLINK-6008
> URL: https://issues.apache.org/jira/browse/FLINK-6008
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.3.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> The following things should be improved around the BlobServer/BlobCache:
> * update config uptions with non-deprecated ones, e.g. 
> {{high-availability.cluster-id}} and {{high-availability.storageDir}}
> * -promote {{BlobStore#deleteAll(JobID)}} to the {{BlobService}}-
> * -extend the {{BlobService}} to work with {{NAME_ADDRESSABLE}} blobs 
> (prepares FLINK-4399]-
> * -remove {{NAME_ADDRESSABLE}} blobs after job/task termination-
> * do not fail the {{BlobServer}} when a delete operation fails
> * code style, like using {{Preconditions.checkArgument}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6008) collection of BlobServer improvements

2017-07-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16071734#comment-16071734
 ] 

ASF GitHub Bot commented on FLINK-6008:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4146#discussion_r125165567
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
 ---
@@ -540,7 +526,7 @@ else if (type == NAME_ADDRESSABLE) {
// we should make the local and remote 
file deletion atomic, otherwise we might risk not
// removing the remote file in case of 
a concurrent put operation
if (blobFile.exists() && 
!blobFile.delete()) {
--- End diff --

Shouldn't the order of the operations be changed like in the other cases?


> collection of BlobServer improvements
> -
>
> Key: FLINK-6008
> URL: https://issues.apache.org/jira/browse/FLINK-6008
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.3.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> The following things should be improved around the BlobServer/BlobCache:
> * update config uptions with non-deprecated ones, e.g. 
> {{high-availability.cluster-id}} and {{high-availability.storageDir}}
> * -promote {{BlobStore#deleteAll(JobID)}} to the {{BlobService}}-
> * -extend the {{BlobService}} to work with {{NAME_ADDRESSABLE}} blobs 
> (prepares FLINK-4399]-
> * -remove {{NAME_ADDRESSABLE}} blobs after job/task termination-
> * do not fail the {{BlobServer}} when a delete operation fails
> * code style, like using {{Preconditions.checkArgument}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6008) collection of BlobServer improvements

2017-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16067956#comment-16067956
 ] 

ASF GitHub Bot commented on FLINK-6008:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4146#discussion_r124734798
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
 ---
@@ -59,6 +60,110 @@
 
private final Random rnd = new Random();
 
+
+   // --- concurrency tests for utility methods which could fail during 
the put operation ---
+
+   /**
+* Checked thread that calls {@link 
BlobServer#getStorageLocation(BlobKey)}
+*/
+   public static class ContentAddressableGetStorageLocation extends 
CheckedThread {
+   private final BlobServer server;
+   private final BlobKey key;
+
+   public ContentAddressableGetStorageLocation(BlobServer server, 
BlobKey key) {
+   this.server = server;
+   this.key = key;
+   }
+
+   @Override
+   public void go() throws Exception {
+   server.getStorageLocation(key);
--- End diff --

Unfortunately, concurrency with `delete` operations does not work either if 
not guarded - the directory may not exist anymore between 
`jobDirectory.mkdirs()` and `jobDirectory.exists()`. I was able to reproduce 
the error with the existing test though - if you want to try it, just change 
the order of these two commands back - the test will not hit every time, but 
some times.


> collection of BlobServer improvements
> -
>
> Key: FLINK-6008
> URL: https://issues.apache.org/jira/browse/FLINK-6008
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.3.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> The following things should be improved around the BlobServer/BlobCache:
> * update config uptions with non-deprecated ones, e.g. 
> {{high-availability.cluster-id}} and {{high-availability.storageDir}}
> * promote {{BlobStore#deleteAll(JobID)}} to the {{BlobService}}
> * extend the {{BlobService}} to work with {{NAME_ADDRESSABLE}} blobs 
> (prepares FLINK-4399]
> * remove {{NAME_ADDRESSABLE}} blobs after job/task termination
> * do not fail the {{BlobServer}} when a delete operation fails
> * code style, like using {{Preconditions.checkArgument}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6008) collection of BlobServer improvements

2017-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16067918#comment-16067918
 ] 

ASF GitHub Bot commented on FLINK-6008:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4146#discussion_r124727496
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java ---
@@ -593,9 +594,7 @@ private void sendPutHeader(OutputStream outputStream, 
JobID jobID, String key) t
 * the BLOB server or if the BLOB server cannot delete the file
 */
public void delete(BlobKey key) throws IOException {
-   if (key == null) {
-   throw new IllegalArgumentException("BLOB key must not 
be null");
-   }
+   checkArgument(key != null, "BLOB key must not be null.");
--- End diff --

Yes, but it would be cleaner and less surprising for future users. However 
I have no strong feelings about it.


> collection of BlobServer improvements
> -
>
> Key: FLINK-6008
> URL: https://issues.apache.org/jira/browse/FLINK-6008
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.3.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> The following things should be improved around the BlobServer/BlobCache:
> * update config uptions with non-deprecated ones, e.g. 
> {{high-availability.cluster-id}} and {{high-availability.storageDir}}
> * promote {{BlobStore#deleteAll(JobID)}} to the {{BlobService}}
> * extend the {{BlobService}} to work with {{NAME_ADDRESSABLE}} blobs 
> (prepares FLINK-4399]
> * remove {{NAME_ADDRESSABLE}} blobs after job/task termination
> * do not fail the {{BlobServer}} when a delete operation fails
> * code style, like using {{Preconditions.checkArgument}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6008) collection of BlobServer improvements

2017-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16067920#comment-16067920
 ] 

ASF GitHub Bot commented on FLINK-6008:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4146#discussion_r124728498
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java ---
@@ -227,7 +227,7 @@ static File getStorageLocation(File storageDir, JobID 
jobID, String key) {
private static File getJobDirectory(File storageDir, JobID jobID) {
final File jobDirectory = new File(storageDir, JOB_DIR_PREFIX + 
jobID.toString());
 
-   if (!jobDirectory.exists() && !jobDirectory.mkdirs()) {
--- End diff --

Ok, please add comment or commit message info about this.


> collection of BlobServer improvements
> -
>
> Key: FLINK-6008
> URL: https://issues.apache.org/jira/browse/FLINK-6008
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.3.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> The following things should be improved around the BlobServer/BlobCache:
> * update config uptions with non-deprecated ones, e.g. 
> {{high-availability.cluster-id}} and {{high-availability.storageDir}}
> * promote {{BlobStore#deleteAll(JobID)}} to the {{BlobService}}
> * extend the {{BlobService}} to work with {{NAME_ADDRESSABLE}} blobs 
> (prepares FLINK-4399]
> * remove {{NAME_ADDRESSABLE}} blobs after job/task termination
> * do not fail the {{BlobServer}} when a delete operation fails
> * code style, like using {{Preconditions.checkArgument}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6008) collection of BlobServer improvements

2017-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16067917#comment-16067917
 ] 

ASF GitHub Bot commented on FLINK-6008:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4146#discussion_r124730014
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
 ---
@@ -59,6 +60,110 @@
 
private final Random rnd = new Random();
 
+
+   // --- concurrency tests for utility methods which could fail during 
the put operation ---
+
+   /**
+* Checked thread that calls {@link 
BlobServer#getStorageLocation(BlobKey)}
+*/
+   public static class ContentAddressableGetStorageLocation extends 
CheckedThread {
+   private final BlobServer server;
+   private final BlobKey key;
+
+   public ContentAddressableGetStorageLocation(BlobServer server, 
BlobKey key) {
+   this.server = server;
+   this.key = key;
+   }
+
+   @Override
+   public void go() throws Exception {
+   server.getStorageLocation(key);
--- End diff --

So maybe you could call in a loop multiple times `gets` and `deletes` (if 
there is such operation) interleaved? Otherwise I don't see a real value of 
those tests and I would prefer to drop them (so that we don't have to maintain 
tests that do not check for anything).


> collection of BlobServer improvements
> -
>
> Key: FLINK-6008
> URL: https://issues.apache.org/jira/browse/FLINK-6008
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.3.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> The following things should be improved around the BlobServer/BlobCache:
> * update config uptions with non-deprecated ones, e.g. 
> {{high-availability.cluster-id}} and {{high-availability.storageDir}}
> * promote {{BlobStore#deleteAll(JobID)}} to the {{BlobService}}
> * extend the {{BlobService}} to work with {{NAME_ADDRESSABLE}} blobs 
> (prepares FLINK-4399]
> * remove {{NAME_ADDRESSABLE}} blobs after job/task termination
> * do not fail the {{BlobServer}} when a delete operation fails
> * code style, like using {{Preconditions.checkArgument}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6008) collection of BlobServer improvements

2017-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16067919#comment-16067919
 ] 

ASF GitHub Bot commented on FLINK-6008:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4146#discussion_r124727900
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
 ---
@@ -509,21 +509,7 @@ private void delete(InputStream inputStream, 
OutputStream outputStream, byte[] b
 
if (type == CONTENT_ADDRESSABLE) {
BlobKey key = 
BlobKey.readFromInputStream(inputStream);
-   File blobFile = 
blobServer.getStorageLocation(key);
-
-   writeLock.lock();
-
-   try {
-   // we should make the local and remote 
file deletion atomic, otherwise we might risk not
-   // removing the remote file in case of 
a concurrent put operation
-   if (blobFile.exists() && 
!blobFile.delete()) {
-   throw new IOException("Cannot 
delete BLOB file " + blobFile.getAbsolutePath());
--- End diff --

Ok, sorry, I didn't find it first time I was looking for it.


> collection of BlobServer improvements
> -
>
> Key: FLINK-6008
> URL: https://issues.apache.org/jira/browse/FLINK-6008
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.3.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> The following things should be improved around the BlobServer/BlobCache:
> * update config uptions with non-deprecated ones, e.g. 
> {{high-availability.cluster-id}} and {{high-availability.storageDir}}
> * promote {{BlobStore#deleteAll(JobID)}} to the {{BlobService}}
> * extend the {{BlobService}} to work with {{NAME_ADDRESSABLE}} blobs 
> (prepares FLINK-4399]
> * remove {{NAME_ADDRESSABLE}} blobs after job/task termination
> * do not fail the {{BlobServer}} when a delete operation fails
> * code style, like using {{Preconditions.checkArgument}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6008) collection of BlobServer improvements

2017-06-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066798#comment-16066798
 ] 

ASF GitHub Bot commented on FLINK-6008:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4146#discussion_r124587248
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
 ---
@@ -509,21 +509,7 @@ private void delete(InputStream inputStream, 
OutputStream outputStream, byte[] b
 
if (type == CONTENT_ADDRESSABLE) {
BlobKey key = 
BlobKey.readFromInputStream(inputStream);
-   File blobFile = 
blobServer.getStorageLocation(key);
-
-   writeLock.lock();
-
-   try {
-   // we should make the local and remote 
file deletion atomic, otherwise we might risk not
-   // removing the remote file in case of 
a concurrent put operation
-   if (blobFile.exists() && 
!blobFile.delete()) {
-   throw new IOException("Cannot 
delete BLOB file " + blobFile.getAbsolutePath());
--- End diff --

the (included) changes in `BlobServerDeleteTest` should cover this


> collection of BlobServer improvements
> -
>
> Key: FLINK-6008
> URL: https://issues.apache.org/jira/browse/FLINK-6008
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.3.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> The following things should be improved around the BlobServer/BlobCache:
> * update config uptions with non-deprecated ones, e.g. 
> {{high-availability.cluster-id}} and {{high-availability.storageDir}}
> * promote {{BlobStore#deleteAll(JobID)}} to the {{BlobService}}
> * extend the {{BlobService}} to work with {{NAME_ADDRESSABLE}} blobs 
> (prepares FLINK-4399]
> * remove {{NAME_ADDRESSABLE}} blobs after job/task termination
> * do not fail the {{BlobServer}} when a delete operation fails
> * code style, like using {{Preconditions.checkArgument}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6008) collection of BlobServer improvements

2017-06-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066796#comment-16066796
 ] 

ASF GitHub Bot commented on FLINK-6008:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4146#discussion_r124586418
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
 ---
@@ -59,6 +60,110 @@
 
private final Random rnd = new Random();
 
+
+   // --- concurrency tests for utility methods which could fail during 
the put operation ---
+
+   /**
+* Checked thread that calls {@link 
BlobServer#getStorageLocation(BlobKey)}
+*/
+   public static class ContentAddressableGetStorageLocation extends 
CheckedThread {
+   private final BlobServer server;
+   private final BlobKey key;
+
+   public ContentAddressableGetStorageLocation(BlobServer server, 
BlobKey key) {
+   this.server = server;
+   this.key = key;
+   }
+
+   @Override
+   public void go() throws Exception {
+   server.getStorageLocation(key);
--- End diff --

Actually, the job directory is only created once so it doesn't help adding 
more calls. What could help make this happen more often, is to add more threads 
to 
`BlobServerPutTest#testServerContentAddressableGetStorageLocationConcurrent()` 
otherwise the failure only happens once in a while if the implementation is not 
thread-safe


> collection of BlobServer improvements
> -
>
> Key: FLINK-6008
> URL: https://issues.apache.org/jira/browse/FLINK-6008
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.3.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> The following things should be improved around the BlobServer/BlobCache:
> * update config uptions with non-deprecated ones, e.g. 
> {{high-availability.cluster-id}} and {{high-availability.storageDir}}
> * promote {{BlobStore#deleteAll(JobID)}} to the {{BlobService}}
> * extend the {{BlobService}} to work with {{NAME_ADDRESSABLE}} blobs 
> (prepares FLINK-4399]
> * remove {{NAME_ADDRESSABLE}} blobs after job/task termination
> * do not fail the {{BlobServer}} when a delete operation fails
> * code style, like using {{Preconditions.checkArgument}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6008) collection of BlobServer improvements

2017-06-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066786#comment-16066786
 ] 

ASF GitHub Bot commented on FLINK-6008:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4146#discussion_r124585333
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java ---
@@ -227,7 +227,7 @@ static File getStorageLocation(File storageDir, JobID 
jobID, String key) {
private static File getJobDirectory(File storageDir, JobID jobID) {
final File jobDirectory = new File(storageDir, JOB_DIR_PREFIX + 
jobID.toString());
 
-   if (!jobDirectory.exists() && !jobDirectory.mkdirs()) {
--- End diff --

the new way is thread-safe - imagine a directory being concurrently created 
after another thread has failed the `exists()` part - not sure we actually fix 
a bug nowadays since the use may be guarded...so this is probably more of a 
"just-in-case" thing or if usage patterns change


> collection of BlobServer improvements
> -
>
> Key: FLINK-6008
> URL: https://issues.apache.org/jira/browse/FLINK-6008
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.3.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> The following things should be improved around the BlobServer/BlobCache:
> * update config uptions with non-deprecated ones, e.g. 
> {{high-availability.cluster-id}} and {{high-availability.storageDir}}
> * promote {{BlobStore#deleteAll(JobID)}} to the {{BlobService}}
> * extend the {{BlobService}} to work with {{NAME_ADDRESSABLE}} blobs 
> (prepares FLINK-4399]
> * remove {{NAME_ADDRESSABLE}} blobs after job/task termination
> * do not fail the {{BlobServer}} when a delete operation fails
> * code style, like using {{Preconditions.checkArgument}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6008) collection of BlobServer improvements

2017-06-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066780#comment-16066780
 ] 

ASF GitHub Bot commented on FLINK-6008:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4146#discussion_r124584383
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java ---
@@ -593,9 +594,7 @@ private void sendPutHeader(OutputStream outputStream, 
JobID jobID, String key) t
 * the BLOB server or if the BLOB server cannot delete the file
 */
public void delete(BlobKey key) throws IOException {
-   if (key == null) {
-   throw new IllegalArgumentException("BLOB key must not 
be null");
-   }
+   checkArgument(key != null, "BLOB key must not be null.");
--- End diff --

unfortunately, `checkNotNull` throws a `NullPointerException` instead and I 
did not want to change that here


> collection of BlobServer improvements
> -
>
> Key: FLINK-6008
> URL: https://issues.apache.org/jira/browse/FLINK-6008
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.3.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> The following things should be improved around the BlobServer/BlobCache:
> * update config uptions with non-deprecated ones, e.g. 
> {{high-availability.cluster-id}} and {{high-availability.storageDir}}
> * promote {{BlobStore#deleteAll(JobID)}} to the {{BlobService}}
> * extend the {{BlobService}} to work with {{NAME_ADDRESSABLE}} blobs 
> (prepares FLINK-4399]
> * remove {{NAME_ADDRESSABLE}} blobs after job/task termination
> * do not fail the {{BlobServer}} when a delete operation fails
> * code style, like using {{Preconditions.checkArgument}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6008) collection of BlobServer improvements

2017-06-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066686#comment-16066686
 ] 

ASF GitHub Bot commented on FLINK-6008:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4146#discussion_r124566417
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
 ---
@@ -509,21 +509,7 @@ private void delete(InputStream inputStream, 
OutputStream outputStream, byte[] b
 
if (type == CONTENT_ADDRESSABLE) {
BlobKey key = 
BlobKey.readFromInputStream(inputStream);
-   File blobFile = 
blobServer.getStorageLocation(key);
-
-   writeLock.lock();
-
-   try {
-   // we should make the local and remote 
file deletion atomic, otherwise we might risk not
-   // removing the remote file in case of 
a concurrent put operation
-   if (blobFile.exists() && 
!blobFile.delete()) {
-   throw new IOException("Cannot 
delete BLOB file " + blobFile.getAbsolutePath());
--- End diff --

Don't you want to add a test coverage for not throwing if delete fails?


> collection of BlobServer improvements
> -
>
> Key: FLINK-6008
> URL: https://issues.apache.org/jira/browse/FLINK-6008
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.3.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> The following things should be improved around the BlobServer/BlobCache:
> * update config uptions with non-deprecated ones, e.g. 
> {{high-availability.cluster-id}} and {{high-availability.storageDir}}
> * promote {{BlobStore#deleteAll(JobID)}} to the {{BlobService}}
> * extend the {{BlobService}} to work with {{NAME_ADDRESSABLE}} blobs 
> (prepares FLINK-4399]
> * remove {{NAME_ADDRESSABLE}} blobs after job/task termination
> * do not fail the {{BlobServer}} when a delete operation fails
> * code style, like using {{Preconditions.checkArgument}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6008) collection of BlobServer improvements

2017-06-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066688#comment-16066688
 ] 

ASF GitHub Bot commented on FLINK-6008:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4146#discussion_r124566819
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java ---
@@ -227,7 +227,7 @@ static File getStorageLocation(File storageDir, JobID 
jobID, String key) {
private static File getJobDirectory(File storageDir, JobID jobID) {
final File jobDirectory = new File(storageDir, JOB_DIR_PREFIX + 
jobID.toString());
 
-   if (!jobDirectory.exists() && !jobDirectory.mkdirs()) {
--- End diff --

Why did you change the order of those operations? Is that fix for 
something? If so could you add explanation to the commit message what's going 
on here?


> collection of BlobServer improvements
> -
>
> Key: FLINK-6008
> URL: https://issues.apache.org/jira/browse/FLINK-6008
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.3.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> The following things should be improved around the BlobServer/BlobCache:
> * update config uptions with non-deprecated ones, e.g. 
> {{high-availability.cluster-id}} and {{high-availability.storageDir}}
> * promote {{BlobStore#deleteAll(JobID)}} to the {{BlobService}}
> * extend the {{BlobService}} to work with {{NAME_ADDRESSABLE}} blobs 
> (prepares FLINK-4399]
> * remove {{NAME_ADDRESSABLE}} blobs after job/task termination
> * do not fail the {{BlobServer}} when a delete operation fails
> * code style, like using {{Preconditions.checkArgument}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6008) collection of BlobServer improvements

2017-06-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066685#comment-16066685
 ] 

ASF GitHub Bot commented on FLINK-6008:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4146#discussion_r124564731
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java ---
@@ -593,9 +594,7 @@ private void sendPutHeader(OutputStream outputStream, 
JobID jobID, String key) t
 * the BLOB server or if the BLOB server cannot delete the file
 */
public void delete(BlobKey key) throws IOException {
-   if (key == null) {
-   throw new IllegalArgumentException("BLOB key must not 
be null");
-   }
+   checkArgument(key != null, "BLOB key must not be null.");
--- End diff --

`requireNonNull`?


> collection of BlobServer improvements
> -
>
> Key: FLINK-6008
> URL: https://issues.apache.org/jira/browse/FLINK-6008
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.3.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> The following things should be improved around the BlobServer/BlobCache:
> * update config uptions with non-deprecated ones, e.g. 
> {{high-availability.cluster-id}} and {{high-availability.storageDir}}
> * promote {{BlobStore#deleteAll(JobID)}} to the {{BlobService}}
> * extend the {{BlobService}} to work with {{NAME_ADDRESSABLE}} blobs 
> (prepares FLINK-4399]
> * remove {{NAME_ADDRESSABLE}} blobs after job/task termination
> * do not fail the {{BlobServer}} when a delete operation fails
> * code style, like using {{Preconditions.checkArgument}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6008) collection of BlobServer improvements

2017-06-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066684#comment-16066684
 ] 

ASF GitHub Bot commented on FLINK-6008:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4146#discussion_r124569928
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java ---
@@ -411,12 +411,11 @@ public void delete(BlobKey key) throws IOException {
readWriteLock.writeLock().lock();
 
try {
-   if (localFile.exists()) {
-   if (!localFile.delete()) {
-   LOG.warn("Failed to delete locally BLOB 
" + key + " at " + localFile.getAbsolutePath());
-   }
+   if (!localFile.delete() && localFile.exists()) {
+   LOG.warn("Failed to delete locally BLOB " + key 
+ " at " + localFile.getAbsolutePath());
}
 
+
--- End diff --

remove extra line


> collection of BlobServer improvements
> -
>
> Key: FLINK-6008
> URL: https://issues.apache.org/jira/browse/FLINK-6008
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.3.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> The following things should be improved around the BlobServer/BlobCache:
> * update config uptions with non-deprecated ones, e.g. 
> {{high-availability.cluster-id}} and {{high-availability.storageDir}}
> * promote {{BlobStore#deleteAll(JobID)}} to the {{BlobService}}
> * extend the {{BlobService}} to work with {{NAME_ADDRESSABLE}} blobs 
> (prepares FLINK-4399]
> * remove {{NAME_ADDRESSABLE}} blobs after job/task termination
> * do not fail the {{BlobServer}} when a delete operation fails
> * code style, like using {{Preconditions.checkArgument}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6008) collection of BlobServer improvements

2017-06-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066687#comment-16066687
 ] 

ASF GitHub Bot commented on FLINK-6008:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4146#discussion_r124568869
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
 ---
@@ -59,6 +60,110 @@
 
private final Random rnd = new Random();
 
+
+   // --- concurrency tests for utility methods which could fail during 
the put operation ---
+
+   /**
+* Checked thread that calls {@link 
BlobServer#getStorageLocation(BlobKey)}
+*/
+   public static class ContentAddressableGetStorageLocation extends 
CheckedThread {
+   private final BlobServer server;
+   private final BlobKey key;
+
+   public ContentAddressableGetStorageLocation(BlobServer server, 
BlobKey key) {
+   this.server = server;
+   this.key = key;
+   }
+
+   @Override
+   public void go() throws Exception {
+   server.getStorageLocation(key);
--- End diff --

Shouldn't you call this at least couple/couple of dozens/couple of hundred 
times? Otherwise won't this complete before next thread starts up?


> collection of BlobServer improvements
> -
>
> Key: FLINK-6008
> URL: https://issues.apache.org/jira/browse/FLINK-6008
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.3.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> The following things should be improved around the BlobServer/BlobCache:
> * update config uptions with non-deprecated ones, e.g. 
> {{high-availability.cluster-id}} and {{high-availability.storageDir}}
> * promote {{BlobStore#deleteAll(JobID)}} to the {{BlobService}}
> * extend the {{BlobService}} to work with {{NAME_ADDRESSABLE}} blobs 
> (prepares FLINK-4399]
> * remove {{NAME_ADDRESSABLE}} blobs after job/task termination
> * do not fail the {{BlobServer}} when a delete operation fails
> * code style, like using {{Preconditions.checkArgument}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6008) collection of BlobServer improvements

2017-06-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16057378#comment-16057378
 ] 

ASF GitHub Bot commented on FLINK-6008:
---

Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/4146
  
please note that there are four unrelated test failures:
* 2x Kafka010ITCase
* 2x YARNSessionCapacitySchedulerITCase


> collection of BlobServer improvements
> -
>
> Key: FLINK-6008
> URL: https://issues.apache.org/jira/browse/FLINK-6008
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.3.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> The following things should be improved around the BlobServer/BlobCache:
> * update config uptions with non-deprecated ones, e.g. 
> {{high-availability.cluster-id}} and {{high-availability.storageDir}}
> * promote {{BlobStore#deleteAll(JobID)}} to the {{BlobService}}
> * extend the {{BlobService}} to work with {{NAME_ADDRESSABLE}} blobs 
> (prepares FLINK-4399]
> * remove {{NAME_ADDRESSABLE}} blobs after job/task termination
> * do not fail the {{BlobServer}} when a delete operation fails
> * code style, like using {{Preconditions.checkArgument}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6008) collection of BlobServer improvements

2017-06-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16055463#comment-16055463
 ] 

ASF GitHub Bot commented on FLINK-6008:
---

Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/3512
  
let's include the improvements in a separate PR #4146  and drop the feature 
additions for a re-work in FLIP-19


> collection of BlobServer improvements
> -
>
> Key: FLINK-6008
> URL: https://issues.apache.org/jira/browse/FLINK-6008
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.3.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> The following things should be improved around the BlobServer/BlobCache:
> * update config uptions with non-deprecated ones, e.g. 
> {{high-availability.cluster-id}} and {{high-availability.storageDir}}
> * promote {{BlobStore#deleteAll(JobID)}} to the {{BlobService}}
> * extend the {{BlobService}} to work with {{NAME_ADDRESSABLE}} blobs 
> (prepares FLINK-4399]
> * remove {{NAME_ADDRESSABLE}} blobs after job/task termination
> * do not fail the {{BlobServer}} when a delete operation fails
> * code style, like using {{Preconditions.checkArgument}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6008) collection of BlobServer improvements

2017-06-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16055464#comment-16055464
 ] 

ASF GitHub Bot commented on FLINK-6008:
---

Github user NicoK closed the pull request at:

https://github.com/apache/flink/pull/3512


> collection of BlobServer improvements
> -
>
> Key: FLINK-6008
> URL: https://issues.apache.org/jira/browse/FLINK-6008
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.3.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> The following things should be improved around the BlobServer/BlobCache:
> * update config uptions with non-deprecated ones, e.g. 
> {{high-availability.cluster-id}} and {{high-availability.storageDir}}
> * promote {{BlobStore#deleteAll(JobID)}} to the {{BlobService}}
> * extend the {{BlobService}} to work with {{NAME_ADDRESSABLE}} blobs 
> (prepares FLINK-4399]
> * remove {{NAME_ADDRESSABLE}} blobs after job/task termination
> * do not fail the {{BlobServer}} when a delete operation fails
> * code style, like using {{Preconditions.checkArgument}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6008) collection of BlobServer improvements

2017-06-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16055456#comment-16055456
 ] 

ASF GitHub Bot commented on FLINK-6008:
---

GitHub user NicoK opened a pull request:

https://github.com/apache/flink/pull/4146

[FLINK-6008] collection of BlobServer improvements

This PR is a light-weight version of #3512 that only includes the 
improvements and can serve as a base for FLIP-19. It improves the following 
things around the `BlobServer`/`BlobCache`:

* replace config options in `config.md` with non-deprecated ones, e.g. 
`high-availability.cluster-id` and `high-availability.storageDir`
* do not fail the `BlobServer` when a delete operation fails
* add more unit tests
* general code style and docs improvements, like using 
`Preconditions.checkArgument`



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/NicoK/flink flink-6008b

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4146.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4146


commit ce719ee39fbbca7b7828c17d9792fc87d37450c7
Author: Nico Kruber 
Date:   2017-01-06T17:42:58Z

[FLINK-6008][docs] update some config options to the new, non-deprecated 
ones

commit 9efa8808e46adc1253f52a6a8cec6d3b4d29fee3
Author: Nico Kruber 
Date:   2016-12-20T15:49:57Z

[FLINK-6008][docs] minor improvements in the BlobService docs

commit ca3d533b0affa645ec93d40de378dadc829bbfe5
Author: Nico Kruber 
Date:   2016-12-20T17:27:13Z

[FLINK-6008] refactor BlobCache#getURL() for cleaner code

commit 0eededeb36dd833835753def7f4bb27c9d5fb67e
Author: Nico Kruber 
Date:   2017-03-09T17:14:02Z

[FLINK-6008] use Preconditions.checkArgument in BlobClient

commit 6249041a9db2b39ddf54e79a1aed5e7706e739c7
Author: Nico Kruber 
Date:   2016-12-21T15:23:29Z

[FLINK-6008] do not fail the BlobServer if delete fails

also extend the delete tests and remove one code duplication

commit e681239a538547f752d65358db1ebd2ba312b33c
Author: Nico Kruber 
Date:   2017-03-17T15:21:40Z

[FLINK-6008] fix concurrent job directory creation

also add according unit tests

commit 20beae2dbc91859e2ec724b35b20536dcd11fe90
Author: Nico Kruber 
Date:   2017-04-18T14:37:37Z

[FLINK-6008] some comments about BlobLibraryCacheManager cleanup

commit 8a33517fe6eb2fa932ab17cb0d82a3fa8d7b8d0b
Author: Nico Kruber 
Date:   2017-04-19T13:39:03Z

[hotfix] minor typos

commit 23889866ac21494fc4af90905ab1518cbe897118
Author: Nico Kruber 
Date:   2017-04-19T14:10:16Z

[FLINK-6008] further cleanup tests for BlobLibraryCacheManager

commit 01b1a245528c264a6061ed3a48b24c5a207369f6
Author: Nico Kruber 
Date:   2017-06-14T16:01:47Z

[FLINK-6008] do not guard a delete() call with a check for existence




> collection of BlobServer improvements
> -
>
> Key: FLINK-6008
> URL: https://issues.apache.org/jira/browse/FLINK-6008
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.3.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> The following things should be improved around the BlobServer/BlobCache:
> * update config uptions with non-deprecated ones, e.g. 
> {{high-availability.cluster-id}} and {{high-availability.storageDir}}
> * promote {{BlobStore#deleteAll(JobID)}} to the {{BlobService}}
> * extend the {{BlobService}} to work with {{NAME_ADDRESSABLE}} blobs 
> (prepares FLINK-4399]
> * remove {{NAME_ADDRESSABLE}} blobs after job/task termination
> * do not fail the {{BlobServer}} when a delete operation fails
> * code style, like using {{Preconditions.checkArgument}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6008) collection of BlobServer improvements

2017-06-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16053975#comment-16053975
 ] 

ASF GitHub Bot commented on FLINK-6008:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3512
  
I take it this PR will be subsumed in FLIP-19?


> collection of BlobServer improvements
> -
>
> Key: FLINK-6008
> URL: https://issues.apache.org/jira/browse/FLINK-6008
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.3.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> The following things should be improved around the BlobServer/BlobCache:
> * update config uptions with non-deprecated ones, e.g. 
> {{high-availability.cluster-id}} and {{high-availability.storageDir}}
> * promote {{BlobStore#deleteAll(JobID)}} to the {{BlobService}}
> * extend the {{BlobService}} to work with {{NAME_ADDRESSABLE}} blobs 
> (prepares FLINK-4399]
> * remove {{NAME_ADDRESSABLE}} blobs after job/task termination
> * do not fail the {{BlobServer}} when a delete operation fails
> * code style, like using {{Preconditions.checkArgument}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6008) collection of BlobServer improvements

2017-04-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15982583#comment-15982583
 ] 

ASF GitHub Bot commented on FLINK-6008:
---

Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/3512
  
After investigating a bit further, I noticed that this problem is actually 
a bit bigger:
Even in `FileSystemBlobStore`, there is no guarantee that a directory will 
not be deleted concurrently (from a `delete` method) between its creation and 
writing a file during (in a `get` method):
* the `delete` method for name-addressable blobs always deletes the 
job-specific storage directory if there is no further blob for this job
* the content-addressable blobs do that similarly but are shared among jobs 
and thus only delete directories if there is no other blob.

Since name-addressable blobs have not been used so far and the latter case 
typically does not occur concurrently with `get` requests, this has not been a 
problem so far.

Therefore, I suggest to create a separate JIRA for this (delete vs. get) 
concurrency issue and apply a quick fix for #3742.


> collection of BlobServer improvements
> -
>
> Key: FLINK-6008
> URL: https://issues.apache.org/jira/browse/FLINK-6008
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.3.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> The following things should be improved around the BlobServer/BlobCache:
> * update config uptions with non-deprecated ones, e.g. 
> {{high-availability.cluster-id}} and {{high-availability.storageDir}}
> * promote {{BlobStore#deleteAll(JobID)}} to the {{BlobService}}
> * extend the {{BlobService}} to work with {{NAME_ADDRESSABLE}} blobs 
> (prepares FLINK-4399]
> * remove {{NAME_ADDRESSABLE}} blobs after job/task termination
> * do not fail the {{BlobServer}} when a delete operation fails
> * code style, like using {{Preconditions.checkArgument}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6008) collection of BlobServer improvements

2017-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15975416#comment-15975416
 ] 

ASF GitHub Bot commented on FLINK-6008:
---

Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/3512
  
Found a race between `BlobCache#deleteAll(JobID)` and 
`BlobCache#getURL(BlobKey)` now that the former is actually being used - this 
needs to be fixed first before merging:

`BlobCache#deleteAll(JobID)` deletes the job directory which is only 
created at the start of `BlobCache#getURL(BlobKey)` which then relies on the 
directory being present.


> collection of BlobServer improvements
> -
>
> Key: FLINK-6008
> URL: https://issues.apache.org/jira/browse/FLINK-6008
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.3.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> The following things should be improved around the BlobServer/BlobCache:
> * update config uptions with non-deprecated ones, e.g. 
> {{high-availability.cluster-id}} and {{high-availability.storageDir}}
> * promote {{BlobStore#deleteAll(JobID)}} to the {{BlobService}}
> * extend the {{BlobService}} to work with {{NAME_ADDRESSABLE}} blobs 
> (prepares FLINK-4399]
> * remove {{NAME_ADDRESSABLE}} blobs after job/task termination
> * do not fail the {{BlobServer}} when a delete operation fails
> * code style, like using {{Preconditions.checkArgument}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6008) collection of BlobServer improvements

2017-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15974787#comment-15974787
 ] 

ASF GitHub Bot commented on FLINK-6008:
---

Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/3512
  
* I removed the exposed `BlobService` from the `LibraryCacheManager`
* Also, I developed a new cleanup story that removes blobs only if there 
are no tasks referring to the job ID anymore. The extension of keeping the 
files around for re-deployment may be added as part of a new re-deployment 
story when it is actually implemented.
* Regarding the suggestion of refactoring the `get(JobId, String)` methods, 
I created [FLINK-6329](https://issues.apache.org/jira/browse/FLINK-6329) as 
this change maybe a future improvement.




> collection of BlobServer improvements
> -
>
> Key: FLINK-6008
> URL: https://issues.apache.org/jira/browse/FLINK-6008
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.3.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> The following things should be improved around the BlobServer/BlobCache:
> * update config uptions with non-deprecated ones, e.g. 
> {{high-availability.cluster-id}} and {{high-availability.storageDir}}
> * promote {{BlobStore#deleteAll(JobID)}} to the {{BlobService}}
> * extend the {{BlobService}} to work with {{NAME_ADDRESSABLE}} blobs 
> (prepares FLINK-4399]
> * remove {{NAME_ADDRESSABLE}} blobs after job/task termination
> * do not fail the {{BlobServer}} when a delete operation fails
> * code style, like using {{Preconditions.checkArgument}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6008) collection of BlobServer improvements

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15930207#comment-15930207
 ] 

ASF GitHub Bot commented on FLINK-6008:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3512#discussion_r106675058
  
--- Diff: docs/setup/config.md ---
@@ -494,13 +494,13 @@ Previously this key was named `recovery.mode` and the 
default value was `standal
 
 - `high-availability.zookeeper.path.root`: (Default `/flink`) Defines the 
root dir under which the ZooKeeper HA mode will create namespace directories. 
Previously this ket was named `recovery.zookeeper.path.root`.
 
-- `high-availability.zookeeper.path.namespace`: (Default `/default_ns` in 
standalone cluster mode, or the  under YARN) Defines the 
subdirectory under the root dir where the ZooKeeper HA mode will create znodes. 
This allows to isolate multiple applications on the same ZooKeeper. Previously 
this key was named `recovery.zookeeper.path.namespace`.
+- `high-availability.cluster-id`: (Default `/default_ns` in standalone 
cluster mode, or the  under YARN) Defines the subdirectory 
under the root dir where the ZooKeeper HA mode will create znodes. This allows 
to isolate multiple applications on the same ZooKeeper. Previously this key was 
named `recovery.zookeeper.path.namespace` and 
`high-availability.zookeeper.path.namespace`.
--- End diff --

I would move these into ` ### High Availability (HA)` section, because they 
are independent of ZooKeeper


> collection of BlobServer improvements
> -
>
> Key: FLINK-6008
> URL: https://issues.apache.org/jira/browse/FLINK-6008
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.3.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> The following things should be improved around the BlobServer/BlobCache:
> * update config uptions with non-deprecated ones, e.g. 
> {{high-availability.cluster-id}} and {{high-availability.storageDir}}
> * promote {{BlobStore#deleteAll(JobID)}} to the {{BlobService}}
> * extend the {{BlobService}} to work with {{NAME_ADDRESSABLE}} blobs 
> (prepares FLINK-4399]
> * remove {{NAME_ADDRESSABLE}} blobs after job/task termination
> * do not fail the {{BlobServer}} when a delete operation fails
> * code style, like using {{Preconditions.checkArgument}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6008) collection of BlobServer improvements

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15930211#comment-15930211
 ] 

ASF GitHub Bot commented on FLINK-6008:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3512#discussion_r106680476
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -1305,6 +1305,9 @@ class TaskManager(
 s"${task.getExecutionState} to JobManager for task 
${task.getTaskInfo.getTaskName} " +
 s"(${task.getExecutionId})")
 
+  // delete all NAME_ADDRESSABLE BLOBs
+  libraryCacheManager.get.getBlobService.deleteAll(task.getJobID)
--- End diff --

Multiple tasks of the same job run in a TaskManager. This means that tasks 
delete each others blobs.


> collection of BlobServer improvements
> -
>
> Key: FLINK-6008
> URL: https://issues.apache.org/jira/browse/FLINK-6008
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.3.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> The following things should be improved around the BlobServer/BlobCache:
> * update config uptions with non-deprecated ones, e.g. 
> {{high-availability.cluster-id}} and {{high-availability.storageDir}}
> * promote {{BlobStore#deleteAll(JobID)}} to the {{BlobService}}
> * extend the {{BlobService}} to work with {{NAME_ADDRESSABLE}} blobs 
> (prepares FLINK-4399]
> * remove {{NAME_ADDRESSABLE}} blobs after job/task termination
> * do not fail the {{BlobServer}} when a delete operation fails
> * code style, like using {{Preconditions.checkArgument}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6008) collection of BlobServer improvements

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15930209#comment-15930209
 ] 

ASF GitHub Bot commented on FLINK-6008:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3512#discussion_r106674262
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java ---
@@ -180,91 +180,159 @@ public URL getURL(final BlobKey requiredBlob) throws 
IOException {
 
// fallback: download from the BlobServer
final byte[] buf = new byte[BlobServerProtocol.BUFFER_SIZE];
+   LOG.info("Downloading {} from {}", requiredBlob, serverAddress);
 
// loop over retries
int attempt = 0;
while (true) {
+   try (
+   final BlobClient bc = new 
BlobClient(serverAddress, blobClientConfig);
+   final InputStream is = bc.get(requiredBlob);
+   final OutputStream os = new 
FileOutputStream(localJarFile)
+   ) {
+   getURLTransferFile(buf, is, os);
+
+   // success, we finished
+   return localJarFile.toURI().toURL();
+   }
+   catch (Throwable t) {
+   getURLOnException(requiredBlob.toString(), 
localJarFile, attempt, t);
 
-   if (attempt == 0) {
-   LOG.info("Downloading {} from {}", 
requiredBlob, serverAddress);
-   } else {
+   // retry
+   ++attempt;
LOG.info("Downloading {} from {} (retry {})", 
requiredBlob, serverAddress, attempt);
}
+   } // end loop over retries
+   }
 
-   try {
-   BlobClient bc = null;
-   InputStream is = null;
-   OutputStream os = null;
+   /**
+* Returns the URL for the BLOB with the given parameters. The method 
will first attempt to
+* serve the BLOB from its local cache. If the BLOB is not in the 
cache, the method will try
+* to download it from this cache's BLOB server.
+*
+* @param jobId JobID of the file in the blob store
+* @param key   String key of the file in the blob store
+* @return URL referring to the local storage location of the BLOB.
+* @throws java.io.FileNotFoundException if the path does not exist;
+* @throws IOException Thrown if an I/O error occurs while downloading 
the BLOBs from the BLOB server.
+*/
+   public URL getURL(final JobID jobId, final String key) throws 
IOException {
+   checkArgument(jobId != null, "Job id cannot be null.");
+   checkArgument(key != null, "BLOB name cannot be null.");
 
-   try {
-   bc = new BlobClient(serverAddress, 
blobClientConfig);
-   is = bc.get(requiredBlob);
-   os = new FileOutputStream(localJarFile);
-
-   while (true) {
-   final int read = is.read(buf);
-   if (read < 0) {
-   break;
-   }
-   os.write(buf, 0, read);
-   }
-
-   // we do explicitly not use a finally 
block, because we want the closing
-   // in the regular case to throw 
exceptions and cause the writing to fail.
-   // But, the closing on exception should 
not throw further exceptions and
-   // let us keep the root exception
-   os.close();
-   os = null;
-   is.close();
-   is = null;
-   bc.close();
-   bc = null;
-
-   // success, we finished
-   return localJarFile.toURI().toURL();
-   }
-   catch (Throwable t) {
-   // we use "catch (Throwable)" to keep 
the root exception. Otherwise that exc

[jira] [Commented] (FLINK-6008) collection of BlobServer improvements

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15930210#comment-15930210
 ] 

ASF GitHub Bot commented on FLINK-6008:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3512#discussion_r106674837
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java ---
@@ -180,91 +180,159 @@ public URL getURL(final BlobKey requiredBlob) throws 
IOException {
 
// fallback: download from the BlobServer
final byte[] buf = new byte[BlobServerProtocol.BUFFER_SIZE];
+   LOG.info("Downloading {} from {}", requiredBlob, serverAddress);
 
// loop over retries
int attempt = 0;
while (true) {
+   try (
+   final BlobClient bc = new 
BlobClient(serverAddress, blobClientConfig);
+   final InputStream is = bc.get(requiredBlob);
+   final OutputStream os = new 
FileOutputStream(localJarFile)
+   ) {
+   getURLTransferFile(buf, is, os);
+
+   // success, we finished
+   return localJarFile.toURI().toURL();
+   }
+   catch (Throwable t) {
+   getURLOnException(requiredBlob.toString(), 
localJarFile, attempt, t);
 
-   if (attempt == 0) {
-   LOG.info("Downloading {} from {}", 
requiredBlob, serverAddress);
-   } else {
+   // retry
+   ++attempt;
LOG.info("Downloading {} from {} (retry {})", 
requiredBlob, serverAddress, attempt);
}
+   } // end loop over retries
+   }
 
-   try {
-   BlobClient bc = null;
-   InputStream is = null;
-   OutputStream os = null;
+   /**
+* Returns the URL for the BLOB with the given parameters. The method 
will first attempt to
+* serve the BLOB from its local cache. If the BLOB is not in the 
cache, the method will try
+* to download it from this cache's BLOB server.
+*
+* @param jobId JobID of the file in the blob store
+* @param key   String key of the file in the blob store
+* @return URL referring to the local storage location of the BLOB.
+* @throws java.io.FileNotFoundException if the path does not exist;
+* @throws IOException Thrown if an I/O error occurs while downloading 
the BLOBs from the BLOB server.
+*/
+   public URL getURL(final JobID jobId, final String key) throws 
IOException {
+   checkArgument(jobId != null, "Job id cannot be null.");
+   checkArgument(key != null, "BLOB name cannot be null.");
 
-   try {
-   bc = new BlobClient(serverAddress, 
blobClientConfig);
-   is = bc.get(requiredBlob);
-   os = new FileOutputStream(localJarFile);
-
-   while (true) {
-   final int read = is.read(buf);
-   if (read < 0) {
-   break;
-   }
-   os.write(buf, 0, read);
-   }
-
-   // we do explicitly not use a finally 
block, because we want the closing
-   // in the regular case to throw 
exceptions and cause the writing to fail.
-   // But, the closing on exception should 
not throw further exceptions and
-   // let us keep the root exception
-   os.close();
-   os = null;
-   is.close();
-   is = null;
-   bc.close();
-   bc = null;
-
-   // success, we finished
-   return localJarFile.toURI().toURL();
-   }
-   catch (Throwable t) {
-   // we use "catch (Throwable)" to keep 
the root exception. Otherwise that exc

[jira] [Commented] (FLINK-6008) collection of BlobServer improvements

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15930206#comment-15930206
 ] 

ASF GitHub Bot commented on FLINK-6008:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3512#discussion_r106677990
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java ---
@@ -400,6 +418,47 @@ public void delete(BlobKey key) throws IOException {
}
 
/**
+* Deletes the file associated with the given job and key if it exists 
in the local
+* storage of the blob server.
+*
+* @param jobId JobID of the file in the blob store
+* @param key   String key of the file in the blob store
+*/
+   @Override
+   public void delete(JobID jobId, String key) {
+   checkArgument(jobId != null, "Job id must not be null.");
+   checkArgument(key != null, "BLOB name must not be null.");
+
+   final File localFile = BlobUtils.getStorageLocation(storageDir, 
jobId, key);
+
+   if (localFile.exists()) {
+   if (!localFile.delete()) {
+   LOG.warn("Failed to delete locally BLOB " + key 
+ " at " + localFile.getAbsolutePath());
+   }
+   }
+
+   blobStore.delete(jobId, key);
+   }
+
+   /**
+* Deletes all files associated with the given job id from the storage.
+*
+* @param jobId JobID of the files in the blob store
+*/
+   @Override
+   public void deleteAll(final JobID jobId) {
+   checkArgument(jobId != null, "Job id must not be null.");
+
+   try {
+   BlobUtils.deleteJobDirectory(storageDir, jobId);
+   } catch (IOException e) {
--- End diff --

If we want to make sure we cleanup in any case, we can actually catch 
`Exception` here.


> collection of BlobServer improvements
> -
>
> Key: FLINK-6008
> URL: https://issues.apache.org/jira/browse/FLINK-6008
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.3.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> The following things should be improved around the BlobServer/BlobCache:
> * update config uptions with non-deprecated ones, e.g. 
> {{high-availability.cluster-id}} and {{high-availability.storageDir}}
> * promote {{BlobStore#deleteAll(JobID)}} to the {{BlobService}}
> * extend the {{BlobService}} to work with {{NAME_ADDRESSABLE}} blobs 
> (prepares FLINK-4399]
> * remove {{NAME_ADDRESSABLE}} blobs after job/task termination
> * do not fail the {{BlobServer}} when a delete operation fails
> * code style, like using {{Preconditions.checkArgument}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6008) collection of BlobServer improvements

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15930208#comment-15930208
 ] 

ASF GitHub Bot commented on FLINK-6008:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3512#discussion_r106677799
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java ---
@@ -400,6 +418,47 @@ public void delete(BlobKey key) throws IOException {
}
 
/**
+* Deletes the file associated with the given job and key if it exists 
in the local
+* storage of the blob server.
+*
+* @param jobId JobID of the file in the blob store
+* @param key   String key of the file in the blob store
+*/
+   @Override
+   public void delete(JobID jobId, String key) {
+   checkArgument(jobId != null, "Job id must not be null.");
+   checkArgument(key != null, "BLOB name must not be null.");
+
+   final File localFile = BlobUtils.getStorageLocation(storageDir, 
jobId, key);
+
+   if (localFile.exists()) {
--- End diff --

From concurrency safety, it better to do `if (!delete && exists)`


> collection of BlobServer improvements
> -
>
> Key: FLINK-6008
> URL: https://issues.apache.org/jira/browse/FLINK-6008
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.3.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> The following things should be improved around the BlobServer/BlobCache:
> * update config uptions with non-deprecated ones, e.g. 
> {{high-availability.cluster-id}} and {{high-availability.storageDir}}
> * promote {{BlobStore#deleteAll(JobID)}} to the {{BlobService}}
> * extend the {{BlobService}} to work with {{NAME_ADDRESSABLE}} blobs 
> (prepares FLINK-4399]
> * remove {{NAME_ADDRESSABLE}} blobs after job/task termination
> * do not fail the {{BlobServer}} when a delete operation fails
> * code style, like using {{Preconditions.checkArgument}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6008) collection of BlobServer improvements

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15905376#comment-15905376
 ] 

ASF GitHub Bot commented on FLINK-6008:
---

GitHub user NicoK opened a pull request:

https://github.com/apache/flink/pull/3512

[FLINK-6008] collection of BlobServer improvements

This PR improves the following things around the `BlobServer`/`BlobCache`:

* replaces config uptions in `config.md` with non-deprecated ones, e.g. 
`high-availability.cluster-id` and `high-availability.storageDir`
* promote `BlobStore#deleteAll(JobID)` to the `BlobService`
* extend the `BlobService` to work with `NAME_ADDRESSABLE` blobs (prepares 
for FLINK-4399]
* remove `NAME_ADDRESSABLE` blobs after job/task termination
* add more unit tests for `NAME_ADDRESSABLE` blobs
* do not fail the `BlobServer` when a delete operation fails
* general code style and docs improvements, like using 
`Preconditions.checkArgument`



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/NicoK/flink flink-6008

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3512.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3512


commit 8cfbe97df3f7c8fa268f5c19291174a99e3cf943
Author: Nico Kruber 
Date:   2016-12-20T15:49:57Z

[FLINK-6008][docs] minor improvements in the BlobService docs

commit a72b31474fd38f27e5cc582b3c2797fa51695e38
Author: Nico Kruber 
Date:   2017-01-06T17:42:58Z

[FLINK-6008][docs] update some config options to the new, non-deprecated 
ones

commit a6af4e0b393a8684984a6adada7e6eff4f99ac18
Author: Nico Kruber 
Date:   2016-12-20T17:27:13Z

[FLINK-6008] refactor BlobCache#getURL() for cleaner code

commit 69247739e127f8c941e352c07a0be6e03ecea1d1
Author: Nico Kruber 
Date:   2016-12-20T17:52:19Z

[FLINK-6008] extend the BlobService to the NAME_ADDRESSABLE blobs

These blobs are referenced by the job ID and a selected name instead of the
hash sum of the blob's contents. Some code was already prepared but lacked
the proper additions in further APIs. This commit adds some.

commit 9913ae86b854e1c5b3dca404824ab9a70cc32db6
Author: Nico Kruber 
Date:   2016-12-21T15:23:29Z

[FLINK-6008] promote BlobStore#deleteAll(JobID) to the BlobService

commit d96e6d43ac637149e9d1077c6dee3801d30f679a
Author: Nico Kruber 
Date:   2017-03-09T17:14:02Z

[FLINK-6008] use Preconditions.checkArgument in BlobClient

commit 6d53e3ff87110601eb1a71d60f850e6089930141
Author: Nico Kruber 
Date:   2016-12-21T16:59:27Z

[FLINK-6008] properly remove NAME_ADDRESSABLE blobs after job/task 
termination

commit 5ef5a74db3f6753437b585823b037e25e23a61ba
Author: Nico Kruber 
Date:   2017-03-09T18:14:52Z

[FLINK-6008] more unit tests for NAME_ADDRESSABLE and BlobService access

NAME_ADDRESSABLE blobs were not that thouroughly tested before and also the
access methods that the BlobService implementations provide. This adds tests
covering both.

commit 34857456a43ec5a2ccb5166bd379f263cd54697d
Author: Nico Kruber 
Date:   2017-03-09T17:15:08Z

[FLINK-6008] do not fail the BlobServer when delete fails

This also enables us to reuse some more code between BlobServerConnection 
and
BlobServer.

commit e55ab0f37005ef37065b8156f59e4b8db1a7b95f
Author: Nico Kruber 
Date:   2017-03-09T17:32:14Z

[FLINK-6008] refactor BlobCache#deleteGlobal() for cleaner code




> collection of BlobServer improvements
> -
>
> Key: FLINK-6008
> URL: https://issues.apache.org/jira/browse/FLINK-6008
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.3.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> The following things should be improved around the BlobServer/BlobCache:
> * update config uptions with non-deprecated ones, e.g. 
> {{high-availability.cluster-id}} and {{high-availability.storageDir}}
> * promote {{BlobStore#deleteAll(JobID)}} to the {{BlobService}}
> * extend the {{BlobService}} to work with {{NAME_ADDRESSABLE}} blobs 
> (prepares FLINK-4399]
> * remove {{NAME_ADDRESSABLE}} blobs after job/task termination
> * do not fail the {{BlobServer}} when a delete operation fails
> * code style, like using {{Preconditions.checkArgument}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)