This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fc0ccf325527c3589d5cd5ae7397b22c22321cec
Author: David Moravek <d...@apache.org>
AuthorDate: Mon Mar 27 12:51:28 2023 +0200

    [FLINK-31399] Add state hooks for reacting to new requirements
---
 .../runtime/scheduler/adaptive/AdaptiveScheduler.java    |  4 ++--
 .../flink/runtime/scheduler/adaptive/Executing.java      | 15 ++++++++++++---
 .../{ResourceConsumer.java => ResourceListener.java}     | 12 +++++++++---
 .../runtime/scheduler/adaptive/WaitingForResources.java  | 11 ++++++++---
 .../flink/runtime/scheduler/adaptive/ExecutingTest.java  |  4 ++--
 .../scheduler/adaptive/WaitingForResourcesTest.java      | 16 ++++++++--------
 6 files changed, 41 insertions(+), 21 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
index e8493d731f5..c028fd05ecf 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
@@ -425,8 +425,8 @@ public class AdaptiveScheduler
 
     private void newResourcesAvailable(Collection<? extends PhysicalSlot> 
physicalSlots) {
         state.tryRun(
-                ResourceConsumer.class,
-                ResourceConsumer::notifyNewResourcesAvailable,
+                ResourceListener.class,
+                ResourceListener::onNewResourcesAvailable,
                 "newResourcesAvailable");
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
index ae33dc3b418..9cdb9afc7f2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
@@ -44,7 +44,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledFuture;
 
 /** State which represents a running job with an {@link ExecutionGraph} and 
assigned slots. */
-class Executing extends StateWithExecutionGraph implements ResourceConsumer {
+class Executing extends StateWithExecutionGraph implements ResourceListener {
 
     private final Context context;
 
@@ -71,7 +71,7 @@ class Executing extends StateWithExecutionGraph implements 
ResourceConsumer {
         deploy();
 
         // check if new resources have come available in the meantime
-        context.runIfState(this, this::notifyNewResourcesAvailable, 
Duration.ZERO);
+        context.runIfState(this, this::maybeRescale, Duration.ZERO);
     }
 
     @Override
@@ -123,7 +123,16 @@ class Executing extends StateWithExecutionGraph implements 
ResourceConsumer {
     }
 
     @Override
-    public void notifyNewResourcesAvailable() {
+    public void onNewResourcesAvailable() {
+        maybeRescale();
+    }
+
+    @Override
+    public void onNewResourceRequirements() {
+        maybeRescale();
+    }
+
+    private void maybeRescale() {
         if (context.shouldRescale(getExecutionGraph())) {
             getLogger().info("Can change the parallelism of job. Restarting 
job.");
             context.goToRestarting(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/ResourceConsumer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/ResourceListener.java
similarity index 74%
rename from 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/ResourceConsumer.java
rename to 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/ResourceListener.java
index 4ea4877d0a7..0e259f625b3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/ResourceConsumer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/ResourceListener.java
@@ -18,9 +18,15 @@
 
 package org.apache.flink.runtime.scheduler.adaptive;
 
-/** Interface which denotes that {@link State} can react to newly available 
resource (slots). */
-interface ResourceConsumer {
+/**
+ * Interface which denotes that {@link State} can react to newly available 
resource (slots) and
+ * changes in resource requirements.
+ */
+interface ResourceListener {
 
     /** Notifies that new resources are available. */
-    void notifyNewResourcesAvailable();
+    void onNewResourcesAvailable();
+
+    /** Notifies that the resource requirements have changed. */
+    void onNewResourceRequirements();
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResources.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResources.java
index f102bcfc4ba..62b20919ee1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResources.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResources.java
@@ -38,7 +38,7 @@ import java.util.concurrent.ScheduledFuture;
 /**
  * State which describes that the scheduler is waiting for resources in order 
to execute the job.
  */
-class WaitingForResources implements State, ResourceConsumer {
+class WaitingForResources implements State, ResourceListener {
 
     private final Context context;
 
@@ -103,7 +103,7 @@ class WaitingForResources implements State, 
ResourceConsumer {
                             this, this::resourceTimeout, 
initialResourceAllocationTimeout);
         }
         this.previousExecutionGraph = previousExecutionGraph;
-        context.runIfState(this, this::notifyNewResourcesAvailable, 
Duration.ZERO);
+        context.runIfState(this, 
this::checkDesiredOrSufficientResourcesAvailable, Duration.ZERO);
     }
 
     @Override
@@ -144,7 +144,12 @@ class WaitingForResources implements State, 
ResourceConsumer {
     }
 
     @Override
-    public void notifyNewResourcesAvailable() {
+    public void onNewResourcesAvailable() {
+        checkDesiredOrSufficientResourcesAvailable();
+    }
+
+    @Override
+    public void onNewResourceRequirements() {
         checkDesiredOrSufficientResourcesAvailable();
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java
index aacf9250554..fe1448b8e5e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java
@@ -260,7 +260,7 @@ public class ExecutingTest extends TestLogger {
                         assertThat(restartingArguments.getBackoffTime(), 
is(Duration.ZERO));
                     });
             ctx.setCanScaleUp(() -> true);
-            exec.notifyNewResourcesAvailable();
+            exec.onNewResourcesAvailable();
         }
     }
 
@@ -269,7 +269,7 @@ public class ExecutingTest extends TestLogger {
         try (MockExecutingContext ctx = new MockExecutingContext()) {
             Executing exec = new ExecutingStateBuilder().build(ctx);
             ctx.setCanScaleUp(() -> false);
-            exec.notifyNewResourcesAvailable();
+            exec.onNewResourcesAvailable();
             ctx.assertNoStateTransition();
         }
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.java
index 2c58f053b13..202d0fdbd5f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.java
@@ -84,7 +84,7 @@ public class WaitingForResourcesTest extends TestLogger {
                             ctx, log, RESOURCE_COUNTER, Duration.ZERO, 
STABILIZATION_TIMEOUT);
 
             // we expect no state transition.
-            wfr.notifyNewResourcesAvailable();
+            wfr.onNewResourcesAvailable();
         }
     }
 
@@ -97,7 +97,7 @@ public class WaitingForResourcesTest extends TestLogger {
                             ctx, log, RESOURCE_COUNTER, Duration.ZERO, 
STABILIZATION_TIMEOUT);
             ctx.setHasDesiredResources(() -> true); // make resources available
             ctx.setExpectCreatingExecutionGraph();
-            wfr.notifyNewResourcesAvailable(); // .. and notify
+            wfr.onNewResourcesAvailable(); // .. and notify
         }
     }
 
@@ -117,7 +117,7 @@ public class WaitingForResourcesTest extends TestLogger {
             ctx.setHasDesiredResources(() -> false);
             ctx.setHasSufficientResources(() -> true);
             ctx.setExpectCreatingExecutionGraph();
-            wfr.notifyNewResourcesAvailable();
+            wfr.onNewResourcesAvailable();
         }
     }
 
@@ -137,7 +137,7 @@ public class WaitingForResourcesTest extends TestLogger {
 
             ctx.setHasDesiredResources(() -> false);
             ctx.setHasSufficientResources(() -> true);
-            wfr.notifyNewResourcesAvailable();
+            wfr.onNewResourcesAvailable();
             // we are not triggering the scheduled tasks, to simulate a long 
stabilization timeout
 
             assertThat(ctx.hasStateTransition(), is(false));
@@ -165,7 +165,7 @@ public class WaitingForResourcesTest extends TestLogger {
             ctx.setHasSufficientResources(() -> true);
 
             // notify about sufficient resources
-            wfr.notifyNewResourcesAvailable();
+            wfr.onNewResourcesAvailable();
 
             ctx.setExpectCreatingExecutionGraph();
 
@@ -201,17 +201,17 @@ public class WaitingForResourcesTest extends TestLogger {
             // notify about resources, trigger stabilization timeout
             ctx.setHasSufficientResources(() -> true);
             ctx.advanceTimeByMillis(40); // advance time, but don't trigger 
stabilizationTimeout
-            wfr.notifyNewResourcesAvailable();
+            wfr.onNewResourcesAvailable();
 
             // notify again, but insufficient (reset stabilization timeout)
             ctx.setHasSufficientResources(() -> false);
             ctx.advanceTimeByMillis(40);
-            wfr.notifyNewResourcesAvailable();
+            wfr.onNewResourcesAvailable();
 
             // notify again, but sufficient, trigger timeout
             ctx.setHasSufficientResources(() -> true);
             ctx.advanceTimeByMillis(40);
-            wfr.notifyNewResourcesAvailable();
+            wfr.onNewResourcesAvailable();
 
             // sanity check: no state transition has been triggered so far
             assertThat(ctx.hasStateTransition(), is(false));

Reply via email to