This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 9ba3b1a5863 [FLINK-31527][tests] Stabilize ChangelogRescalingITCase
9ba3b1a5863 is described below

commit 9ba3b1a5863c1aeeca0be25b4bb375abfe02b940
Author: Roman Khachatryan <khachatryan.ro...@gmail.com>
AuthorDate: Tue Mar 21 21:08:27 2023 +0000

    [FLINK-31527][tests] Stabilize ChangelogRescalingITCase
---
 .../org/apache/flink/test/state/ChangelogRescalingITCase.java  | 10 ++++++++--
 1 file changed, 8 insertions(+), 2 deletions(-)

diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogRescalingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogRescalingITCase.java
index 6891eafadae..10ba869f6a8 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogRescalingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogRescalingITCase.java
@@ -329,7 +329,7 @@ public class ChangelogRescalingITCase extends TestLogger {
     private String checkpointAndCancel(JobID jobID) throws Exception {
         waitForCheckpoint(jobID, cluster.getMiniCluster(), 1);
         cluster.getClusterClient().cancel(jobID).get();
-        checkStatus(jobID);
+        waitForSuccessfulTermination(jobID);
         return CommonTestUtils.getLatestCompletedCheckpointPath(jobID, 
cluster.getMiniCluster())
                 .<NoSuchElementException>orElseThrow(
                         () -> {
@@ -337,7 +337,13 @@ public class ChangelogRescalingITCase extends TestLogger {
                         });
     }
 
-    private void checkStatus(JobID jobID) throws InterruptedException, 
ExecutionException {
+    private void waitForSuccessfulTermination(JobID jobID) throws Exception {
+        CommonTestUtils.waitUntilCondition(
+                () ->
+                        cluster.getClusterClient()
+                                .getJobStatus(jobID)
+                                .get()
+                                .isGloballyTerminalState());
         if 
(cluster.getClusterClient().getJobStatus(jobID).get().isGloballyTerminalState())
 {
             cluster.getClusterClient()
                     .requestJobResult(jobID)

Reply via email to