This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new d4c1a0a1ba4 [FLINK-34643] Use AdaptiveScheduler in JobIDLoggingITCase d4c1a0a1ba4 is described below commit d4c1a0a1ba4a1f1919f5ecccd3baa3d2cd44cef6 Author: Roman Khachatryan <khachatryan.ro...@gmail.com> AuthorDate: Mon Mar 25 11:12:43 2024 +0100 [FLINK-34643] Use AdaptiveScheduler in JobIDLoggingITCase --- .../apache/flink/test/misc/JobIDLoggingITCase.java | 35 +++++++++++++++++----- 1 file changed, 27 insertions(+), 8 deletions(-) diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/JobIDLoggingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/JobIDLoggingITCase.java index 162d57db6cd..fb4e27604fe 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/misc/JobIDLoggingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/misc/JobIDLoggingITCase.java @@ -22,11 +22,14 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.common.time.Deadline; import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.core.execution.CheckpointType; import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; import org.apache.flink.runtime.checkpoint.CheckpointException; import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.apache.flink.runtime.jobmaster.JobMaster; +import org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler; import org.apache.flink.runtime.taskexecutor.TaskExecutor; import org.apache.flink.runtime.taskmanager.Task; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; @@ -55,6 +58,7 @@ import java.util.regex.Pattern; import static java.util.Arrays.asList; import static java.util.stream.Collectors.toList; +import static org.apache.flink.configuration.JobManagerOptions.SCHEDULER; import static org.apache.flink.util.MdcUtils.JOB_ID; import static org.assertj.core.api.Assertions.assertThat; import static org.slf4j.event.Level.DEBUG; @@ -86,6 +90,10 @@ class JobIDLoggingITCase { public final LoggerAuditingExtension jobMasterLogging = new LoggerAuditingExtension(JobMaster.class, DEBUG); + @RegisterExtension + public final LoggerAuditingExtension adaptiveSchedulerLogging = + new LoggerAuditingExtension(AdaptiveScheduler.class, DEBUG); + @RegisterExtension public final LoggerAuditingExtension asyncCheckpointRunnableLogging = // this class is private @@ -96,10 +104,17 @@ class JobIDLoggingITCase { public static MiniClusterExtension miniClusterResource = new MiniClusterExtension( new MiniClusterResourceConfiguration.Builder() + .setConfiguration(getConfiguration()) .setNumberTaskManagers(1) .setNumberSlotsPerTaskManager(1) .build()); + private static Configuration getConfiguration() { + Configuration configuration = new Configuration(); + configuration.set(SCHEDULER, JobManagerOptions.SchedulerType.Adaptive); + return configuration; + } + @Test void testJobIDLogging(@InjectClusterClient ClusterClient<?> clusterClient) throws Exception { JobID jobID = runJob(clusterClient); @@ -170,18 +185,22 @@ class JobIDLoggingITCase { assertJobIDPresent( jobID, - jobMasterLogging, + adaptiveSchedulerLogging, asList( "Checkpoint storage is set to .*", - "Initializing job .*", "Running initialization on master for job .*", - "Starting execution of job .*", - "Starting scheduling.*", - "State backend is set to .*", "Successfully created execution graph from job graph .*", - "Successfully ran initialization on master.*", - "Triggering a manual checkpoint for job .*.", - "Using failover strategy .*", + "Successfully ran initialization on master.*"), + "Registration at ResourceManager.*", + "Registration with ResourceManager.*", + "Resolved ResourceManager address.*"); + + assertJobIDPresent( + jobID, + jobMasterLogging, + asList( + "Initializing job .*", + "Starting execution of job .*", "Using restart back off time strategy .*"), "Registration at ResourceManager.*", "Registration with ResourceManager.*",