[GitHub] flink pull request #5715: [FLINK-8956][tests] Port RescalingITCase to flip6
Github user zentol closed the pull request at: https://github.com/apache/flink/pull/5715 ---
[GitHub] flink pull request #5715: [FLINK-8956][tests] Port RescalingITCase to flip6
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5715#discussion_r176663967 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java --- @@ -528,54 +454,44 @@ public void testSavepointRescalingPartitionedOperatorState(boolean scaleOut, Ope } try { - jobManager = cluster.getLeaderGateway(deadline.timeLeft()); - JobGraph jobGraph = createJobGraphWithOperatorState(parallelism, maxParallelism, checkpointMethod); - jobID = jobGraph.getJobID(); - - cluster.submitJobDetached(jobGraph); + final JobID jobID = jobGraph.getJobID(); - Object savepointResponse = null; + client.setDetached(true); + client.submitJob(jobGraph, RescalingITCase.class.getClassLoader()); // wait until the operator is started StateSourceBase.workStartedLatch.await(); - while (deadline.hasTimeLeft()) { - - Future savepointPathFuture = jobManager.ask(new JobManagerMessages.TriggerSavepoint(jobID, Option.empty()), deadline.timeLeft()); - FiniteDuration waitingTime = new FiniteDuration(10, TimeUnit.SECONDS); - savepointResponse = Await.result(savepointPathFuture, waitingTime); - - if (savepointResponse instanceof JobManagerMessages.TriggerSavepointSuccess) { - break; - } - } - - assertTrue(savepointResponse instanceof JobManagerMessages.TriggerSavepointSuccess); - - final String savepointPath = ((JobManagerMessages.TriggerSavepointSuccess) savepointResponse).savepointPath(); - - Future jobRemovedFuture = jobManager.ask(new TestingJobManagerMessages.NotifyWhenJobRemoved(jobID), deadline.timeLeft()); - - Future cancellationResponseFuture = jobManager.ask(new JobManagerMessages.CancelJob(jobID), deadline.timeLeft()); - - Object cancellationResponse = Await.result(cancellationResponseFuture, deadline.timeLeft()); + CompletableFuture savepointPathFuture = FutureUtils.retryWithDelay( + () -> { + try { + return client.triggerSavepoint(jobID, null); + } catch (FlinkException e) { + throw new RuntimeException(e); --- End diff -- yes, I'll change it and merge afterwards. ---
[GitHub] flink pull request #5715: [FLINK-8956][tests] Port RescalingITCase to flip6
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5715#discussion_r176475682 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java --- @@ -528,54 +454,44 @@ public void testSavepointRescalingPartitionedOperatorState(boolean scaleOut, Ope } try { - jobManager = cluster.getLeaderGateway(deadline.timeLeft()); - JobGraph jobGraph = createJobGraphWithOperatorState(parallelism, maxParallelism, checkpointMethod); - jobID = jobGraph.getJobID(); - - cluster.submitJobDetached(jobGraph); + final JobID jobID = jobGraph.getJobID(); - Object savepointResponse = null; + client.setDetached(true); + client.submitJob(jobGraph, RescalingITCase.class.getClassLoader()); // wait until the operator is started StateSourceBase.workStartedLatch.await(); - while (deadline.hasTimeLeft()) { - - Future savepointPathFuture = jobManager.ask(new JobManagerMessages.TriggerSavepoint(jobID, Option.empty()), deadline.timeLeft()); - FiniteDuration waitingTime = new FiniteDuration(10, TimeUnit.SECONDS); - savepointResponse = Await.result(savepointPathFuture, waitingTime); - - if (savepointResponse instanceof JobManagerMessages.TriggerSavepointSuccess) { - break; - } - } - - assertTrue(savepointResponse instanceof JobManagerMessages.TriggerSavepointSuccess); - - final String savepointPath = ((JobManagerMessages.TriggerSavepointSuccess) savepointResponse).savepointPath(); - - Future jobRemovedFuture = jobManager.ask(new TestingJobManagerMessages.NotifyWhenJobRemoved(jobID), deadline.timeLeft()); - - Future cancellationResponseFuture = jobManager.ask(new JobManagerMessages.CancelJob(jobID), deadline.timeLeft()); - - Object cancellationResponse = Await.result(cancellationResponseFuture, deadline.timeLeft()); + CompletableFuture savepointPathFuture = FutureUtils.retryWithDelay( + () -> { + try { + return client.triggerSavepoint(jobID, null); + } catch (FlinkException e) { + throw new RuntimeException(e); --- End diff -- Shouldn't we return a exceptionally completed future here? ---
[GitHub] flink pull request #5715: [FLINK-8956][tests] Port RescalingITCase to flip6
GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/5715 [FLINK-8956][tests] Port RescalingITCase to flip6 Based on #5690. ## What is the purpose of the change Ports the `RescalingITCase` to use `MiniClusterResource`. ## Verifying this change Run `RescalingITCase` with `flip6` profile enabled/disabled. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 8956 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5715.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5715 ---