Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5715#discussion_r176663967 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java --- @@ -528,54 +454,44 @@ public void testSavepointRescalingPartitionedOperatorState(boolean scaleOut, Ope } try { - jobManager = cluster.getLeaderGateway(deadline.timeLeft()); - JobGraph jobGraph = createJobGraphWithOperatorState(parallelism, maxParallelism, checkpointMethod); - jobID = jobGraph.getJobID(); - - cluster.submitJobDetached(jobGraph); + final JobID jobID = jobGraph.getJobID(); - Object savepointResponse = null; + client.setDetached(true); + client.submitJob(jobGraph, RescalingITCase.class.getClassLoader()); // wait until the operator is started StateSourceBase.workStartedLatch.await(); - while (deadline.hasTimeLeft()) { - - Future<Object> savepointPathFuture = jobManager.ask(new JobManagerMessages.TriggerSavepoint(jobID, Option.<String>empty()), deadline.timeLeft()); - FiniteDuration waitingTime = new FiniteDuration(10, TimeUnit.SECONDS); - savepointResponse = Await.result(savepointPathFuture, waitingTime); - - if (savepointResponse instanceof JobManagerMessages.TriggerSavepointSuccess) { - break; - } - } - - assertTrue(savepointResponse instanceof JobManagerMessages.TriggerSavepointSuccess); - - final String savepointPath = ((JobManagerMessages.TriggerSavepointSuccess) savepointResponse).savepointPath(); - - Future<Object> jobRemovedFuture = jobManager.ask(new TestingJobManagerMessages.NotifyWhenJobRemoved(jobID), deadline.timeLeft()); - - Future<Object> cancellationResponseFuture = jobManager.ask(new JobManagerMessages.CancelJob(jobID), deadline.timeLeft()); - - Object cancellationResponse = Await.result(cancellationResponseFuture, deadline.timeLeft()); + CompletableFuture<String> savepointPathFuture = FutureUtils.retryWithDelay( + () -> { + try { + return client.triggerSavepoint(jobID, null); + } catch (FlinkException e) { + throw new RuntimeException(e); --- End diff -- yes, I'll change it and merge afterwards.
---