This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch release-1.13 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.13 by this push: new e75679d [FLINK-20461][tests] Check replication factor before asking for JobResult e75679d is described below commit e75679d2635e859a6a5f97419777c857ba5321df Author: Till Rohrmann <trohrm...@apache.org> AuthorDate: Sat Aug 21 13:19:35 2021 +0200 [FLINK-20461][tests] Check replication factor before asking for JobResult This commit hardens the YARNFileReplicationITCase by checking the replication factor before asking for the JobResult. If done in the reverse order, then it can happen that the Flink application has already terminated before doing the file replication check because the per-job mode has already delivered the JobResult. This closes #16917. --- .../org/apache/flink/yarn/YARNFileReplicationITCase.java | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNFileReplicationITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNFileReplicationITCase.java index 85cde8c..fdaf8bb 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNFileReplicationITCase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNFileReplicationITCase.java @@ -50,6 +50,7 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; /** * Test cases for the deployment of Yarn Flink clusters with customized file replication numbers. @@ -108,6 +109,8 @@ public class YARNFileReplicationITCase extends YarnTestBase { ApplicationId applicationId = clusterClient.getClusterId(); + extraVerification(configuration, applicationId); + final CompletableFuture<JobResult> jobResultCompletableFuture = clusterClient.requestJobResult(jobGraph.getJobID()); @@ -125,8 +128,6 @@ public class YARNFileReplicationITCase extends YarnTestBase { .getClassLoader())); }); - extraVerification(configuration, applicationId); - waitApplicationFinishedElseKillIt( applicationId, yarnAppTerminateTimeout, @@ -162,6 +163,13 @@ public class YARNFileReplicationITCase extends YarnTestBase { String suffix = ".flink/" + applicationId.toString() + "/" + flinkUberjar.getName(); Path uberJarHDFSPath = new Path(fs.getHomeDirectory(), suffix); + + assertTrue( + "The Flink uber jar needs to exist. If it does not exist, then this " + + "indicates that the Flink cluster has already terminated and Yarn has " + + "already deleted the working directory.", + fs.exists(uberJarHDFSPath)); + FileStatus fsStatus = fs.getFileStatus(uberJarHDFSPath); final int flinkFileReplication =