This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 6bbf6b1eff2c [SPARK-48126][CORE] Make 
`spark.log.structuredLogging.enabled` effective
6bbf6b1eff2c is described below

commit 6bbf6b1eff2cffe8d116ebba0194fac233b42348
Author: Gengliang Wang <gengli...@apache.org>
AuthorDate: Tue May 7 19:10:27 2024 -0700

    [SPARK-48126][CORE] Make `spark.log.structuredLogging.enabled` effective
    
    ### What changes were proposed in this pull request?
    
    Currently, the spark conf `spark.log.structuredLogging.enabled` is not 
taking effect. The current code base checks this config in the method 
`prepareSubmitEnvironment`. However, Log4j is already initialized before that.
    
    This PR is to fix it by checking the config 
`spark.log.structuredLogging.enabled` before the initialization of Log4j.
    Also, this PR enhances the doc for this configuration.
    
    ### Why are the changes needed?
    
    Bug fix. After the fix, the Spark conf 
`spark.log.structuredLogging.enabled` takes effect.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Manual test.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Generated-by: GPT-4
    I used GPT-4 to improve the documents.
    
    Closes #46452 from gengliangwang/makeConfEffective.
    
    Authored-by: Gengliang Wang <gengli...@apache.org>
    Signed-off-by: Gengliang Wang <gengli...@apache.org>
---
 .../org/apache/spark/deploy/SparkSubmit.scala      | 33 ++++++++++++----------
 .../org/apache/spark/internal/config/package.scala |  9 +++---
 docs/configuration.md                              |  6 +++-
 docs/core-migration-guide.md                       |  4 ++-
 4 files changed, 31 insertions(+), 21 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 076aa8387dc5..5a7e5542cbd0 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -69,10 +69,20 @@ private[spark] class SparkSubmit extends Logging {
 
   def doSubmit(args: Array[String]): Unit = {
     val appArgs = parseArguments(args)
+    val sparkConf = appArgs.toSparkConf()
+
     // For interpreters, structured logging is disabled by default to avoid 
generating mixed
     // plain text and structured logs on the same console.
     if (isShell(appArgs.primaryResource) || isSqlShell(appArgs.mainClass)) {
       Logging.disableStructuredLogging()
+    } else {
+      // For non-shell applications, enable structured logging if it's not 
explicitly disabled
+      // via the configuration `spark.log.structuredLogging.enabled`.
+      if (sparkConf.getBoolean(STRUCTURED_LOGGING_ENABLED.key, defaultValue = 
true)) {
+        Logging.enableStructuredLogging()
+      } else {
+        Logging.disableStructuredLogging()
+      }
     }
     // Initialize logging if it hasn't been done yet. Keep track of whether 
logging needs to
     // be reset before the application starts.
@@ -82,9 +92,9 @@ private[spark] class SparkSubmit extends Logging {
       logInfo(appArgs.toString)
     }
     appArgs.action match {
-      case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
-      case SparkSubmitAction.KILL => kill(appArgs)
-      case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
+      case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog, sparkConf)
+      case SparkSubmitAction.KILL => kill(appArgs, sparkConf)
+      case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs, 
sparkConf)
       case SparkSubmitAction.PRINT_VERSION => printVersion()
     }
   }
