Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5715#discussion_r176663967
  
    --- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
 ---
    @@ -528,54 +454,44 @@ public void 
testSavepointRescalingPartitionedOperatorState(boolean scaleOut, Ope
                }
     
                try {
    -                   jobManager = 
cluster.getLeaderGateway(deadline.timeLeft());
    -
                        JobGraph jobGraph = 
createJobGraphWithOperatorState(parallelism, maxParallelism, checkpointMethod);
     
    -                   jobID = jobGraph.getJobID();
    -
    -                   cluster.submitJobDetached(jobGraph);
    +                   final JobID jobID = jobGraph.getJobID();
     
    -                   Object savepointResponse = null;
    +                   client.setDetached(true);
    +                   client.submitJob(jobGraph, 
RescalingITCase.class.getClassLoader());
     
                        // wait until the operator is started
                        StateSourceBase.workStartedLatch.await();
     
    -                   while (deadline.hasTimeLeft()) {
    -
    -                           Future<Object> savepointPathFuture = 
jobManager.ask(new JobManagerMessages.TriggerSavepoint(jobID, 
Option.<String>empty()), deadline.timeLeft());
    -                           FiniteDuration waitingTime = new 
FiniteDuration(10, TimeUnit.SECONDS);
    -                           savepointResponse = 
Await.result(savepointPathFuture, waitingTime);
    -
    -                           if (savepointResponse instanceof 
JobManagerMessages.TriggerSavepointSuccess) {
    -                                   break;
    -                           }
    -                   }
    -
    -                   assertTrue(savepointResponse instanceof 
JobManagerMessages.TriggerSavepointSuccess);
    -
    -                   final String savepointPath = 
((JobManagerMessages.TriggerSavepointSuccess) 
savepointResponse).savepointPath();
    -
    -                   Future<Object> jobRemovedFuture = jobManager.ask(new 
TestingJobManagerMessages.NotifyWhenJobRemoved(jobID), deadline.timeLeft());
    -
    -                   Future<Object> cancellationResponseFuture = 
jobManager.ask(new JobManagerMessages.CancelJob(jobID), deadline.timeLeft());
    -
    -                   Object cancellationResponse = 
Await.result(cancellationResponseFuture, deadline.timeLeft());
    +                   CompletableFuture<String> savepointPathFuture = 
FutureUtils.retryWithDelay(
    +                           () -> {
    +                                   try {
    +                                           return 
client.triggerSavepoint(jobID, null);
    +                                   } catch (FlinkException e) {
    +                                           throw new RuntimeException(e);
    --- End diff --
    
    yes, I'll change it and merge afterwards.


---

Reply via email to