This is an automated email from the ASF dual-hosted git repository.
ruanhang1993 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new b7acd9fe93f [FLINK-38404][test] Fix JobMasterTest by making it
scheduler-type aware (#27091)
b7acd9fe93f is described below
commit b7acd9fe93fb20160327698a4c8c0fa2fd3484a3
Author: Mingliang Liu <[email protected]>
AuthorDate: Mon Nov 3 06:03:32 2025 -0800
[FLINK-38404][test] Fix JobMasterTest by making it scheduler-type aware
(#27091)
---
.../flink/runtime/jobmaster/JobMasterTest.java | 112 +++++++++++----------
1 file changed, 60 insertions(+), 52 deletions(-)
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 64e2bc6c696..ebacc4847d9 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -184,6 +184,7 @@ import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import static org.apache.flink.configuration.JobManagerOptions.SchedulerType;
import static
org.apache.flink.configuration.RestartStrategyOptions.RestartStrategyType.FIXED_DELAY;
import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;
import static
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
@@ -1691,40 +1692,17 @@ class JobMasterTest {
@Test
void testJobFailureWhenGracefulTaskExecutorTermination() throws Exception {
final List<Event> jobEvents = new ArrayList<>();
- runJobFailureWhenTaskExecutorTerminatesTest(
- heartbeatServices,
- (localTaskManagerLocation, jobMasterGateway) ->
- jobMasterGateway.disconnectTaskManager(
- localTaskManagerLocation.getResourceID(),
- new FlinkException("Test disconnectTaskManager
exception.")),
- jobEvents);
- assertThat(
- jobEvents.stream()
- .filter(
- event ->
-
Events.JobStatusChangeEvent.name()
-
.equals(event.getName()))
- .map(Event::getAttributes)
- .map(x -> x.get("newJobStatus")))
- .containsExactly(
- JobStatus.RUNNING.toString(),
- JobStatus.FAILING.toString(),
- JobStatus.FAILED.toString());
+ final SchedulerType schedulerType =
+ runJobFailureWhenTaskExecutorTerminatesTest(
+ heartbeatServices,
+ (localTaskManagerLocation, jobMasterGateway) ->
+ jobMasterGateway.disconnectTaskManager(
+
localTaskManagerLocation.getResourceID(),
+ new FlinkException(
+ "Test disconnectTaskManager
exception.")),
+ jobEvents);
- assertThat(
- jobEvents.stream()
- .filter(
- event ->
-
Events.AllSubtasksStatusChangeEvent.name()
-
.equals(event.getName()))
- .map(Event::getAttributes)
- .map(
- x ->
- x.get(
-
AllSubTasksRunningOrFinishedStateTimeMetrics
-
.STATUS_ATTRIBUTE)))
- .containsExactly(
- ALL_RUNNING_OR_FINISHED.toString(),
NOT_ALL_RUNNING_OR_FINISHED.toString());
+ assertJobStatusTransitions(schedulerType, jobEvents);
}
@Test
@@ -1733,24 +1711,47 @@ class JobMasterTest {
final TestingHeartbeatServices testingHeartbeatService =
new TestingHeartbeatServices(heartbeatInterval,
heartbeatTimeout);
- runJobFailureWhenTaskExecutorTerminatesTest(
- testingHeartbeatService,
- (localTaskManagerLocation, jobMasterGateway) ->
- testingHeartbeatService.triggerHeartbeatTimeout(
- jmResourceId,
localTaskManagerLocation.getResourceID()),
- jobEvents);
- assertThat(
- jobEvents.stream()
- .filter(
- event ->
-
Events.JobStatusChangeEvent.name()
-
.equals(event.getName()))
- .map(Event::getAttributes)
- .map(x -> x.get("newJobStatus")))
- .containsExactly(
- JobStatus.RUNNING.toString(),
- JobStatus.FAILING.toString(),
- JobStatus.FAILED.toString());
+ final SchedulerType schedulerType =
+ runJobFailureWhenTaskExecutorTerminatesTest(
+ testingHeartbeatService,
+ (localTaskManagerLocation, jobMasterGateway) ->
+
testingHeartbeatService.triggerHeartbeatTimeout(
+ jmResourceId,
localTaskManagerLocation.getResourceID()),
+ jobEvents);
+
+ assertJobStatusTransitions(schedulerType, jobEvents);
+ }
+
+ /**
+ * Asserts that job status transitions are as expected based on the
scheduler type.
+ * DefaultScheduler does not emit CREATED state, while AdaptiveScheduler
and
+ * AdaptiveBatchScheduler do.
+ */
+ private static void assertJobStatusTransitions(
+ SchedulerType schedulerType, List<Event> jobEvents) {
+ final List<String> jobStatusTransitions =
+ jobEvents.stream()
+ .filter(event ->
Events.JobStatusChangeEvent.name().equals(event.getName()))
+ .map(Event::getAttributes)
+ .map(x -> (String) x.get("newJobStatus"))
+ .collect(Collectors.toList());
+
+ if (schedulerType == SchedulerType.Adaptive) {
+ // Adaptive schedulers emit CREATED: CREATED → RUNNING → FAILING →
FAILED
+ assertThat(jobStatusTransitions)
+ .containsExactly(
+ JobStatus.CREATED.toString(),
+ JobStatus.RUNNING.toString(),
+ JobStatus.FAILING.toString(),
+ JobStatus.FAILED.toString());
+ } else {
+ // Default scheduler does not emit CREATED: RUNNING → FAILING →
FAILED
+ assertThat(jobStatusTransitions)
+ .containsExactly(
+ JobStatus.RUNNING.toString(),
+ JobStatus.FAILING.toString(),
+ JobStatus.FAILED.toString());
+ }
assertThat(
jobEvents.stream()
@@ -2555,7 +2556,7 @@ class JobMasterTest {
return resourceManagerGateway;
}
- private void runJobFailureWhenTaskExecutorTerminatesTest(
+ private SchedulerType runJobFailureWhenTaskExecutorTerminatesTest(
HeartbeatServices heartbeatServices,
BiConsumer<LocalUnresolvedTaskManagerLocation, JobMasterGateway>
jobReachedRunningState,
List<Event> jobEventsOut)
@@ -2564,12 +2565,17 @@ class JobMasterTest {
final JobMasterBuilder.TestingOnCompletionActions onCompletionActions =
new JobMasterBuilder.TestingOnCompletionActions();
+ final SlotPoolServiceSchedulerFactory slotPoolServiceSchedulerFactory =
+ DefaultSlotPoolServiceSchedulerFactory.fromConfiguration(
+ configuration, jobGraph.getJobType(),
jobGraph.isDynamic());
+
try (final JobMaster jobMaster =
new JobMasterBuilder(jobGraph, rpcService)
.withResourceId(jmResourceId)
.withHighAvailabilityServices(haServices)
.withHeartbeatServices(heartbeatServices)
.withOnCompletionActions(onCompletionActions)
+
.withSlotPoolServiceSchedulerFactory(slotPoolServiceSchedulerFactory)
.withMetricsGroupFactory(
new JobManagerJobMetricGroupFactory() {
@Override
@@ -2636,6 +2642,8 @@ class JobMasterTest {
assertThat(archivedExecutionGraph.getState()).isEqualTo(JobStatus.FAILED);
}
+
+ return slotPoolServiceSchedulerFactory.getSchedulerType();
}
private Collection<SlotOffer> registerSlotsAtJobMaster(