@@ -96,12 +106,11 @@ private[spark] class SparkSubmit extends Logging {
   /**
    * Kill an existing submission.
    */
-  private def kill(args: SparkSubmitArguments): Unit = {
+  private def kill(args: SparkSubmitArguments, sparkConf: SparkConf): Unit = {
     if (RestSubmissionClient.supportsRestClient(args.master)) {
       new RestSubmissionClient(args.master)
         .killSubmission(args.submissionToKill)
     } else {
-      val sparkConf = args.toSparkConf()
       sparkConf.set("spark.master", args.master)
       SparkSubmitUtils
         .getSubmitOperations(args.master)
@@ -112,12 +121,11 @@ private[spark] class SparkSubmit extends Logging {
   /**
    * Request the status of an existing submission.
    */
-  private def requestStatus(args: SparkSubmitArguments): Unit = {
+  private def requestStatus(args: SparkSubmitArguments, sparkConf: SparkConf): 
Unit = {
     if (RestSubmissionClient.supportsRestClient(args.master)) {
       new RestSubmissionClient(args.master)
         .requestSubmissionStatus(args.submissionToRequestStatusFor)
     } else {
-      val sparkConf = args.toSparkConf()
       sparkConf.set("spark.master", args.master)
       SparkSubmitUtils
         .getSubmitOperations(args.master)
@@ -148,7 +156,7 @@ private[spark] class SparkSubmit extends Logging {
    * in a doAs when --proxy-user is specified.
    */
   @tailrec
-  private def submit(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
+  private def submit(args: SparkSubmitArguments, uninitLog: Boolean, 
sparkConf: SparkConf): Unit = {
 
     def doRunMain(): Unit = {
       if (args.proxyUser != null) {
@@ -157,7 +165,7 @@ private[spark] class SparkSubmit extends Logging {
         // is done in client mode.
         val isKubernetesClusterModeDriver = args.master.startsWith("k8s") &&
           "client".equals(args.deployMode) &&
-          args.toSparkConf().getBoolean("spark.kubernetes.submitInDriver", 
false)
+          sparkConf.getBoolean("spark.kubernetes.submitInDriver", false)
         if (isKubernetesClusterModeDriver) {
           logInfo("Running driver with proxy user. Cluster manager: 
Kubernetes")
           SparkHadoopUtil.get.runAsSparkUser(() => runMain(args, uninitLog))
@@ -204,7 +212,7 @@ private[spark] class SparkSubmit extends Logging {
           logWarning(log"Master endpoint ${MDC(LogKeys.MASTER_URL, 
args.master)} " +
             log"was not a REST server. Falling back to legacy submission 
gateway instead.")
           args.useRest = false
-          submit(args, false)
+          submit(args, false, sparkConf)
       }
     // In all other modes, just run the main class as prepared
     } else {
@@ -234,11 +242,6 @@ private[spark] class SparkSubmit extends Logging {
     val childClasspath = new ArrayBuffer[String]()
     val sparkConf = args.toSparkConf()
     if (sparkConf.contains("spark.local.connect")) 
sparkConf.remove("spark.remote")
-    if (sparkConf.getBoolean(STRUCTURED_LOGGING_ENABLED.key, defaultValue = 
true)) {
-      Logging.enableStructuredLogging()
-    } else {
-      Logging.disableStructuredLogging()
-    }
     var childMainClass = ""
 
     // Set the cluster manager
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 2e207422ae06..a5be6084de36 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -143,10 +143,11 @@ package object config {
 
   private[spark] val STRUCTURED_LOGGING_ENABLED =
     ConfigBuilder("spark.log.structuredLogging.enabled")
-      .doc("When true, the default log4j output format is structured JSON 
lines, and there will " +
-        "be Mapped Diagnostic Context (MDC) from Spark added to the logs. This 
is useful for log " +
-        "aggregation and analysis tools. When false, the default log4j output 
will be plain " +
-        "text and no MDC from Spark will be set.")
+      .doc("When true, Spark logs are output as structured JSON lines with 
added Spark " +
+        "Mapped Diagnostic Context (MDC), facilitating easier integration with 
log aggregation " +
+        "and analysis tools. When false, logs are plain text without MDC. This 
configuration " +
+        "does not apply to interactive environments such as spark-shell, 
spark-sql, and " +
+        "PySpark shell.")
       .version("4.0.0")
       .booleanConf
       .createWithDefault(true)
diff --git a/docs/configuration.md b/docs/configuration.md
index fb14af6d55b8..c018b9f1fb7c 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -3693,7 +3693,11 @@ val logDf = 
spark.read.schema(LOG_SCHEMA).json("path/to/logs")
 ```
 
 ## Plain Text Logging
-If you prefer plain text logging, you can use the 
`log4j2.properties.pattern-layout-template` file as a starting point. This is 
the default configuration used by Spark before the 4.0.0 release. This 
configuration uses the 
[PatternLayout](https://logging.apache.org/log4j/2.x/manual/layouts.html#PatternLayout)
 to log all the logs in plain text. MDC information is not included by default. 
In order to print it in the logs, you can update the patternLayout in the file. 
For example, you can ad [...]
+If you prefer plain text logging, you have two options:
+- Disable structured JSON logging by setting the Spark configuration 
`spark.log.structuredLogging.enabled` to `false`.
+- Use a custom log4j configuration file. Rename 
`conf/log4j2.properties.pattern-layout-template` to `conf/log4j2.properties`. 
This reverts to the default configuration prior to Spark 4.0, which utilizes 
[PatternLayout](https://logging.apache.org/log4j/2.x/manual/layouts.html#PatternLayout)
 for logging all messages in plain text.
+
+MDC information is not included by default when with plain text logging. In 
order to print it in the logs, you can update the patternLayout in the file. 
For example, you can add `%X{task_name}` to print the task name in the logs.
 Moreover, you can use `spark.sparkContext.setLocalProperty(s"mdc.$name", 
"value")` to add user specific data into MDC.
 The key in MDC will be the string of `mdc.$name`.
 
diff --git a/docs/core-migration-guide.md b/docs/core-migration-guide.md
index 597900630b3f..95c7929a6241 100644
--- a/docs/core-migration-guide.md
+++ b/docs/core-migration-guide.md
@@ -42,7 +42,9 @@ license: |
 
 - Since Spark 4.0, Spark uses the external shuffle service for deleting 
shuffle blocks for deallocated executors when the shuffle is no longer needed. 
To restore the legacy behavior, you can set 
`spark.shuffle.service.removeShuffle` to `false`.
 
-- Since Spark 4.0, the default log4j output of `spark-submit` has shifted from 
plain text to JSON lines to enhance analyzability. To revert to plain text 
output, you can rename the file 
`conf/log4j2.properties.pattern-layout-template` as `conf/log4j2.properties`, 
or use a custom log4j configuration file.
+- Starting with Spark 4.0, the default logging format for `spark-submit` has 
changed from plain text to JSON lines to improve log analysis. If you prefer 
plain text logs, you have two options:
+  - Set the Spark configuration `spark.log.structuredLogging.enabled` to 
`false`. 
+  - Use a custom log4j configuration file, such as renaming the template file 
`conf/log4j2.properties.pattern-layout-template` to `conf/log4j2.properties`.
 
 - Since Spark 4.0, Spark performs speculative executions less agressively with 
`spark.speculation.multiplier=3` and `spark.speculation.quantile=0.9`. To 
restore the legacy behavior, you can set `spark.speculation.multiplier=1.5` and 
`spark.speculation.quantile=0.75`.
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to