This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
     new e75679d  [FLINK-20461][tests] Check replication factor before asking 
for JobResult
e75679d is described below

commit e75679d2635e859a6a5f97419777c857ba5321df
Author: Till Rohrmann <trohrm...@apache.org>
AuthorDate: Sat Aug 21 13:19:35 2021 +0200

    [FLINK-20461][tests] Check replication factor before asking for JobResult
    
    This commit hardens the YARNFileReplicationITCase by checking the 
replication factor before
    asking for the JobResult. If done in the reverse order, then it can happen 
that the Flink application
    has already terminated before doing the file replication check because the 
per-job mode has already
    delivered the JobResult.
    
    This closes #16917.
---
 .../org/apache/flink/yarn/YARNFileReplicationITCase.java     | 12 ++++++++++--
 1 file changed, 10 insertions(+), 2 deletions(-)

diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNFileReplicationITCase.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNFileReplicationITCase.java
index 85cde8c..fdaf8bb 100644
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNFileReplicationITCase.java
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNFileReplicationITCase.java
@@ -50,6 +50,7 @@ import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Test cases for the deployment of Yarn Flink clusters with customized file 
replication numbers.
@@ -108,6 +109,8 @@ public class YARNFileReplicationITCase extends YarnTestBase 
{
 
                 ApplicationId applicationId = clusterClient.getClusterId();
 
+                extraVerification(configuration, applicationId);
+
                 final CompletableFuture<JobResult> jobResultCompletableFuture =
                         clusterClient.requestJobResult(jobGraph.getJobID());
 
@@ -125,8 +128,6 @@ public class YARNFileReplicationITCase extends YarnTestBase 
{
                                                             
.getClassLoader()));
                                 });
 
-                extraVerification(configuration, applicationId);
-
                 waitApplicationFinishedElseKillIt(
                         applicationId,
                         yarnAppTerminateTimeout,
@@ -162,6 +163,13 @@ public class YARNFileReplicationITCase extends 
YarnTestBase {
         String suffix = ".flink/" + applicationId.toString() + "/" + 
flinkUberjar.getName();
 
         Path uberJarHDFSPath = new Path(fs.getHomeDirectory(), suffix);
+
+        assertTrue(
+                "The Flink uber jar needs to exist. If it does not exist, then 
this "
+                        + "indicates that the Flink cluster has already 
terminated and Yarn has "
+                        + "already deleted the working directory.",
+                fs.exists(uberJarHDFSPath));
+
         FileStatus fsStatus = fs.getFileStatus(uberJarHDFSPath);
 
         final int flinkFileReplication =

Reply via email to