tillrohrmann commented on a change in pull request #15159:
URL: https://github.com/apache/flink/pull/15159#discussion_r596773120
##########
File path: docs/layouts/shortcodes/generated/all_jobmanager_section.html
##########
@@ -14,11 +14,17 @@
<td>Integer</td>
<td>Configure the minimum increase in parallelism for a job to
scale up.</td>
</tr>
+ <tr>
+
<td><h5>jobmanager.adaptive-scheduler.resource-stabilization-timeout</h5></td>
+ <td style="word-wrap: break-word;">10 s</td>
+ <td>Duration</td>
+ <td>The resource stabilization timeout defines the time the
JobManager will wait if fewer than the required resources are available, but
sufficient resources for execution are there. Once this timeout has passed, the
job will start executing with the available resources, or fail, if the
resources are not sufficient. The timeout starts as soon as there are
sufficient resources available for execution.<br />If <span
markdown="span">`scheduler-mode`</span> is configured to <span
markdown="span">`REACTIVE`</span>, this configuration value will default to 0,
so that jobs are starting immediately with the available resources.</td>
+ </tr>
<tr>
<td><h5>jobmanager.adaptive-scheduler.resource-wait-timeout</h5></td>
<td style="word-wrap: break-word;">10 s</td>
<td>Duration</td>
- <td>The maximum time the JobManager will wait to acquire all
required resources after a job submission or restart. Once elapsed it will try
to run the job with a lower parallelism, or fail if the minimum amount of
resources could not be acquired.<br />Increasing this value will make the
cluster more resilient against temporary resources shortages (e.g., there is
more time for a failed TaskManager to be restarted), while decreasing this
value reduces downtime of a job (provided that enough slots are available to
still run the job).</td>
+ <td>The maximum time the JobManager will wait to acquire all
required resources after a job submission or restart. Once elapsed it will try
to run the job with a lower parallelism, or fail if the minimum amount of
resources could not be acquired.<br />Increasing this value will make the
cluster more resilient against temporary resources shortages (e.g., there is
more time for a failed TaskManager to be restarted), while decreasing this
value reduces downtime of a job (provided that enough slots are available to
still run the job).<br />Setting a negative duration will disable the resource
timeout: The JobManager will wait indefinitely for resources to appear.<br />If
<span markdown="span">`scheduler-mode`</span> is configured to <span
markdown="span">`REACTIVE`</span>, this configuration value will default to a
negative value to disable the resource timeout.</td>
Review comment:
Is it still correct to say that decreasing this value will reduce the
downtime of jobs? If we have sufficient resources, then
`jobmanager.adaptive-scheduler.resource-stabilization-timeout` should be the
decisive factor how long the system waits to restart the job.
##########
File path: docs/layouts/shortcodes/generated/all_jobmanager_section.html
##########
@@ -14,11 +14,17 @@
<td>Integer</td>
<td>Configure the minimum increase in parallelism for a job to
scale up.</td>
</tr>
+ <tr>
+
<td><h5>jobmanager.adaptive-scheduler.resource-stabilization-timeout</h5></td>
+ <td style="word-wrap: break-word;">10 s</td>
+ <td>Duration</td>
+ <td>The resource stabilization timeout defines the time the
JobManager will wait if fewer than the required resources are available, but
sufficient resources for execution are there. Once this timeout has passed, the
job will start executing with the available resources, or fail, if the
resources are not sufficient. The timeout starts as soon as there are
sufficient resources available for execution.<br />If <span
markdown="span">`scheduler-mode`</span> is configured to <span
markdown="span">`REACTIVE`</span>, this configuration value will default to 0,
so that jobs are starting immediately with the available resources.</td>
Review comment:
Maybe: `The resource stabilization timeout defines the time the
JobManager will wait if fewer than the desired but sufficient resources are
available.`
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResources.java
##########
@@ -90,13 +107,40 @@ public Logger getLogger() {
@Override
public void notifyNewResourcesAvailable() {
- if (context.hasEnoughResources(desiredResources)) {
+ checkDesiredOrSufficientResourcesAvailable();
+ }
+
+ private void checkDesiredOrSufficientResourcesAvailable() {
+ if (context.hasDesiredResources(desiredResources)) {
createExecutionGraphWithAvailableResources();
+ return;
}
+
+ if (context.hasSufficientResources()) {
+ Deadline deadline = initializeOrGetResourceStabilizationDeadline();
+ if (deadline.isOverdue()) {
+ createExecutionGraphWithAvailableResources();
+ } else {
+ // schedule next resource check
+ context.runIfState(
+ this,
+ this::checkDesiredOrSufficientResourcesAvailable,
+ deadline.timeLeft());
+ }
+ }
Review comment:
Shouldn't we clear the resource stabilization deadline if we no longer
have sufficient resources so that we can start a new deadline once we have
sufficient resources again?
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.java
##########
@@ -110,41 +117,187 @@ public void testNotEnoughResources() throws Exception {
@Test
public void testNotifyNewResourcesAvailable() throws Exception {
try (MockContext ctx = new MockContext()) {
- ctx.setHasEnoughResources(() -> false); // initially, not enough
resources
+ ctx.setHasDesiredResources(() -> false); // initially, not enough
resources
WaitingForResources wfr =
- new WaitingForResources(ctx, log, RESOURCE_COUNTER,
Duration.ZERO);
- ctx.setHasEnoughResources(() -> true); // make resources available
+ new WaitingForResources(
+ ctx, log, RESOURCE_COUNTER, Duration.ZERO,
STABILIZATION_TIMEOUT);
+ ctx.setHasDesiredResources(() -> true); // make resources available
ctx.setExpectExecuting(assertNonNull());
wfr.notifyNewResourcesAvailable(); // .. and notify
}
}
@Test
- public void testResourceTimeout() throws Exception {
+ public void
testSchedulingWithSufficientResourcesAndNoStabilizationTimeout() throws
Exception {
+ try (MockContext ctx = new MockContext()) {
+
+ Duration noStabilizationTimeout = Duration.ofMillis(0);
+ WaitingForResources wfr =
+ new WaitingForResources(
+ ctx,
+ log,
+ RESOURCE_COUNTER,
+ Duration.ofSeconds(1000),
+ noStabilizationTimeout);
+
+ ctx.setHasDesiredResources(() -> false);
+ ctx.setHasSufficientResources(() -> true);
+ ctx.setExpectExecuting(assertNonNull());
+ wfr.notifyNewResourcesAvailable();
+ }
+ }
+
+ @Test
+ public void testNoSchedulingIfStabilizationTimeoutIsConfigured() throws
Exception {
try (MockContext ctx = new MockContext()) {
- ctx.setHasEnoughResources(() -> false);
+
+ Duration stabilizationTimeout = Duration.ofMillis(50000);
+
WaitingForResources wfr =
- new WaitingForResources(ctx, log, RESOURCE_COUNTER,
Duration.ZERO);
+ new WaitingForResources(
+ ctx,
+ log,
+ RESOURCE_COUNTER,
+ Duration.ofSeconds(1000),
+ stabilizationTimeout);
+
+ ctx.setHasDesiredResources(() -> false);
+ ctx.setHasSufficientResources(() -> true);
+ wfr.notifyNewResourcesAvailable();
+ // we are not triggering the scheduled tasks, to simulate a long
stabilization timeout
+
+ assertThat(ctx.hasStateTransition(), is(false));
+ }
+ }
+
+ @Test
+ public void testSchedulingIfStabilizationTimeoutIsConfigured() throws
Exception {
+ try (MockContext ctx = new MockContext()) {
+
+ Duration initialResourceTimeout = Duration.ofMillis(120948);
+ Duration stabilizationTimeout = Duration.ofMillis(50000);
+
+ TestingWaitingForResources wfr =
+ new TestingWaitingForResources(
+ ctx,
+ log,
+ RESOURCE_COUNTER,
+ initialResourceTimeout,
+ stabilizationTimeout);
+
+ // not enough resources available
+ ctx.setHasDesiredResources(() -> false);
+ ctx.setHasSufficientResources(() -> false);
+
+ assertNoStateTransitionsAfterExecutingRunnables(ctx, wfr);
+
+ // sufficient resources available
+ ctx.setHasDesiredResources(() -> false);
+ ctx.setHasSufficientResources(() -> true);
+ wfr.setTestDeadline(Deadline.fromNow(Duration.ofDays(600)));
ctx.setExpectExecuting(assertNonNull());
+ // start countdown
+ wfr.notifyNewResourcesAvailable();
+
+ assertNoStateTransitionsAfterExecutingRunnables(ctx, wfr);
+
+ // advance time to now
+ wfr.setTestDeadline(Deadline.now());
- // immediately execute all scheduled runnables
- assertThat(ctx.getScheduledRunnables().size(), greaterThan(0));
+ // execute all runnables and trigger expected state transition
for (ScheduledRunnable scheduledRunnable :
ctx.getScheduledRunnables()) {
- if (scheduledRunnable.getExpectedState() == wfr) {
+ if (scheduledRunnable.getExpectedState() == wfr
+ &&
!scheduledRunnable.getDelay().equals(initialResourceTimeout)
+ &&
!scheduledRunnable.getDelay().equals(Duration.ZERO)) {
scheduledRunnable.runAction();
}
}
}
}
+ private static void assertNoStateTransitionsAfterExecutingRunnables(
+ MockContext ctx, WaitingForResources wfr) {
+ Iterator<ScheduledRunnable> runnableIterator =
ctx.getScheduledRunnables().iterator();
+ while (runnableIterator.hasNext()) {
+ ScheduledRunnable scheduledRunnable = runnableIterator.next();
+ if (scheduledRunnable.getExpectedState() == wfr
+ && scheduledRunnable.getDeadline().isOverdue()) {
+ scheduledRunnable.runAction();
+ runnableIterator.remove();
+ }
+ }
+ assertThat(ctx.hasStateTransition, is(false));
+ }
+
+ private static class TestingWaitingForResources extends
WaitingForResources {
+
+ private Deadline testDeadline;
+
+ TestingWaitingForResources(
+ Context context,
+ Logger log,
+ ResourceCounter desiredResources,
+ Duration initialResourceAllocationTimeout,
+ Duration resourceStabilizationTimeout) {
+ super(
+ context,
+ log,
+ desiredResources,
+ initialResourceAllocationTimeout,
+ resourceStabilizationTimeout);
+ }
+
+ @Override
+ protected Deadline initializeOrGetResourceStabilizationDeadline() {
+ return testDeadline;
+ }
+
+ public void setTestDeadline(Deadline testDeadline) {
+ this.testDeadline = testDeadline;
+ }
+ }
+
+ @Test
+ public void testNoStateTransitionOnNoResourceTimeout() throws Exception {
+ try (MockContext ctx = new MockContext()) {
+ ctx.setHasDesiredResources(() -> false);
+ WaitingForResources wfr =
+ new WaitingForResources(
+ ctx,
+ log,
+ RESOURCE_COUNTER,
+ Duration.ofMillis(-1),
+ STABILIZATION_TIMEOUT);
+
+ executeAllScheduledRunnables(ctx, wfr);
+
+ assertThat(ctx.hasStateTransition(), is(false));
+ }
+ }
+
+ @Test
+ public void testStateTransitionOnResourceTimeout() throws Exception {
+ try (MockContext ctx = new MockContext()) {
+ ctx.setHasDesiredResources(() -> false);
+ WaitingForResources wfr =
+ new WaitingForResources(
+ ctx, log, RESOURCE_COUNTER, Duration.ZERO,
STABILIZATION_TIMEOUT);
+
+ ctx.setExpectExecuting(assertNonNull());
+
+ executeAllScheduledRunnables(ctx, wfr);
+ }
+ }
+
Review comment:
I think we are lacking a test where we have sufficient resources, then
again no sufficient resources and then restart the timeout once we have
sufficient resources again (e.g. testing that the sufficient resource timeout
can be restarted).
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResources.java
##########
@@ -90,13 +107,40 @@ public Logger getLogger() {
@Override
public void notifyNewResourcesAvailable() {
- if (context.hasEnoughResources(desiredResources)) {
+ checkDesiredOrSufficientResourcesAvailable();
+ }
+
+ private void checkDesiredOrSufficientResourcesAvailable() {
+ if (context.hasDesiredResources(desiredResources)) {
createExecutionGraphWithAvailableResources();
+ return;
}
+
+ if (context.hasSufficientResources()) {
Review comment:
One minor idea could be to unify `hasDesiredResources` and
`hasSufficientResources`. I think both methods do effectively the same and
could signal the different results with a more expressive return type.
##########
File path:
flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
##########
@@ -411,6 +411,35 @@
.text(
"Increasing this value will make
the cluster more resilient against temporary resources shortages (e.g., there
is more time for a failed TaskManager to be restarted), "
+ "while decreasing this
value reduces downtime of a job (provided that enough slots are available to
still run the job).")
+ .linebreak()
+ .text(
+ "Setting a negative duration will
disable the resource timeout: The JobManager will wait indefinitely for
resources to appear.")
+ .linebreak()
+ .text(
+ "If %s is configured to %s, this
configuration value will default to a negative value to disable the resource
timeout.",
+ code(SCHEDULER_MODE.key()),
+
code(SchedulerExecutionMode.REACTIVE.name()))
+ .build());
+
+ @Documentation.Section({
+ Documentation.Sections.EXPERT_SCHEDULING,
+ Documentation.Sections.ALL_JOB_MANAGER
+ })
+ public static final ConfigOption<Duration> RESOURCE_STABILIZATION_TIMEOUT =
+ key("jobmanager.adaptive-scheduler.resource-stabilization-timeout")
+ .durationType()
+ .defaultValue(RESOURCE_WAIT_TIMEOUT.defaultValue())
+ .withDescription(
+ Description.builder()
+ .text(
+ "The resource stabilization
timeout defines the time the JobManager will wait "
Review comment:
I like Chesnay's proposal.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java
##########
@@ -193,4 +197,37 @@ public static DefaultSlotPoolServiceSchedulerFactory
fromConfiguration(
return new DefaultSlotPoolServiceSchedulerFactory(
slotPoolServiceFactory, schedulerNGFactory);
}
+
+ private static AdaptiveSchedulerFactory
getAdaptiveSchedulerFactoryFromConfiguration(
+ Configuration configuration) {
+ Duration initialResourceAllocationTimeout =
+ returnValueOrReplaceDefaultIfReactiveMode(
+ configuration,
+ JobManagerOptions.RESOURCE_WAIT_TIMEOUT,
+ Duration.ofMillis(-1));
+ Duration resourceStabilizationTimeout =
+ returnValueOrReplaceDefaultIfReactiveMode(
+ configuration,
+ JobManagerOptions.RESOURCE_STABILIZATION_TIMEOUT,
+ Duration.ZERO);
Review comment:
Would it be simpler to define the defaults based on
`configuration.get(JobManagerOptions.SCHEDULER_MODE)` here and then simply call
`configuration.getOptional(option).orElseGet(() -> defaultValue)` where
`defaultValue` has been specified before?
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.java
##########
@@ -110,41 +117,187 @@ public void testNotEnoughResources() throws Exception {
@Test
public void testNotifyNewResourcesAvailable() throws Exception {
try (MockContext ctx = new MockContext()) {
- ctx.setHasEnoughResources(() -> false); // initially, not enough
resources
+ ctx.setHasDesiredResources(() -> false); // initially, not enough
resources
WaitingForResources wfr =
- new WaitingForResources(ctx, log, RESOURCE_COUNTER,
Duration.ZERO);
- ctx.setHasEnoughResources(() -> true); // make resources available
+ new WaitingForResources(
+ ctx, log, RESOURCE_COUNTER, Duration.ZERO,
STABILIZATION_TIMEOUT);
+ ctx.setHasDesiredResources(() -> true); // make resources available
ctx.setExpectExecuting(assertNonNull());
wfr.notifyNewResourcesAvailable(); // .. and notify
}
}
@Test
- public void testResourceTimeout() throws Exception {
+ public void
testSchedulingWithSufficientResourcesAndNoStabilizationTimeout() throws
Exception {
+ try (MockContext ctx = new MockContext()) {
+
+ Duration noStabilizationTimeout = Duration.ofMillis(0);
+ WaitingForResources wfr =
+ new WaitingForResources(
+ ctx,
+ log,
+ RESOURCE_COUNTER,
+ Duration.ofSeconds(1000),
+ noStabilizationTimeout);
+
+ ctx.setHasDesiredResources(() -> false);
+ ctx.setHasSufficientResources(() -> true);
+ ctx.setExpectExecuting(assertNonNull());
+ wfr.notifyNewResourcesAvailable();
+ }
+ }
+
+ @Test
+ public void testNoSchedulingIfStabilizationTimeoutIsConfigured() throws
Exception {
try (MockContext ctx = new MockContext()) {
- ctx.setHasEnoughResources(() -> false);
+
+ Duration stabilizationTimeout = Duration.ofMillis(50000);
+
WaitingForResources wfr =
- new WaitingForResources(ctx, log, RESOURCE_COUNTER,
Duration.ZERO);
+ new WaitingForResources(
+ ctx,
+ log,
+ RESOURCE_COUNTER,
+ Duration.ofSeconds(1000),
+ stabilizationTimeout);
+
+ ctx.setHasDesiredResources(() -> false);
+ ctx.setHasSufficientResources(() -> true);
+ wfr.notifyNewResourcesAvailable();
+ // we are not triggering the scheduled tasks, to simulate a long
stabilization timeout
+
+ assertThat(ctx.hasStateTransition(), is(false));
+ }
+ }
+
+ @Test
+ public void testSchedulingIfStabilizationTimeoutIsConfigured() throws
Exception {
+ try (MockContext ctx = new MockContext()) {
+
+ Duration initialResourceTimeout = Duration.ofMillis(120948);
+ Duration stabilizationTimeout = Duration.ofMillis(50000);
+
+ TestingWaitingForResources wfr =
+ new TestingWaitingForResources(
+ ctx,
+ log,
+ RESOURCE_COUNTER,
+ initialResourceTimeout,
+ stabilizationTimeout);
+
+ // not enough resources available
+ ctx.setHasDesiredResources(() -> false);
+ ctx.setHasSufficientResources(() -> false);
+
+ assertNoStateTransitionsAfterExecutingRunnables(ctx, wfr);
+
+ // sufficient resources available
+ ctx.setHasDesiredResources(() -> false);
+ ctx.setHasSufficientResources(() -> true);
+ wfr.setTestDeadline(Deadline.fromNow(Duration.ofDays(600)));
Review comment:
This leaks quite some implementation details. If we need to go to these
lengths then this is usually an indicator that the class is not well testable.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.java
##########
@@ -110,41 +117,187 @@ public void testNotEnoughResources() throws Exception {
@Test
public void testNotifyNewResourcesAvailable() throws Exception {
try (MockContext ctx = new MockContext()) {
- ctx.setHasEnoughResources(() -> false); // initially, not enough
resources
+ ctx.setHasDesiredResources(() -> false); // initially, not enough
resources
WaitingForResources wfr =
- new WaitingForResources(ctx, log, RESOURCE_COUNTER,
Duration.ZERO);
- ctx.setHasEnoughResources(() -> true); // make resources available
+ new WaitingForResources(
+ ctx, log, RESOURCE_COUNTER, Duration.ZERO,
STABILIZATION_TIMEOUT);
+ ctx.setHasDesiredResources(() -> true); // make resources available
ctx.setExpectExecuting(assertNonNull());
wfr.notifyNewResourcesAvailable(); // .. and notify
}
}
@Test
- public void testResourceTimeout() throws Exception {
+ public void
testSchedulingWithSufficientResourcesAndNoStabilizationTimeout() throws
Exception {
+ try (MockContext ctx = new MockContext()) {
+
+ Duration noStabilizationTimeout = Duration.ofMillis(0);
+ WaitingForResources wfr =
+ new WaitingForResources(
+ ctx,
+ log,
+ RESOURCE_COUNTER,
+ Duration.ofSeconds(1000),
+ noStabilizationTimeout);
+
+ ctx.setHasDesiredResources(() -> false);
+ ctx.setHasSufficientResources(() -> true);
+ ctx.setExpectExecuting(assertNonNull());
+ wfr.notifyNewResourcesAvailable();
+ }
+ }
+
+ @Test
+ public void testNoSchedulingIfStabilizationTimeoutIsConfigured() throws
Exception {
try (MockContext ctx = new MockContext()) {
- ctx.setHasEnoughResources(() -> false);
+
+ Duration stabilizationTimeout = Duration.ofMillis(50000);
+
WaitingForResources wfr =
- new WaitingForResources(ctx, log, RESOURCE_COUNTER,
Duration.ZERO);
+ new WaitingForResources(
+ ctx,
+ log,
+ RESOURCE_COUNTER,
+ Duration.ofSeconds(1000),
+ stabilizationTimeout);
+
+ ctx.setHasDesiredResources(() -> false);
+ ctx.setHasSufficientResources(() -> true);
+ wfr.notifyNewResourcesAvailable();
+ // we are not triggering the scheduled tasks, to simulate a long
stabilization timeout
+
+ assertThat(ctx.hasStateTransition(), is(false));
+ }
+ }
+
+ @Test
+ public void testSchedulingIfStabilizationTimeoutIsConfigured() throws
Exception {
+ try (MockContext ctx = new MockContext()) {
+
+ Duration initialResourceTimeout = Duration.ofMillis(120948);
+ Duration stabilizationTimeout = Duration.ofMillis(50000);
+
+ TestingWaitingForResources wfr =
+ new TestingWaitingForResources(
+ ctx,
+ log,
+ RESOURCE_COUNTER,
+ initialResourceTimeout,
+ stabilizationTimeout);
+
+ // not enough resources available
+ ctx.setHasDesiredResources(() -> false);
+ ctx.setHasSufficientResources(() -> false);
+
+ assertNoStateTransitionsAfterExecutingRunnables(ctx, wfr);
+
+ // sufficient resources available
+ ctx.setHasDesiredResources(() -> false);
+ ctx.setHasSufficientResources(() -> true);
+ wfr.setTestDeadline(Deadline.fromNow(Duration.ofDays(600)));
ctx.setExpectExecuting(assertNonNull());
+ // start countdown
+ wfr.notifyNewResourcesAvailable();
+
+ assertNoStateTransitionsAfterExecutingRunnables(ctx, wfr);
+
+ // advance time to now
+ wfr.setTestDeadline(Deadline.now());
Review comment:
For these timeout tests, it might make sense to introduce a `Clock` for
the global test time. This `Clock` could be used by the state as well as the
test context. Then we could say that we can advance the clock which would
automatically run all runnables which are eligible now. That way we might be
able to express the tests a bit more naturally.
##########
File path: docs/layouts/shortcodes/generated/all_jobmanager_section.html
##########
@@ -14,11 +14,17 @@
<td>Integer</td>
<td>Configure the minimum increase in parallelism for a job to
scale up.</td>
</tr>
+ <tr>
+
<td><h5>jobmanager.adaptive-scheduler.resource-stabilization-timeout</h5></td>
+ <td style="word-wrap: break-word;">10 s</td>
+ <td>Duration</td>
+ <td>The resource stabilization timeout defines the time the
JobManager will wait if fewer than the required resources are available, but
sufficient resources for execution are there. Once this timeout has passed, the
job will start executing with the available resources, or fail, if the
resources are not sufficient. The timeout starts as soon as there are
sufficient resources available for execution.<br />If <span
markdown="span">`scheduler-mode`</span> is configured to <span
markdown="span">`REACTIVE`</span>, this configuration value will default to 0,
so that jobs are starting immediately with the available resources.</td>
Review comment:
Why are we failing if there are not sufficient resources available? I
thought that this timeout would only be triggered if we have sufficient
resources?
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.java
##########
@@ -110,41 +117,187 @@ public void testNotEnoughResources() throws Exception {
@Test
public void testNotifyNewResourcesAvailable() throws Exception {
try (MockContext ctx = new MockContext()) {
- ctx.setHasEnoughResources(() -> false); // initially, not enough
resources
+ ctx.setHasDesiredResources(() -> false); // initially, not enough
resources
WaitingForResources wfr =
- new WaitingForResources(ctx, log, RESOURCE_COUNTER,
Duration.ZERO);
- ctx.setHasEnoughResources(() -> true); // make resources available
+ new WaitingForResources(
+ ctx, log, RESOURCE_COUNTER, Duration.ZERO,
STABILIZATION_TIMEOUT);
+ ctx.setHasDesiredResources(() -> true); // make resources available
ctx.setExpectExecuting(assertNonNull());
wfr.notifyNewResourcesAvailable(); // .. and notify
}
}
@Test
- public void testResourceTimeout() throws Exception {
+ public void
testSchedulingWithSufficientResourcesAndNoStabilizationTimeout() throws
Exception {
+ try (MockContext ctx = new MockContext()) {
+
+ Duration noStabilizationTimeout = Duration.ofMillis(0);
+ WaitingForResources wfr =
+ new WaitingForResources(
+ ctx,
+ log,
+ RESOURCE_COUNTER,
+ Duration.ofSeconds(1000),
+ noStabilizationTimeout);
+
+ ctx.setHasDesiredResources(() -> false);
+ ctx.setHasSufficientResources(() -> true);
+ ctx.setExpectExecuting(assertNonNull());
+ wfr.notifyNewResourcesAvailable();
+ }
+ }
+
+ @Test
+ public void testNoSchedulingIfStabilizationTimeoutIsConfigured() throws
Exception {
try (MockContext ctx = new MockContext()) {
- ctx.setHasEnoughResources(() -> false);
+
+ Duration stabilizationTimeout = Duration.ofMillis(50000);
+
WaitingForResources wfr =
- new WaitingForResources(ctx, log, RESOURCE_COUNTER,
Duration.ZERO);
+ new WaitingForResources(
+ ctx,
+ log,
+ RESOURCE_COUNTER,
+ Duration.ofSeconds(1000),
+ stabilizationTimeout);
+
+ ctx.setHasDesiredResources(() -> false);
+ ctx.setHasSufficientResources(() -> true);
+ wfr.notifyNewResourcesAvailable();
+ // we are not triggering the scheduled tasks, to simulate a long
stabilization timeout
+
+ assertThat(ctx.hasStateTransition(), is(false));
+ }
+ }
+
+ @Test
+ public void testSchedulingIfStabilizationTimeoutIsConfigured() throws
Exception {
+ try (MockContext ctx = new MockContext()) {
+
+ Duration initialResourceTimeout = Duration.ofMillis(120948);
+ Duration stabilizationTimeout = Duration.ofMillis(50000);
+
+ TestingWaitingForResources wfr =
+ new TestingWaitingForResources(
+ ctx,
+ log,
+ RESOURCE_COUNTER,
+ initialResourceTimeout,
+ stabilizationTimeout);
+
+ // not enough resources available
+ ctx.setHasDesiredResources(() -> false);
+ ctx.setHasSufficientResources(() -> false);
+
+ assertNoStateTransitionsAfterExecutingRunnables(ctx, wfr);
+
+ // sufficient resources available
+ ctx.setHasDesiredResources(() -> false);
+ ctx.setHasSufficientResources(() -> true);
+ wfr.setTestDeadline(Deadline.fromNow(Duration.ofDays(600)));
ctx.setExpectExecuting(assertNonNull());
+ // start countdown
+ wfr.notifyNewResourcesAvailable();
+
+ assertNoStateTransitionsAfterExecutingRunnables(ctx, wfr);
+
+ // advance time to now
+ wfr.setTestDeadline(Deadline.now());
- // immediately execute all scheduled runnables
- assertThat(ctx.getScheduledRunnables().size(), greaterThan(0));
+ // execute all runnables and trigger expected state transition
for (ScheduledRunnable scheduledRunnable :
ctx.getScheduledRunnables()) {
- if (scheduledRunnable.getExpectedState() == wfr) {
+ if (scheduledRunnable.getExpectedState() == wfr
+ &&
!scheduledRunnable.getDelay().equals(initialResourceTimeout)
+ &&
!scheduledRunnable.getDelay().equals(Duration.ZERO)) {
Review comment:
Why do we need these filters here?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]