This is an automated email from the ASF dual-hosted git repository.

panyuepeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new df4f8c48d60 [FLINK-38534][runtime/tests] Fix flaky LocalRecoveryTest 
by waiting for tasks to reach RUNNING state (#27767)
df4f8c48d60 is described below

commit df4f8c48d6042f4e1a2ccf5282ba0ad8cf541fbd
Author: mukul-8 <[email protected]>
AuthorDate: Wed Mar 25 11:44:20 2026 +0530

    [FLINK-38534][runtime/tests] Fix flaky LocalRecoveryTest by waiting for 
tasks to reach RUNNING state (#27767)
---
 .../runtime/scheduler/SchedulerTestingUtils.java   | 22 ++++++++++++++++++++++
 .../scheduler/adaptive/LocalRecoveryTest.java      | 13 +++++++++++++
 2 files changed, 35 insertions(+)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
index 0b53800e159..49a81d6fd2d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
@@ -340,6 +340,28 @@ public class SchedulerTestingUtils {
                 RETRY_ATTEMPTS);
     }
 
+    /**
+     * Waits until all task executions in the given ExecutionGraph reach 
RUNNING state.
+     *
+     * @param executionGraph the ExecutionGraph to check
+     * @throws Exception if the condition is not met within the timeout period
+     */
+    public static void waitForAllTasksRunning(final ExecutionGraph 
executionGraph)
+            throws Exception {
+        waitUntilCondition(
+                () -> {
+                    for (ExecutionVertex vertex : 
executionGraph.getAllExecutionVertices()) {
+                        if (vertex.getCurrentExecutionAttempt().getState()
+                                != ExecutionState.RUNNING) {
+                            return false;
+                        }
+                    }
+                    return true;
+                },
+                RETRY_INTERVAL_MILLIS,
+                RETRY_ATTEMPTS);
+    }
+
     private static ExecutionJobVertex getJobVertex(
             DefaultScheduler scheduler, JobVertexID jobVertexId) {
         final ExecutionVertexID id = new ExecutionVertexID(jobVertexId, 0);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/LocalRecoveryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/LocalRecoveryTest.java
index 8a39234c357..bdaf69c3f31 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/LocalRecoveryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/LocalRecoveryTest.java
@@ -56,6 +56,7 @@ import java.util.stream.Collectors;
 import static 
org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.generateKeyGroupState;
 import static 
org.apache.flink.runtime.scheduler.SchedulerTestingUtils.acknowledgePendingCheckpoint;
 import static 
org.apache.flink.runtime.scheduler.SchedulerTestingUtils.setAllExecutionsToRunning;
+import static 
org.apache.flink.runtime.scheduler.SchedulerTestingUtils.waitForAllTasksRunning;
 import static 
org.apache.flink.runtime.scheduler.SchedulerTestingUtils.waitForCheckpointInProgress;
 import static 
org.apache.flink.runtime.scheduler.SchedulerTestingUtils.waitForCompletedCheckpoint;
 import static 
org.apache.flink.runtime.scheduler.SchedulerTestingUtils.waitForJobStatusRunning;
@@ -103,6 +104,18 @@ public class LocalRecoveryTest extends 
AdaptiveSchedulerTestBase {
         // Transition job and all subtasks to RUNNING state.
         waitForJobStatusRunning(scheduler);
         runInMainThread(() -> setAllExecutionsToRunning(scheduler));
+        // Wait for all task executions to actually reach RUNNING state before 
triggering
+        // checkpoint.
+        // In slower CI environments, state transitions may not complete 
immediately, causing
+        // checkpoint triggers to be rejected if tasks are still in 
DEPLOYING/INITIALIZING state.
+        final ExecutionGraph executionGraph =
+                scheduler
+                        .getState()
+                        .as(StateWithExecutionGraph.class)
+                        .map(StateWithExecutionGraph::getExecutionGraph)
+                        .orElseThrow(
+                                () -> new 
IllegalStateException("ExecutionGraph not available"));
+        waitForAllTasksRunning(executionGraph);
 
         // Trigger a checkpoint
         CompletableFuture<CompletedCheckpoint> completedCheckpointFuture =

Reply via email to