This is an automated email from the ASF dual-hosted git repository. gengliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new a5ca5867298f [SPARK-47654][INFRA] Structured logging framework: support log concatenation a5ca5867298f is described below commit a5ca5867298f8ad6d40f3132ad74cbf078cc62b3 Author: Gengliang Wang <gengli...@apache.org> AuthorDate: Mon Apr 1 00:15:24 2024 -0700 [SPARK-47654][INFRA] Structured logging framework: support log concatenation ### What changes were proposed in this pull request? Support the log concatenation in the structured logging framework. For example ``` log"${MDC(CONFIG, SHUFFLE_MAPOUTPUT_MIN_SIZE_FOR_BROADCAST.key)} " + log"(${MDC(MIN_SIZE, minSizeForBroadcast.toString)} bytes) " + log"must be <= spark.rpc.message.maxSize (${MDC(MAX_SIZE, maxRpcMessageSize.toString)} " + log"bytes) to prevent sending an rpc message that is too large." ``` ### Why are the changes needed? Although most of the Spark logs are short, we need this convenient syntax when handling long logs with multiple variables. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New UT ### Was this patch authored or co-authored using generative AI tooling? Yes, GitHub copilot Closes #45779 from gengliangwang/logConcat. Authored-by: Gengliang Wang <gengli...@apache.org> Signed-off-by: Gengliang Wang <gengli...@apache.org> --- .../scala/org/apache/spark/internal/LogKey.scala | 2 +- .../scala/org/apache/spark/internal/Logging.scala | 62 +++++++++++++--------- .../apache/spark/util/PatternLoggingSuite.scala | 6 +++ .../apache/spark/util/StructuredLoggingSuite.scala | 33 +++++++++++- 4 files changed, 77 insertions(+), 26 deletions(-) diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala index 6ab6ac0eb58a..760077af6d3e 100644 --- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala +++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala @@ -21,5 +21,5 @@ package org.apache.spark.internal * All structured logging keys should be defined here for standardization. */ object LogKey extends Enumeration { - val EXECUTOR_ID = Value + val EXECUTOR_ID, MIN_SIZE, MAX_SIZE = Value } diff --git a/common/utils/src/main/scala/org/apache/spark/internal/Logging.scala b/common/utils/src/main/scala/org/apache/spark/internal/Logging.scala index 0aa93d6289d1..5765a6eed542 100644 --- a/common/utils/src/main/scala/org/apache/spark/internal/Logging.scala +++ b/common/utils/src/main/scala/org/apache/spark/internal/Logging.scala @@ -22,7 +22,6 @@ import java.util.Locale import scala.jdk.CollectionConverters._ import org.apache.logging.log4j.{CloseableThreadContext, Level, LogManager} -import org.apache.logging.log4j.CloseableThreadContext.Instance import org.apache.logging.log4j.core.{Filter, LifeCycle, LogEvent, Logger => Log4jLogger, LoggerContext} import org.apache.logging.log4j.core.appender.ConsoleAppender import org.apache.logging.log4j.core.config.DefaultConfiguration @@ -43,7 +42,13 @@ case class MDC(key: LogKey.Value, value: String) * Wrapper class for log messages that include a logging context. * This is used as the return type of the string interpolator `LogStringContext`. */ -case class MessageWithContext(message: String, context: Option[Instance]) +case class MessageWithContext(message: String, context: java.util.HashMap[String, String]) { + def +(mdc: MessageWithContext): MessageWithContext = { + val resultMap = new java.util.HashMap(context) + resultMap.putAll(mdc.context) + MessageWithContext(message + mdc.message, resultMap) + } +} /** * Companion class for lazy evaluation of the MessageWithContext instance. @@ -51,7 +56,7 @@ case class MessageWithContext(message: String, context: Option[Instance]) class LogEntry(messageWithContext: => MessageWithContext) { def message: String = messageWithContext.message - def context: Option[Instance] = messageWithContext.context + def context: java.util.HashMap[String, String] = messageWithContext.context } /** @@ -94,12 +99,12 @@ trait Logging { def log(args: MDC*): MessageWithContext = { val processedParts = sc.parts.iterator val sb = new StringBuilder(processedParts.next()) - lazy val map = new java.util.HashMap[String, String]() + val context = new java.util.HashMap[String, String]() args.foreach { mdc => sb.append(mdc.value) if (Logging.isStructuredLoggingEnabled) { - map.put(mdc.key.toString.toLowerCase(Locale.ROOT), mdc.value) + context.put(mdc.key.toString.toLowerCase(Locale.ROOT), mdc.value) } if (processedParts.hasNext) { @@ -107,13 +112,16 @@ trait Logging { } } - // Create a CloseableThreadContext and apply the context map - val closeableContext = if (Logging.isStructuredLoggingEnabled) { - Some(CloseableThreadContext.putAll(map)) - } else { - None - } - MessageWithContext(sb.toString(), closeableContext) + MessageWithContext(sb.toString(), context) + } + } + + private def withLogContext(context: java.util.HashMap[String, String])(body: => Unit): Unit = { + val threadContext = CloseableThreadContext.putAll(context) + try { + body + } finally { + threadContext.close() } } @@ -124,15 +132,17 @@ trait Logging { protected def logInfo(entry: LogEntry): Unit = { if (log.isInfoEnabled) { - log.info(entry.message) - entry.context.map(_.close()) + withLogContext(entry.context) { + log.info(entry.message) + } } } protected def logInfo(entry: LogEntry, throwable: Throwable): Unit = { if (log.isInfoEnabled) { - log.info(entry.message, throwable) - entry.context.map(_.close()) + withLogContext(entry.context) { + log.info(entry.message, throwable) + } } } @@ -150,15 +160,17 @@ trait Logging { protected def logWarning(entry: LogEntry): Unit = { if (log.isWarnEnabled) { - log.warn(entry.message) - entry.context.map(_.close()) + withLogContext(entry.context) { + log.warn(entry.message) + } } } protected def logWarning(entry: LogEntry, throwable: Throwable): Unit = { if (log.isWarnEnabled) { - log.warn(entry.message, throwable) - entry.context.map(_.close()) + withLogContext(entry.context) { + log.warn(entry.message, throwable) + } } } @@ -168,15 +180,17 @@ trait Logging { protected def logError(entry: LogEntry): Unit = { if (log.isErrorEnabled) { - log.error(entry.message) - entry.context.map(_.close()) + withLogContext(entry.context) { + log.error(entry.message) + } } } protected def logError(entry: LogEntry, throwable: Throwable): Unit = { if (log.isErrorEnabled) { - log.error(entry.message, throwable) - entry.context.map(_.close()) + withLogContext(entry.context) { + log.error(entry.message, throwable) + } } } diff --git a/common/utils/src/test/scala/org/apache/spark/util/PatternLoggingSuite.scala b/common/utils/src/test/scala/org/apache/spark/util/PatternLoggingSuite.scala index ef0aa7050b07..7e4318306c82 100644 --- a/common/utils/src/test/scala/org/apache/spark/util/PatternLoggingSuite.scala +++ b/common/utils/src/test/scala/org/apache/spark/util/PatternLoggingSuite.scala @@ -36,4 +36,10 @@ class PatternLoggingSuite extends LoggingSuiteBase with BeforeAndAfterAll { override def expectedPatternForMsgWithMDCAndException(level: String): String = s""".*$level PatternLoggingSuite: Error in executor 1.\njava.lang.RuntimeException: OOM\n.*""" + + override def verifyMsgWithConcat(level: String, logOutput: String): Unit = { + val pattern = + s""".*$level PatternLoggingSuite: Min Size: 2, Max Size: 4. Please double check.\n""" + assert(pattern.r.matches(logOutput)) + } } diff --git a/common/utils/src/test/scala/org/apache/spark/util/StructuredLoggingSuite.scala b/common/utils/src/test/scala/org/apache/spark/util/StructuredLoggingSuite.scala index 5dfd3bb46021..8165c5f5b751 100644 --- a/common/utils/src/test/scala/org/apache/spark/util/StructuredLoggingSuite.scala +++ b/common/utils/src/test/scala/org/apache/spark/util/StructuredLoggingSuite.scala @@ -22,7 +22,7 @@ import java.nio.file.Files import org.scalatest.funsuite.AnyFunSuite // scalastyle:ignore funsuite import org.apache.spark.internal.{LogEntry, Logging, MDC} -import org.apache.spark.internal.LogKey.EXECUTOR_ID +import org.apache.spark.internal.LogKey.{EXECUTOR_ID, MAX_SIZE, MIN_SIZE} abstract class LoggingSuiteBase extends AnyFunSuite // scalastyle:ignore funsuite with Logging { @@ -52,12 +52,19 @@ abstract class LoggingSuiteBase extends AnyFunSuite // scalastyle:ignore funsuit def msgWithMDCAndException: LogEntry = log"Error in executor ${MDC(EXECUTOR_ID, "1")}." + def msgWithConcat: LogEntry = log"Min Size: ${MDC(MIN_SIZE, "2")}, " + + log"Max Size: ${MDC(MAX_SIZE, "4")}. " + + log"Please double check." + + def expectedPatternForBasicMsg(level: String): String def expectedPatternForMsgWithMDC(level: String): String def expectedPatternForMsgWithMDCAndException(level: String): String + def verifyMsgWithConcat(level: String, logOutput: String): Unit + test("Basic logging") { val msg = "This is a log message" Seq( @@ -91,6 +98,17 @@ abstract class LoggingSuiteBase extends AnyFunSuite // scalastyle:ignore funsuit assert(expectedPatternForMsgWithMDCAndException(level).r.findFirstIn(logOutput).isDefined) } } + + test("Logging with concat") { + Seq( + ("ERROR", () => logError(msgWithConcat)), + ("WARN", () => logWarning(msgWithConcat)), + ("INFO", () => logInfo(msgWithConcat))).foreach { + case (level, logFunc) => + val logOutput = captureLogOutput(logFunc) + verifyMsgWithConcat(level, logOutput) + } + } } class StructuredLoggingSuite extends LoggingSuiteBase { @@ -109,4 +127,17 @@ class StructuredLoggingSuite extends LoggingSuiteBase { // scalastyle:off line.size.limit s"""\\{"ts":"[^"]+","level":"$level","msg":"Error in executor 1.","context":\\{"executor_id":"1"},"exception":\\{"class":"java.lang.RuntimeException","msg":"OOM","stacktrace":.*},"logger":"$className"}\n""" // scalastyle:on + + override def verifyMsgWithConcat(level: String, logOutput: String): Unit = { + // scalastyle:off line.size.limit + val pattern1 = + s"""\\{"ts":"[^"]+","level":"$level","msg":"Min Size: 2, Max Size: 4. Please double check.","context":\\{"min_size":"2","max_size": "4"},"logger":"$className"}\n""" + + val pattern2 = + s"""\\{"ts":"[^"]+","level":"$level","msg":"Min Size: 2, Max Size: 4. Please double check.","context":\\{"max_size":"4","min_size":"2"},"logger":"$className"}\n""" + + assert(pattern1.r.matches(logOutput) || pattern2.r.matches(logOutput)) + // scalastyle:on + } + } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org