This is an automated email from the ASF dual-hosted git repository.

ruanhang1993 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new b7acd9fe93f [FLINK-38404][test] Fix JobMasterTest by making it 
scheduler-type aware (#27091)
b7acd9fe93f is described below

commit b7acd9fe93fb20160327698a4c8c0fa2fd3484a3
Author: Mingliang Liu <[email protected]>
AuthorDate: Mon Nov 3 06:03:32 2025 -0800

    [FLINK-38404][test] Fix JobMasterTest by making it scheduler-type aware 
(#27091)
---
 .../flink/runtime/jobmaster/JobMasterTest.java     | 112 +++++++++++----------
 1 file changed, 60 insertions(+), 52 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 64e2bc6c696..ebacc4847d9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -184,6 +184,7 @@ import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static org.apache.flink.configuration.JobManagerOptions.SchedulerType;
 import static 
org.apache.flink.configuration.RestartStrategyOptions.RestartStrategyType.FIXED_DELAY;
 import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;
 import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
@@ -1691,40 +1692,17 @@ class JobMasterTest {
     @Test
     void testJobFailureWhenGracefulTaskExecutorTermination() throws Exception {
         final List<Event> jobEvents = new ArrayList<>();
-        runJobFailureWhenTaskExecutorTerminatesTest(
-                heartbeatServices,
-                (localTaskManagerLocation, jobMasterGateway) ->
-                        jobMasterGateway.disconnectTaskManager(
-                                localTaskManagerLocation.getResourceID(),
-                                new FlinkException("Test disconnectTaskManager 
exception.")),
-                jobEvents);
-        assertThat(
-                        jobEvents.stream()
-                                .filter(
-                                        event ->
-                                                
Events.JobStatusChangeEvent.name()
-                                                        
.equals(event.getName()))
-                                .map(Event::getAttributes)
-                                .map(x -> x.get("newJobStatus")))
-                .containsExactly(
-                        JobStatus.RUNNING.toString(),
-                        JobStatus.FAILING.toString(),
-                        JobStatus.FAILED.toString());
+        final SchedulerType schedulerType =
+                runJobFailureWhenTaskExecutorTerminatesTest(
+                        heartbeatServices,
+                        (localTaskManagerLocation, jobMasterGateway) ->
+                                jobMasterGateway.disconnectTaskManager(
+                                        
localTaskManagerLocation.getResourceID(),
+                                        new FlinkException(
+                                                "Test disconnectTaskManager 
exception.")),
+                        jobEvents);
 
-        assertThat(
-                        jobEvents.stream()
-                                .filter(
-                                        event ->
-                                                
Events.AllSubtasksStatusChangeEvent.name()
-                                                        
.equals(event.getName()))
-                                .map(Event::getAttributes)
-                                .map(
-                                        x ->
-                                                x.get(
-                                                        
AllSubTasksRunningOrFinishedStateTimeMetrics
-                                                                
.STATUS_ATTRIBUTE)))
-                .containsExactly(
-                        ALL_RUNNING_OR_FINISHED.toString(), 
NOT_ALL_RUNNING_OR_FINISHED.toString());
+        assertJobStatusTransitions(schedulerType, jobEvents);
     }
 
     @Test
@@ -1733,24 +1711,47 @@ class JobMasterTest {
         final TestingHeartbeatServices testingHeartbeatService =
                 new TestingHeartbeatServices(heartbeatInterval, 
heartbeatTimeout);
 
-        runJobFailureWhenTaskExecutorTerminatesTest(
-                testingHeartbeatService,
-                (localTaskManagerLocation, jobMasterGateway) ->
-                        testingHeartbeatService.triggerHeartbeatTimeout(
-                                jmResourceId, 
localTaskManagerLocation.getResourceID()),
-                jobEvents);
-        assertThat(
-                        jobEvents.stream()
-                                .filter(
-                                        event ->
-                                                
Events.JobStatusChangeEvent.name()
-                                                        
.equals(event.getName()))
-                                .map(Event::getAttributes)
-                                .map(x -> x.get("newJobStatus")))
-                .containsExactly(
-                        JobStatus.RUNNING.toString(),
-                        JobStatus.FAILING.toString(),
-                        JobStatus.FAILED.toString());
+        final SchedulerType schedulerType =
+                runJobFailureWhenTaskExecutorTerminatesTest(
+                        testingHeartbeatService,
+                        (localTaskManagerLocation, jobMasterGateway) ->
+                                
testingHeartbeatService.triggerHeartbeatTimeout(
+                                        jmResourceId, 
localTaskManagerLocation.getResourceID()),
+                        jobEvents);
+
+        assertJobStatusTransitions(schedulerType, jobEvents);
+    }
+
+    /**
+     * Asserts that job status transitions are as expected based on the 
scheduler type.
+     * DefaultScheduler does not emit CREATED state, while AdaptiveScheduler 
and
+     * AdaptiveBatchScheduler do.
+     */
+    private static void assertJobStatusTransitions(
+            SchedulerType schedulerType, List<Event> jobEvents) {
+        final List<String> jobStatusTransitions =
+                jobEvents.stream()
+                        .filter(event -> 
Events.JobStatusChangeEvent.name().equals(event.getName()))
+                        .map(Event::getAttributes)
+                        .map(x -> (String) x.get("newJobStatus"))
+                        .collect(Collectors.toList());
+
+        if (schedulerType == SchedulerType.Adaptive) {
+            // Adaptive schedulers emit CREATED: CREATED → RUNNING → FAILING → 
FAILED
+            assertThat(jobStatusTransitions)
+                    .containsExactly(
+                            JobStatus.CREATED.toString(),
+                            JobStatus.RUNNING.toString(),
+                            JobStatus.FAILING.toString(),
+                            JobStatus.FAILED.toString());
+        } else {
+            // Default scheduler does not emit CREATED: RUNNING → FAILING → 
FAILED
+            assertThat(jobStatusTransitions)
+                    .containsExactly(
+                            JobStatus.RUNNING.toString(),
+                            JobStatus.FAILING.toString(),
+                            JobStatus.FAILED.toString());
+        }
 
         assertThat(
                         jobEvents.stream()
@@ -2555,7 +2556,7 @@ class JobMasterTest {
         return resourceManagerGateway;
     }
 
-    private void runJobFailureWhenTaskExecutorTerminatesTest(
+    private SchedulerType runJobFailureWhenTaskExecutorTerminatesTest(
             HeartbeatServices heartbeatServices,
             BiConsumer<LocalUnresolvedTaskManagerLocation, JobMasterGateway> 
jobReachedRunningState,
             List<Event> jobEventsOut)
@@ -2564,12 +2565,17 @@ class JobMasterTest {
         final JobMasterBuilder.TestingOnCompletionActions onCompletionActions =
                 new JobMasterBuilder.TestingOnCompletionActions();
 
+        final SlotPoolServiceSchedulerFactory slotPoolServiceSchedulerFactory =
+                DefaultSlotPoolServiceSchedulerFactory.fromConfiguration(
+                        configuration, jobGraph.getJobType(), 
jobGraph.isDynamic());
+
         try (final JobMaster jobMaster =
                 new JobMasterBuilder(jobGraph, rpcService)
                         .withResourceId(jmResourceId)
                         .withHighAvailabilityServices(haServices)
                         .withHeartbeatServices(heartbeatServices)
                         .withOnCompletionActions(onCompletionActions)
+                        
.withSlotPoolServiceSchedulerFactory(slotPoolServiceSchedulerFactory)
                         .withMetricsGroupFactory(
                                 new JobManagerJobMetricGroupFactory() {
                                     @Override
@@ -2636,6 +2642,8 @@ class JobMasterTest {
 
             
assertThat(archivedExecutionGraph.getState()).isEqualTo(JobStatus.FAILED);
         }
+
+        return slotPoolServiceSchedulerFactory.getSchedulerType();
     }
 
     private Collection<SlotOffer> registerSlotsAtJobMaster(

Reply via email to