This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit fc0ccf325527c3589d5cd5ae7397b22c22321cec Author: David Moravek <d...@apache.org> AuthorDate: Mon Mar 27 12:51:28 2023 +0200 [FLINK-31399] Add state hooks for reacting to new requirements --- .../runtime/scheduler/adaptive/AdaptiveScheduler.java | 4 ++-- .../flink/runtime/scheduler/adaptive/Executing.java | 15 ++++++++++++--- .../{ResourceConsumer.java => ResourceListener.java} | 12 +++++++++--- .../runtime/scheduler/adaptive/WaitingForResources.java | 11 ++++++++--- .../flink/runtime/scheduler/adaptive/ExecutingTest.java | 4 ++-- .../scheduler/adaptive/WaitingForResourcesTest.java | 16 ++++++++-------- 6 files changed, 41 insertions(+), 21 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java index e8493d731f5..c028fd05ecf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java @@ -425,8 +425,8 @@ public class AdaptiveScheduler private void newResourcesAvailable(Collection<? extends PhysicalSlot> physicalSlots) { state.tryRun( - ResourceConsumer.class, - ResourceConsumer::notifyNewResourcesAvailable, + ResourceListener.class, + ResourceListener::onNewResourcesAvailable, "newResourcesAvailable"); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java index ae33dc3b418..9cdb9afc7f2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java @@ -44,7 +44,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledFuture; /** State which represents a running job with an {@link ExecutionGraph} and assigned slots. */ -class Executing extends StateWithExecutionGraph implements ResourceConsumer { +class Executing extends StateWithExecutionGraph implements ResourceListener { private final Context context; @@ -71,7 +71,7 @@ class Executing extends StateWithExecutionGraph implements ResourceConsumer { deploy(); // check if new resources have come available in the meantime - context.runIfState(this, this::notifyNewResourcesAvailable, Duration.ZERO); + context.runIfState(this, this::maybeRescale, Duration.ZERO); } @Override @@ -123,7 +123,16 @@ class Executing extends StateWithExecutionGraph implements ResourceConsumer { } @Override - public void notifyNewResourcesAvailable() { + public void onNewResourcesAvailable() { + maybeRescale(); + } + + @Override + public void onNewResourceRequirements() { + maybeRescale(); + } + + private void maybeRescale() { if (context.shouldRescale(getExecutionGraph())) { getLogger().info("Can change the parallelism of job. Restarting job."); context.goToRestarting( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/ResourceConsumer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/ResourceListener.java similarity index 74% rename from flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/ResourceConsumer.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/ResourceListener.java index 4ea4877d0a7..0e259f625b3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/ResourceConsumer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/ResourceListener.java @@ -18,9 +18,15 @@ package org.apache.flink.runtime.scheduler.adaptive; -/** Interface which denotes that {@link State} can react to newly available resource (slots). */ -interface ResourceConsumer { +/** + * Interface which denotes that {@link State} can react to newly available resource (slots) and + * changes in resource requirements. + */ +interface ResourceListener { /** Notifies that new resources are available. */ - void notifyNewResourcesAvailable(); + void onNewResourcesAvailable(); + + /** Notifies that the resource requirements have changed. */ + void onNewResourceRequirements(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResources.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResources.java index f102bcfc4ba..62b20919ee1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResources.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResources.java @@ -38,7 +38,7 @@ import java.util.concurrent.ScheduledFuture; /** * State which describes that the scheduler is waiting for resources in order to execute the job. */ -class WaitingForResources implements State, ResourceConsumer { +class WaitingForResources implements State, ResourceListener { private final Context context; @@ -103,7 +103,7 @@ class WaitingForResources implements State, ResourceConsumer { this, this::resourceTimeout, initialResourceAllocationTimeout); } this.previousExecutionGraph = previousExecutionGraph; - context.runIfState(this, this::notifyNewResourcesAvailable, Duration.ZERO); + context.runIfState(this, this::checkDesiredOrSufficientResourcesAvailable, Duration.ZERO); } @Override @@ -144,7 +144,12 @@ class WaitingForResources implements State, ResourceConsumer { } @Override - public void notifyNewResourcesAvailable() { + public void onNewResourcesAvailable() { + checkDesiredOrSufficientResourcesAvailable(); + } + + @Override + public void onNewResourceRequirements() { checkDesiredOrSufficientResourcesAvailable(); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java index aacf9250554..fe1448b8e5e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java @@ -260,7 +260,7 @@ public class ExecutingTest extends TestLogger { assertThat(restartingArguments.getBackoffTime(), is(Duration.ZERO)); }); ctx.setCanScaleUp(() -> true); - exec.notifyNewResourcesAvailable(); + exec.onNewResourcesAvailable(); } } @@ -269,7 +269,7 @@ public class ExecutingTest extends TestLogger { try (MockExecutingContext ctx = new MockExecutingContext()) { Executing exec = new ExecutingStateBuilder().build(ctx); ctx.setCanScaleUp(() -> false); - exec.notifyNewResourcesAvailable(); + exec.onNewResourcesAvailable(); ctx.assertNoStateTransition(); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.java index 2c58f053b13..202d0fdbd5f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.java @@ -84,7 +84,7 @@ public class WaitingForResourcesTest extends TestLogger { ctx, log, RESOURCE_COUNTER, Duration.ZERO, STABILIZATION_TIMEOUT); // we expect no state transition. - wfr.notifyNewResourcesAvailable(); + wfr.onNewResourcesAvailable(); } } @@ -97,7 +97,7 @@ public class WaitingForResourcesTest extends TestLogger { ctx, log, RESOURCE_COUNTER, Duration.ZERO, STABILIZATION_TIMEOUT); ctx.setHasDesiredResources(() -> true); // make resources available ctx.setExpectCreatingExecutionGraph(); - wfr.notifyNewResourcesAvailable(); // .. and notify + wfr.onNewResourcesAvailable(); // .. and notify } } @@ -117,7 +117,7 @@ public class WaitingForResourcesTest extends TestLogger { ctx.setHasDesiredResources(() -> false); ctx.setHasSufficientResources(() -> true); ctx.setExpectCreatingExecutionGraph(); - wfr.notifyNewResourcesAvailable(); + wfr.onNewResourcesAvailable(); } } @@ -137,7 +137,7 @@ public class WaitingForResourcesTest extends TestLogger { ctx.setHasDesiredResources(() -> false); ctx.setHasSufficientResources(() -> true); - wfr.notifyNewResourcesAvailable(); + wfr.onNewResourcesAvailable(); // we are not triggering the scheduled tasks, to simulate a long stabilization timeout assertThat(ctx.hasStateTransition(), is(false)); @@ -165,7 +165,7 @@ public class WaitingForResourcesTest extends TestLogger { ctx.setHasSufficientResources(() -> true); // notify about sufficient resources - wfr.notifyNewResourcesAvailable(); + wfr.onNewResourcesAvailable(); ctx.setExpectCreatingExecutionGraph(); @@ -201,17 +201,17 @@ public class WaitingForResourcesTest extends TestLogger { // notify about resources, trigger stabilization timeout ctx.setHasSufficientResources(() -> true); ctx.advanceTimeByMillis(40); // advance time, but don't trigger stabilizationTimeout - wfr.notifyNewResourcesAvailable(); + wfr.onNewResourcesAvailable(); // notify again, but insufficient (reset stabilization timeout) ctx.setHasSufficientResources(() -> false); ctx.advanceTimeByMillis(40); - wfr.notifyNewResourcesAvailable(); + wfr.onNewResourcesAvailable(); // notify again, but sufficient, trigger timeout ctx.setHasSufficientResources(() -> true); ctx.advanceTimeByMillis(40); - wfr.notifyNewResourcesAvailable(); + wfr.onNewResourcesAvailable(); // sanity check: no state transition has been triggered so far assertThat(ctx.hasStateTransition(), is(false));