This is an automated email from the ASF dual-hosted git repository.
panyuepeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new f9b98758fbd [FLINK-38748][runtime] Enhance the switching logic of
AdaptiveScheduler from Restarting to CreatingExecutionGraph and accelerate the
transition. (#27286)
f9b98758fbd is described below
commit f9b98758fbddd1911a3236c95cf8553eb5f2f03a
Author: Yuepeng Pan <[email protected]>
AuthorDate: Tue Jan 6 13:17:46 2026 +0800
[FLINK-38748][runtime] Enhance the switching logic of AdaptiveScheduler
from Restarting to CreatingExecutionGraph and accelerate the transition.
(#27286)
---
.../apache/flink/runtime/scheduler/adaptive/Executing.java | 5 ++---
.../flink/runtime/scheduler/adaptive/Restarting.java | 10 +++++++++-
.../runtime/scheduler/adaptive/MockRestartingContext.java | 11 +++++++++++
.../flink/runtime/scheduler/adaptive/RestartingTest.java | 14 +++++++++++---
4 files changed, 33 insertions(+), 7 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
index f699d13373e..13848e7aba3 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
@@ -320,9 +320,8 @@ class Executing extends StateWithExecutionGraph
ScheduledFuture<?> runIfState(State expectedState, Runnable action,
Duration delay);
/**
- * Checks whether we have the desired resources.
- *
- * @return {@code true} if we have enough resources; otherwise {@code
false}
+ * Returns {@code true} if the available resources meet the desired
resources for the job;
+ * otherwise {@code false}.
*/
boolean hasDesiredResources();
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java
index 7ce04dbcc37..93ef12ac88a 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java
@@ -114,7 +114,8 @@ class Restarting extends StateWithExecutionGraph {
}
private void goToSubsequentState() {
- if (availableParallelismNotChanged(restartWithParallelism)) {
+ if (availableParallelismNotChanged(restartWithParallelism)
+ || context.hasDesiredResources()) {
context.goToCreatingExecutionGraph(getExecutionGraph());
} else {
context.goToWaitingForResources(getExecutionGraph());
@@ -163,6 +164,13 @@ class Restarting extends StateWithExecutionGraph {
* slots.
*/
Optional<VertexParallelism> getAvailableVertexParallelism();
+
+ /**
+ * Checks whether we have the desired resources.
+ *
+ * @return {@code true} if we have enough resources; otherwise {@code
false}
+ */
+ boolean hasDesiredResources();
}
static class Factory implements StateFactory<Restarting> {
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/MockRestartingContext.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/MockRestartingContext.java
index c4e2023b4c3..30cc9433600 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/MockRestartingContext.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/MockRestartingContext.java
@@ -51,6 +51,8 @@ class MockRestartingContext extends
MockStateWithExecutionGraphContext
@Nullable private VertexParallelism availableVertexParallelism;
+ private boolean hasDesiredResources = false;
+
public void
setExpectCancelling(Consumer<ExecutingTest.CancellingArguments> asserter) {
cancellingStateValidator.expectInput(asserter);
}
@@ -68,6 +70,15 @@ class MockRestartingContext extends
MockStateWithExecutionGraphContext
this.availableVertexParallelism = availableVertexParallelism;
}
+ public void setHasDesiredResources(boolean hasDesiredResources) {
+ this.hasDesiredResources = hasDesiredResources;
+ }
+
+ @Override
+ public boolean hasDesiredResources() {
+ return hasDesiredResources;
+ }
+
@Override
public void goToCanceling(
ExecutionGraph executionGraph,
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/RestartingTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/RestartingTest.java
index b987d43e154..c8b50df7db2 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/RestartingTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/RestartingTest.java
@@ -30,6 +30,7 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -76,8 +77,10 @@ class RestartingTest {
}
}
- @Test
- public void testTransitionToSubsequentStateWhenResourceChanged() throws
Exception {
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testTransitionToSubsequentStateWhenResourceChanged(boolean
hasDesiredResources)
+ throws Exception {
try (MockRestartingContext ctx = new MockRestartingContext()) {
JobVertexID jobVertexId = new JobVertexID();
VertexParallelism availableParallelism =
@@ -86,8 +89,13 @@ class RestartingTest {
new VertexParallelism(singletonMap(jobVertexId, 2));
ctx.setAvailableVertexParallelism(availableParallelism);
+ ctx.setHasDesiredResources(hasDesiredResources);
Restarting restarting = createRestartingState(ctx,
requiredParallelismForForcedRestart);
- ctx.setExpectWaitingForResources();
+ if (hasDesiredResources) {
+ ctx.setExpectCreatingExecutionGraph();
+ } else {
+ ctx.setExpectWaitingForResources();
+ }
restarting.onGloballyTerminalState(JobStatus.CANCELED);
}
}