This is an automated email from the ASF dual-hosted git repository. attilapiros pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 5fc90fbd4e3 [SPARK-44198][CORE] Support propagation of the log level to the executors 5fc90fbd4e3 is described below commit 5fc90fbd4e3235fbcf038f4725037321b8234d94 Author: Vinod KC <vinod.kc...@gmail.com> AuthorDate: Thu Jul 27 16:39:33 2023 -0700 [SPARK-44198][CORE] Support propagation of the log level to the executors ### What changes were proposed in this pull request? Currently, the **sc.setLogLevel()** method only sets the log level on the Spark driver, failing to reflect the desired log level on the executors. With _--conf **spark.log.level**_ or **sc.setLogLevel()**, spark allows tuning the log level in the driver process, but it is not reflecting the log level on executors. ### Why are the changes needed? This inconsistency can lead to difficulties in debugging and monitoring Spark applications, as log messages from the executors may not align with the expected log level set on the user code. This PR aims to propagate the log level changes to executors when sc.setLogLevel() is called or send the current log level when a new executor is getting registered ### Does this PR introduce _any_ user-facing change? No, but with this PR, both driver and executor will show same log level ### How was this patch tested? Tested manually to verify the same log levels on both driver and executor Closes #41746 from vinodkc/br_support_setloglevel_executors. Authored-by: Vinod KC <vinod.kc...@gmail.com> Signed-off-by: attilapiros <piros.attila.zs...@gmail.com> --- .../main/scala/org/apache/spark/SparkContext.scala | 11 ++++++++-- .../executor/CoarseGrainedExecutorBackend.scala | 4 ++++ .../org/apache/spark/internal/config/package.scala | 8 +++++++ .../apache/spark/scheduler/SchedulerBackend.scala | 1 + .../cluster/CoarseGrainedClusterMessage.scala | 7 +++++- .../cluster/CoarseGrainedSchedulerBackend.scala | 20 ++++++++++++++++- .../main/scala/org/apache/spark/util/Utils.scala | 25 +++++++++++++++++++--- 7 files changed, 69 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 26fdb86d299..f48cb32b319 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -40,7 +40,6 @@ import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, BytesWritable, Doub import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf, SequenceFileInputFormat, TextInputFormat} import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, Job => NewHadoopJob} import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat} -import org.apache.logging.log4j.Level import org.apache.spark.annotation.{DeveloperApi, Experimental} import org.apache.spark.broadcast.Broadcast @@ -396,7 +395,10 @@ class SparkContext(config: SparkConf) extends Logging { require(SparkContext.VALID_LOG_LEVELS.contains(upperCased), s"Supplied level $logLevel did not match one of:" + s" ${SparkContext.VALID_LOG_LEVELS.mkString(",")}") - Utils.setLogLevel(Level.toLevel(upperCased)) + Utils.setLogLevelIfNeeded(upperCased) + if (conf.get(EXECUTOR_ALLOW_SYNC_LOG_LEVEL) && _schedulerBackend != null) { + _schedulerBackend.updateExecutorsLogLevel(upperCased) + } } try { @@ -585,6 +587,11 @@ class SparkContext(config: SparkConf) extends Logging { _dagScheduler = new DAGScheduler(this) _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet) + if (_conf.get(EXECUTOR_ALLOW_SYNC_LOG_LEVEL)) { + _conf.get(SPARK_LOG_LEVEL) + .foreach(logLevel => _schedulerBackend.updateExecutorsLogLevel(logLevel)) + } + val _executorMetricsSource = if (_conf.get(METRICS_EXECUTORMETRICS_SOURCE_ENABLED)) { Some(new ExecutorMetricsSource) diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index ab238626efe..da009f5addb 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -177,6 +177,8 @@ private[spark] class CoarseGrainedExecutorBackend( case NonFatal(e) => exitExecutor(1, "Unable to create executor due to " + e.getMessage, e) } + case UpdateExecutorLogLevel(newLogLevel) => + Utils.setLogLevelIfNeeded(newLogLevel) case LaunchTask(data) => if (executor == null) { @@ -473,6 +475,8 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { } driverConf.set(EXECUTOR_ID, arguments.executorId) + cfg.logLevel.foreach(logLevel => Utils.setLogLevelIfNeeded(logLevel)) + val env = SparkEnv.createExecutorEnv(driverConf, arguments.executorId, arguments.bindAddress, arguments.hostname, arguments.cores, cfg.ioEncryptionKey, isLocal = false) // Set the application attemptId in the BlockStoreClient if available. diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 04eba8bddeb..83e64f6f8a8 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -2244,6 +2244,14 @@ package object config { .booleanConf .createWithDefault(false) + private[spark] val EXECUTOR_ALLOW_SYNC_LOG_LEVEL = + ConfigBuilder("spark.executor.syncLogLevel.enabled") + .doc("If set to true, log level applied through SparkContext.setLogLevel() method " + + "will be propagated to all executors.") + .version("4.0.0") + .booleanConf + .createWithDefault(false) + private[spark] val EXECUTOR_KILL_ON_FATAL_ERROR_DEPTH = ConfigBuilder("spark.executor.killOnFatalError.depth") .doc("The max depth of the exception chain in a failed task Spark will search for a fatal " + diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala index 56666dcaccf..31871f0dbf3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala @@ -31,6 +31,7 @@ private[spark] trait SchedulerBackend { def start(): Unit def stop(): Unit def stop(exitCode: Int): Unit = stop() + def updateExecutorsLogLevel(logLevel: String): Unit = {} /** * Update the current offers and schedule tasks */ diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index 109c7373447..51b182d9478 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -35,7 +35,8 @@ private[spark] object CoarseGrainedClusterMessages { sparkProperties: Seq[(String, String)], ioEncryptionKey: Option[Array[Byte]], hadoopDelegationCreds: Option[Array[Byte]], - resourceProfile: ResourceProfile) + resourceProfile: ResourceProfile, + logLevel: Option[String]) extends CoarseGrainedClusterMessage case object RetrieveLastAllocatedExecutorId extends CoarseGrainedClusterMessage @@ -49,6 +50,10 @@ private[spark] object CoarseGrainedClusterMessages { case class KillExecutorsOnHost(host: String) extends CoarseGrainedClusterMessage + case class UpdateExecutorsLogLevel(logLevel: String) extends CoarseGrainedClusterMessage + + case class UpdateExecutorLogLevel(logLevel: String) extends CoarseGrainedClusterMessage + case class DecommissionExecutorsOnHost(host: String) extends CoarseGrainedClusterMessage diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index aeac2616711..a6bb6b18059 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -123,6 +123,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // The num of current max ExecutorId used to re-register appMaster @volatile protected var currentExecutorIdCounter = 0 + // Current log level of driver to send to executor + @volatile private var currentLogLevel: Option[String] = None + // Current set of delegation tokens to send to executors. private val delegationTokens = new AtomicReference[Array[Byte]]() @@ -318,6 +321,14 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp context.reply(true) stop() + case UpdateExecutorsLogLevel(logLevel) => + currentLogLevel = Some(logLevel) + logInfo(s"Asking each executor to refresh the log level to $logLevel") + for ((_, executorData) <- executorDataMap) { + executorData.executorEndpoint.send(UpdateExecutorLogLevel(logLevel)) + } + context.reply(true) + case StopExecutors => logInfo("Asking each executor to shut down") for ((_, executorData) <- executorDataMap) { @@ -345,7 +356,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp sparkProperties, SparkEnv.get.securityManager.getIOEncryptionKey(), Option(delegationTokens.get()), - rp) + rp, + currentLogLevel) context.reply(reply) case IsExecutorAlive(executorId) => context.reply(isExecutorActive(executorId)) @@ -653,6 +665,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp } } + override def updateExecutorsLogLevel(logLevel: String): Unit = { + if (driverEndpoint != null) { + driverEndpoint.ask[Boolean](UpdateExecutorsLogLevel(logLevel)) + } + } + /** * Reset the state of CoarseGrainedSchedulerBackend to the initial state. Currently it will only * be called in the yarn-client mode when AM re-registers after a failure. diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 2d61b1b6305..5225a45cff3 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -61,6 +61,7 @@ import org.apache.hadoop.util.{RunJar, StringUtils} import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.logging.log4j.{Level, LogManager} import org.apache.logging.log4j.core.LoggerContext +import org.apache.logging.log4j.core.config.LoggerConfig import org.eclipse.jetty.util.MultiException import org.slf4j.Logger @@ -2441,9 +2442,7 @@ private[spark] object Utils extends Logging with SparkClassUtils { * configure a new log4j level */ def setLogLevel(l: Level): Unit = { - val ctx = LogManager.getContext(false).asInstanceOf[LoggerContext] - val config = ctx.getConfiguration() - val loggerConfig = config.getLoggerConfig(LogManager.ROOT_LOGGER_NAME) + val (ctx, loggerConfig) = getLogContext loggerConfig.setLevel(l) ctx.updateLoggers() @@ -2451,6 +2450,26 @@ private[spark] object Utils extends Logging with SparkClassUtils { Logging.sparkShellThresholdLevel = null } + + def setLogLevelIfNeeded(newLogLevel: String): Unit = { + if (newLogLevel != Utils.getLogLevel) { + Utils.setLogLevel(Level.toLevel(newLogLevel)) + } + } + + private lazy val getLogContext: (LoggerContext, LoggerConfig) = { + val ctx = LogManager.getContext(false).asInstanceOf[LoggerContext] + (ctx, ctx.getConfiguration().getLoggerConfig(LogManager.ROOT_LOGGER_NAME)) + } + + /** + * Get current log level + */ + def getLogLevel: String = { + val (_, loggerConfig) = getLogContext + loggerConfig.getLevel.name + } + /** * Return the current system LD_LIBRARY_PATH name */ --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org