This is an automated email from the ASF dual-hosted git repository.

panyuepeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new f9b98758fbd [FLINK-38748][runtime] Enhance the switching logic of 
AdaptiveScheduler from Restarting to CreatingExecutionGraph and accelerate the 
transition. (#27286)
f9b98758fbd is described below

commit f9b98758fbddd1911a3236c95cf8553eb5f2f03a
Author: Yuepeng Pan <[email protected]>
AuthorDate: Tue Jan 6 13:17:46 2026 +0800

    [FLINK-38748][runtime] Enhance the switching logic of AdaptiveScheduler 
from Restarting to CreatingExecutionGraph and accelerate the transition. 
(#27286)
---
 .../apache/flink/runtime/scheduler/adaptive/Executing.java |  5 ++---
 .../flink/runtime/scheduler/adaptive/Restarting.java       | 10 +++++++++-
 .../runtime/scheduler/adaptive/MockRestartingContext.java  | 11 +++++++++++
 .../flink/runtime/scheduler/adaptive/RestartingTest.java   | 14 +++++++++++---
 4 files changed, 33 insertions(+), 7 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
index f699d13373e..13848e7aba3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
@@ -320,9 +320,8 @@ class Executing extends StateWithExecutionGraph
         ScheduledFuture<?> runIfState(State expectedState, Runnable action, 
Duration delay);
 
         /**
-         * Checks whether we have the desired resources.
-         *
-         * @return {@code true} if we have enough resources; otherwise {@code 
false}
+         * Returns {@code true} if the available resources meet the desired 
resources for the job;
+         * otherwise {@code false}.
          */
         boolean hasDesiredResources();
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java
index 7ce04dbcc37..93ef12ac88a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java
@@ -114,7 +114,8 @@ class Restarting extends StateWithExecutionGraph {
     }
 
     private void goToSubsequentState() {
-        if (availableParallelismNotChanged(restartWithParallelism)) {
+        if (availableParallelismNotChanged(restartWithParallelism)
+                || context.hasDesiredResources()) {
             context.goToCreatingExecutionGraph(getExecutionGraph());
         } else {
             context.goToWaitingForResources(getExecutionGraph());
@@ -163,6 +164,13 @@ class Restarting extends StateWithExecutionGraph {
          * slots.
          */
         Optional<VertexParallelism> getAvailableVertexParallelism();
+
+        /**
+         * Checks whether we have the desired resources.
+         *
+         * @return {@code true} if we have enough resources; otherwise {@code 
false}
+         */
+        boolean hasDesiredResources();
     }
 
     static class Factory implements StateFactory<Restarting> {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/MockRestartingContext.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/MockRestartingContext.java
index c4e2023b4c3..30cc9433600 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/MockRestartingContext.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/MockRestartingContext.java
@@ -51,6 +51,8 @@ class MockRestartingContext extends 
MockStateWithExecutionGraphContext
 
     @Nullable private VertexParallelism availableVertexParallelism;
 
+    private boolean hasDesiredResources = false;
+
     public void 
setExpectCancelling(Consumer<ExecutingTest.CancellingArguments> asserter) {
         cancellingStateValidator.expectInput(asserter);
     }
@@ -68,6 +70,15 @@ class MockRestartingContext extends 
MockStateWithExecutionGraphContext
         this.availableVertexParallelism = availableVertexParallelism;
     }
 
+    public void setHasDesiredResources(boolean hasDesiredResources) {
+        this.hasDesiredResources = hasDesiredResources;
+    }
+
+    @Override
+    public boolean hasDesiredResources() {
+        return hasDesiredResources;
+    }
+
     @Override
     public void goToCanceling(
             ExecutionGraph executionGraph,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/RestartingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/RestartingTest.java
index b987d43e154..c8b50df7db2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/RestartingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/RestartingTest.java
@@ -30,6 +30,7 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.ValueSource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -76,8 +77,10 @@ class RestartingTest {
         }
     }
 
-    @Test
-    public void testTransitionToSubsequentStateWhenResourceChanged() throws 
Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testTransitionToSubsequentStateWhenResourceChanged(boolean 
hasDesiredResources)
+            throws Exception {
         try (MockRestartingContext ctx = new MockRestartingContext()) {
             JobVertexID jobVertexId = new JobVertexID();
             VertexParallelism availableParallelism =
@@ -86,8 +89,13 @@ class RestartingTest {
                     new VertexParallelism(singletonMap(jobVertexId, 2));
 
             ctx.setAvailableVertexParallelism(availableParallelism);
+            ctx.setHasDesiredResources(hasDesiredResources);
             Restarting restarting = createRestartingState(ctx, 
requiredParallelismForForcedRestart);
-            ctx.setExpectWaitingForResources();
+            if (hasDesiredResources) {
+                ctx.setExpectCreatingExecutionGraph();
+            } else {
+                ctx.setExpectWaitingForResources();
+            }
             restarting.onGloballyTerminalState(JobStatus.CANCELED);
         }
     }

Reply via email to