This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b5ae445724 [FLINK-34643][tests] Fix JobIDLoggingITCase
6b5ae445724 is described below

commit 6b5ae445724b68db05a3f9687cff6dd68e2129d7
Author: Roman Khachatryan <khachatryan.ro...@gmail.com>
AuthorDate: Mon Mar 11 16:22:42 2024 +0100

    [FLINK-34643][tests] Fix JobIDLoggingITCase
---
 .../apache/flink/test/misc/JobIDLoggingITCase.java | 134 +++++++++++++++------
 1 file changed, 98 insertions(+), 36 deletions(-)

diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/misc/JobIDLoggingITCase.java 
b/flink-tests/src/test/java/org/apache/flink/test/misc/JobIDLoggingITCase.java
index 3380698feb7..e13bfce16e3 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/misc/JobIDLoggingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/misc/JobIDLoggingITCase.java
@@ -37,9 +37,9 @@ import org.apache.flink.test.junit5.InjectClusterClient;
 import org.apache.flink.test.junit5.MiniClusterExtension;
 import org.apache.flink.testutils.logging.LoggerAuditingExtension;
 import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.MdcUtils;
 
 import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.util.ReadOnlyStringMap;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 import org.slf4j.Logger;
@@ -52,17 +52,14 @@ import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.ExecutionException;
 import java.util.regex.Pattern;
-import java.util.stream.Collectors;
 
-import static org.apache.flink.util.Preconditions.checkState;
+import static java.util.Arrays.asList;
+import static java.util.stream.Collectors.toList;
+import static org.apache.flink.util.MdcUtils.JOB_ID;
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.slf4j.event.Level.DEBUG;
 
