This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new b82acf9 [FLINK-20461][tests] Check replication factor before asking for JobResult b82acf9 is described below commit b82acf9352896a1cd0b1371fbdd6c4439d5c8c6c Author: Till Rohrmann <trohrm...@apache.org> AuthorDate: Sat Aug 21 13:19:35 2021 +0200 [FLINK-20461][tests] Check replication factor before asking for JobResult This commit hardens the YARNFileReplicationITCase by checking the replication factor before asking for the JobResult. If done in the reverse order, then it can happen that the Flink application has already terminated before doing the file replication check because the per-job mode has already delivered the JobResult. This closes #16917. --- .../org/apache/flink/yarn/YARNFileReplicationITCase.java | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNFileReplicationITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNFileReplicationITCase.java index 29a7635..f6f37be 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNFileReplicationITCase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNFileReplicationITCase.java @@ -50,6 +50,7 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; /** * Test cases for the deployment of Yarn Flink clusters with customized file replication numbers. @@ -108,6 +109,8 @@ public class YARNFileReplicationITCase extends YarnTestBase { ApplicationId applicationId = clusterClient.getClusterId(); + extraVerification(configuration, applicationId); + final CompletableFuture<JobResult> jobResultCompletableFuture = clusterClient.requestJobResult(jobGraph.getJobID()); @@ -125,8 +128,6 @@ public class YARNFileReplicationITCase extends YarnTestBase { .getClassLoader())); }); - extraVerification(configuration, applicationId); - waitApplicationFinishedElseKillIt( applicationId, yarnAppTerminateTimeout, @@ -162,6 +163,13 @@ public class YARNFileReplicationITCase extends YarnTestBase { String suffix = ".flink/" + applicationId.toString() + "/" + flinkUberjar.getName(); Path uberJarHDFSPath = new Path(fs.getHomeDirectory(), suffix); + + assertTrue( + "The Flink uber jar needs to exist. If it does not exist, then this " + + "indicates that the Flink cluster has already terminated and Yarn has " + + "already deleted the working directory.", + fs.exists(uberJarHDFSPath)); + FileStatus fsStatus = fs.getFileStatus(uberJarHDFSPath); final int flinkFileReplication =