Repository: flink
Updated Branches:
  refs/heads/master 56fef58a3 -> c74d8cac2


[FLINK-8818][yarn/s3][tests] harden YarnFileStageTest upload test for eventual 
consistent read-after-write

In case the newly written object cannot be read (yet), we do 4 more retries to
retrieve the value and wait 50ms each. While this does not solve all the cases
it should make the (rare) case of the written object not being available for
read even more unlikely.

This closes #5601.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c74d8cac
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c74d8cac
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c74d8cac

Branch: refs/heads/master
Commit: c74d8cac25e8c025ced11a9c03cd0cf07a8c2d6b
Parents: 56fef58
Author: Nico Kruber <n...@data-artisans.com>
Authored: Tue Feb 27 17:29:00 2018 +0100
Committer: Stefan Richter <s.rich...@data-artisans.com>
Committed: Mon Mar 5 12:17:53 2018 +0100

----------------------------------------------------------------------
 .../apache/flink/yarn/YarnFileStageTest.java    | 25 ++++++++++++++------
 1 file changed, 18 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c74d8cac/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTest.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTest.java 
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTest.java
index 5cbe1be..527782c 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTest.java
@@ -41,6 +41,7 @@ import org.junit.rules.TemporaryFolder;
 
 import java.io.DataOutputStream;
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -200,13 +201,23 @@ public class YarnFileStageTest extends TestLogger {
                        while (targetFilesIterator.hasNext()) {
                                LocatedFileStatus targetFile = 
targetFilesIterator.next();
 
-                               try (FSDataInputStream in = 
targetFileSystem.open(targetFile.getPath())) {
-                                       String absolutePathString = 
targetFile.getPath().toString();
-                                       String relativePath = 
absolutePathString.substring(workDirPrefixLength);
-                                       targetFiles.put(relativePath, 
in.readUTF());
-
-                                       assertEquals("extraneous data in file " 
+ relativePath, -1, in.read());
-                               }
+                               int retries = 5;
+                               do {
+                                       try (FSDataInputStream in = 
targetFileSystem.open(targetFile.getPath())) {
+                                               String absolutePathString = 
targetFile.getPath().toString();
+                                               String relativePath = 
absolutePathString.substring(workDirPrefixLength);
+                                               targetFiles.put(relativePath, 
in.readUTF());
+
+                                               assertEquals("extraneous data 
in file " + relativePath, -1, in.read());
+                                               break;
+                                       } catch (FileNotFoundException e) {
+                                               // For S3, read-after-write may 
be eventually consistent, i.e. when trying
+                                               // to access the object before 
writing it; see
+                                               // 
https://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html#ConsistencyModel
+                                               // -> try again a bit later
+                                               Thread.sleep(50);
+                                       }
+                               } while ((retries--) > 0);
                        }
 
                        assertThat(targetFiles, equalTo(srcFiles));

Reply via email to