This is an automated email from the ASF dual-hosted git repository.

attilapiros pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 5fc90fbd4e3 [SPARK-44198][CORE] Support propagation of the log level 
to the executors
5fc90fbd4e3 is described below

commit 5fc90fbd4e3235fbcf038f4725037321b8234d94
Author: Vinod KC <vinod.kc...@gmail.com>
AuthorDate: Thu Jul 27 16:39:33 2023 -0700

    [SPARK-44198][CORE] Support propagation of the log level to the executors
    
    ### What changes were proposed in this pull request?
    
    Currently, the **sc.setLogLevel()** method only sets the log level on the 
Spark driver, failing to reflect the desired log level on the executors. With  
_--conf **spark.log.level**_  or **sc.setLogLevel()**, spark allows tuning the 
log level in the driver process, but it is not reflecting the log level on 
executors.
    
    ### Why are the changes needed?
    This inconsistency can lead to difficulties in debugging and monitoring 
Spark applications, as log messages from the executors may not align with the 
expected log level set on the user code.
    This PR aims to propagate the log level changes to executors when  
sc.setLogLevel()  is called or send the current log level when a new executor 
is getting registered
    
    ### Does this PR introduce _any_ user-facing change?
    No, but with this PR, both driver and executor will show same log level
    
    ### How was this patch tested?
    
    Tested manually to verify the same log levels on both driver and executor
    
    Closes #41746 from vinodkc/br_support_setloglevel_executors.
    
    Authored-by: Vinod KC <vinod.kc...@gmail.com>
    Signed-off-by: attilapiros <piros.attila.zs...@gmail.com>
---
 .../main/scala/org/apache/spark/SparkContext.scala | 11 ++++++++--
 .../executor/CoarseGrainedExecutorBackend.scala    |  4 ++++
 .../org/apache/spark/internal/config/package.scala |  8 +++++++
 .../apache/spark/scheduler/SchedulerBackend.scala  |  1 +
 .../cluster/CoarseGrainedClusterMessage.scala      |  7 +++++-
 .../cluster/CoarseGrainedSchedulerBackend.scala    | 20 ++++++++++++++++-
 .../main/scala/org/apache/spark/util/Utils.scala   | 25 +++++++++++++++++++---
 7 files changed, 69 insertions(+), 7 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 26fdb86d299..f48cb32b319 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -40,7 +40,6 @@ import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, 
BytesWritable, Doub
 import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf, 
SequenceFileInputFormat, TextInputFormat}
 import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, Job => 
NewHadoopJob}
 import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => 
NewFileInputFormat}
-import org.apache.logging.log4j.Level
 
 import org.apache.spark.annotation.{DeveloperApi, Experimental}
 import org.apache.spark.broadcast.Broadcast
