echauchot commented on code in PR #22985:
URL: https://github.com/apache/flink/pull/22985#discussion_r1362263357


##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java:
##########
@@ -252,28 +259,94 @@ public void testTransitionToFinishedOnSuspend() throws 
Exception {
     }
 
     @Test
-    public void 
testNotifyNewResourcesAvailableWithCanScaleUpTransitionsToRestarting()
+    public void 
testNotifyNewResourcesAvailableBeforeCooldownIsOverScheduledStateChange()
+            throws Exception {
+        try (MockExecutingContext ctx = new MockExecutingContext()) {
+            // do not wait too long in the test
+            final Duration scalingIntervalMin = Duration.ofSeconds(1L);
+            final ExecutingStateBuilder executingStateBuilder =
+                    new 
ExecutingStateBuilder().setScalingIntervalMin(scalingIntervalMin);
+            Executing exec = executingStateBuilder.build(ctx);
+            ctx.setCanScaleUp(true); // => rescale
+            ctx.setExpectRestarting( // scheduled rescale should restart the 
job after cooldown
+                    restartingArguments -> {
+                        assertThat(restartingArguments.getBackoffTime(), 
is(Duration.ZERO));
+                        assertThat(ctx.actionWasScheduled, is(true));
+                    });
+            exec.onNewResourcesAvailable();
+        }
+    }
+
+    @Test
+    public void 
testNotifyNewResourcesAvailableAfterCooldownIsOverStateChange() throws 
Exception {
+        try (MockExecutingContext ctx = new MockExecutingContext()) {
+            final ExecutingStateBuilder executingStateBuilder =
+                    new ExecutingStateBuilder()
+                            .setScalingIntervalMin(Duration.ofSeconds(20L))
+                            
.setLastRescale(Instant.now().minus(Duration.ofSeconds(30L)));
+            Executing exec = executingStateBuilder.build(ctx);
+            ctx.setCanScaleUp(true); // => rescale
+            ctx.setExpectRestarting(
+                    restartingArguments -> { // immediate rescale
+                        assertThat(restartingArguments.getBackoffTime(), 
is(Duration.ZERO));
+                        assertThat(ctx.actionWasScheduled, is(false));
+                    });
+            exec.onNewResourcesAvailable();
+        }
+    }
+
+    @Test
+    public void 
testNotifyNewResourcesAvailableWithCanScaleUpWithoutForceTransitionsToRestarting()
             throws Exception {
         try (MockExecutingContext ctx = new MockExecutingContext()) {
             Executing exec = new ExecutingStateBuilder().build(ctx);
 
             ctx.setExpectRestarting(
                     restartingArguments -> {
-                        // expect immediate restart on scale up
+                        // immediate rescale
                         assertThat(restartingArguments.getBackoffTime(), 
is(Duration.ZERO));
+                        assertThat(ctx.actionWasScheduled, is(false));
                     });
-            ctx.setCanScaleUp(() -> true);
+            ctx.setCanScaleUp(true); // => rescale
             exec.onNewResourcesAvailable();
         }
     }
 
     @Test
