This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new e4453b480f98 [SPARK-48123][CORE] Provide a constant table schema for 
querying structured logs
e4453b480f98 is described below

commit e4453b480f988bf6683930ae14b7043a2cecffc4
Author: Gengliang Wang <gengli...@apache.org>
AuthorDate: Sat May 4 00:18:18 2024 -0700

    [SPARK-48123][CORE] Provide a constant table schema for querying structured 
logs
    
    ### What changes were proposed in this pull request?
    
    Providing a table schema LOG_SCHEMA, so that users can load structured logs 
with the following code:
    
    ```
    import org.apache.spark.util.LogUtils.LOG_SCHEMA
    
    val logDf = spark.read.schema(LOG_SCHEMA).json("path/to/logs")
    ```
    
    ### Why are the changes needed?
    
    Provide a convenient way to query Spark logs using Spark SQL.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    New UT
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #46375 from gengliangwang/logSchema.
    
    Authored-by: Gengliang Wang <gengli...@apache.org>
    Signed-off-by: Gengliang Wang <gengli...@apache.org>
---
 .../scala/org/apache/spark/util/LogUtils.scala     | 50 +++++++++++++
 docs/configuration.md                              | 10 ++-
 sql/core/src/test/resources/log4j2.properties      | 12 ++++
 .../scala/org/apache/spark/sql/LogQuerySuite.scala | 83 ++++++++++++++++++++++
 4 files changed, 154 insertions(+), 1 deletion(-)