-/**
- * Tests adding of {@link JobID} to logs (via {@link org.slf4j.MDC}) in the 
most important cases.
- */
-public class JobIDLoggingITCase {
+class JobIDLoggingITCase {
     private static final Logger logger = 
LoggerFactory.getLogger(JobIDLoggingITCase.class);
 
     @RegisterExtension
@@ -104,8 +101,7 @@ public class JobIDLoggingITCase {
                             .build());
 
     @Test
-    public void testJobIDLogging(@InjectClusterClient ClusterClient<?> 
clusterClient)
-            throws Exception {
+    void testJobIDLogging(@InjectClusterClient ClusterClient<?> clusterClient) 
throws Exception {
         JobID jobID = runJob(clusterClient);
         clusterClient.cancel(jobID).get();
 
@@ -114,53 +110,113 @@ public class JobIDLoggingITCase {
         // - how many messages to expect
         // - which log patterns to ignore
 
-        assertJobIDPresent(jobID, 3, checkpointCoordinatorLogging);
-        assertJobIDPresent(jobID, 6, streamTaskLogging);
         assertJobIDPresent(
                 jobID,
-                9,
+                checkpointCoordinatorLogging,
+                asList(
+                        "No checkpoint found during restore.",
+                        "Resetting the master hooks.",
+                        "Triggering checkpoint .*",
+                        "Received acknowledge message for checkpoint .*",
+                        "Completed checkpoint .*",
+                        "Checkpoint state: .*"));
+
+        assertJobIDPresent(
+                jobID,
+                streamTaskLogging,
+                asList(
+                        "State backend is set to .*",
+                        "Initializing Source: .*",
+                        "Invoking Source: .*",
+                        "Starting checkpoint .*",
+                        "Notify checkpoint \\d+ complete .*"));
+
+        assertJobIDPresent(
+                jobID,
                 taskExecutorLogging,
+                asList(
+                        "Received task .*",
+                        "Trigger checkpoint .*",
+                        "Confirm completed checkpoint .*"),
                 "Un-registering task.*",
                 "Successful registration.*",
                 "Establish JobManager connection.*",
                 "Offer reserved slots.*",
                 ".*ResourceManager.*",
-                "Operator event.*");
+                "Operator event.*",
+                "Recovered slot allocation snapshots.*",
+                ".*heartbeat.*");
+
+        assertJobIDPresent(
+                jobID,
+                taskLogging,
+                asList(
+                        "Source: .* switched from CREATED to DEPLOYING.",
+                        "Source: .* switched from DEPLOYING to INITIALIZING.",
+                        "Source: .* switched from INITIALIZING to RUNNING."));
+
+        assertJobIDPresent(
+                jobID,
+                executionGraphLogging,
+                asList(
+                        "Created execution graph .*",
+                        "Deploying Source.*",
+                        "Job .* switched from state CREATED to RUNNING.",
+                        "Source: .* switched from CREATED to SCHEDULED.",
+                        "Source: .* switched from SCHEDULED to DEPLOYING.",
+                        "Source: .* switched from DEPLOYING to INITIALIZING.",
+                        "Source: .* switched from INITIALIZING to RUNNING."));
 
-        assertJobIDPresent(jobID, 10, taskLogging);
-        assertJobIDPresent(jobID, 10, executionGraphLogging);
         assertJobIDPresent(
                 jobID,
-                15,
                 jobMasterLogging,
+                asList(
+                        "Checkpoint storage is set to .*",
+                        "Initializing job .*",
+                        "Running initialization on master for job .*",
+                        "Starting execution of job .*",
+                        "Starting scheduling.*",
+                        "State backend is set to .*",
+                        "Successfully created execution graph from job graph 
.*",
+                        "Successfully ran initialization on master.*",
+                        "Triggering a manual checkpoint for job .*.",
+                        "Using failover strategy .*",
+                        "Using restart back off time strategy .*"),
                 "Registration at ResourceManager.*",
                 "Registration with ResourceManager.*",
                 "Resolved ResourceManager address.*");
-        assertJobIDPresent(jobID, 1, asyncCheckpointRunnableLogging);
+
+        assertJobIDPresent(
+                jobID,
+                asyncCheckpointRunnableLogging,
+                asList(
+                        ".* started executing asynchronous part of checkpoint 
.*",
+                        ".* finished asynchronous part of checkpoint .*"));
     }
 
     private static void assertJobIDPresent(
             JobID jobID,
-            int expectedLogMessages,
             LoggerAuditingExtension ext,
+            List<String> expPatterns,
             String... ignPatterns) {
-        String loggerName = ext.getLoggerName();
-        checkState(
-                ext.getEvents().size() >= expectedLogMessages,
-                "Too few log events recorded for %s (%s) - this must be a bug 
in the test code",
-                loggerName,
-                ext.getEvents().size());
 
         final List<LogEvent> eventsWithMissingJobId = new ArrayList<>();
         final List<LogEvent> eventsWithWrongJobId = new ArrayList<>();
         final List<LogEvent> ignoredEvents = new ArrayList<>();
+        final List<Pattern> expectedPatterns =
+                expPatterns.stream().map(Pattern::compile).collect(toList());
         final List<Pattern> ignorePatterns =
-                
Arrays.stream(ignPatterns).map(Pattern::compile).collect(Collectors.toList());
+                
Arrays.stream(ignPatterns).map(Pattern::compile).collect(toList());
 
         for (LogEvent e : ext.getEvents()) {
-            if (e.getContextData().containsKey(MdcUtils.JOB_ID)) {
-                if (!Objects.equals(
-                        e.getContextData().getValue(MdcUtils.JOB_ID), 
jobID.toHexString())) {
+            ReadOnlyStringMap context = e.getContextData();
+            if (context.containsKey(JOB_ID)) {
+                if (Objects.equals(context.getValue(JOB_ID), 
jobID.toHexString())) {
+                    expectedPatterns.removeIf(
+                            pattern ->
+                                    
pattern.matcher(e.getMessage().getFormattedMessage())
+                                            .matches());
+                } else {
                     eventsWithWrongJobId.add(e);
                 }
             } else if (matchesAny(ignorePatterns, 
e.getMessage().getFormattedMessage())) {
@@ -169,20 +225,23 @@ public class JobIDLoggingITCase {
                 eventsWithMissingJobId.add(e);
             }
         }
+
         logger.debug(
                 "checked events for {}:\n  {};\n  ignored: {},\n  wrong job 
id: {},\n  missing job id: {}",
-                loggerName,
+                ext.getLoggerName(),
                 ext.getEvents(),
                 ignoredEvents,
                 eventsWithWrongJobId,
                 eventsWithMissingJobId);
         assertThat(eventsWithWrongJobId).as("events with a wrong Job 
ID").isEmpty();
-        assertTrue(
-                eventsWithMissingJobId.isEmpty(),
-                "too many events without Job ID recorded for "
-                        + loggerName
-                        + ": "
-                        + eventsWithMissingJobId);
+        assertThat(expectedPatterns)
+                .as(
+                        "not all expected events logged by %s, logged:\n%s",
+                        ext.getLoggerName(), ext.getEvents())
+                .isEmpty();
+        assertThat(eventsWithMissingJobId)
+                .as("too many events without Job ID logged by %s", 
ext.getLoggerName())
+                .isEmpty();
     }
 
     private static boolean matchesAny(List<Pattern> patternStream, String 
message) {
@@ -205,6 +264,9 @@ public class JobIDLoggingITCase {
         // wait for all tasks ready and then checkpoint
         while (true) {
             try {
+                clusterClient.triggerCheckpoint(jobId, 
CheckpointType.DEFAULT).get();
+                // to check the log message about checkpoint completion 
notification we need to
+                // either wait or trigger another checkpoint
                 clusterClient.triggerCheckpoint(jobId, 
CheckpointType.DEFAULT).get();
                 return jobId;
             } catch (ExecutionException e) {

Reply via email to