This is an automated email from the ASF dual-hosted git repository.
panyuepeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new df4f8c48d60 [FLINK-38534][runtime/tests] Fix flaky LocalRecoveryTest
by waiting for tasks to reach RUNNING state (#27767)
df4f8c48d60 is described below
commit df4f8c48d6042f4e1a2ccf5282ba0ad8cf541fbd
Author: mukul-8 <[email protected]>
AuthorDate: Wed Mar 25 11:44:20 2026 +0530
[FLINK-38534][runtime/tests] Fix flaky LocalRecoveryTest by waiting for
tasks to reach RUNNING state (#27767)
---
.../runtime/scheduler/SchedulerTestingUtils.java | 22 ++++++++++++++++++++++
.../scheduler/adaptive/LocalRecoveryTest.java | 13 +++++++++++++
2 files changed, 35 insertions(+)
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
index 0b53800e159..49a81d6fd2d 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
@@ -340,6 +340,28 @@ public class SchedulerTestingUtils {
RETRY_ATTEMPTS);
}
+ /**
+ * Waits until all task executions in the given ExecutionGraph reach
RUNNING state.
+ *
+ * @param executionGraph the ExecutionGraph to check
+ * @throws Exception if the condition is not met within the timeout period
+ */
+ public static void waitForAllTasksRunning(final ExecutionGraph
executionGraph)
+ throws Exception {
+ waitUntilCondition(
+ () -> {
+ for (ExecutionVertex vertex :
executionGraph.getAllExecutionVertices()) {
+ if (vertex.getCurrentExecutionAttempt().getState()
+ != ExecutionState.RUNNING) {
+ return false;
+ }
+ }
+ return true;
+ },
+ RETRY_INTERVAL_MILLIS,
+ RETRY_ATTEMPTS);
+ }
+
private static ExecutionJobVertex getJobVertex(
DefaultScheduler scheduler, JobVertexID jobVertexId) {
final ExecutionVertexID id = new ExecutionVertexID(jobVertexId, 0);
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/LocalRecoveryTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/LocalRecoveryTest.java
index 8a39234c357..bdaf69c3f31 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/LocalRecoveryTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/LocalRecoveryTest.java
@@ -56,6 +56,7 @@ import java.util.stream.Collectors;
import static
org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.generateKeyGroupState;
import static
org.apache.flink.runtime.scheduler.SchedulerTestingUtils.acknowledgePendingCheckpoint;
import static
org.apache.flink.runtime.scheduler.SchedulerTestingUtils.setAllExecutionsToRunning;
+import static
org.apache.flink.runtime.scheduler.SchedulerTestingUtils.waitForAllTasksRunning;
import static
org.apache.flink.runtime.scheduler.SchedulerTestingUtils.waitForCheckpointInProgress;
import static
org.apache.flink.runtime.scheduler.SchedulerTestingUtils.waitForCompletedCheckpoint;
import static
org.apache.flink.runtime.scheduler.SchedulerTestingUtils.waitForJobStatusRunning;
@@ -103,6 +104,18 @@ public class LocalRecoveryTest extends
AdaptiveSchedulerTestBase {
// Transition job and all subtasks to RUNNING state.
waitForJobStatusRunning(scheduler);
runInMainThread(() -> setAllExecutionsToRunning(scheduler));
+ // Wait for all task executions to actually reach RUNNING state before
triggering
+ // checkpoint.
+ // In slower CI environments, state transitions may not complete
immediately, causing
+ // checkpoint triggers to be rejected if tasks are still in
DEPLOYING/INITIALIZING state.
+ final ExecutionGraph executionGraph =
+ scheduler
+ .getState()
+ .as(StateWithExecutionGraph.class)
+ .map(StateWithExecutionGraph::getExecutionGraph)
+ .orElseThrow(
+ () -> new
IllegalStateException("ExecutionGraph not available"));
+ waitForAllTasksRunning(executionGraph);
// Trigger a checkpoint
CompletableFuture<CompletedCheckpoint> completedCheckpointFuture =