diff --git a/common/utils/src/main/scala/org/apache/spark/util/LogUtils.scala 
b/common/utils/src/main/scala/org/apache/spark/util/LogUtils.scala
new file mode 100644
index 000000000000..5a798ffad3a9
--- /dev/null
+++ b/common/utils/src/main/scala/org/apache/spark/util/LogUtils.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.util
+
+import org.apache.spark.annotation.DeveloperApi
+
+/**
+ * :: : DeveloperApi ::
+ * Utils for querying Spark logs with Spark SQL.
+ *
+ * @since 4.0.0
+ */
+@DeveloperApi
+object LogUtils {
+  /**
+   * Schema for structured Spark logs.
+   * Example usage:
+   *   val logDf = spark.read.schema(LOG_SCHEMA).json("path/to/logs")
+   */
+  val LOG_SCHEMA: String = """
+    |ts TIMESTAMP,
+    |level STRING,
+    |msg STRING,
+    |context map<STRING, STRING>,
+    |exception STRUCT<
+    |  class STRING,
+    |  msg STRING,
+    |  stacktrace ARRAY<STRUCT<
+    |    class STRING,
+    |    method STRING,
+    |    file STRING,
+    |    line STRING
+    |  >>
+    |>,
+    |logger STRING""".stripMargin
+}
diff --git a/docs/configuration.md b/docs/configuration.md
index a3b4e731f057..7966aceccdea 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -3675,7 +3675,15 @@ Spark uses [log4j](http://logging.apache.org/log4j/) for 
logging. You can config
 ## Structured Logging
 Starting from version 4.0.0, Spark has adopted the [JSON Template 
Layout](https://logging.apache.org/log4j/2.x/manual/json-template-layout.html) 
for logging, which outputs logs in JSON format. This format facilitates 
querying logs using Spark SQL with the JSON data source. Additionally, the logs 
include all Mapped Diagnostic Context (MDC) information for search and 
debugging purposes.
 
-To implement structured logging, start with the `log4j2.properties.template` 
file.
+To configure the layout of structured logging, start with the 
`log4j2.properties.template` file.
+
+To query Spark logs using Spark SQL, you can use the following Scala code 
snippet:
+
+```scala
+import org.apache.spark.util.LogUtils.LOG_SCHEMA
+
+val logDf = spark.read.schema(LOG_SCHEMA).json("path/to/logs")
+```
 
 ## Plain Text Logging
 If you prefer plain text logging, you can use the 
`log4j2.properties.pattern-layout-template` file as a starting point. This is 
the default configuration used by Spark before the 4.0.0 release. This 
configuration uses the 
[PatternLayout](https://logging.apache.org/log4j/2.x/manual/layouts.html#PatternLayout)
 to log all the logs in plain text. MDC information is not included by default. 
In order to print it in the logs, you can update the patternLayout in the file. 
For example, you can ad [...]
diff --git a/sql/core/src/test/resources/log4j2.properties 
b/sql/core/src/test/resources/log4j2.properties
index 7ab47c16d4f9..fdbc35e70ab4 100644
--- a/sql/core/src/test/resources/log4j2.properties
+++ b/sql/core/src/test/resources/log4j2.properties
@@ -29,6 +29,12 @@ appender.console.layout.pattern = %d{HH:mm:ss.SSS} %p %c: 
%maxLen{%m}{512}%n%ex{
 appender.console.filter.threshold.type = ThresholdFilter
 appender.console.filter.threshold.level = warn
 
+appender.structured.type = File
+appender.structured.name = structured
+appender.structured.fileName = target/LogQuerySuite.log
+appender.structured.layout.type = JsonTemplateLayout
+appender.structured.layout.eventTemplateUri = 
classpath:org/apache/spark/SparkLayout.json
+
 #File Appender
 appender.file.type = File
 appender.file.name = File
@@ -72,3 +78,9 @@ logger.parquet1.level = error
 
 logger.parquet2.name = parquet.CorruptStatistics
 logger.parquet2.level = error
+
+# Custom loggers
+logger.structured.name = org.apache.spark.sql.LogQuerySuite
+logger.structured.level = trace
+logger.structured.appenderRefs = structured
+logger.structured.appenderRef.structured.ref = structured
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/LogQuerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/LogQuerySuite.scala
new file mode 100644
index 000000000000..df0fbf15a98e
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/LogQuerySuite.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.io.File
+
+import org.apache.spark.internal.{Logging, LogKeys, MDC}
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.util.LogUtils.LOG_SCHEMA
+
+/**
+ * Test suite for querying Spark logs using SQL.
+ */
+class LogQuerySuite extends QueryTest with SharedSparkSession with Logging {
+
+  val logFile: File = {
+    val pwd = new File(".").getCanonicalPath
+    new File(pwd + "/target/LogQuerySuite.log")
+  }
+
+  override def afterAll(): Unit = {
+    super.afterAll()
+    // Clear the log file
+    if (logFile.exists()) {
+      logFile.delete()
+    }
+  }
+
+  private def createTempView(viewName: String): Unit = {
+    
spark.read.schema(LOG_SCHEMA).json(logFile.getCanonicalPath).createOrReplaceTempView(viewName)
+  }
+
+  test("Query Spark logs using SQL") {
+    val msg = log"Lost executor ${MDC(LogKeys.EXECUTOR_ID, "1")}."
+    logError(msg)
+
+    withTempView("logs") {
+      createTempView("logs")
+      checkAnswer(
+        spark.sql(s"SELECT level, msg, context, exception FROM logs WHERE msg 
= '${msg.message}'"),
+        Row("ERROR", msg.message, Map(LogKeys.EXECUTOR_ID.name -> "1"), null) 
:: Nil)
+    }
+  }
+
+  test("Query Spark logs with exception using SQL") {
+    val msg = log"Task ${MDC(LogKeys.TASK_ID, "2")} failed."
+    val exception = new RuntimeException("OOM")
+    logError(msg, exception)
+
+    withTempView("logs") {
+      createTempView("logs")
+      val expectedMDC = Map(LogKeys.TASK_ID.name -> "2")
+      checkAnswer(
+        spark.sql("SELECT level, msg, context, exception.class, exception.msg 
FROM logs " +
+          s"WHERE msg = '${msg.message}'"),
+        Row("ERROR", msg.message, expectedMDC, "java.lang.RuntimeException", 
"OOM") :: Nil)
+
+      val stacktrace =
+        spark.sql(s"SELECT exception.stacktrace FROM logs WHERE msg = 
'${msg.message}'").collect()
+      assert(stacktrace.length == 1)
+      val topStacktraceArray = stacktrace.head.getSeq[Row](0).head
+      assert(topStacktraceArray.getString(0) == this.getClass.getName)
+      assert(topStacktraceArray.getString(1) != "")
+      assert(topStacktraceArray.getString(2) == this.getClass.getSimpleName + 
".scala")
+      assert(topStacktraceArray.getString(3) != "")
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to