This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new b82acf9  [FLINK-20461][tests] Check replication factor before asking 
for JobResult
b82acf9 is described below

commit b82acf9352896a1cd0b1371fbdd6c4439d5c8c6c
Author: Till Rohrmann <trohrm...@apache.org>
AuthorDate: Sat Aug 21 13:19:35 2021 +0200

    [FLINK-20461][tests] Check replication factor before asking for JobResult
    
    This commit hardens the YARNFileReplicationITCase by checking the 
replication factor before
    asking for the JobResult. If done in the reverse order, then it can happen 
that the Flink application
    has already terminated before doing the file replication check because the 
per-job mode has already
    delivered the JobResult.
    
    This closes #16917.
---
 .../org/apache/flink/yarn/YARNFileReplicationITCase.java     | 12 ++++++++++--
 1 file changed, 10 insertions(+), 2 deletions(-)

diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNFileReplicationITCase.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNFileReplicationITCase.java
index 29a7635..f6f37be 100644
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNFileReplicationITCase.java
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNFileReplicationITCase.java
@@ -50,6 +50,7 @@ import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Test cases for the deployment of Yarn Flink clusters with customized file 
replication numbers.
@@ -108,6 +109,8 @@ public class YARNFileReplicationITCase extends YarnTestBase 
{
 
                 ApplicationId applicationId = clusterClient.getClusterId();
 
+                extraVerification(configuration, applicationId);
+
                 final CompletableFuture<JobResult> jobResultCompletableFuture =
                         clusterClient.requestJobResult(jobGraph.getJobID());
 
@@ -125,8 +128,6 @@ public class YARNFileReplicationITCase extends YarnTestBase 
{
                                                             
.getClassLoader()));
                                 });
 
-                extraVerification(configuration, applicationId);
-
                 waitApplicationFinishedElseKillIt(
                         applicationId,
                         yarnAppTerminateTimeout,
@@ -162,6 +163,13 @@ public class YARNFileReplicationITCase extends 
YarnTestBase {
         String suffix = ".flink/" + applicationId.toString() + "/" + 
flinkUberjar.getName();
 
         Path uberJarHDFSPath = new Path(fs.getHomeDirectory(), suffix);
+
+        assertTrue(
+                "The Flink uber jar needs to exist. If it does not exist, then 
this "
+                        + "indicates that the Flink cluster has already 
terminated and Yarn has "
+                        + "already deleted the working directory.",
+                fs.exists(uberJarHDFSPath));
+
         FileStatus fsStatus = fs.getFileStatus(uberJarHDFSPath);
 
         final int flinkFileReplication =

Reply via email to