echauchot commented on code in PR #22985: URL: https://github.com/apache/flink/pull/22985#discussion_r1362283946
########## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java: ########## @@ -252,28 +259,94 @@ public void testTransitionToFinishedOnSuspend() throws Exception { } @Test - public void testNotifyNewResourcesAvailableWithCanScaleUpTransitionsToRestarting() + public void testNotifyNewResourcesAvailableBeforeCooldownIsOverScheduledStateChange() + throws Exception { + try (MockExecutingContext ctx = new MockExecutingContext()) { + // do not wait too long in the test + final Duration scalingIntervalMin = Duration.ofSeconds(1L); + final ExecutingStateBuilder executingStateBuilder = + new ExecutingStateBuilder().setScalingIntervalMin(scalingIntervalMin); + Executing exec = executingStateBuilder.build(ctx); + ctx.setCanScaleUp(true); // => rescale + ctx.setExpectRestarting( // scheduled rescale should restart the job after cooldown + restartingArguments -> { + assertThat(restartingArguments.getBackoffTime(), is(Duration.ZERO)); + assertThat(ctx.actionWasScheduled, is(true)); + }); + exec.onNewResourcesAvailable(); + } + } + + @Test + public void testNotifyNewResourcesAvailableAfterCooldownIsOverStateChange() throws Exception { + try (MockExecutingContext ctx = new MockExecutingContext()) { + final ExecutingStateBuilder executingStateBuilder = + new ExecutingStateBuilder() + .setScalingIntervalMin(Duration.ofSeconds(20L)) + .setLastRescale(Instant.now().minus(Duration.ofSeconds(30L))); + Executing exec = executingStateBuilder.build(ctx); + ctx.setCanScaleUp(true); // => rescale + ctx.setExpectRestarting( + restartingArguments -> { // immediate rescale + assertThat(restartingArguments.getBackoffTime(), is(Duration.ZERO)); + assertThat(ctx.actionWasScheduled, is(false)); + }); + exec.onNewResourcesAvailable(); + } + } + + @Test + public void testNotifyNewResourcesAvailableWithCanScaleUpWithoutForceTransitionsToRestarting() throws Exception { try (MockExecutingContext ctx = new MockExecutingContext()) { Executing exec = new ExecutingStateBuilder().build(ctx); ctx.setExpectRestarting( restartingArguments -> { - // expect immediate restart on scale up + // immediate rescale assertThat(restartingArguments.getBackoffTime(), is(Duration.ZERO)); + assertThat(ctx.actionWasScheduled, is(false)); }); - ctx.setCanScaleUp(() -> true); + ctx.setCanScaleUp(true); // => rescale exec.onNewResourcesAvailable(); } } @Test - public void testNotifyNewResourcesAvailableWithNoResourcesAndNoStateChange() throws Exception { + public void testNotifyNewResourcesAvailableWithCantScaleUpWithoutForceAndCantScaleUpWithForce() + throws Exception { try (MockExecutingContext ctx = new MockExecutingContext()) { - Executing exec = new ExecutingStateBuilder().build(ctx); - ctx.setCanScaleUp(() -> false); + Executing exec = + new ExecutingStateBuilder() + .setScalingIntervalMax(Duration.ofSeconds(1L)) + .build(ctx); + ctx.setCanScaleUp( + false, false); // => schedule force rescale but resource lost on timeout => + // no rescale exec.onNewResourcesAvailable(); ctx.assertNoStateTransition(); + assertThat(ctx.actionWasScheduled, is(true)); + } + } + + @Test + public void testNotifyNewResourcesAvailableWithCantScaleUpWithoutForceAndCanScaleUpWithForce() + throws Exception { Review Comment: yesterday we added an immediate force rescale if `timeSinceLastRescale > interval-max`. This case is not covered by the tests. The current test covers the previous behaviour in which we scheduled a force rescale no matter `timeSinceLastRescale`. I added a test for immediate force -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org