This is an automated email from the ASF dual-hosted git repository. gengliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 18072b5357a5 [SPARK-47577][CORE][PART2] Migrate logError with variables to structured logging framework 18072b5357a5 is described below commit 18072b5357a5fd671829e312ca359fcf34d47c63 Author: Gengliang Wang <gengli...@apache.org> AuthorDate: Fri Apr 5 14:04:51 2024 -0700 [SPARK-47577][CORE][PART2] Migrate logError with variables to structured logging framework ### What changes were proposed in this pull request? Migrate logError with variables of core module to structured logging framework. This is part2 which transforms the logError entries of the following API ``` def logError(msg: => String, throwable: Throwable): Unit ``` to ``` def logError(entry: LogEntry, throwable: Throwable): Unit ``` migration Part1 was in https://github.com/apache/spark/pull/45834 ### Why are the changes needed? To enhance Apache Spark's logging system by implementing structured logging. ### Does this PR introduce _any_ user-facing change? Yes, Spark core logs will contain additional MDC ### How was this patch tested? Compiler and scala style checks, as well as code review. ### Was this patch authored or co-authored using generative AI tooling? No Closes #45890 from gengliangwang/coreError2. Authored-by: Gengliang Wang <gengli...@apache.org> Signed-off-by: Gengliang Wang <gengli...@apache.org> --- .../scala/org/apache/spark/internal/LogKey.scala | 23 +++++++++++++++++++++- .../scala/org/apache/spark/ContextCleaner.scala | 19 +++++++++++------- .../scala/org/apache/spark/MapOutputTracker.scala | 2 +- .../main/scala/org/apache/spark/SparkContext.scala | 15 +++++++++----- .../scala/org/apache/spark/TaskContextImpl.scala | 5 +++-- .../org/apache/spark/api/r/RBackendHandler.scala | 7 ++++--- .../scala/org/apache/spark/deploy/Client.scala | 4 ++-- .../spark/deploy/StandaloneResourceUtils.scala | 8 +++++--- .../main/scala/org/apache/spark/deploy/Utils.scala | 6 ++++-- .../spark/deploy/history/FsHistoryProvider.scala | 7 ++++--- .../org/apache/spark/deploy/worker/Worker.scala | 18 ++++++++++------- .../apache/spark/deploy/worker/ui/LogPage.scala | 6 ++++-- .../scala/org/apache/spark/executor/Executor.scala | 11 ++++++----- .../spark/executor/ExecutorClassLoader.scala | 5 +++-- .../spark/internal/io/SparkHadoopWriter.scala | 4 ++-- .../spark/mapred/SparkHadoopMapRedUtil.scala | 7 +++++-- .../org/apache/spark/metrics/MetricsConfig.scala | 5 +++-- .../org/apache/spark/metrics/MetricsSystem.scala | 3 ++- .../main/scala/org/apache/spark/rdd/PipedRDD.scala | 7 ++++--- .../scala/org/apache/spark/rpc/netty/Inbox.scala | 6 ++++-- .../org/apache/spark/rpc/netty/MessageLoop.scala | 5 +++-- .../org/apache/spark/scheduler/DAGScheduler.scala | 10 ++++++---- .../apache/spark/scheduler/ReplayListenerBus.scala | 4 ++-- .../spark/scheduler/SchedulableBuilder.scala | 14 ++++++++----- .../apache/spark/scheduler/TaskSetManager.scala | 4 ++-- .../apache/spark/serializer/KryoSerializer.scala | 5 +++-- .../org/apache/spark/status/AppStatusStore.scala | 5 +++-- .../org/apache/spark/storage/BlockManager.scala | 7 ++++--- .../spark/storage/BlockManagerDecommissioner.scala | 10 ++++++---- .../spark/storage/BlockManagerMasterEndpoint.scala | 5 +++-- .../storage/BlockManagerStorageEndpoint.scala | 21 +++++++++++--------- .../apache/spark/storage/DiskBlockManager.scala | 11 +++++++---- .../spark/storage/DiskBlockObjectWriter.scala | 3 ++- .../spark/storage/PushBasedFetchHelper.scala | 15 ++++++++------ .../storage/ShuffleBlockFetcherIterator.scala | 5 +++-- .../scala/org/apache/spark/ui/DriverLogPage.scala | 6 ++++-- .../main/scala/org/apache/spark/ui/SparkUI.scala | 5 +++-- .../src/main/scala/org/apache/spark/ui/WebUI.scala | 5 +++-- .../scala/org/apache/spark/util/EventLoop.scala | 7 ++++--- .../scala/org/apache/spark/util/ListenerBus.scala | 8 +++++--- .../apache/spark/util/ShutdownHookManager.scala | 6 ++++-- .../spark/util/SparkUncaughtExceptionHandler.scala | 20 +++++++++++++------ .../main/scala/org/apache/spark/util/Utils.scala | 22 +++++++++++++-------- .../apache/spark/util/logging/FileAppender.scala | 5 +++-- .../spark/util/logging/RollingFileAppender.scala | 8 +++++--- 45 files changed, 244 insertions(+), 140 deletions(-) diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala index 66f3b803c0d4..1d8161282c5b 100644 --- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala +++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala @@ -21,6 +21,7 @@ package org.apache.spark.internal * All structured logging keys should be defined here for standardization. */ object LogKey extends Enumeration { + val ACCUMULATOR_ID = Value val APP_DESC = Value val APP_ID = Value val APP_STATE = Value @@ -33,40 +34,56 @@ object LogKey extends Enumeration { val CLASS_NAME = Value val COMMAND = Value val COMMAND_OUTPUT = Value + val COMPONENT = Value val CONFIG = Value val CONFIG2 = Value val CONTAINER_ID = Value val COUNT = Value val DRIVER_ID = Value + val END_POINT = Value val ERROR = Value + val EVENT_LOOP = Value val EVENT_QUEUE = Value val EXECUTOR_ID = Value - val EXECUTOR_STATE_CHANGED = Value + val EXECUTOR_STATE = Value val EXIT_CODE = Value + val FAILURES = Value val HOST = Value val JOB_ID = Value val LEARNING_RATE = Value val LINE = Value val LINE_NUM = Value + val LISTENER = Value + val LOG_TYPE = Value val MASTER_URL = Value val MAX_ATTEMPTS = Value val MAX_CATEGORIES = Value val MAX_EXECUTOR_FAILURES = Value val MAX_SIZE = Value + val MERGE_DIR_NAME = Value + val METHOD_NAME = Value val MIN_SIZE = Value val NUM_ITERATIONS = Value + val OBJECT_ID = Value val OLD_BLOCK_MANAGER_ID = Value val OPTIMIZER_CLASS_NAME = Value val PARTITION_ID = Value val PATH = Value + val PATHS = Value val POD_ID = Value + val PORT = Value val RANGE = Value + val RDD_ID = Value val REASON = Value + val REDUCE_ID = Value val REMOTE_ADDRESS = Value val RETRY_COUNT = Value val RPC_ADDRESS = Value + val SHUFFLE_BLOCK_INFO = Value val SHUFFLE_ID = Value + val SHUFFLE_MERGE_ID = Value val SIZE = Value + val SLEEP_TIME_SECONDS = Value val STAGE_ID = Value val SUBMISSION_ID = Value val SUBSAMPLING_RATE = Value @@ -75,8 +92,12 @@ object LogKey extends Enumeration { val TASK_NAME = Value val TASK_SET_NAME = Value val TASK_STATE = Value + val THREAD = Value + val THREAD_NAME = Value val TID = Value val TIMEOUT = Value + val URI = Value + val USER_NAME = Value val WORKER_URL = Value type LogKey = Value diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala index a1871cb231cf..c16a84c13187 100644 --- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala +++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala @@ -24,7 +24,8 @@ import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue, Scheduled import scala.jdk.CollectionConverters._ import org.apache.spark.broadcast.Broadcast -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{ACCUMULATOR_ID, BROADCAST_ID, LISTENER, RDD_ID, SHUFFLE_ID} import org.apache.spark.internal.config._ import org.apache.spark.rdd.{RDD, ReliableRDDCheckpointData} import org.apache.spark.scheduler.SparkListener @@ -226,7 +227,7 @@ private[spark] class ContextCleaner( listeners.asScala.foreach(_.rddCleaned(rddId)) logDebug("Cleaned RDD " + rddId) } catch { - case e: Exception => logError("Error cleaning RDD " + rddId, e) + case e: Exception => logError(log"Error cleaning RDD ${MDC(RDD_ID, rddId)}", e) } } @@ -245,7 +246,7 @@ private[spark] class ContextCleaner( logDebug("Asked to cleanup non-existent shuffle (maybe it was already removed)") } } catch { - case e: Exception => logError("Error cleaning shuffle " + shuffleId, e) + case e: Exception => logError(log"Error cleaning shuffle ${MDC(SHUFFLE_ID, shuffleId)}", e) } } @@ -257,7 +258,8 @@ private[spark] class ContextCleaner( listeners.asScala.foreach(_.broadcastCleaned(broadcastId)) logDebug(s"Cleaned broadcast $broadcastId") } catch { - case e: Exception => logError("Error cleaning broadcast " + broadcastId, e) + case e: Exception => + logError(log"Error cleaning broadcast ${MDC(BROADCAST_ID, broadcastId)}", e) } } @@ -269,7 +271,8 @@ private[spark] class ContextCleaner( listeners.asScala.foreach(_.accumCleaned(accId)) logDebug("Cleaned accumulator " + accId) } catch { - case e: Exception => logError("Error cleaning accumulator " + accId, e) + case e: Exception => + logError(log"Error cleaning accumulator ${MDC(ACCUMULATOR_ID, accId)}", e) } } @@ -285,7 +288,8 @@ private[spark] class ContextCleaner( logDebug("Cleaned rdd checkpoint data " + rddId) } catch { - case e: Exception => logError("Error cleaning rdd checkpoint data " + rddId, e) + case e: Exception => + logError(log"Error cleaning rdd checkpoint data ${MDC(RDD_ID, rddId)}", e) } } @@ -295,7 +299,8 @@ private[spark] class ContextCleaner( sc.listenerBus.removeListener(listener) logDebug(s"Cleaned Spark listener $listener") } catch { - case e: Exception => logError(s"Error cleaning Spark listener $listener", e) + case e: Exception => + logError(log"Error cleaning Spark listener ${MDC(LISTENER, listener)}", e) } } diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index faa8504df365..48569eb71379 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -781,7 +781,7 @@ private[spark] class MapOutputTrackerMaster( .getOrElse(Seq.empty[BlockManagerId])) } } catch { - case NonFatal(e) => logError(e.getMessage, e) + case NonFatal(e) => logError(log"${MDC(ERROR, e.getMessage)}", e) } } } catch { diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 7595488cecee..9d908cd8713c 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -46,7 +46,8 @@ import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil} import org.apache.spark.errors.SparkCoreErrors import org.apache.spark.executor.{Executor, ExecutorMetrics, ExecutorMetricsSource} import org.apache.spark.input.{FixedLengthBinaryInputFormat, PortableDataStream, StreamInputFormat, WholeTextFileInputFormat} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey import org.apache.spark.internal.config._ import org.apache.spark.internal.config.Tests._ import org.apache.spark.internal.config.UI._ @@ -748,7 +749,9 @@ class SparkContext(config: SparkConf) extends Logging { } } catch { case e: Exception => - logError(s"Exception getting thread dump from executor $executorId", e) + logError( + log"Exception getting thread dump from executor ${MDC(LogKey.EXECUTOR_ID, executorId)}", + e) None } } @@ -778,7 +781,9 @@ class SparkContext(config: SparkConf) extends Logging { } } catch { case e: Exception => - logError(s"Exception getting heap histogram from executor $executorId", e) + logError( + log"Exception getting heap histogram from " + + log"executor ${MDC(LogKey.EXECUTOR_ID, executorId)}", e) None } } @@ -2140,7 +2145,7 @@ class SparkContext(config: SparkConf) extends Logging { Seq(env.rpcEnv.fileServer.addJar(file)) } catch { case NonFatal(e) => - logError(s"Failed to add $path to Spark environment", e) + logError(log"Failed to add ${MDC(LogKey.PATH, path)} to Spark environment", e) Nil } } @@ -2161,7 +2166,7 @@ class SparkContext(config: SparkConf) extends Logging { Seq(path) } catch { case NonFatal(e) => - logError(s"Failed to add $path to Spark environment", e) + logError(log"Failed to add ${MDC(LogKey.PATH, path)} to Spark environment", e) Nil } } else { diff --git a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala index a3c36de15155..e433cc10ae73 100644 --- a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala +++ b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala @@ -24,7 +24,8 @@ import scala.collection.mutable.ArrayBuffer import scala.jdk.CollectionConverters._ import org.apache.spark.executor.TaskMetrics -import org.apache.spark.internal.{config, Logging} +import org.apache.spark.internal.{config, Logging, MDC} +import org.apache.spark.internal.LogKey.LISTENER import org.apache.spark.memory.TaskMemoryManager import org.apache.spark.metrics.MetricsSystem import org.apache.spark.metrics.source.Source @@ -246,7 +247,7 @@ private[spark] class TaskContextImpl( } } listenerExceptions += e - logError(s"Error in $name", e) + logError(log"Error in ${MDC(LISTENER, name)}", e) } } if (listenerExceptions.nonEmpty) { diff --git a/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala b/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala index 3f7a3ea70a7e..1a05c8f35b7f 100644 --- a/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala +++ b/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala @@ -26,7 +26,8 @@ import io.netty.handler.timeout.ReadTimeoutException import org.apache.spark.{SparkConf, SparkEnv} import org.apache.spark.api.r.SerDe._ -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{METHOD_NAME, OBJECT_ID} import org.apache.spark.internal.config.R._ import org.apache.spark.util.{ThreadUtils, Utils} import org.apache.spark.util.ArrayImplicits._ @@ -76,7 +77,7 @@ private[r] class RBackendHandler(server: RBackend) writeObject(dos, null, server.jvmObjectTracker) } catch { case e: Exception => - logError(s"Removing $objId failed", e) + logError(log"Removing ${MDC(OBJECT_ID, objId)} failed", e) writeInt(dos, -1) writeString(dos, s"Removing $objId failed: ${e.getMessage}") } @@ -192,7 +193,7 @@ private[r] class RBackendHandler(server: RBackend) } } catch { case e: Exception => - logError(s"$methodName on $objId failed", e) + logError(log"${MDC(METHOD_NAME, methodName)} on ${MDC(OBJECT_ID, objId)} failed", e) writeInt(dos, -1) // Writing the error message of the cause for the exception. This will be returned // to user in the R process. diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala index 1eec3e82f1b7..6cf240f12a1c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala @@ -32,7 +32,7 @@ import org.apache.spark.deploy.DeployMessages._ import org.apache.spark.deploy.master.{DriverState, Master} import org.apache.spark.deploy.master.DriverState.DriverState import org.apache.spark.internal.{config, Logging, MDC} -import org.apache.spark.internal.LogKey.{DRIVER_ID, RPC_ADDRESS} +import org.apache.spark.internal.LogKey.{DRIVER_ID, ERROR, RPC_ADDRESS} import org.apache.spark.internal.config.Network.RPC_ASK_TIMEOUT import org.apache.spark.resource.ResourceUtils import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv, ThreadSafeRpcEndpoint} @@ -61,7 +61,7 @@ private class ClientEndpoint( t => t match { case ie: InterruptedException => // Exit normally case e: Throwable => - logError(e.getMessage, e) + logError(log"${MDC(ERROR, e.getMessage)}", e) System.exit(SparkExitCode.UNCAUGHT_EXCEPTION) }) diff --git a/core/src/main/scala/org/apache/spark/deploy/StandaloneResourceUtils.scala b/core/src/main/scala/org/apache/spark/deploy/StandaloneResourceUtils.scala index 509049550ad4..d317d6449f29 100644 --- a/core/src/main/scala/org/apache/spark/deploy/StandaloneResourceUtils.scala +++ b/core/src/main/scala/org/apache/spark/deploy/StandaloneResourceUtils.scala @@ -27,7 +27,8 @@ import org.json4s.{DefaultFormats, Extraction, Formats} import org.json4s.jackson.JsonMethods.{compact, render} import org.apache.spark.SparkException -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.COMPONENT import org.apache.spark.resource.{ResourceAllocation, ResourceID, ResourceInformation, ResourceRequirement} import org.apache.spark.util.ArrayImplicits._ import org.apache.spark.util.Utils @@ -103,9 +104,10 @@ private[spark] object StandaloneResourceUtils extends Logging { writeResourceAllocationJson(allocations, tmpFile) } catch { case NonFatal(e) => - val errMsg = s"Exception threw while preparing resource file for $compShortName" + val errMsg = + log"Exception threw while preparing resource file for ${MDC(COMPONENT, compShortName)}" logError(errMsg, e) - throw new SparkException(errMsg, e) + throw new SparkException(errMsg.message, e) } val resourcesFile = File.createTempFile(s"resource-$compShortName-", ".json", dir) tmpFile.renameTo(resourcesFile) diff --git a/core/src/main/scala/org/apache/spark/deploy/Utils.scala b/core/src/main/scala/org/apache/spark/deploy/Utils.scala index 9eb5a0042e51..4d2546cb808c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Utils.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Utils.scala @@ -22,7 +22,8 @@ import java.io.File import jakarta.servlet.http.HttpServletRequest import org.apache.spark.SparkConf -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{LOG_TYPE, PATH} import org.apache.spark.ui.JettyUtils.createServletHandler import org.apache.spark.ui.WebUI import org.apache.spark.util.Utils.{getFileLength, offsetBytes} @@ -95,7 +96,8 @@ private[deploy] object Utils extends Logging { (logText, startIndex, endIndex, totalLength) } catch { case e: Exception => - logError(s"Error getting $logType logs from directory $logDirectory", e) + logError(log"Error getting ${MDC(LOG_TYPE, logType)} logs from " + + log"directory ${MDC(PATH, logDirectory)}", e) ("Error getting logs due to exception: " + e.getMessage, 0, 0, 0) } } diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index e0128e35b761..98cbd7b3eba8 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -37,7 +37,8 @@ import org.apache.hadoop.security.AccessControlException import org.apache.spark.{SecurityManager, SparkConf, SparkException} import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.PATH import org.apache.spark.internal.config._ import org.apache.spark.internal.config.History._ import org.apache.spark.internal.config.Status._ @@ -920,7 +921,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) case e: AccessControlException => logWarning(s"Insufficient permission while compacting log for $rootPath", e) case e: Exception => - logError(s"Exception while compacting log for $rootPath", e) + logError(log"Exception while compacting log for ${MDC(PATH, rootPath)}", e) } finally { endProcessing(rootPath) } @@ -1402,7 +1403,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) case _: AccessControlException => logInfo(s"No permission to delete $log, ignoring.") case ioe: IOException => - logError(s"IOException in cleaning $log", ioe) + logError(log"IOException in cleaning ${MDC(PATH, log)}", ioe) } } deleted diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index e2ba221fb00c..0659c26fd15b 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -38,7 +38,7 @@ import org.apache.spark.deploy.StandaloneResourceUtils._ import org.apache.spark.deploy.master.{DriverState, Master} import org.apache.spark.deploy.worker.ui.WorkerWebUI import org.apache.spark.internal.{config, Logging, MDC} -import org.apache.spark.internal.LogKey.{DRIVER_ID, ERROR, EXECUTOR_STATE_CHANGED, MASTER_URL, MAX_ATTEMPTS} +import org.apache.spark.internal.LogKey._ import org.apache.spark.internal.config.Tests.IS_TESTING import org.apache.spark.internal.config.UI._ import org.apache.spark.internal.config.Worker._ @@ -549,7 +549,7 @@ private[deploy] class Worker( }(cleanupThreadExecutor) cleanupFuture.failed.foreach(e => - logError("App dir cleanup failed: " + e.getMessage, e) + logError(log"App dir cleanup failed: ${MDC(ERROR, e.getMessage)}", e) )(cleanupThreadExecutor) } catch { case _: RejectedExecutionException if cleanupThreadExecutor.isShutdown => @@ -638,7 +638,9 @@ private[deploy] class Worker( addResourcesUsed(resources_) } catch { case e: Exception => - logError(s"Failed to launch executor $appId/$execId for ${appDesc.name}.", e) + logError( + log"Failed to launch executor ${MDC(APP_ID, appId)}/${MDC(EXECUTOR_ID, execId)} " + + log"for ${MDC(APP_DESC, appDesc.name)}.", e) if (executors.contains(appId + "/" + execId)) { executors(appId + "/" + execId).kill() executors -= appId + "/" + execId @@ -749,7 +751,7 @@ private[deploy] class Worker( Utils.deleteRecursively(new File(dir)) } }(cleanupThreadExecutor).failed.foreach(e => - logError(s"Clean up app dir $dirList failed: ${e.getMessage}", e) + logError(log"Clean up app dir ${MDC(PATHS, dirList)} failed", e) )(cleanupThreadExecutor) } } catch { @@ -794,8 +796,10 @@ private[deploy] class Worker( case Failure(t) => val failures = executorStateSyncFailureAttempts.getOrElse(fullId, 0) + 1 if (failures < executorStateSyncMaxAttempts) { - logError(s"Failed to send $newState to Master $masterRef, " + - s"will retry ($failures/$executorStateSyncMaxAttempts).", t) + logError(log"Failed to send ${MDC(EXECUTOR_STATE, newState)}" + + log" to Master ${MDC(MASTER_URL, masterRef)}, will retry " + + log"(${MDC(FAILURES, failures)}/" + + log"${MDC(MAX_ATTEMPTS, executorStateSyncMaxAttempts)}).", t) executorStateSyncFailureAttempts(fullId) = failures // If the failure is not caused by TimeoutException, wait for a while before retry in // case the connection is temporarily unavailable. @@ -808,7 +812,7 @@ private[deploy] class Worker( } self.send(newState) } else { - logError(log"Failed to send ${MDC(EXECUTOR_STATE_CHANGED, newState)} " + + logError(log"Failed to send ${MDC(EXECUTOR_STATE, newState)} " + log"to Master ${MDC(MASTER_URL, masterRef)} for " + log"${MDC(MAX_ATTEMPTS, executorStateSyncMaxAttempts)} times. Giving up.") System.exit(1) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala index c4a15095ec40..006a388e98b5 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala @@ -23,7 +23,8 @@ import scala.xml.{Node, Unparsed} import jakarta.servlet.http.HttpServletRequest -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{LOG_TYPE, PATH} import org.apache.spark.ui.{UIUtils, WebUIPage} import org.apache.spark.util.Utils import org.apache.spark.util.logging.RollingFileAppender @@ -174,7 +175,8 @@ private[ui] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") with (logText, startIndex, endIndex, totalLength) } catch { case e: Exception => - logError(s"Error getting $logType logs from directory $logDirectory", e) + logError(log"Error getting ${MDC(LOG_TYPE, logType)} logs from " + + log"directory ${MDC(PATH, logDirectory)}", e) ("Error getting logs due to exception: " + e.getMessage, 0, 0, 0) } } diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 67d0c37c3edd..a7657cd78cd9 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -41,7 +41,7 @@ import org.slf4j.MDC import org.apache.spark._ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.{Logging, MDC => LogMDC} -import org.apache.spark.internal.LogKey.{ERROR, MAX_ATTEMPTS, TASK_ID, TASK_NAME, TIMEOUT} +import org.apache.spark.internal.LogKey.{CLASS_NAME, ERROR, MAX_ATTEMPTS, TASK_ID, TASK_NAME, TIMEOUT} import org.apache.spark.internal.config._ import org.apache.spark.internal.plugin.PluginContainer import org.apache.spark.memory.{SparkOutOfMemoryError, TaskMemoryManager} @@ -661,9 +661,10 @@ private[spark] class Executor( // uh-oh. it appears the user code has caught the fetch-failure without throwing any // other exceptions. Its *possible* this is what the user meant to do (though highly // unlikely). So we will log an error and keep going. - logError(s"$taskName completed successfully though internally it encountered " + - s"unrecoverable fetch failures! Most likely this means user code is incorrectly " + - s"swallowing Spark's internal ${classOf[FetchFailedException]}", fetchFailure) + logError(log"${LogMDC(TASK_NAME, taskName)} completed successfully though internally " + + log"it encountered unrecoverable fetch failures! Most likely this means user code " + + log"is incorrectly swallowing Spark's internal " + + log"${LogMDC(CLASS_NAME, classOf[FetchFailedException])}", fetchFailure) } val taskFinishNs = System.nanoTime() val taskFinishCpu = if (threadMXBean.isCurrentThreadCpuTimeSupported) { @@ -802,7 +803,7 @@ private[spark] class Executor( // Attempt to exit cleanly by informing the driver of our failure. // If anything goes wrong (or this was a fatal exception), we will delegate to // the default uncaught exception handler, which will terminate the Executor. - logError(s"Exception in $taskName", t) + logError(log"Exception in ${LogMDC(TASK_NAME, taskName)}", t) // SPARK-20904: Do not report failure to driver if if happened during shut down. Because // libraries may set up shutdown hooks that race with running tasks during shutdown, diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorClassLoader.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorClassLoader.scala index b9f4486b66fa..c7047ddd278b 100644 --- a/core/src/main/scala/org/apache/spark/executor/ExecutorClassLoader.scala +++ b/core/src/main/scala/org/apache/spark/executor/ExecutorClassLoader.scala @@ -30,7 +30,7 @@ import org.apache.xbean.asm9.Opcodes._ import org.apache.spark.{SparkConf, SparkEnv} import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, LogKey, MDC} import org.apache.spark.util.ParentClassLoader /** @@ -183,7 +183,8 @@ class ExecutorClassLoader( None case e: Exception => // Something bad happened while checking if the class exists - logError(s"Failed to check existence of class $name on REPL class server at $uri", e) + logError(log"Failed to check existence of class ${MDC(LogKey.CLASS_NAME, name)} " + + log"on REPL class server at ${MDC(LogKey.URI, uri)}", e) if (userClassPathFirst) { // Allow to try to load from "parentLoader" None diff --git a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala index 20239980eee5..95ea814042d3 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala @@ -33,7 +33,7 @@ import org.apache.hadoop.mapreduce.task.{TaskAttemptContextImpl => NewTaskAttemp import org.apache.spark.{SerializableWritable, SparkConf, SparkException, TaskContext} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKey.TASK_ATTEMPT_ID +import org.apache.spark.internal.LogKey.{JOB_ID, TASK_ATTEMPT_ID} import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage import org.apache.spark.rdd.{HadoopRDD, RDD} import org.apache.spark.util.{SerializableConfiguration, SerializableJobConf, Utils} @@ -104,7 +104,7 @@ object SparkHadoopWriter extends Logging { logInfo(s"Write Job ${jobContext.getJobID} committed. Elapsed time: $duration ms.") } catch { case cause: Throwable => - logError(s"Aborting job ${jobContext.getJobID}.", cause) + logError(log"Aborting job ${MDC(JOB_ID, jobContext.getJobID)}.", cause) committer.abortJob(jobContext) throw new SparkException("Job aborted.", cause) } diff --git a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala index b059e82df23b..c68999f34079 100644 --- a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala +++ b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala @@ -24,7 +24,8 @@ import org.apache.hadoop.mapreduce.{OutputCommitter => MapReduceOutputCommitter} import org.apache.spark.{SparkEnv, TaskContext} import org.apache.spark.executor.CommitDeniedException -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.TASK_ATTEMPT_ID import org.apache.spark.util.Utils object SparkHadoopMapRedUtil extends Logging { @@ -52,7 +53,9 @@ object SparkHadoopMapRedUtil extends Logging { logInfo(s"$mrTaskAttemptID: Committed. Elapsed time: $timeCost ms.") } catch { case cause: IOException => - logError(s"Error committing the output of task: $mrTaskAttemptID", cause) + logError( + log"Error committing the output of task: ${MDC(TASK_ATTEMPT_ID, mrTaskAttemptID)}", + cause) committer.abortTask(mrTaskContext) throw cause } diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala index 195c5b0f47f5..12df40c3476a 100644 --- a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala +++ b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala @@ -25,7 +25,8 @@ import scala.jdk.CollectionConverters._ import scala.util.matching.Regex import org.apache.spark.SparkConf -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.PATH import org.apache.spark.internal.config.METRICS_CONF import org.apache.spark.util.Utils @@ -140,7 +141,7 @@ private[spark] class MetricsConfig(conf: SparkConf) extends Logging { } catch { case e: Exception => val file = path.getOrElse(DEFAULT_METRICS_CONF_FILENAME) - logError(s"Error loading configuration file $file", e) + logError(log"Error loading configuration file ${MDC(PATH, file)}", e) } finally { if (is != null) { is.close() diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala index 05e0b6b9c4ef..555083bb65d2 100644 --- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala +++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala @@ -188,7 +188,8 @@ private[spark] class MetricsSystem private ( val source = Utils.classForName[Source](classPath).getConstructor().newInstance() registerSource(source) } catch { - case e: Exception => logError("Source class " + classPath + " cannot be instantiated", e) + case e: Exception => + logError(log"Source class ${MDC(CLASS_NAME, classPath)} cannot be instantiated", e) } } } diff --git a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala index 086df6231324..127bdf6d9181 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala @@ -34,7 +34,7 @@ import scala.reflect.ClassTag import org.apache.spark.{Partition, TaskContext} import org.apache.spark.errors.SparkCoreErrors -import org.apache.spark.internal.LogKey.{COMMAND, ERROR} +import org.apache.spark.internal.LogKey.{COMMAND, ERROR, PATH} import org.apache.spark.internal.MDC import org.apache.spark.util.Utils @@ -107,8 +107,9 @@ private[spark] class PipedRDD[T: ClassTag]( pb.directory(taskDirFile) workInTaskDirectory = true } catch { - case e: Exception => logError("Unable to setup task working directory: " + e.getMessage + - " (" + taskDirectory + ")", e) + case e: Exception => + logError(log"Unable to setup task working directory: ${MDC(ERROR, e.getMessage)}" + + log" (${MDC(PATH, taskDirectory)})", e) } } diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/Inbox.scala b/core/src/main/scala/org/apache/spark/rpc/netty/Inbox.scala index 472401b23fe8..b503c5a0f808 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/Inbox.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/Inbox.scala @@ -22,7 +22,8 @@ import javax.annotation.concurrent.GuardedBy import scala.util.control.NonFatal import org.apache.spark.SparkException -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.END_POINT import org.apache.spark.rpc.{RpcAddress, RpcEndpoint, ThreadSafeRpcEndpoint} @@ -206,7 +207,8 @@ private[netty] class Inbox(val endpointName: String, val endpoint: RpcEndpoint) // Should reduce the number of active threads before throw the error. numActiveThreads -= 1 } - logError(s"An error happened while processing message in the inbox for $endpointName", fatal) + logError(log"An error happened while processing message in the inbox for" + + log" ${MDC(END_POINT, endpointName)}", fatal) throw fatal } diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/MessageLoop.scala b/core/src/main/scala/org/apache/spark/rpc/netty/MessageLoop.scala index df7cd0b44c90..2d94ed5d05e1 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/MessageLoop.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/MessageLoop.scala @@ -22,7 +22,8 @@ import java.util.concurrent._ import scala.util.control.NonFatal import org.apache.spark.{SparkConf, SparkContext} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.ERROR import org.apache.spark.internal.config.EXECUTOR_ID import org.apache.spark.internal.config.Network._ import org.apache.spark.rpc.{IsolatedRpcEndpoint, RpcEndpoint} @@ -74,7 +75,7 @@ private sealed abstract class MessageLoop(dispatcher: Dispatcher) extends Loggin } inbox.process(dispatcher) } catch { - case NonFatal(e) => logError(e.getMessage, e) + case NonFatal(e) => logError(log"${MDC(ERROR, e.getMessage)}", e) } } } catch { diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 7ee8dc7ec0c8..41cbd795b7e5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -37,7 +37,7 @@ import org.apache.spark.broadcast.Broadcast import org.apache.spark.errors.SparkCoreErrors import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics} import org.apache.spark.internal.{config, Logging, MDC} -import org.apache.spark.internal.LogKey.{JOB_ID, STAGE_ID} +import org.apache.spark.internal.LogKey.{ACCUMULATOR_ID, CLASS_NAME, JOB_ID, PARTITION_ID, STAGE_ID, TASK_ID} import org.apache.spark.internal.config.{LEGACY_ABORT_STAGE_AFTER_KILL_TASKS, RDD_CACHE_VISIBILITY_TRACKING_ENABLED} import org.apache.spark.internal.config.Tests.TEST_NO_STAGE_RETRY import org.apache.spark.network.shuffle.{BlockStoreClient, MergeFinalizerListener} @@ -1749,8 +1749,8 @@ private[spark] class DAGScheduler( case None => "Unknown class" } logError( - s"Failed to update accumulator $id ($accumClassName) for task ${task.partitionId}", - e) + log"Failed to update accumulator ${MDC(ACCUMULATOR_ID, id)} (${MDC(CLASS_NAME, accumClassName)}) " + + log"for task ${MDC(PARTITION_ID, task.partitionId)}", e) } } } @@ -1763,7 +1763,9 @@ private[spark] class DAGScheduler( } catch { case NonFatal(e) => val taskId = event.taskInfo.taskId - logError(s"Error when attempting to reconstruct metrics for task $taskId", e) + logError( + log"Error when attempting to reconstruct metrics for task ${MDC(TASK_ID, taskId)}", + e) null } } else { diff --git a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala index a3b8f1206b9d..24c25d237794 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala @@ -25,7 +25,7 @@ import com.fasterxml.jackson.core.JsonParseException import com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKey.{LINE, LINE_NUM} +import org.apache.spark.internal.LogKey.{LINE, LINE_NUM, PATH} import org.apache.spark.scheduler.ReplayListenerBus._ import org.apache.spark.util.JsonProtocol @@ -125,7 +125,7 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging { case ioe: IOException => throw ioe case e: Exception => - logError(s"Exception parsing Spark event log: $sourceName", e) + logError(log"Exception parsing Spark event log: ${MDC(PATH, sourceName)}", e) logError(log"Malformed line #${MDC(LINE_NUM, lineNumber)}: ${MDC(LINE, currentLine)}\n") false } diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala index a30744da9ee9..7e61dad3c141 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala @@ -26,7 +26,8 @@ import scala.xml.{Node, XML} import org.apache.hadoop.fs.Path import org.apache.spark.SparkContext -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.PATH import org.apache.spark.internal.config.{SCHEDULER_ALLOCATION_FILE, SCHEDULER_MODE} import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.util.Utils @@ -99,10 +100,13 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, sc: SparkContext fileData.foreach { case (is, fileName) => buildFairSchedulerPool(is, fileName) } } catch { case NonFatal(t) => - val defaultMessage = "Error while building the fair scheduler pools" - val message = fileData.map { case (is, fileName) => s"$defaultMessage from $fileName" } - .getOrElse(defaultMessage) - logError(message, t) + if (fileData.isDefined) { + val fileName = fileData.get._2 + logError(log"Error while building the fair scheduler pools from ${MDC(PATH, fileName)}", + t) + } else { + logError("Error while building the fair scheduler pools", t) + } throw t } finally { fileData.foreach { case (is, fileName) => is.close() } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index dc0656778455..1418901e3442 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -534,9 +534,9 @@ private[spark] class TaskSetManager( // If the task cannot be serialized, then there's no point to re-attempt the task, // as it will always fail. So just abort the whole task-set. case NonFatal(e) => - val msg = s"Failed to serialize task $taskId, not attempting to retry it." + val msg = log"Failed to serialize task ${MDC(TASK_ID, taskId)}, not attempting to retry it." logError(msg, e) - abort(s"$msg Exception during serialization: $e") + abort(s"${msg.message} Exception during serialization: $e") throw SparkCoreErrors.failToSerializeTaskError(e) } if (serializedTask.limit() > TaskSetManager.TASK_SIZE_TO_WARN_KIB * 1024 && diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index 613080813d8e..5a0b2ba3735c 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -39,7 +39,8 @@ import org.roaringbitmap.RoaringBitmap import org.apache.spark._ import org.apache.spark.api.python.PythonBroadcast -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.CLASS_NAME import org.apache.spark.internal.config.Kryo._ import org.apache.spark.internal.io.FileCommitProtocol._ import org.apache.spark.network.util.ByteUnit @@ -739,7 +740,7 @@ private object JavaIterableWrapperSerializer extends Logging { private val underlyingMethodOpt = { try Some(wrapperClass.getDeclaredMethod("underlying")) catch { case e: Exception => - logError("Failed to find the underlying field in " + wrapperClass, e) + logError(log"Failed to find the underlying field in ${MDC(CLASS_NAME, wrapperClass)}", e) None } } diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala index d50b8f935d56..109a9a2e3eb9 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala @@ -25,7 +25,8 @@ import scala.collection.mutable.HashMap import scala.jdk.CollectionConverters._ import org.apache.spark.{JobExecutionStatus, SparkConf, SparkContext} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.PATH import org.apache.spark.internal.config.Status.LIVE_UI_LOCAL_STORE_DIR import org.apache.spark.status.AppStatusUtils.getQuantilesValue import org.apache.spark.status.api.v1 @@ -864,7 +865,7 @@ private[spark] object AppStatusStore extends Logging { Some(localDir) } catch { case e: IOException => - logError(s"Failed to create spark ui store path in $rootDir.", e) + logError(log"Failed to create spark ui store path in ${MDC(PATH, rootDir)}.", e) None } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index e68239f260d9..9aa100d9ff36 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -42,7 +42,7 @@ import org.apache.spark._ import org.apache.spark.errors.SparkCoreErrors import org.apache.spark.executor.DataReadMethod import org.apache.spark.internal.{config, Logging, MDC} -import org.apache.spark.internal.LogKey.BLOCK_ID +import org.apache.spark.internal.LogKey.{BLOCK_ID, COUNT, SLEEP_TIME_SECONDS} import org.apache.spark.internal.config.{Network, RDD_CACHE_VISIBILITY_TRACKING_ENABLED, Tests} import org.apache.spark.memory.{MemoryManager, MemoryMode} import org.apache.spark.metrics.source.Source @@ -626,8 +626,9 @@ private[spark] class BlockManager( return } catch { case e: Exception if i < MAX_ATTEMPTS => - logError(s"Failed to connect to external shuffle server, will retry ${MAX_ATTEMPTS - i}" - + s" more times after waiting $SLEEP_TIME_SECS seconds...", e) + logError(log"Failed to connect to external shuffle server, will retry " + + log"${MDC(COUNT, MAX_ATTEMPTS - i)} more times after waiting " + + log"${MDC(SLEEP_TIME_SECONDS, SLEEP_TIME_SECS)} seconds...", e) Thread.sleep(SLEEP_TIME_SECS * 1000L) case NonFatal(e) => throw SparkCoreErrors.unableToRegisterWithExternalShuffleServerError(e) } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala index 686003e2c51d..5b4ecef233f8 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala @@ -26,8 +26,8 @@ import scala.util.control.NonFatal import org.apache.spark._ import org.apache.spark.errors.SparkCoreErrors -import org.apache.spark.internal.Logging -import org.apache.spark.internal.config +import org.apache.spark.internal.{config, Logging, MDC} +import org.apache.spark.internal.LogKey.SHUFFLE_BLOCK_INFO import org.apache.spark.shuffle.ShuffleBlockInfo import org.apache.spark.storage.BlockManagerMessages.ReplicateBlock import org.apache.spark.util.{ThreadUtils, Utils} @@ -152,11 +152,13 @@ private[storage] class BlockManagerDecommissioner( isTargetDecommissioned = true keepRunning = false } else { - logError(s"Error occurred during migrating $shuffleBlockInfo", e) + logError(log"Error occurred during migrating " + + log"${MDC(SHUFFLE_BLOCK_INFO, shuffleBlockInfo)}", e) keepRunning = false } case e: Exception => - logError(s"Error occurred during migrating $shuffleBlockInfo", e) + logError(log"Error occurred during migrating " + + log"${MDC(SHUFFLE_BLOCK_INFO, shuffleBlockInfo)}", e) keepRunning = false } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index ac453d0f743c..5bb4e096c029 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -32,7 +32,7 @@ import com.google.common.cache.CacheBuilder import org.apache.spark.{MapOutputTrackerMaster, SparkConf, SparkContext, SparkEnv} import org.apache.spark.annotation.DeveloperApi import org.apache.spark.internal.{config, Logging, MDC} -import org.apache.spark.internal.LogKey.{BLOCK_MANAGER_ID, OLD_BLOCK_MANAGER_ID} +import org.apache.spark.internal.LogKey.{BLOCK_MANAGER_ID, EXECUTOR_ID, OLD_BLOCK_MANAGER_ID} import org.apache.spark.internal.config.RDD_CACHE_VISIBILITY_TRACKING_ENABLED import org.apache.spark.network.shuffle.{ExternalBlockStoreClient, RemoteBlockPushResolver} import org.apache.spark.rpc.{IsolatedThreadSafeRpcEndpoint, RpcCallContext, RpcEndpointRef, RpcEnv} @@ -328,7 +328,8 @@ class BlockManagerMasterEndpoint( // care about the return result of removing blocks. That way we avoid breaking // down the whole application. case NonFatal(e) => - logError(s"Cannot determine whether executor $executorId is alive or not.", e) + logError(log"Cannot determine whether executor " + + log"${MDC(EXECUTOR_ID, executorId)} is alive or not.", e) false } if (!isAlive) { diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala index 5cc08714d41c..1fccbd16ced5 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala @@ -20,7 +20,8 @@ package org.apache.spark.storage import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, Future} import org.apache.spark.{MapOutputTracker, SparkEnv} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC, MessageWithContext} +import org.apache.spark.internal.LogKey.{BLOCK_ID, BROADCAST_ID, RDD_ID, SHUFFLE_ID} import org.apache.spark.rpc.{IsolatedThreadSafeRpcEndpoint, RpcCallContext, RpcEnv} import org.apache.spark.storage.BlockManagerMessages._ import org.apache.spark.util.{ThreadUtils, Utils} @@ -44,18 +45,18 @@ class BlockManagerStorageEndpoint( // Operations that involve removing blocks may be slow and should be done asynchronously override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case RemoveBlock(blockId) => - doAsync[Boolean]("removing block " + blockId, context) { + doAsync[Boolean](log"removing block ${MDC(BLOCK_ID, blockId)}", context) { blockManager.removeBlock(blockId) true } case RemoveRdd(rddId) => - doAsync[Int]("removing RDD " + rddId, context) { + doAsync[Int](log"removing RDD ${MDC(RDD_ID, rddId)}", context) { blockManager.removeRdd(rddId) } case RemoveShuffle(shuffleId) => - doAsync[Boolean]("removing shuffle " + shuffleId, context) { + doAsync[Boolean](log"removing shuffle ${MDC(SHUFFLE_ID, shuffleId)}", context) { if (mapOutputTracker != null) { mapOutputTracker.unregisterShuffle(shuffleId) } @@ -66,7 +67,7 @@ class BlockManagerStorageEndpoint( context.reply(blockManager.decommissionSelf()) case RemoveBroadcast(broadcastId, _) => - doAsync[Int]("removing broadcast " + broadcastId, context) { + doAsync[Int](log"removing broadcast ${MDC(BROADCAST_ID, broadcastId)}", context) { blockManager.removeBroadcast(broadcastId, tellMaster = true) } @@ -96,18 +97,20 @@ class BlockManagerStorageEndpoint( context.reply(blockManager.blockInfoManager.tryMarkBlockAsVisible(blockId)) } - private def doAsync[T](actionMessage: String, context: RpcCallContext)(body: => T): Unit = { + private def doAsync[T]( + actionMessage: MessageWithContext, + context: RpcCallContext)(body: => T): Unit = { val future = Future { - logDebug(actionMessage) + logDebug(actionMessage.message) body } future.foreach { response => - logDebug(s"Done $actionMessage, response is $response") + logDebug(s"Done ${actionMessage.message}, response is $response") context.reply(response) logDebug(s"Sent response: $response to ${context.senderAddress}") } future.failed.foreach { t => - logError(s"Error in $actionMessage", t) + logError(log"Error in " + actionMessage, t) context.sendFailure(t) } } diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index 7446a55fc7c3..4c0b5f4a14f6 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -30,7 +30,8 @@ import com.fasterxml.jackson.module.scala.DefaultScalaModule import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.errors.SparkCoreErrors import org.apache.spark.executor.ExecutorExitCode -import org.apache.spark.internal.{config, Logging} +import org.apache.spark.internal.{config, Logging, MDC} +import org.apache.spark.internal.LogKey.{MERGE_DIR_NAME, PATH} import org.apache.spark.network.shuffle.ExecutorDiskUtils import org.apache.spark.storage.DiskBlockManager.ATTEMPT_ID_KEY import org.apache.spark.storage.DiskBlockManager.MERGE_DIR_KEY @@ -255,7 +256,8 @@ private[spark] class DiskBlockManager( Some(localDir) } catch { case e: IOException => - logError(s"Failed to create local dir in $rootDir. Ignoring this directory.", e) + logError( + log"Failed to create local dir in ${MDC(PATH, rootDir)}. Ignoring this directory.", e) None } } @@ -292,7 +294,8 @@ private[spark] class DiskBlockManager( } catch { case e: IOException => logError( - s"Failed to create $mergeDirName dir in $rootDir. Ignoring this directory.", e) + log"Failed to create ${MDC(MERGE_DIR_NAME, mergeDirName)} dir in " + + log"${MDC(PATH, rootDir)}. Ignoring this directory.", e) } } } @@ -370,7 +373,7 @@ private[spark] class DiskBlockManager( } } catch { case e: Exception => - logError(s"Exception while deleting local spark dir: $localDir", e) + logError(log"Exception while deleting local spark dir: ${MDC(PATH, localDir)}", e) } } } diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala index 80e268081fa7..0b6e33ff5fb3 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala @@ -271,7 +271,8 @@ private[spark] class DiskBlockObjectWriter( logError(log"Exception occurred while reverting partial writes to file " + log"${MDC(PATH, file)}, ${MDC(ERROR, ce.getMessage)}") case e: Exception => - logError("Uncaught exception while reverting partial writes to file " + file, e) + logError( + log"Uncaught exception while reverting partial writes to file ${MDC(PATH, file)}", e) } finally { if (truncateStream != null) { truncateStream.close() diff --git a/core/src/main/scala/org/apache/spark/storage/PushBasedFetchHelper.scala b/core/src/main/scala/org/apache/spark/storage/PushBasedFetchHelper.scala index 9b6048e90c9a..31958af84e54 100644 --- a/core/src/main/scala/org/apache/spark/storage/PushBasedFetchHelper.scala +++ b/core/src/main/scala/org/apache/spark/storage/PushBasedFetchHelper.scala @@ -28,7 +28,8 @@ import org.roaringbitmap.RoaringBitmap import org.apache.spark.MapOutputTracker import org.apache.spark.MapOutputTracker.SHUFFLE_PUSH_MAP_ID -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{HOST, PORT, REDUCE_ID, SHUFFLE_ID, SHUFFLE_MERGE_ID} import org.apache.spark.network.shuffle.{BlockStoreClient, MergedBlockMeta, MergedBlocksMetaListener} import org.apache.spark.shuffle.ShuffleReadMetricsReporter import org.apache.spark.storage.BlockManagerId.SHUFFLE_MERGER_IDENTIFIER @@ -170,9 +171,10 @@ private class PushBasedFetchHelper( reduceId, sizeMap((shuffleId, reduceId)), meta.readChunkBitmaps(), address)) } catch { case exception: Exception => - logError(s"Failed to parse the meta of push-merged block for ($shuffleId, " + - s"$shuffleMergeId, $reduceId) from" + - s" ${req.address.host}:${req.address.port}", exception) + logError(log"Failed to parse the meta of push-merged block for (" + + log"${MDC(SHUFFLE_ID, shuffleId)}, ${MDC(SHUFFLE_MERGE_ID, shuffleMergeId)}, " + + log"${MDC(REDUCE_ID, reduceId)}) from ${MDC(HOST, req.address.host)}" + + log":${MDC(PORT, req.address.port)}", exception) iterator.addToResultsQueue( PushMergedRemoteMetaFailedFetchResult(shuffleId, shuffleMergeId, reduceId, address)) @@ -181,8 +183,9 @@ private class PushBasedFetchHelper( override def onFailure(shuffleId: Int, shuffleMergeId: Int, reduceId: Int, exception: Throwable): Unit = { - logError(s"Failed to get the meta of push-merged block for ($shuffleId, $reduceId) " + - s"from ${req.address.host}:${req.address.port}", exception) + logError(log"Failed to get the meta of push-merged block for " + + log"(${MDC(SHUFFLE_ID, shuffleId)}, ${MDC(REDUCE_ID, reduceId)}) " + + log"from ${MDC(HOST, req.address.host)}:${MDC(PORT, req.address.port)}", exception) iterator.addToResultsQueue( PushMergedRemoteMetaFailedFetchResult(shuffleId, shuffleMergeId, reduceId, address)) } diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index 916cb83d379e..d22ce3dbed77 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -37,7 +37,7 @@ import org.apache.spark.{MapOutputTracker, SparkException, TaskContext} import org.apache.spark.MapOutputTracker.SHUFFLE_PUSH_MAP_ID import org.apache.spark.errors.SparkCoreErrors import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKey.{BLOCK_ID, ERROR, MAX_ATTEMPTS} +import org.apache.spark.internal.LogKey.{BLOCK_ID, ERROR, HOST, MAX_ATTEMPTS, PORT} import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer} import org.apache.spark.network.shuffle._ import org.apache.spark.network.shuffle.checksum.{Cause, ShuffleChecksumHelper} @@ -314,7 +314,8 @@ final class ShuffleBlockFetcherIterator( override def onBlockFetchFailure(blockId: String, e: Throwable): Unit = { ShuffleBlockFetcherIterator.this.synchronized { - logError(s"Failed to get block(s) from ${req.address.host}:${req.address.port}", e) + logError(log"Failed to get block(s) from " + + log"${MDC(HOST, req.address.host)}:${MDC(PORT, req.address.port)}", e) e match { // SPARK-27991: Catch the Netty OOM and set the flag `isNettyOOMOnShuffle` (shared among // tasks) to true as early as possible. The pending fetch requests won't be sent diff --git a/core/src/main/scala/org/apache/spark/ui/DriverLogPage.scala b/core/src/main/scala/org/apache/spark/ui/DriverLogPage.scala index 83a8fd628cd7..310211515999 100644 --- a/core/src/main/scala/org/apache/spark/ui/DriverLogPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/DriverLogPage.scala @@ -21,7 +21,8 @@ import scala.xml.{Node, Unparsed} import jakarta.servlet.http.HttpServletRequest import org.apache.spark.SparkConf -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{LOG_TYPE, PATH} import org.apache.spark.internal.config.DRIVER_LOG_LOCAL_DIR import org.apache.spark.util.Utils import org.apache.spark.util.logging.DriverLogger.DRIVER_LOG_FILE @@ -136,7 +137,8 @@ private[ui] class DriverLogPage( (logText, startIndex, endIndex, totalLength) } catch { case e: Exception => - logError(s"Error getting $logType logs from directory $logDirectory", e) + logError(log"Error getting ${MDC(LOG_TYPE, logType)} logs from directory " + + log"${MDC(PATH, logDirectory)}", e) ("Error getting logs due to exception: " + e.getMessage, 0, 0, 0) } } diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index 099e47abf408..ddf451c16f3a 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -23,7 +23,8 @@ import jakarta.servlet.http.{HttpServlet, HttpServletRequest, HttpServletRespons import org.eclipse.jetty.servlet.ServletContextHandler import org.apache.spark.{SecurityManager, SparkConf, SparkContext} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.CLASS_NAME import org.apache.spark.internal.config.DRIVER_LOG_LOCAL_DIR import org.apache.spark.internal.config.UI._ import org.apache.spark.scheduler._ @@ -155,7 +156,7 @@ private[spark] class SparkUI private ( serverInfo = Some(server) } catch { case e: Exception => - logError(s"Failed to bind $className", e) + logError(log"Failed to bind ${MDC(CLASS_NAME, className)}", e) System.exit(1) } } diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala index 2c937e71f64b..baeed322e8ad 100644 --- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala @@ -29,7 +29,8 @@ import org.eclipse.jetty.servlet.{FilterHolder, FilterMapping, ServletContextHan import org.json4s.JsonAST.{JNothing, JValue} import org.apache.spark.{SecurityManager, SparkConf, SSLOptions} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.CLASS_NAME import org.apache.spark.internal.config._ import org.apache.spark.ui.JettyUtils._ import org.apache.spark.util.Utils @@ -158,7 +159,7 @@ private[spark] abstract class WebUI( logInfo(s"Bound $className to $hostName, and started at $webUrl") } catch { case e: Exception => - logError(s"Failed to bind $className", e) + logError(log"Failed to bind ${MDC(CLASS_NAME, className)}", e) System.exit(1) } } diff --git a/core/src/main/scala/org/apache/spark/util/EventLoop.scala b/core/src/main/scala/org/apache/spark/util/EventLoop.scala index 5125adc9f7ca..eaa9ef517294 100644 --- a/core/src/main/scala/org/apache/spark/util/EventLoop.scala +++ b/core/src/main/scala/org/apache/spark/util/EventLoop.scala @@ -22,7 +22,8 @@ import java.util.concurrent.atomic.AtomicBoolean import scala.util.control.NonFatal -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.EVENT_LOOP /** * An event loop to receive events from the caller and process all events in the event thread. It @@ -52,13 +53,13 @@ private[spark] abstract class EventLoop[E](name: String) extends Logging { try { onError(e) } catch { - case NonFatal(e) => logError("Unexpected error in " + name, e) + case NonFatal(e) => logError(log"Unexpected error in ${MDC(EVENT_LOOP, name)}", e) } } } } catch { case ie: InterruptedException => // exit even if eventQueue is not empty - case NonFatal(e) => logError("Unexpected error in " + name, e) + case NonFatal(e) => logError(log"Unexpected error in ${MDC(EVENT_LOOP, name)}", e) } } diff --git a/core/src/main/scala/org/apache/spark/util/ListenerBus.scala b/core/src/main/scala/org/apache/spark/util/ListenerBus.scala index f1daa76f3116..814201d8c959 100644 --- a/core/src/main/scala/org/apache/spark/util/ListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/util/ListenerBus.scala @@ -26,7 +26,8 @@ import scala.util.control.NonFatal import com.codahale.metrics.Timer import org.apache.spark.SparkEnv -import org.apache.spark.internal.{config, Logging} +import org.apache.spark.internal.{config, Logging, MDC} +import org.apache.spark.internal.LogKey.LISTENER import org.apache.spark.scheduler.EventLoggingListener import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate @@ -122,10 +123,11 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging { } } catch { case ie: InterruptedException => - logError(s"Interrupted while posting to ${listenerName}. Removing that listener.", ie) + logError(log"Interrupted while posting to " + + log"${MDC(LISTENER, listenerName)}. Removing that listener.", ie) removeListenerOnError(listener) case NonFatal(e) if !isIgnorableException(e) => - logError(s"Listener ${listenerName} threw an exception", e) + logError(log"Listener ${MDC(LISTENER, listenerName)} threw an exception", e) } finally { if (maybeTimerContext != null) { val elapsed = maybeTimerContext.stop() diff --git a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala index c6cad9440168..b9dece19f265 100644 --- a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala +++ b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala @@ -26,7 +26,8 @@ import scala.util.Try import org.apache.hadoop.fs.FileSystem import org.apache.spark.SparkConf -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.PATH import org.apache.spark.internal.config.SPARK_SHUTDOWN_TIMEOUT_MS @@ -68,7 +69,8 @@ private[spark] object ShutdownHookManager extends Logging { logInfo("Deleting directory " + dirPath) Utils.deleteRecursively(new File(dirPath)) } catch { - case e: Exception => logError(s"Exception while deleting Spark temp dir: $dirPath", e) + case e: Exception => + logError(log"Exception while deleting Spark temp dir: ${MDC(PATH, dirPath)}", e) } } } diff --git a/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala b/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala index b24129eb3697..74f1474f9cf7 100644 --- a/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala +++ b/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala @@ -17,7 +17,8 @@ package org.apache.spark.util -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.THREAD /** * The default uncaught exception handler for Spark daemons. It terminates the whole process for @@ -36,11 +37,14 @@ private[spark] class SparkUncaughtExceptionHandler(val exitOnUncaughtException: override def uncaughtException(thread: Thread, exception: Throwable): Unit = { try { + val mdc = MDC(THREAD, thread) // Make it explicit that uncaught exceptions are thrown when container is shutting down. // It will help users when they analyze the executor logs - val inShutdownMsg = if (ShutdownHookManager.inShutdown()) "[Container in shutdown] " else "" - val errMsg = "Uncaught exception in thread " - logError(inShutdownMsg + errMsg + thread, exception) + if (ShutdownHookManager.inShutdown()) { + logError(log"[Container in shutdown] Uncaught exception in thread $mdc", exception) + } else { + logError(log"Uncaught exception in thread $mdc", exception) + } // We may have been called from a shutdown hook. If so, we must not call System.exit(). // (If we do, we will deadlock.) @@ -61,7 +65,9 @@ private[spark] class SparkUncaughtExceptionHandler(val exitOnUncaughtException: } catch { case oom: OutOfMemoryError => try { - logError(s"Uncaught OutOfMemoryError in thread $thread, process halted.", oom) + logError( + log"Uncaught OutOfMemoryError in thread ${MDC(THREAD, thread)}, process halted.", + oom) } catch { // absorb any exception/error since we're halting the process case _: Throwable => @@ -69,7 +75,9 @@ private[spark] class SparkUncaughtExceptionHandler(val exitOnUncaughtException: Runtime.getRuntime.halt(SparkExitCode.OOM) case t: Throwable => try { - logError(s"Another uncaught exception in thread $thread, process halted.", t) + logError( + log"Another uncaught exception in thread ${MDC(THREAD, thread)}, process halted.", + t) } catch { case _: Throwable => } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index d7e174f5497c..7022506e5508 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -69,7 +69,7 @@ import org.slf4j.Logger import org.apache.spark._ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKey.{COMMAND, COMMAND_OUTPUT, EXIT_CODE, PATH} +import org.apache.spark.internal.LogKey._ import org.apache.spark.internal.config._ import org.apache.spark.internal.config.Streaming._ import org.apache.spark.internal.config.Tests.IS_TESTING @@ -1276,11 +1276,13 @@ private[spark] object Utils case t: Throwable => val currentThreadName = Thread.currentThread().getName if (sc != null) { - logError(s"uncaught error in thread $currentThreadName, stopping SparkContext", t) + logError(log"uncaught error in thread ${MDC(THREAD_NAME, currentThreadName)}, " + + log"stopping SparkContext", t) sc.stopInNewThread() } if (!NonFatal(t)) { - logError(s"throw uncaught fatal error in thread $currentThreadName", t) + logError( + log"throw uncaught fatal error in thread ${MDC(THREAD_NAME, currentThreadName)}", t) throw t } } @@ -1292,7 +1294,8 @@ private[spark] object Utils block } catch { case NonFatal(t) => - logError(s"Uncaught exception in thread ${Thread.currentThread().getName}", t) + logError( + log"Uncaught exception in thread ${MDC(THREAD_NAME, Thread.currentThread().getName)}", t) } } @@ -1469,7 +1472,7 @@ private[spark] object Utils fileSize } catch { case e: Throwable => - logError(s"Cannot get file length of ${file}", e) + logError(log"Cannot get file length of ${MDC(PATH, file)}", e) throw e } finally { if (gzInputStream != null) { @@ -1847,7 +1850,8 @@ private[spark] object Utils case ct: ControlThrowable => throw ct case t: Throwable => - logError(s"Uncaught exception in thread ${Thread.currentThread().getName}", t) + logError( + log"Uncaught exception in thread ${MDC(THREAD_NAME, Thread.currentThread().getName)}", t) throw t } } @@ -1861,7 +1865,8 @@ private[spark] object Utils case ct: ControlThrowable => throw ct case t: Throwable => - logError(s"Uncaught exception in thread ${Thread.currentThread().getName}", t) + logError( + log"Uncaught exception in thread ${MDC(THREAD_NAME, Thread.currentThread().getName)}", t) scala.util.Failure(t) } } @@ -2348,7 +2353,8 @@ private[spark] object Utils val currentUserGroups = groupMappingServiceProvider.getGroups(username) return currentUserGroups } catch { - case e: Exception => logError(s"Error getting groups for user=$username", e) + case e: Exception => + logError(log"Error getting groups for user=${MDC(USER_NAME, username)}", e) } } EMPTY_USER_GROUPS diff --git a/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala b/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala index 2243239dce6f..1dadf15da40f 100644 --- a/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala +++ b/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala @@ -20,7 +20,8 @@ package org.apache.spark.util.logging import java.io.{File, FileOutputStream, InputStream, IOException} import org.apache.spark.SparkConf -import org.apache.spark.internal.{config, Logging} +import org.apache.spark.internal.{config, Logging, MDC} +import org.apache.spark.internal.LogKey.PATH import org.apache.spark.util.{IntParam, Utils} /** @@ -90,7 +91,7 @@ private[spark] class FileAppender( } } catch { case e: Exception => - logError(s"Error writing stream to file $file", e) + logError(log"Error writing stream to file ${MDC(PATH, file)}", e) } } diff --git a/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala b/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala index e374c41b9140..f8f144f6e388 100644 --- a/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala +++ b/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala @@ -24,7 +24,8 @@ import com.google.common.io.Files import org.apache.commons.io.IOUtils import org.apache.spark.SparkConf -import org.apache.spark.internal.config +import org.apache.spark.internal.{config, MDC} +import org.apache.spark.internal.LogKey.PATH import org.apache.spark.util.ArrayImplicits._ /** @@ -77,7 +78,7 @@ private[spark] class RollingFileAppender( } } catch { case e: Exception => - logError(s"Error rolling over $activeFile", e) + logError(log"Error rolling over ${MDC(PATH, activeFile)}", e) } } @@ -156,7 +157,8 @@ private[spark] class RollingFileAppender( } } catch { case e: Exception => - logError("Error cleaning logs in directory " + activeFile.getParentFile.getAbsolutePath, e) + val path = activeFile.getParentFile.getAbsolutePath + logError(log"Error cleaning logs in directory ${MDC(PATH, path)}", e) } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org