Repository: flink Updated Branches: refs/heads/master 56fef58a3 -> c74d8cac2
[FLINK-8818][yarn/s3][tests] harden YarnFileStageTest upload test for eventual consistent read-after-write In case the newly written object cannot be read (yet), we do 4 more retries to retrieve the value and wait 50ms each. While this does not solve all the cases it should make the (rare) case of the written object not being available for read even more unlikely. This closes #5601. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c74d8cac Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c74d8cac Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c74d8cac Branch: refs/heads/master Commit: c74d8cac25e8c025ced11a9c03cd0cf07a8c2d6b Parents: 56fef58 Author: Nico Kruber <n...@data-artisans.com> Authored: Tue Feb 27 17:29:00 2018 +0100 Committer: Stefan Richter <s.rich...@data-artisans.com> Committed: Mon Mar 5 12:17:53 2018 +0100 ---------------------------------------------------------------------- .../apache/flink/yarn/YarnFileStageTest.java | 25 ++++++++++++++------ 1 file changed, 18 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/c74d8cac/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTest.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTest.java index 5cbe1be..527782c 100644 --- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTest.java +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTest.java @@ -41,6 +41,7 @@ import org.junit.rules.TemporaryFolder; import java.io.DataOutputStream; import java.io.File; +import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.util.ArrayList; import java.util.Collections; @@ -200,13 +201,23 @@ public class YarnFileStageTest extends TestLogger { while (targetFilesIterator.hasNext()) { LocatedFileStatus targetFile = targetFilesIterator.next(); - try (FSDataInputStream in = targetFileSystem.open(targetFile.getPath())) { - String absolutePathString = targetFile.getPath().toString(); - String relativePath = absolutePathString.substring(workDirPrefixLength); - targetFiles.put(relativePath, in.readUTF()); - - assertEquals("extraneous data in file " + relativePath, -1, in.read()); - } + int retries = 5; + do { + try (FSDataInputStream in = targetFileSystem.open(targetFile.getPath())) { + String absolutePathString = targetFile.getPath().toString(); + String relativePath = absolutePathString.substring(workDirPrefixLength); + targetFiles.put(relativePath, in.readUTF()); + + assertEquals("extraneous data in file " + relativePath, -1, in.read()); + break; + } catch (FileNotFoundException e) { + // For S3, read-after-write may be eventually consistent, i.e. when trying + // to access the object before writing it; see + // https://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html#ConsistencyModel + // -> try again a bit later + Thread.sleep(50); + } + } while ((retries--) > 0); } assertThat(targetFiles, equalTo(srcFiles));