Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5701#discussion_r175751892
  
    --- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
 ---
    @@ -194,49 +187,35 @@ protected final void restoreAndExecute(
                        String savepointPath,
                        Tuple2<String, Integer>... expectedAccumulators) throws 
Exception {
     
    -           // Retrieve the job manager
    -           Await.result(cluster.leaderGateway().future(), 
DEADLINE.timeLeft());
    +           ClusterClient<?> client = 
miniClusterResource.getClusterClient();
    +           client.setDetached(true);
     
                // Submit the job
                JobGraph jobGraph = env.getStreamGraph().getJobGraph();
     
                
jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
     
    -           JobSubmissionResult jobSubmissionResult = 
cluster.submitJobDetached(jobGraph);
    -
    -           StandaloneClusterClient clusterClient = new 
StandaloneClusterClient(cluster.configuration());
    -           JobListeningContext jobListeningContext = 
clusterClient.connectToJob(jobSubmissionResult.getJobID());
    +           JobSubmissionResult jobSubmissionResult = 
client.submitJob(jobGraph, SavepointMigrationTestBase.class.getClassLoader());
     
                boolean done = false;
                while (DEADLINE.hasTimeLeft()) {
     
                        // try and get a job result, this will fail if the job 
already failed. Use this
                        // to get out of this loop
                        JobID jobId = jobSubmissionResult.getJobID();
    -                   FiniteDuration timeout = FiniteDuration.apply(5, 
TimeUnit.SECONDS);
     
                        try {
    +                           CompletableFuture<JobStatus> jobStatusFuture = 
client.getJobStatus(jobSubmissionResult.getJobID());
     
    -                           Future<Object> future = clusterClient
    -                                           .getJobManagerGateway()
    -                                           
.ask(JobManagerMessages.getRequestJobStatus(jobSubmissionResult.getJobID()), 
timeout);
    -
    -                           Object result = Await.result(future, timeout);
    +                           JobStatus jobStatus = jobStatusFuture.get(5, 
TimeUnit.SECONDS);
     
    -                           if (result instanceof 
JobManagerMessages.CurrentJobStatus) {
    -                                   if 
(((JobManagerMessages.CurrentJobStatus) result).status() == JobStatus.FAILED) {
    -                                           Object jobResult = Await.result(
    -                                                           
jobListeningContext.getJobResultFuture(),
    -                                                           
Duration.apply(5, TimeUnit.SECONDS));
    -                                           fail("Job failed: " + 
jobResult);
    -                                   }
    -                           }
    +                           assertNotEquals(JobStatus.FAILED, jobStatus);
                        } catch (Exception e) {
                                fail("Could not connect to job: " + e);
                        }
     
                        Thread.sleep(100);
    --- End diff --
    
    If the job reaches `JobStatus.FAILED` the test fails. 
`assertNotEquals(JobStatus.FAILED, jobStatus);`
    
    We're polling the accumulators in a loop so long as the job is not failed 
and the deadline wasn't reached yet.


---

Reply via email to