@@ -396,7 +395,10 @@ class SparkContext(config: SparkConf) extends Logging {
     require(SparkContext.VALID_LOG_LEVELS.contains(upperCased),
       s"Supplied level $logLevel did not match one of:" +
         s" ${SparkContext.VALID_LOG_LEVELS.mkString(",")}")
-    Utils.setLogLevel(Level.toLevel(upperCased))
+    Utils.setLogLevelIfNeeded(upperCased)
+    if (conf.get(EXECUTOR_ALLOW_SYNC_LOG_LEVEL) && _schedulerBackend != null) {
+      _schedulerBackend.updateExecutorsLogLevel(upperCased)
+    }
   }
 
   try {
@@ -585,6 +587,11 @@ class SparkContext(config: SparkConf) extends Logging {
     _dagScheduler = new DAGScheduler(this)
     _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
 
+    if (_conf.get(EXECUTOR_ALLOW_SYNC_LOG_LEVEL)) {
+      _conf.get(SPARK_LOG_LEVEL)
+        .foreach(logLevel => 
_schedulerBackend.updateExecutorsLogLevel(logLevel))
+    }
+
     val _executorMetricsSource =
       if (_conf.get(METRICS_EXECUTORMETRICS_SOURCE_ENABLED)) {
         Some(new ExecutorMetricsSource)
diff --git 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index ab238626efe..da009f5addb 100644
--- 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -177,6 +177,8 @@ private[spark] class CoarseGrainedExecutorBackend(
         case NonFatal(e) =>
           exitExecutor(1, "Unable to create executor due to " + e.getMessage, 
e)
       }
+    case UpdateExecutorLogLevel(newLogLevel) =>
+      Utils.setLogLevelIfNeeded(newLogLevel)
 
     case LaunchTask(data) =>
       if (executor == null) {
@@ -473,6 +475,8 @@ private[spark] object CoarseGrainedExecutorBackend extends 
Logging {
       }
 
       driverConf.set(EXECUTOR_ID, arguments.executorId)
+      cfg.logLevel.foreach(logLevel => Utils.setLogLevelIfNeeded(logLevel))
+
       val env = SparkEnv.createExecutorEnv(driverConf, arguments.executorId, 
arguments.bindAddress,
         arguments.hostname, arguments.cores, cfg.ioEncryptionKey, isLocal = 
false)
       // Set the application attemptId in the BlockStoreClient if available.
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 04eba8bddeb..83e64f6f8a8 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -2244,6 +2244,14 @@ package object config {
       .booleanConf
       .createWithDefault(false)
 
+  private[spark] val EXECUTOR_ALLOW_SYNC_LOG_LEVEL =
+    ConfigBuilder("spark.executor.syncLogLevel.enabled")
+      .doc("If set to true, log level applied through 
SparkContext.setLogLevel() method " +
+        "will be propagated to all executors.")
+      .version("4.0.0")
+      .booleanConf
+      .createWithDefault(false)
+
   private[spark] val EXECUTOR_KILL_ON_FATAL_ERROR_DEPTH =
     ConfigBuilder("spark.executor.killOnFatalError.depth")
       .doc("The max depth of the exception chain in a failed task Spark will 
search for a fatal " +
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala 
b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
index 56666dcaccf..31871f0dbf3 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
@@ -31,6 +31,7 @@ private[spark] trait SchedulerBackend {
   def start(): Unit
   def stop(): Unit
   def stop(exitCode: Int): Unit = stop()
+  def updateExecutorsLogLevel(logLevel: String): Unit = {}
   /**
    * Update the current offers and schedule tasks
    */
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
index 109c7373447..51b182d9478 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
@@ -35,7 +35,8 @@ private[spark] object CoarseGrainedClusterMessages {
       sparkProperties: Seq[(String, String)],
       ioEncryptionKey: Option[Array[Byte]],
       hadoopDelegationCreds: Option[Array[Byte]],
-      resourceProfile: ResourceProfile)
+      resourceProfile: ResourceProfile,
+      logLevel: Option[String])
     extends CoarseGrainedClusterMessage
 
   case object RetrieveLastAllocatedExecutorId extends 
CoarseGrainedClusterMessage
@@ -49,6 +50,10 @@ private[spark] object CoarseGrainedClusterMessages {
   case class KillExecutorsOnHost(host: String)
     extends CoarseGrainedClusterMessage
 
+  case class UpdateExecutorsLogLevel(logLevel: String) extends 
CoarseGrainedClusterMessage
+
+  case class UpdateExecutorLogLevel(logLevel: String) extends 
CoarseGrainedClusterMessage
+
   case class DecommissionExecutorsOnHost(host: String)
     extends CoarseGrainedClusterMessage
 
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index aeac2616711..a6bb6b18059 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -123,6 +123,9 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
   // The num of current max ExecutorId used to re-register appMaster
   @volatile protected var currentExecutorIdCounter = 0
 
+  // Current log level of driver to send to executor
+  @volatile private var currentLogLevel: Option[String] = None
+
   // Current set of delegation tokens to send to executors.
   private val delegationTokens = new AtomicReference[Array[Byte]]()
 
@@ -318,6 +321,14 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
         context.reply(true)
         stop()
 
+      case UpdateExecutorsLogLevel(logLevel) =>
+        currentLogLevel = Some(logLevel)
+        logInfo(s"Asking each executor to refresh the log level to $logLevel")
+        for ((_, executorData) <- executorDataMap) {
+          executorData.executorEndpoint.send(UpdateExecutorLogLevel(logLevel))
+        }
+        context.reply(true)
+
       case StopExecutors =>
         logInfo("Asking each executor to shut down")
         for ((_, executorData) <- executorDataMap) {
@@ -345,7 +356,8 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
           sparkProperties,
           SparkEnv.get.securityManager.getIOEncryptionKey(),
           Option(delegationTokens.get()),
-          rp)
+          rp,
+          currentLogLevel)
         context.reply(reply)
 
       case IsExecutorAlive(executorId) => 
context.reply(isExecutorActive(executorId))
@@ -653,6 +665,12 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
     }
   }
 
+  override def updateExecutorsLogLevel(logLevel: String): Unit = {
+    if (driverEndpoint != null) {
+      driverEndpoint.ask[Boolean](UpdateExecutorsLogLevel(logLevel))
+    }
+  }
+
   /**
    * Reset the state of CoarseGrainedSchedulerBackend to the initial state. 
Currently it will only
    * be called in the yarn-client mode when AM re-registers after a failure.
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 2d61b1b6305..5225a45cff3 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -61,6 +61,7 @@ import org.apache.hadoop.util.{RunJar, StringUtils}
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.logging.log4j.{Level, LogManager}
 import org.apache.logging.log4j.core.LoggerContext
+import org.apache.logging.log4j.core.config.LoggerConfig
 import org.eclipse.jetty.util.MultiException
 import org.slf4j.Logger
 
@@ -2441,9 +2442,7 @@ private[spark] object Utils extends Logging with 
SparkClassUtils {
    * configure a new log4j level
    */
   def setLogLevel(l: Level): Unit = {
-    val ctx = LogManager.getContext(false).asInstanceOf[LoggerContext]
-    val config = ctx.getConfiguration()
-    val loggerConfig = config.getLoggerConfig(LogManager.ROOT_LOGGER_NAME)
+    val (ctx, loggerConfig) = getLogContext
     loggerConfig.setLevel(l)
     ctx.updateLoggers()
 
@@ -2451,6 +2450,26 @@ private[spark] object Utils extends Logging with 
SparkClassUtils {
     Logging.sparkShellThresholdLevel = null
   }
 
+
+  def setLogLevelIfNeeded(newLogLevel: String): Unit = {
+    if (newLogLevel != Utils.getLogLevel) {
+      Utils.setLogLevel(Level.toLevel(newLogLevel))
+    }
+  }
+
+  private lazy val getLogContext: (LoggerContext, LoggerConfig) = {
+    val ctx = LogManager.getContext(false).asInstanceOf[LoggerContext]
+    (ctx, ctx.getConfiguration().getLoggerConfig(LogManager.ROOT_LOGGER_NAME))
+  }
+
+  /**
+   * Get current log level
+   */
+  def getLogLevel: String = {
+    val (_, loggerConfig) = getLogContext
+    loggerConfig.getLevel.name
+  }
+
   /**
    * Return the current system LD_LIBRARY_PATH name
    */


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to