This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new a5ca5867298f [SPARK-47654][INFRA] Structured logging framework: 
support log concatenation
a5ca5867298f is described below

commit a5ca5867298f8ad6d40f3132ad74cbf078cc62b3
Author: Gengliang Wang <gengli...@apache.org>
AuthorDate: Mon Apr 1 00:15:24 2024 -0700

    [SPARK-47654][INFRA] Structured logging framework: support log concatenation
    
    ### What changes were proposed in this pull request?
    
    Support the log concatenation in the structured logging framework. For 
example
    ```
    log"${MDC(CONFIG, SHUFFLE_MAPOUTPUT_MIN_SIZE_FOR_BROADCAST.key)} " +
      log"(${MDC(MIN_SIZE, minSizeForBroadcast.toString)} bytes) " +
      log"must be <= spark.rpc.message.maxSize (${MDC(MAX_SIZE, 
maxRpcMessageSize.toString)} " +
      log"bytes) to prevent sending an rpc message that is too large."
    ```
    
    ### Why are the changes needed?
    
    Although most of the Spark logs are short, we need this convenient syntax 
when handling long logs with multiple variables.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    New UT
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Yes, GitHub copilot
    
    Closes #45779 from gengliangwang/logConcat.
    
    Authored-by: Gengliang Wang <gengli...@apache.org>
    Signed-off-by: Gengliang Wang <gengli...@apache.org>
---
 .../scala/org/apache/spark/internal/LogKey.scala   |  2 +-
 .../scala/org/apache/spark/internal/Logging.scala  | 62 +++++++++++++---------
 .../apache/spark/util/PatternLoggingSuite.scala    |  6 +++
 .../apache/spark/util/StructuredLoggingSuite.scala | 33 +++++++++++-
 4 files changed, 77 insertions(+), 26 deletions(-)

diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala 
b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
index 6ab6ac0eb58a..760077af6d3e 100644
--- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
+++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
@@ -21,5 +21,5 @@ package org.apache.spark.internal
  * All structured logging keys should be defined here for standardization.
  */
 object LogKey extends Enumeration {
-  val EXECUTOR_ID = Value
+  val EXECUTOR_ID, MIN_SIZE, MAX_SIZE = Value
 }
diff --git 
a/common/utils/src/main/scala/org/apache/spark/internal/Logging.scala 
b/common/utils/src/main/scala/org/apache/spark/internal/Logging.scala
index 0aa93d6289d1..5765a6eed542 100644
--- a/common/utils/src/main/scala/org/apache/spark/internal/Logging.scala
+++ b/common/utils/src/main/scala/org/apache/spark/internal/Logging.scala
@@ -22,7 +22,6 @@ import java.util.Locale
 import scala.jdk.CollectionConverters._
 
 import org.apache.logging.log4j.{CloseableThreadContext, Level, LogManager}
-import org.apache.logging.log4j.CloseableThreadContext.Instance
 import org.apache.logging.log4j.core.{Filter, LifeCycle, LogEvent, Logger => 
Log4jLogger, LoggerContext}
 import org.apache.logging.log4j.core.appender.ConsoleAppender
 import org.apache.logging.log4j.core.config.DefaultConfiguration
@@ -43,7 +42,13 @@ case class MDC(key: LogKey.Value, value: String)
  * Wrapper class for log messages that include a logging context.
  * This is used as the return type of the string interpolator 
`LogStringContext`.
  */