-    public void 
testNotifyNewResourcesAvailableWithNoResourcesAndNoStateChange() throws 
Exception {
+    public void 
testNotifyNewResourcesAvailableWithCantScaleUpWithoutForceAndCantScaleUpWithForce()
+            throws Exception {
         try (MockExecutingContext ctx = new MockExecutingContext()) {
-            Executing exec = new ExecutingStateBuilder().build(ctx);
-            ctx.setCanScaleUp(() -> false);
+            Executing exec =
+                    new ExecutingStateBuilder()
+                            .setScalingIntervalMax(Duration.ofSeconds(1L))
+                            .build(ctx);
+            ctx.setCanScaleUp(
+                    false, false); // => schedule force rescale but resource 
lost on timeout =>
+            // no rescale
             exec.onNewResourcesAvailable();
             ctx.assertNoStateTransition();
+            assertThat(ctx.actionWasScheduled, is(true));

Review Comment:
   Even if minMet and parallelismChangeAfterTimeout names kind of leaked 
implementation details, I believe it is still better than the new names that 
are too far from the tested use cases



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java:
##########
@@ -252,28 +259,94 @@ public void testTransitionToFinishedOnSuspend() throws 
Exception {
     }
 
     @Test
-    public void 
testNotifyNewResourcesAvailableWithCanScaleUpTransitionsToRestarting()
+    public void 
testNotifyNewResourcesAvailableBeforeCooldownIsOverScheduledStateChange()
+            throws Exception {
+        try (MockExecutingContext ctx = new MockExecutingContext()) {
+            // do not wait too long in the test
+            final Duration scalingIntervalMin = Duration.ofSeconds(1L);
+            final ExecutingStateBuilder executingStateBuilder =
+                    new 
ExecutingStateBuilder().setScalingIntervalMin(scalingIntervalMin);
+            Executing exec = executingStateBuilder.build(ctx);
+            ctx.setCanScaleUp(true); // => rescale
+            ctx.setExpectRestarting( // scheduled rescale should restart the 
job after cooldown
+                    restartingArguments -> {
+                        assertThat(restartingArguments.getBackoffTime(), 
is(Duration.ZERO));
+                        assertThat(ctx.actionWasScheduled, is(true));
+                    });
+            exec.onNewResourcesAvailable();
+        }
+    }
+
+    @Test
+    public void 
testNotifyNewResourcesAvailableAfterCooldownIsOverStateChange() throws 
Exception {
+        try (MockExecutingContext ctx = new MockExecutingContext()) {
+            final ExecutingStateBuilder executingStateBuilder =
+                    new ExecutingStateBuilder()
+                            .setScalingIntervalMin(Duration.ofSeconds(20L))
+                            
.setLastRescale(Instant.now().minus(Duration.ofSeconds(30L)));
+            Executing exec = executingStateBuilder.build(ctx);
+            ctx.setCanScaleUp(true); // => rescale
+            ctx.setExpectRestarting(
+                    restartingArguments -> { // immediate rescale
+                        assertThat(restartingArguments.getBackoffTime(), 
is(Duration.ZERO));
+                        assertThat(ctx.actionWasScheduled, is(false));
+                    });
+            exec.onNewResourcesAvailable();
+        }
+    }
+
+    @Test
+    public void 
testNotifyNewResourcesAvailableWithCanScaleUpWithoutForceTransitionsToRestarting()
             throws Exception {
         try (MockExecutingContext ctx = new MockExecutingContext()) {
             Executing exec = new ExecutingStateBuilder().build(ctx);
 
             ctx.setExpectRestarting(
                     restartingArguments -> {
-                        // expect immediate restart on scale up
+                        // immediate rescale
                         assertThat(restartingArguments.getBackoffTime(), 
is(Duration.ZERO));
+                        assertThat(ctx.actionWasScheduled, is(false));
                     });
-            ctx.setCanScaleUp(() -> true);
+            ctx.setCanScaleUp(true); // => rescale
             exec.onNewResourcesAvailable();
         }
     }
 
     @Test
-    public void 
testNotifyNewResourcesAvailableWithNoResourcesAndNoStateChange() throws 
Exception {
+    public void 
testNotifyNewResourcesAvailableWithCantScaleUpWithoutForceAndCantScaleUpWithForce()
+            throws Exception {
         try (MockExecutingContext ctx = new MockExecutingContext()) {
-            Executing exec = new ExecutingStateBuilder().build(ctx);
-            ctx.setCanScaleUp(() -> false);
+            Executing exec =
+                    new ExecutingStateBuilder()
+                            .setScalingIntervalMax(Duration.ofSeconds(1L))
+                            .build(ctx);
+            ctx.setCanScaleUp(
+                    false, false); // => schedule force rescale but resource 
lost on timeout =>
+            // no rescale
             exec.onNewResourcesAvailable();
             ctx.assertNoStateTransition();
+            assertThat(ctx.actionWasScheduled, is(true));

Review Comment:
   Yes it is expected: as long as resources are not met we schedule a force 
rescale after scalingIntervalMax. This test is to simulate that the resource 
was lost in between, hence there is no rescale (because parallelism has not 
changed). But still it is normal that we have scheduled a forceRescale.
   
   We are exactly in the case I mentioned yesterday, by renaming `minMet` to 
`canScaleUpWithoutForce` and `parallelismChangeAfterTimeout` to 
`canScaleUpWithForce` we have lost all the expressiveness of the tests.
   Even if minMet and parallelismChangeAfterTimeout names kind of leaked 
implementation details, I believe it is still better than the new names that 
are too far from the tested use cases
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to