This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 6b5ae445724 [FLINK-34643][tests] Fix JobIDLoggingITCase 6b5ae445724 is described below commit 6b5ae445724b68db05a3f9687cff6dd68e2129d7 Author: Roman Khachatryan <khachatryan.ro...@gmail.com> AuthorDate: Mon Mar 11 16:22:42 2024 +0100 [FLINK-34643][tests] Fix JobIDLoggingITCase --- .../apache/flink/test/misc/JobIDLoggingITCase.java | 134 +++++++++++++++------ 1 file changed, 98 insertions(+), 36 deletions(-) diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/JobIDLoggingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/JobIDLoggingITCase.java index 3380698feb7..e13bfce16e3 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/misc/JobIDLoggingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/misc/JobIDLoggingITCase.java @@ -37,9 +37,9 @@ import org.apache.flink.test.junit5.InjectClusterClient; import org.apache.flink.test.junit5.MiniClusterExtension; import org.apache.flink.testutils.logging.LoggerAuditingExtension; import org.apache.flink.util.ExceptionUtils; -import org.apache.flink.util.MdcUtils; import org.apache.logging.log4j.core.LogEvent; +import org.apache.logging.log4j.util.ReadOnlyStringMap; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import org.slf4j.Logger; @@ -52,17 +52,14 @@ import java.util.List; import java.util.Objects; import java.util.concurrent.ExecutionException; import java.util.regex.Pattern; -import java.util.stream.Collectors; -import static org.apache.flink.util.Preconditions.checkState; +import static java.util.Arrays.asList; +import static java.util.stream.Collectors.toList; +import static org.apache.flink.util.MdcUtils.JOB_ID; import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.jupiter.api.Assertions.assertTrue; import static org.slf4j.event.Level.DEBUG; -/** - * Tests adding of {@link JobID} to logs (via {@link org.slf4j.MDC}) in the most important cases. - */ -public class JobIDLoggingITCase { +class JobIDLoggingITCase { private static final Logger logger = LoggerFactory.getLogger(JobIDLoggingITCase.class); @RegisterExtension @@ -104,8 +101,7 @@ public class JobIDLoggingITCase { .build()); @Test - public void testJobIDLogging(@InjectClusterClient ClusterClient<?> clusterClient) - throws Exception { + void testJobIDLogging(@InjectClusterClient ClusterClient<?> clusterClient) throws Exception { JobID jobID = runJob(clusterClient); clusterClient.cancel(jobID).get(); @@ -114,53 +110,113 @@ public class JobIDLoggingITCase { // - how many messages to expect // - which log patterns to ignore - assertJobIDPresent(jobID, 3, checkpointCoordinatorLogging); - assertJobIDPresent(jobID, 6, streamTaskLogging); assertJobIDPresent( jobID, - 9, + checkpointCoordinatorLogging, + asList( + "No checkpoint found during restore.", + "Resetting the master hooks.", + "Triggering checkpoint .*", + "Received acknowledge message for checkpoint .*", + "Completed checkpoint .*", + "Checkpoint state: .*")); + + assertJobIDPresent( + jobID, + streamTaskLogging, + asList( + "State backend is set to .*", + "Initializing Source: .*", + "Invoking Source: .*", + "Starting checkpoint .*", + "Notify checkpoint \\d+ complete .*")); + + assertJobIDPresent( + jobID, taskExecutorLogging, + asList( + "Received task .*", + "Trigger checkpoint .*", + "Confirm completed checkpoint .*"), "Un-registering task.*", "Successful registration.*", "Establish JobManager connection.*", "Offer reserved slots.*", ".*ResourceManager.*", - "Operator event.*"); + "Operator event.*", + "Recovered slot allocation snapshots.*", + ".*heartbeat.*"); + + assertJobIDPresent( + jobID, + taskLogging, + asList( + "Source: .* switched from CREATED to DEPLOYING.", + "Source: .* switched from DEPLOYING to INITIALIZING.", + "Source: .* switched from INITIALIZING to RUNNING.")); + + assertJobIDPresent( + jobID, + executionGraphLogging, + asList( + "Created execution graph .*", + "Deploying Source.*", + "Job .* switched from state CREATED to RUNNING.", + "Source: .* switched from CREATED to SCHEDULED.", + "Source: .* switched from SCHEDULED to DEPLOYING.", + "Source: .* switched from DEPLOYING to INITIALIZING.", + "Source: .* switched from INITIALIZING to RUNNING.")); - assertJobIDPresent(jobID, 10, taskLogging); - assertJobIDPresent(jobID, 10, executionGraphLogging); assertJobIDPresent( jobID, - 15, jobMasterLogging, + asList( + "Checkpoint storage is set to .*", + "Initializing job .*", + "Running initialization on master for job .*", + "Starting execution of job .*", + "Starting scheduling.*", + "State backend is set to .*", + "Successfully created execution graph from job graph .*", + "Successfully ran initialization on master.*", + "Triggering a manual checkpoint for job .*.", + "Using failover strategy .*", + "Using restart back off time strategy .*"), "Registration at ResourceManager.*", "Registration with ResourceManager.*", "Resolved ResourceManager address.*"); - assertJobIDPresent(jobID, 1, asyncCheckpointRunnableLogging); + + assertJobIDPresent( + jobID, + asyncCheckpointRunnableLogging, + asList( + ".* started executing asynchronous part of checkpoint .*", + ".* finished asynchronous part of checkpoint .*")); } private static void assertJobIDPresent( JobID jobID, - int expectedLogMessages, LoggerAuditingExtension ext, + List<String> expPatterns, String... ignPatterns) { - String loggerName = ext.getLoggerName(); - checkState( - ext.getEvents().size() >= expectedLogMessages, - "Too few log events recorded for %s (%s) - this must be a bug in the test code", - loggerName, - ext.getEvents().size()); final List<LogEvent> eventsWithMissingJobId = new ArrayList<>(); final List<LogEvent> eventsWithWrongJobId = new ArrayList<>(); final List<LogEvent> ignoredEvents = new ArrayList<>(); + final List<Pattern> expectedPatterns = + expPatterns.stream().map(Pattern::compile).collect(toList()); final List<Pattern> ignorePatterns = - Arrays.stream(ignPatterns).map(Pattern::compile).collect(Collectors.toList()); + Arrays.stream(ignPatterns).map(Pattern::compile).collect(toList()); for (LogEvent e : ext.getEvents()) { - if (e.getContextData().containsKey(MdcUtils.JOB_ID)) { - if (!Objects.equals( - e.getContextData().getValue(MdcUtils.JOB_ID), jobID.toHexString())) { + ReadOnlyStringMap context = e.getContextData(); + if (context.containsKey(JOB_ID)) { + if (Objects.equals(context.getValue(JOB_ID), jobID.toHexString())) { + expectedPatterns.removeIf( + pattern -> + pattern.matcher(e.getMessage().getFormattedMessage()) + .matches()); + } else { eventsWithWrongJobId.add(e); } } else if (matchesAny(ignorePatterns, e.getMessage().getFormattedMessage())) { @@ -169,20 +225,23 @@ public class JobIDLoggingITCase { eventsWithMissingJobId.add(e); } } + logger.debug( "checked events for {}:\n {};\n ignored: {},\n wrong job id: {},\n missing job id: {}", - loggerName, + ext.getLoggerName(), ext.getEvents(), ignoredEvents, eventsWithWrongJobId, eventsWithMissingJobId); assertThat(eventsWithWrongJobId).as("events with a wrong Job ID").isEmpty(); - assertTrue( - eventsWithMissingJobId.isEmpty(), - "too many events without Job ID recorded for " - + loggerName - + ": " - + eventsWithMissingJobId); + assertThat(expectedPatterns) + .as( + "not all expected events logged by %s, logged:\n%s", + ext.getLoggerName(), ext.getEvents()) + .isEmpty(); + assertThat(eventsWithMissingJobId) + .as("too many events without Job ID logged by %s", ext.getLoggerName()) + .isEmpty(); } private static boolean matchesAny(List<Pattern> patternStream, String message) { @@ -205,6 +264,9 @@ public class JobIDLoggingITCase { // wait for all tasks ready and then checkpoint while (true) { try { + clusterClient.triggerCheckpoint(jobId, CheckpointType.DEFAULT).get(); + // to check the log message about checkpoint completion notification we need to + // either wait or trigger another checkpoint clusterClient.triggerCheckpoint(jobId, CheckpointType.DEFAULT).get(); return jobId; } catch (ExecutionException e) {