-case class MessageWithContext(message: String, context: Option[Instance])
+case class MessageWithContext(message: String, context: 
java.util.HashMap[String, String]) {
+  def +(mdc: MessageWithContext): MessageWithContext = {
+    val resultMap = new java.util.HashMap(context)
+    resultMap.putAll(mdc.context)
+    MessageWithContext(message + mdc.message, resultMap)
+  }
+}
 
 /**
  * Companion class for lazy evaluation of the MessageWithContext instance.
@@ -51,7 +56,7 @@ case class MessageWithContext(message: String, context: 
Option[Instance])
 class LogEntry(messageWithContext: => MessageWithContext) {
   def message: String = messageWithContext.message
 
-  def context: Option[Instance] = messageWithContext.context
+  def context: java.util.HashMap[String, String] = messageWithContext.context
 }
 
 /**
@@ -94,12 +99,12 @@ trait Logging {
     def log(args: MDC*): MessageWithContext = {
       val processedParts = sc.parts.iterator
       val sb = new StringBuilder(processedParts.next())
-      lazy val map = new java.util.HashMap[String, String]()
+      val context = new java.util.HashMap[String, String]()
 
       args.foreach { mdc =>
         sb.append(mdc.value)
         if (Logging.isStructuredLoggingEnabled) {
-          map.put(mdc.key.toString.toLowerCase(Locale.ROOT), mdc.value)
+          context.put(mdc.key.toString.toLowerCase(Locale.ROOT), mdc.value)
         }
 
         if (processedParts.hasNext) {
@@ -107,13 +112,16 @@ trait Logging {
         }
       }
 
-      // Create a CloseableThreadContext and apply the context map
-      val closeableContext = if (Logging.isStructuredLoggingEnabled) {
-        Some(CloseableThreadContext.putAll(map))
-      } else {
-        None
-      }
-      MessageWithContext(sb.toString(), closeableContext)
+      MessageWithContext(sb.toString(), context)
+    }
+  }
+
+  private def withLogContext(context: java.util.HashMap[String, String])(body: 
=> Unit): Unit = {
+    val threadContext = CloseableThreadContext.putAll(context)
+    try {
+      body
+    } finally {
+      threadContext.close()
     }
   }
 
@@ -124,15 +132,17 @@ trait Logging {
 
   protected def logInfo(entry: LogEntry): Unit = {
     if (log.isInfoEnabled) {
-      log.info(entry.message)
-      entry.context.map(_.close())
+      withLogContext(entry.context) {
+        log.info(entry.message)
+      }
     }
   }
 
   protected def logInfo(entry: LogEntry, throwable: Throwable): Unit = {
     if (log.isInfoEnabled) {
-      log.info(entry.message, throwable)
-      entry.context.map(_.close())
+      withLogContext(entry.context) {
+        log.info(entry.message, throwable)
+      }
     }
   }
 
@@ -150,15 +160,17 @@ trait Logging {
 
   protected def logWarning(entry: LogEntry): Unit = {
     if (log.isWarnEnabled) {
-      log.warn(entry.message)
-      entry.context.map(_.close())
+      withLogContext(entry.context) {
+        log.warn(entry.message)
+      }
     }
   }
 
   protected def logWarning(entry: LogEntry, throwable: Throwable): Unit = {
     if (log.isWarnEnabled) {
-      log.warn(entry.message, throwable)
-      entry.context.map(_.close())
+      withLogContext(entry.context) {
+        log.warn(entry.message, throwable)
+      }
     }
   }
 
@@ -168,15 +180,17 @@ trait Logging {
 
   protected def logError(entry: LogEntry): Unit = {
     if (log.isErrorEnabled) {
-      log.error(entry.message)
-      entry.context.map(_.close())
+      withLogContext(entry.context) {
+        log.error(entry.message)
+      }
     }
   }
 
   protected def logError(entry: LogEntry, throwable: Throwable): Unit = {
     if (log.isErrorEnabled) {
-      log.error(entry.message, throwable)
-      entry.context.map(_.close())
+      withLogContext(entry.context) {
+        log.error(entry.message, throwable)
+      }
     }
   }
 
diff --git 
a/common/utils/src/test/scala/org/apache/spark/util/PatternLoggingSuite.scala 
b/common/utils/src/test/scala/org/apache/spark/util/PatternLoggingSuite.scala
index ef0aa7050b07..7e4318306c82 100644
--- 
a/common/utils/src/test/scala/org/apache/spark/util/PatternLoggingSuite.scala
+++ 
b/common/utils/src/test/scala/org/apache/spark/util/PatternLoggingSuite.scala
@@ -36,4 +36,10 @@ class PatternLoggingSuite extends LoggingSuiteBase with 
BeforeAndAfterAll {
 
   override def expectedPatternForMsgWithMDCAndException(level: String): String 
=
     s""".*$level PatternLoggingSuite: Error in executor 
1.\njava.lang.RuntimeException: OOM\n.*"""
+
+  override def verifyMsgWithConcat(level: String, logOutput: String): Unit = {
+    val pattern =
+      s""".*$level PatternLoggingSuite: Min Size: 2, Max Size: 4. Please 
double check.\n"""
+    assert(pattern.r.matches(logOutput))
+  }
 }
diff --git 
a/common/utils/src/test/scala/org/apache/spark/util/StructuredLoggingSuite.scala
 
b/common/utils/src/test/scala/org/apache/spark/util/StructuredLoggingSuite.scala
index 5dfd3bb46021..8165c5f5b751 100644
--- 
a/common/utils/src/test/scala/org/apache/spark/util/StructuredLoggingSuite.scala
+++ 
b/common/utils/src/test/scala/org/apache/spark/util/StructuredLoggingSuite.scala
@@ -22,7 +22,7 @@ import java.nio.file.Files
 import org.scalatest.funsuite.AnyFunSuite // scalastyle:ignore funsuite
 
 import org.apache.spark.internal.{LogEntry, Logging, MDC}
-import org.apache.spark.internal.LogKey.EXECUTOR_ID
+import org.apache.spark.internal.LogKey.{EXECUTOR_ID, MAX_SIZE, MIN_SIZE}
 
 abstract class LoggingSuiteBase extends AnyFunSuite // scalastyle:ignore 
funsuite
   with Logging {
@@ -52,12 +52,19 @@ abstract class LoggingSuiteBase extends AnyFunSuite // 
scalastyle:ignore funsuit
 
   def msgWithMDCAndException: LogEntry = log"Error in executor 
${MDC(EXECUTOR_ID, "1")}."
 
+  def msgWithConcat: LogEntry = log"Min Size: ${MDC(MIN_SIZE, "2")}, " +
+    log"Max Size: ${MDC(MAX_SIZE, "4")}. " +
+    log"Please double check."
+
+
   def expectedPatternForBasicMsg(level: String): String
 
   def expectedPatternForMsgWithMDC(level: String): String
 
   def expectedPatternForMsgWithMDCAndException(level: String): String
 
+  def verifyMsgWithConcat(level: String, logOutput: String): Unit
+
   test("Basic logging") {
     val msg = "This is a log message"
     Seq(
@@ -91,6 +98,17 @@ abstract class LoggingSuiteBase extends AnyFunSuite // 
scalastyle:ignore funsuit
           
assert(expectedPatternForMsgWithMDCAndException(level).r.findFirstIn(logOutput).isDefined)
       }
   }
+
+  test("Logging with concat") {
+    Seq(
+      ("ERROR", () => logError(msgWithConcat)),
+      ("WARN", () => logWarning(msgWithConcat)),
+      ("INFO", () => logInfo(msgWithConcat))).foreach {
+        case (level, logFunc) =>
+          val logOutput = captureLogOutput(logFunc)
+          verifyMsgWithConcat(level, logOutput)
+      }
+  }
 }
 
 class StructuredLoggingSuite extends LoggingSuiteBase {
@@ -109,4 +127,17 @@ class StructuredLoggingSuite extends LoggingSuiteBase {
     // scalastyle:off line.size.limit
     s"""\\{"ts":"[^"]+","level":"$level","msg":"Error in executor 
1.","context":\\{"executor_id":"1"},"exception":\\{"class":"java.lang.RuntimeException","msg":"OOM","stacktrace":.*},"logger":"$className"}\n"""
     // scalastyle:on
+
+  override def verifyMsgWithConcat(level: String, logOutput: String): Unit = {
+    // scalastyle:off line.size.limit
+    val pattern1 =
+      s"""\\{"ts":"[^"]+","level":"$level","msg":"Min Size: 2, Max Size: 4. 
Please double check.","context":\\{"min_size":"2","max_size": 
"4"},"logger":"$className"}\n"""
+
+    val pattern2 =
+      s"""\\{"ts":"[^"]+","level":"$level","msg":"Min Size: 2, Max Size: 4. 
Please double 
check.","context":\\{"max_size":"4","min_size":"2"},"logger":"$className"}\n"""
+
+    assert(pattern1.r.matches(logOutput) || pattern2.r.matches(logOutput))
+    // scalastyle:on
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to