This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new d4c1a0a1ba4 [FLINK-34643] Use AdaptiveScheduler in JobIDLoggingITCase
d4c1a0a1ba4 is described below

commit d4c1a0a1ba4a1f1919f5ecccd3baa3d2cd44cef6
Author: Roman Khachatryan <khachatryan.ro...@gmail.com>
AuthorDate: Mon Mar 25 11:12:43 2024 +0100

    [FLINK-34643] Use AdaptiveScheduler in JobIDLoggingITCase
---
 .../apache/flink/test/misc/JobIDLoggingITCase.java | 35 +++++++++++++++++-----
 1 file changed, 27 insertions(+), 8 deletions(-)

diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/misc/JobIDLoggingITCase.java 
b/flink-tests/src/test/java/org/apache/flink/test/misc/JobIDLoggingITCase.java
index 162d57db6cd..fb4e27604fe 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/misc/JobIDLoggingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/misc/JobIDLoggingITCase.java
@@ -22,11 +22,14 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.core.execution.CheckpointType;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
 import org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler;
 import org.apache.flink.runtime.taskexecutor.TaskExecutor;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
@@ -55,6 +58,7 @@ import java.util.regex.Pattern;
 
 import static java.util.Arrays.asList;
 import static java.util.stream.Collectors.toList;
+import static org.apache.flink.configuration.JobManagerOptions.SCHEDULER;
 import static org.apache.flink.util.MdcUtils.JOB_ID;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.slf4j.event.Level.DEBUG;
@@ -86,6 +90,10 @@ class JobIDLoggingITCase {
     public final LoggerAuditingExtension jobMasterLogging =
             new LoggerAuditingExtension(JobMaster.class, DEBUG);
 
+    @RegisterExtension
+    public final LoggerAuditingExtension adaptiveSchedulerLogging =
+            new LoggerAuditingExtension(AdaptiveScheduler.class, DEBUG);
+
     @RegisterExtension
     public final LoggerAuditingExtension asyncCheckpointRunnableLogging =
             // this class is private
@@ -96,10 +104,17 @@ class JobIDLoggingITCase {
     public static MiniClusterExtension miniClusterResource =
             new MiniClusterExtension(
                     new MiniClusterResourceConfiguration.Builder()
+                            .setConfiguration(getConfiguration())
                             .setNumberTaskManagers(1)
                             .setNumberSlotsPerTaskManager(1)
                             .build());
 
+    private static Configuration getConfiguration() {
+        Configuration configuration = new Configuration();
+        configuration.set(SCHEDULER, JobManagerOptions.SchedulerType.Adaptive);
+        return configuration;
+    }
+
     @Test
     void testJobIDLogging(@InjectClusterClient ClusterClient<?> clusterClient) 
throws Exception {
         JobID jobID = runJob(clusterClient);
@@ -170,18 +185,22 @@ class JobIDLoggingITCase {
 
         assertJobIDPresent(
                 jobID,
-                jobMasterLogging,
+                adaptiveSchedulerLogging,
                 asList(
                         "Checkpoint storage is set to .*",
-                        "Initializing job .*",
                         "Running initialization on master for job .*",
-                        "Starting execution of job .*",
-                        "Starting scheduling.*",
-                        "State backend is set to .*",
                         "Successfully created execution graph from job graph 
.*",
-                        "Successfully ran initialization on master.*",
-                        "Triggering a manual checkpoint for job .*.",
-                        "Using failover strategy .*",
+                        "Successfully ran initialization on master.*"),
+                "Registration at ResourceManager.*",
+                "Registration with ResourceManager.*",
+                "Resolved ResourceManager address.*");
+
+        assertJobIDPresent(
+                jobID,
+                jobMasterLogging,
+                asList(
+                        "Initializing job .*",
+                        "Starting execution of job .*",
                         "Using restart back off time strategy .*"),
                 "Registration at ResourceManager.*",
                 "Registration with ResourceManager.*",

Reply via email to