This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 18072b5357a5 [SPARK-47577][CORE][PART2] Migrate logError with 
variables to structured logging framework
18072b5357a5 is described below

commit 18072b5357a5fd671829e312ca359fcf34d47c63
Author: Gengliang Wang <gengli...@apache.org>
AuthorDate: Fri Apr 5 14:04:51 2024 -0700

    [SPARK-47577][CORE][PART2] Migrate logError with variables to structured 
logging framework
    
    ### What changes were proposed in this pull request?
    
    Migrate logError with variables of core module to structured logging 
framework. This is part2 which transforms the logError entries of the following 
API
    ```
    def logError(msg: => String, throwable: Throwable): Unit
    ```
    to
    ```
    def logError(entry: LogEntry, throwable: Throwable): Unit
    ```
    
    migration Part1 was in https://github.com/apache/spark/pull/45834
    ### Why are the changes needed?
    
    To enhance Apache Spark's logging system by implementing structured logging.
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, Spark core logs will contain additional MDC
    
    ### How was this patch tested?
    
    Compiler and scala style checks, as well as code review.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #45890 from gengliangwang/coreError2.
    
    Authored-by: Gengliang Wang <gengli...@apache.org>
    Signed-off-by: Gengliang Wang <gengli...@apache.org>
---
 .../scala/org/apache/spark/internal/LogKey.scala   | 23 +++++++++++++++++++++-
 .../scala/org/apache/spark/ContextCleaner.scala    | 19 +++++++++++-------
 .../scala/org/apache/spark/MapOutputTracker.scala  |  2 +-
 .../main/scala/org/apache/spark/SparkContext.scala | 15 +++++++++-----
 .../scala/org/apache/spark/TaskContextImpl.scala   |  5 +++--
 .../org/apache/spark/api/r/RBackendHandler.scala   |  7 ++++---
 .../scala/org/apache/spark/deploy/Client.scala     |  4 ++--
 .../spark/deploy/StandaloneResourceUtils.scala     |  8 +++++---
 .../main/scala/org/apache/spark/deploy/Utils.scala |  6 ++++--
 .../spark/deploy/history/FsHistoryProvider.scala   |  7 ++++---
 .../org/apache/spark/deploy/worker/Worker.scala    | 18 ++++++++++-------
 .../apache/spark/deploy/worker/ui/LogPage.scala    |  6 ++++--
 .../scala/org/apache/spark/executor/Executor.scala | 11 ++++++-----
 .../spark/executor/ExecutorClassLoader.scala       |  5 +++--
 .../spark/internal/io/SparkHadoopWriter.scala      |  4 ++--
 .../spark/mapred/SparkHadoopMapRedUtil.scala       |  7 +++++--
 .../org/apache/spark/metrics/MetricsConfig.scala   |  5 +++--
 .../org/apache/spark/metrics/MetricsSystem.scala   |  3 ++-
 .../main/scala/org/apache/spark/rdd/PipedRDD.scala |  7 ++++---
 .../scala/org/apache/spark/rpc/netty/Inbox.scala   |  6 ++++--
 .../org/apache/spark/rpc/netty/MessageLoop.scala   |  5 +++--
 .../org/apache/spark/scheduler/DAGScheduler.scala  | 10 ++++++----
 .../apache/spark/scheduler/ReplayListenerBus.scala |  4 ++--
 .../spark/scheduler/SchedulableBuilder.scala       | 14 ++++++++-----
 .../apache/spark/scheduler/TaskSetManager.scala    |  4 ++--
 .../apache/spark/serializer/KryoSerializer.scala   |  5 +++--
 .../org/apache/spark/status/AppStatusStore.scala   |  5 +++--
 .../org/apache/spark/storage/BlockManager.scala    |  7 ++++---
 .../spark/storage/BlockManagerDecommissioner.scala | 10 ++++++----
 .../spark/storage/BlockManagerMasterEndpoint.scala |  5 +++--
 .../storage/BlockManagerStorageEndpoint.scala      | 21 +++++++++++---------
 .../apache/spark/storage/DiskBlockManager.scala    | 11 +++++++----
 .../spark/storage/DiskBlockObjectWriter.scala      |  3 ++-
 .../spark/storage/PushBasedFetchHelper.scala       | 15 ++++++++------
 .../storage/ShuffleBlockFetcherIterator.scala      |  5 +++--
 .../scala/org/apache/spark/ui/DriverLogPage.scala  |  6 ++++--
 .../main/scala/org/apache/spark/ui/SparkUI.scala   |  5 +++--
 .../src/main/scala/org/apache/spark/ui/WebUI.scala |  5 +++--
 .../scala/org/apache/spark/util/EventLoop.scala    |  7 ++++---
 .../scala/org/apache/spark/util/ListenerBus.scala  |  8 +++++---
 .../apache/spark/util/ShutdownHookManager.scala    |  6 ++++--
 .../spark/util/SparkUncaughtExceptionHandler.scala | 20 +++++++++++++------
 .../main/scala/org/apache/spark/util/Utils.scala   | 22 +++++++++++++--------
 .../apache/spark/util/logging/FileAppender.scala   |  5 +++--
 .../spark/util/logging/RollingFileAppender.scala   |  8 +++++---
 45 files changed, 244 insertions(+), 140 deletions(-)

diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala 
b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
index 66f3b803c0d4..1d8161282c5b 100644
--- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
+++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
@@ -21,6 +21,7 @@ package org.apache.spark.internal
  * All structured logging keys should be defined here for standardization.
  */
 object LogKey extends Enumeration {
+  val ACCUMULATOR_ID = Value
   val APP_DESC = Value
   val APP_ID = Value
   val APP_STATE = Value
@@ -33,40 +34,56 @@ object LogKey extends Enumeration {
   val CLASS_NAME = Value
   val COMMAND = Value
   val COMMAND_OUTPUT = Value
+  val COMPONENT = Value
   val CONFIG = Value
   val CONFIG2 = Value
   val CONTAINER_ID = Value
   val COUNT = Value
   val DRIVER_ID = Value
+  val END_POINT = Value
   val ERROR = Value
+  val EVENT_LOOP = Value
   val EVENT_QUEUE = Value
   val EXECUTOR_ID = Value
-  val EXECUTOR_STATE_CHANGED = Value
+  val EXECUTOR_STATE = Value
   val EXIT_CODE = Value
+  val FAILURES = Value
   val HOST = Value
   val JOB_ID = Value
   val LEARNING_RATE = Value
   val LINE = Value
   val LINE_NUM = Value
+  val LISTENER = Value
+  val LOG_TYPE = Value
   val MASTER_URL = Value
   val MAX_ATTEMPTS = Value
   val MAX_CATEGORIES = Value
   val MAX_EXECUTOR_FAILURES = Value
   val MAX_SIZE = Value
+  val MERGE_DIR_NAME = Value
+  val METHOD_NAME = Value
   val MIN_SIZE = Value
   val NUM_ITERATIONS = Value
+  val OBJECT_ID = Value
   val OLD_BLOCK_MANAGER_ID = Value
   val OPTIMIZER_CLASS_NAME = Value
   val PARTITION_ID = Value
   val PATH = Value
+  val PATHS = Value
   val POD_ID = Value
+  val PORT = Value
   val RANGE = Value
+  val RDD_ID = Value
   val REASON = Value
+  val REDUCE_ID = Value
   val REMOTE_ADDRESS = Value
   val RETRY_COUNT = Value
   val RPC_ADDRESS = Value
+  val SHUFFLE_BLOCK_INFO = Value
   val SHUFFLE_ID = Value
+  val SHUFFLE_MERGE_ID = Value
   val SIZE = Value
+  val SLEEP_TIME_SECONDS = Value
   val STAGE_ID = Value
   val SUBMISSION_ID = Value
   val SUBSAMPLING_RATE = Value
@@ -75,8 +92,12 @@ object LogKey extends Enumeration {
   val TASK_NAME = Value
   val TASK_SET_NAME = Value
   val TASK_STATE = Value
+  val THREAD = Value
+  val THREAD_NAME = Value
   val TID = Value
   val TIMEOUT = Value
+  val URI = Value
+  val USER_NAME = Value
   val WORKER_URL = Value
 
   type LogKey = Value
diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala 
b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
index a1871cb231cf..c16a84c13187 100644
--- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
@@ -24,7 +24,8 @@ import java.util.concurrent.{ConcurrentHashMap, 
ConcurrentLinkedQueue, Scheduled
 import scala.jdk.CollectionConverters._
 
 import org.apache.spark.broadcast.Broadcast
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{ACCUMULATOR_ID, BROADCAST_ID, 
LISTENER, RDD_ID, SHUFFLE_ID}
 import org.apache.spark.internal.config._
 import org.apache.spark.rdd.{RDD, ReliableRDDCheckpointData}
 import org.apache.spark.scheduler.SparkListener
@@ -226,7 +227,7 @@ private[spark] class ContextCleaner(
       listeners.asScala.foreach(_.rddCleaned(rddId))
       logDebug("Cleaned RDD " + rddId)
     } catch {
-      case e: Exception => logError("Error cleaning RDD " + rddId, e)
+      case e: Exception => logError(log"Error cleaning RDD ${MDC(RDD_ID, 
rddId)}", e)
     }
   }
 
@@ -245,7 +246,7 @@ private[spark] class ContextCleaner(
         logDebug("Asked to cleanup non-existent shuffle (maybe it was already 
removed)")
       }
     } catch {
-      case e: Exception => logError("Error cleaning shuffle " + shuffleId, e)
+      case e: Exception => logError(log"Error cleaning shuffle 
${MDC(SHUFFLE_ID, shuffleId)}", e)
     }
   }
 
@@ -257,7 +258,8 @@ private[spark] class ContextCleaner(
       listeners.asScala.foreach(_.broadcastCleaned(broadcastId))
       logDebug(s"Cleaned broadcast $broadcastId")
     } catch {
-      case e: Exception => logError("Error cleaning broadcast " + broadcastId, 
e)
+      case e: Exception =>
+        logError(log"Error cleaning broadcast ${MDC(BROADCAST_ID, 
broadcastId)}", e)
     }
   }
 
@@ -269,7 +271,8 @@ private[spark] class ContextCleaner(
       listeners.asScala.foreach(_.accumCleaned(accId))
       logDebug("Cleaned accumulator " + accId)
     } catch {
-      case e: Exception => logError("Error cleaning accumulator " + accId, e)
+      case e: Exception =>
+        logError(log"Error cleaning accumulator ${MDC(ACCUMULATOR_ID, 
accId)}", e)
     }
   }
 
@@ -285,7 +288,8 @@ private[spark] class ContextCleaner(
       logDebug("Cleaned rdd checkpoint data " + rddId)
     }
     catch {
-      case e: Exception => logError("Error cleaning rdd checkpoint data " + 
rddId, e)
+      case e: Exception =>
+        logError(log"Error cleaning rdd checkpoint data ${MDC(RDD_ID, 
rddId)}", e)
     }
   }
 
@@ -295,7 +299,8 @@ private[spark] class ContextCleaner(
       sc.listenerBus.removeListener(listener)
       logDebug(s"Cleaned Spark listener $listener")
     } catch {
-      case e: Exception => logError(s"Error cleaning Spark listener 
$listener", e)
+      case e: Exception =>
+        logError(log"Error cleaning Spark listener ${MDC(LISTENER, 
listener)}", e)
     }
   }
 
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala 
b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index faa8504df365..48569eb71379 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -781,7 +781,7 @@ private[spark] class MapOutputTrackerMaster(
                   .getOrElse(Seq.empty[BlockManagerId]))
             }
           } catch {
-            case NonFatal(e) => logError(e.getMessage, e)
+            case NonFatal(e) => logError(log"${MDC(ERROR, e.getMessage)}", e)
           }
         }
       } catch {
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 7595488cecee..9d908cd8713c 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -46,7 +46,8 @@ import org.apache.spark.deploy.{LocalSparkCluster, 
SparkHadoopUtil}
 import org.apache.spark.errors.SparkCoreErrors
 import org.apache.spark.executor.{Executor, ExecutorMetrics, 
ExecutorMetricsSource}
 import org.apache.spark.input.{FixedLengthBinaryInputFormat, 
PortableDataStream, StreamInputFormat, WholeTextFileInputFormat}
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey
 import org.apache.spark.internal.config._
 import org.apache.spark.internal.config.Tests._
 import org.apache.spark.internal.config.UI._
@@ -748,7 +749,9 @@ class SparkContext(config: SparkConf) extends Logging {
       }
     } catch {
       case e: Exception =>
-        logError(s"Exception getting thread dump from executor $executorId", e)
+        logError(
+          log"Exception getting thread dump from executor 
${MDC(LogKey.EXECUTOR_ID, executorId)}",
+          e)
         None
     }
   }
@@ -778,7 +781,9 @@ class SparkContext(config: SparkConf) extends Logging {
       }
     } catch {
       case e: Exception =>
-        logError(s"Exception getting heap histogram from executor 
$executorId", e)
+        logError(
+          log"Exception getting heap histogram from " +
+            log"executor ${MDC(LogKey.EXECUTOR_ID, executorId)}", e)
         None
     }
   }
@@ -2140,7 +2145,7 @@ class SparkContext(config: SparkConf) extends Logging {
         Seq(env.rpcEnv.fileServer.addJar(file))
       } catch {
         case NonFatal(e) =>
-          logError(s"Failed to add $path to Spark environment", e)
+          logError(log"Failed to add ${MDC(LogKey.PATH, path)} to Spark 
environment", e)
           Nil
       }
     }
@@ -2161,7 +2166,7 @@ class SparkContext(config: SparkConf) extends Logging {
           Seq(path)
         } catch {
           case NonFatal(e) =>
-            logError(s"Failed to add $path to Spark environment", e)
+            logError(log"Failed to add ${MDC(LogKey.PATH, path)} to Spark 
environment", e)
             Nil
         }
       } else {
diff --git a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala 
b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
index a3c36de15155..e433cc10ae73 100644
--- a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
@@ -24,7 +24,8 @@ import scala.collection.mutable.ArrayBuffer
 import scala.jdk.CollectionConverters._
 
 import org.apache.spark.executor.TaskMetrics
-import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.internal.{config, Logging, MDC}
+import org.apache.spark.internal.LogKey.LISTENER
 import org.apache.spark.memory.TaskMemoryManager
 import org.apache.spark.metrics.MetricsSystem
 import org.apache.spark.metrics.source.Source
@@ -246,7 +247,7 @@ private[spark] class TaskContextImpl(
             }
           }
           listenerExceptions += e
-          logError(s"Error in $name", e)
+          logError(log"Error in ${MDC(LISTENER, name)}", e)
       }
     }
     if (listenerExceptions.nonEmpty) {
diff --git a/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala 
b/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala
index 3f7a3ea70a7e..1a05c8f35b7f 100644
--- a/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala
+++ b/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala
@@ -26,7 +26,8 @@ import io.netty.handler.timeout.ReadTimeoutException
 
 import org.apache.spark.{SparkConf, SparkEnv}
 import org.apache.spark.api.r.SerDe._
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{METHOD_NAME, OBJECT_ID}
 import org.apache.spark.internal.config.R._
 import org.apache.spark.util.{ThreadUtils, Utils}
 import org.apache.spark.util.ArrayImplicits._
@@ -76,7 +77,7 @@ private[r] class RBackendHandler(server: RBackend)
             writeObject(dos, null, server.jvmObjectTracker)
           } catch {
             case e: Exception =>
-              logError(s"Removing $objId failed", e)
+              logError(log"Removing ${MDC(OBJECT_ID, objId)} failed", e)
               writeInt(dos, -1)
               writeString(dos, s"Removing $objId failed: ${e.getMessage}")
           }
@@ -192,7 +193,7 @@ private[r] class RBackendHandler(server: RBackend)
       }
     } catch {
       case e: Exception =>
-        logError(s"$methodName on $objId failed", e)
+        logError(log"${MDC(METHOD_NAME, methodName)} on ${MDC(OBJECT_ID, 
objId)} failed", e)
         writeInt(dos, -1)
         // Writing the error message of the cause for the exception. This will 
be returned
         // to user in the R process.
diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala 
b/core/src/main/scala/org/apache/spark/deploy/Client.scala
index 1eec3e82f1b7..6cf240f12a1c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala
@@ -32,7 +32,7 @@ import org.apache.spark.deploy.DeployMessages._
 import org.apache.spark.deploy.master.{DriverState, Master}
 import org.apache.spark.deploy.master.DriverState.DriverState
 import org.apache.spark.internal.{config, Logging, MDC}
-import org.apache.spark.internal.LogKey.{DRIVER_ID, RPC_ADDRESS}
+import org.apache.spark.internal.LogKey.{DRIVER_ID, ERROR, RPC_ADDRESS}
 import org.apache.spark.internal.config.Network.RPC_ASK_TIMEOUT
 import org.apache.spark.resource.ResourceUtils
 import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv, 
ThreadSafeRpcEndpoint}
@@ -61,7 +61,7 @@ private class ClientEndpoint(
       t => t match {
         case ie: InterruptedException => // Exit normally
         case e: Throwable =>
-          logError(e.getMessage, e)
+          logError(log"${MDC(ERROR, e.getMessage)}", e)
           System.exit(SparkExitCode.UNCAUGHT_EXCEPTION)
       })
 
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/StandaloneResourceUtils.scala 
b/core/src/main/scala/org/apache/spark/deploy/StandaloneResourceUtils.scala
index 509049550ad4..d317d6449f29 100644
--- a/core/src/main/scala/org/apache/spark/deploy/StandaloneResourceUtils.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/StandaloneResourceUtils.scala
@@ -27,7 +27,8 @@ import org.json4s.{DefaultFormats, Extraction, Formats}
 import org.json4s.jackson.JsonMethods.{compact, render}
 
 import org.apache.spark.SparkException
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.COMPONENT
 import org.apache.spark.resource.{ResourceAllocation, ResourceID, 
ResourceInformation, ResourceRequirement}
 import org.apache.spark.util.ArrayImplicits._
 import org.apache.spark.util.Utils
@@ -103,9 +104,10 @@ private[spark] object StandaloneResourceUtils extends 
Logging {
       writeResourceAllocationJson(allocations, tmpFile)
     } catch {
       case NonFatal(e) =>
-        val errMsg = s"Exception threw while preparing resource file for 
$compShortName"
+        val errMsg =
+          log"Exception threw while preparing resource file for 
${MDC(COMPONENT, compShortName)}"
         logError(errMsg, e)
-        throw new SparkException(errMsg, e)
+        throw new SparkException(errMsg.message, e)
     }
     val resourcesFile = File.createTempFile(s"resource-$compShortName-", 
".json", dir)
     tmpFile.renameTo(resourcesFile)
diff --git a/core/src/main/scala/org/apache/spark/deploy/Utils.scala 
b/core/src/main/scala/org/apache/spark/deploy/Utils.scala
index 9eb5a0042e51..4d2546cb808c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/Utils.scala
@@ -22,7 +22,8 @@ import java.io.File
 import jakarta.servlet.http.HttpServletRequest
 
 import org.apache.spark.SparkConf
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{LOG_TYPE, PATH}
 import org.apache.spark.ui.JettyUtils.createServletHandler
 import org.apache.spark.ui.WebUI
 import org.apache.spark.util.Utils.{getFileLength, offsetBytes}
@@ -95,7 +96,8 @@ private[deploy] object Utils extends Logging {
       (logText, startIndex, endIndex, totalLength)
     } catch {
       case e: Exception =>
-        logError(s"Error getting $logType logs from directory $logDirectory", 
e)
+        logError(log"Error getting ${MDC(LOG_TYPE, logType)} logs from " +
+          log"directory ${MDC(PATH, logDirectory)}", e)
         ("Error getting logs due to exception: " + e.getMessage, 0, 0, 0)
     }
   }
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index e0128e35b761..98cbd7b3eba8 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -37,7 +37,8 @@ import org.apache.hadoop.security.AccessControlException
 
 import org.apache.spark.{SecurityManager, SparkConf, SparkException}
 import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.PATH
 import org.apache.spark.internal.config._
 import org.apache.spark.internal.config.History._
 import org.apache.spark.internal.config.Status._
@@ -920,7 +921,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
       case e: AccessControlException =>
         logWarning(s"Insufficient permission while compacting log for 
$rootPath", e)
       case e: Exception =>
-        logError(s"Exception while compacting log for $rootPath", e)
+        logError(log"Exception while compacting log for ${MDC(PATH, 
rootPath)}", e)
     } finally {
       endProcessing(rootPath)
     }
@@ -1402,7 +1403,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
         case _: AccessControlException =>
           logInfo(s"No permission to delete $log, ignoring.")
         case ioe: IOException =>
-          logError(s"IOException in cleaning $log", ioe)
+          logError(log"IOException in cleaning ${MDC(PATH, log)}", ioe)
       }
     }
     deleted
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index e2ba221fb00c..0659c26fd15b 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -38,7 +38,7 @@ import org.apache.spark.deploy.StandaloneResourceUtils._
 import org.apache.spark.deploy.master.{DriverState, Master}
 import org.apache.spark.deploy.worker.ui.WorkerWebUI
 import org.apache.spark.internal.{config, Logging, MDC}
-import org.apache.spark.internal.LogKey.{DRIVER_ID, ERROR, 
EXECUTOR_STATE_CHANGED, MASTER_URL, MAX_ATTEMPTS}
+import org.apache.spark.internal.LogKey._
 import org.apache.spark.internal.config.Tests.IS_TESTING
 import org.apache.spark.internal.config.UI._
 import org.apache.spark.internal.config.Worker._
@@ -549,7 +549,7 @@ private[deploy] class Worker(
         }(cleanupThreadExecutor)
 
         cleanupFuture.failed.foreach(e =>
-          logError("App dir cleanup failed: " + e.getMessage, e)
+          logError(log"App dir cleanup failed: ${MDC(ERROR, e.getMessage)}", e)
         )(cleanupThreadExecutor)
       } catch {
         case _: RejectedExecutionException if cleanupThreadExecutor.isShutdown 
=>
@@ -638,7 +638,9 @@ private[deploy] class Worker(
           addResourcesUsed(resources_)
         } catch {
           case e: Exception =>
-            logError(s"Failed to launch executor $appId/$execId for 
${appDesc.name}.", e)
+            logError(
+              log"Failed to launch executor ${MDC(APP_ID, 
appId)}/${MDC(EXECUTOR_ID, execId)} " +
+                log"for ${MDC(APP_DESC, appDesc.name)}.", e)
             if (executors.contains(appId + "/" + execId)) {
               executors(appId + "/" + execId).kill()
               executors -= appId + "/" + execId
@@ -749,7 +751,7 @@ private[deploy] class Worker(
               Utils.deleteRecursively(new File(dir))
             }
           }(cleanupThreadExecutor).failed.foreach(e =>
-            logError(s"Clean up app dir $dirList failed: ${e.getMessage}", e)
+            logError(log"Clean up app dir ${MDC(PATHS, dirList)} failed", e)
           )(cleanupThreadExecutor)
         }
       } catch {
@@ -794,8 +796,10 @@ private[deploy] class Worker(
           case Failure(t) =>
             val failures = executorStateSyncFailureAttempts.getOrElse(fullId, 
0) + 1
             if (failures < executorStateSyncMaxAttempts) {
-              logError(s"Failed to send $newState to Master $masterRef, " +
-                s"will retry ($failures/$executorStateSyncMaxAttempts).", t)
+              logError(log"Failed to send ${MDC(EXECUTOR_STATE, newState)}" +
+                log" to Master ${MDC(MASTER_URL, masterRef)}, will retry " +
+                log"(${MDC(FAILURES, failures)}/" +
+                log"${MDC(MAX_ATTEMPTS, executorStateSyncMaxAttempts)}).", t)
               executorStateSyncFailureAttempts(fullId) = failures
               // If the failure is not caused by TimeoutException, wait for a 
while before retry in
               // case the connection is temporarily unavailable.
@@ -808,7 +812,7 @@ private[deploy] class Worker(
               }
               self.send(newState)
             } else {
-              logError(log"Failed to send ${MDC(EXECUTOR_STATE_CHANGED, 
newState)} " +
+              logError(log"Failed to send ${MDC(EXECUTOR_STATE, newState)} " +
                 log"to Master ${MDC(MASTER_URL, masterRef)} for " +
                 log"${MDC(MAX_ATTEMPTS, executorStateSyncMaxAttempts)} times. 
Giving up.")
               System.exit(1)
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala
index c4a15095ec40..006a388e98b5 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala
@@ -23,7 +23,8 @@ import scala.xml.{Node, Unparsed}
 
 import jakarta.servlet.http.HttpServletRequest
 
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{LOG_TYPE, PATH}
 import org.apache.spark.ui.{UIUtils, WebUIPage}
 import org.apache.spark.util.Utils
 import org.apache.spark.util.logging.RollingFileAppender
@@ -174,7 +175,8 @@ private[ui] class LogPage(parent: WorkerWebUI) extends 
WebUIPage("logPage") with
       (logText, startIndex, endIndex, totalLength)
     } catch {
       case e: Exception =>
-        logError(s"Error getting $logType logs from directory $logDirectory", 
e)
+        logError(log"Error getting ${MDC(LOG_TYPE, logType)} logs from " +
+          log"directory ${MDC(PATH, logDirectory)}", e)
         ("Error getting logs due to exception: " + e.getMessage, 0, 0, 0)
     }
   }
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala 
b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 67d0c37c3edd..a7657cd78cd9 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -41,7 +41,7 @@ import org.slf4j.MDC
 import org.apache.spark._
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.{Logging, MDC => LogMDC}
-import org.apache.spark.internal.LogKey.{ERROR, MAX_ATTEMPTS, TASK_ID, 
TASK_NAME, TIMEOUT}
+import org.apache.spark.internal.LogKey.{CLASS_NAME, ERROR, MAX_ATTEMPTS, 
TASK_ID, TASK_NAME, TIMEOUT}
 import org.apache.spark.internal.config._
 import org.apache.spark.internal.plugin.PluginContainer
 import org.apache.spark.memory.{SparkOutOfMemoryError, TaskMemoryManager}
@@ -661,9 +661,10 @@ private[spark] class Executor(
           // uh-oh.  it appears the user code has caught the fetch-failure 
without throwing any
           // other exceptions.  Its *possible* this is what the user meant to 
do (though highly
           // unlikely).  So we will log an error and keep going.
-          logError(s"$taskName completed successfully though internally it 
encountered " +
-            s"unrecoverable fetch failures!  Most likely this means user code 
is incorrectly " +
-            s"swallowing Spark's internal ${classOf[FetchFailedException]}", 
fetchFailure)
+          logError(log"${LogMDC(TASK_NAME, taskName)} completed successfully 
though internally " +
+            log"it encountered unrecoverable fetch failures! Most likely this 
means user code " +
+            log"is incorrectly swallowing Spark's internal " +
+            log"${LogMDC(CLASS_NAME, classOf[FetchFailedException])}", 
fetchFailure)
         }
         val taskFinishNs = System.nanoTime()
         val taskFinishCpu = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
@@ -802,7 +803,7 @@ private[spark] class Executor(
           // Attempt to exit cleanly by informing the driver of our failure.
           // If anything goes wrong (or this was a fatal exception), we will 
delegate to
           // the default uncaught exception handler, which will terminate the 
Executor.
-          logError(s"Exception in $taskName", t)
+          logError(log"Exception in ${LogMDC(TASK_NAME, taskName)}", t)
 
           // SPARK-20904: Do not report failure to driver if if happened 
during shut down. Because
           // libraries may set up shutdown hooks that race with running tasks 
during shutdown,
diff --git 
a/core/src/main/scala/org/apache/spark/executor/ExecutorClassLoader.scala 
b/core/src/main/scala/org/apache/spark/executor/ExecutorClassLoader.scala
index b9f4486b66fa..c7047ddd278b 100644
--- a/core/src/main/scala/org/apache/spark/executor/ExecutorClassLoader.scala
+++ b/core/src/main/scala/org/apache/spark/executor/ExecutorClassLoader.scala
@@ -30,7 +30,7 @@ import org.apache.xbean.asm9.Opcodes._
 
 import org.apache.spark.{SparkConf, SparkEnv}
 import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, LogKey, MDC}
 import org.apache.spark.util.ParentClassLoader
 
 /**
@@ -183,7 +183,8 @@ class ExecutorClassLoader(
         None
       case e: Exception =>
         // Something bad happened while checking if the class exists
-        logError(s"Failed to check existence of class $name on REPL class 
server at $uri", e)
+        logError(log"Failed to check existence of class 
${MDC(LogKey.CLASS_NAME, name)} " +
+          log"on REPL class server at ${MDC(LogKey.URI, uri)}", e)
         if (userClassPathFirst) {
           // Allow to try to load from "parentLoader"
           None
diff --git 
a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala 
b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala
index 20239980eee5..95ea814042d3 100644
--- a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala
+++ b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala
@@ -33,7 +33,7 @@ import 
org.apache.hadoop.mapreduce.task.{TaskAttemptContextImpl => NewTaskAttemp
 import org.apache.spark.{SerializableWritable, SparkConf, SparkException, 
TaskContext}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.{Logging, MDC}
-import org.apache.spark.internal.LogKey.TASK_ATTEMPT_ID
+import org.apache.spark.internal.LogKey.{JOB_ID, TASK_ATTEMPT_ID}
 import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
 import org.apache.spark.rdd.{HadoopRDD, RDD}
 import org.apache.spark.util.{SerializableConfiguration, SerializableJobConf, 
Utils}
@@ -104,7 +104,7 @@ object SparkHadoopWriter extends Logging {
       logInfo(s"Write Job ${jobContext.getJobID} committed. Elapsed time: 
$duration ms.")
     } catch {
       case cause: Throwable =>
-        logError(s"Aborting job ${jobContext.getJobID}.", cause)
+        logError(log"Aborting job ${MDC(JOB_ID, jobContext.getJobID)}.", cause)
         committer.abortJob(jobContext)
         throw new SparkException("Job aborted.", cause)
     }
diff --git 
a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala 
b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
index b059e82df23b..c68999f34079 100644
--- a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
+++ b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
@@ -24,7 +24,8 @@ import org.apache.hadoop.mapreduce.{OutputCommitter => 
MapReduceOutputCommitter}
 
 import org.apache.spark.{SparkEnv, TaskContext}
 import org.apache.spark.executor.CommitDeniedException
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.TASK_ATTEMPT_ID
 import org.apache.spark.util.Utils
 
 object SparkHadoopMapRedUtil extends Logging {
@@ -52,7 +53,9 @@ object SparkHadoopMapRedUtil extends Logging {
         logInfo(s"$mrTaskAttemptID: Committed. Elapsed time: $timeCost ms.")
       } catch {
         case cause: IOException =>
-          logError(s"Error committing the output of task: $mrTaskAttemptID", 
cause)
+          logError(
+            log"Error committing the output of task: ${MDC(TASK_ATTEMPT_ID, 
mrTaskAttemptID)}",
+            cause)
           committer.abortTask(mrTaskContext)
           throw cause
       }
diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala 
b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala
index 195c5b0f47f5..12df40c3476a 100644
--- a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala
@@ -25,7 +25,8 @@ import scala.jdk.CollectionConverters._
 import scala.util.matching.Regex
 
 import org.apache.spark.SparkConf
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.PATH
 import org.apache.spark.internal.config.METRICS_CONF
 import org.apache.spark.util.Utils
 
@@ -140,7 +141,7 @@ private[spark] class MetricsConfig(conf: SparkConf) extends 
Logging {
     } catch {
       case e: Exception =>
         val file = path.getOrElse(DEFAULT_METRICS_CONF_FILENAME)
-        logError(s"Error loading configuration file $file", e)
+        logError(log"Error loading configuration file ${MDC(PATH, file)}", e)
     } finally {
       if (is != null) {
         is.close()
diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala 
b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
index 05e0b6b9c4ef..555083bb65d2 100644
--- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
@@ -188,7 +188,8 @@ private[spark] class MetricsSystem private (
         val source = 
Utils.classForName[Source](classPath).getConstructor().newInstance()
         registerSource(source)
       } catch {
-        case e: Exception => logError("Source class " + classPath + " cannot 
be instantiated", e)
+        case e: Exception =>
+          logError(log"Source class ${MDC(CLASS_NAME, classPath)} cannot be 
instantiated", e)
       }
     }
   }
diff --git a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
index 086df6231324..127bdf6d9181 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
@@ -34,7 +34,7 @@ import scala.reflect.ClassTag
 
 import org.apache.spark.{Partition, TaskContext}
 import org.apache.spark.errors.SparkCoreErrors
-import org.apache.spark.internal.LogKey.{COMMAND, ERROR}
+import org.apache.spark.internal.LogKey.{COMMAND, ERROR, PATH}
 import org.apache.spark.internal.MDC
 import org.apache.spark.util.Utils
 
@@ -107,8 +107,9 @@ private[spark] class PipedRDD[T: ClassTag](
         pb.directory(taskDirFile)
         workInTaskDirectory = true
       } catch {
-        case e: Exception => logError("Unable to setup task working directory: 
" + e.getMessage +
-          " (" + taskDirectory + ")", e)
+        case e: Exception =>
+          logError(log"Unable to setup task working directory: ${MDC(ERROR, 
e.getMessage)}" +
+          log" (${MDC(PATH, taskDirectory)})", e)
       }
     }
 
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/Inbox.scala 
b/core/src/main/scala/org/apache/spark/rpc/netty/Inbox.scala
index 472401b23fe8..b503c5a0f808 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/Inbox.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/Inbox.scala
@@ -22,7 +22,8 @@ import javax.annotation.concurrent.GuardedBy
 import scala.util.control.NonFatal
 
 import org.apache.spark.SparkException
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.END_POINT
 import org.apache.spark.rpc.{RpcAddress, RpcEndpoint, ThreadSafeRpcEndpoint}
 
 
@@ -206,7 +207,8 @@ private[netty] class Inbox(val endpointName: String, val 
endpoint: RpcEndpoint)
         // Should reduce the number of active threads before throw the error.
         numActiveThreads -= 1
       }
-      logError(s"An error happened while processing message in the inbox for 
$endpointName", fatal)
+      logError(log"An error happened while processing message in the inbox 
for" +
+        log" ${MDC(END_POINT, endpointName)}", fatal)
       throw fatal
     }
 
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/MessageLoop.scala 
b/core/src/main/scala/org/apache/spark/rpc/netty/MessageLoop.scala
index df7cd0b44c90..2d94ed5d05e1 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/MessageLoop.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/MessageLoop.scala
@@ -22,7 +22,8 @@ import java.util.concurrent._
 import scala.util.control.NonFatal
 
 import org.apache.spark.{SparkConf, SparkContext}
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.ERROR
 import org.apache.spark.internal.config.EXECUTOR_ID
 import org.apache.spark.internal.config.Network._
 import org.apache.spark.rpc.{IsolatedRpcEndpoint, RpcEndpoint}
@@ -74,7 +75,7 @@ private sealed abstract class MessageLoop(dispatcher: 
Dispatcher) extends Loggin
           }
           inbox.process(dispatcher)
         } catch {
-          case NonFatal(e) => logError(e.getMessage, e)
+          case NonFatal(e) => logError(log"${MDC(ERROR, e.getMessage)}", e)
         }
       }
     } catch {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 7ee8dc7ec0c8..41cbd795b7e5 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -37,7 +37,7 @@ import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.errors.SparkCoreErrors
 import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
 import org.apache.spark.internal.{config, Logging, MDC}
-import org.apache.spark.internal.LogKey.{JOB_ID, STAGE_ID}
+import org.apache.spark.internal.LogKey.{ACCUMULATOR_ID, CLASS_NAME, JOB_ID, 
PARTITION_ID, STAGE_ID, TASK_ID}
 import org.apache.spark.internal.config.{LEGACY_ABORT_STAGE_AFTER_KILL_TASKS, 
RDD_CACHE_VISIBILITY_TRACKING_ENABLED}
 import org.apache.spark.internal.config.Tests.TEST_NO_STAGE_RETRY
 import org.apache.spark.network.shuffle.{BlockStoreClient, 
MergeFinalizerListener}
@@ -1749,8 +1749,8 @@ private[spark] class DAGScheduler(
             case None => "Unknown class"
           }
           logError(
-            s"Failed to update accumulator $id ($accumClassName) for task 
${task.partitionId}",
-            e)
+            log"Failed to update accumulator ${MDC(ACCUMULATOR_ID, id)} 
(${MDC(CLASS_NAME, accumClassName)}) " +
+              log"for task ${MDC(PARTITION_ID, task.partitionId)}", e)
       }
     }
   }
@@ -1763,7 +1763,9 @@ private[spark] class DAGScheduler(
         } catch {
           case NonFatal(e) =>
             val taskId = event.taskInfo.taskId
-            logError(s"Error when attempting to reconstruct metrics for task 
$taskId", e)
+            logError(
+              log"Error when attempting to reconstruct metrics for task 
${MDC(TASK_ID, taskId)}",
+              e)
             null
         }
       } else {
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala 
b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
index a3b8f1206b9d..24c25d237794 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
@@ -25,7 +25,7 @@ import com.fasterxml.jackson.core.JsonParseException
 import com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException
 
 import org.apache.spark.internal.{Logging, MDC}
-import org.apache.spark.internal.LogKey.{LINE, LINE_NUM}
+import org.apache.spark.internal.LogKey.{LINE, LINE_NUM, PATH}
 import org.apache.spark.scheduler.ReplayListenerBus._
 import org.apache.spark.util.JsonProtocol
 
@@ -125,7 +125,7 @@ private[spark] class ReplayListenerBus extends 
SparkListenerBus with Logging {
       case ioe: IOException =>
         throw ioe
       case e: Exception =>
-        logError(s"Exception parsing Spark event log: $sourceName", e)
+        logError(log"Exception parsing Spark event log: ${MDC(PATH, 
sourceName)}", e)
         logError(log"Malformed line #${MDC(LINE_NUM, lineNumber)}: ${MDC(LINE, 
currentLine)}\n")
         false
     }
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala 
b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala
index a30744da9ee9..7e61dad3c141 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala
@@ -26,7 +26,8 @@ import scala.xml.{Node, XML}
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.SparkContext
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.PATH
 import org.apache.spark.internal.config.{SCHEDULER_ALLOCATION_FILE, 
SCHEDULER_MODE}
 import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
 import org.apache.spark.util.Utils
@@ -99,10 +100,13 @@ private[spark] class FairSchedulableBuilder(val rootPool: 
Pool, sc: SparkContext
       fileData.foreach { case (is, fileName) => buildFairSchedulerPool(is, 
fileName) }
     } catch {
       case NonFatal(t) =>
-        val defaultMessage = "Error while building the fair scheduler pools"
-        val message = fileData.map { case (is, fileName) => s"$defaultMessage 
from $fileName" }
-          .getOrElse(defaultMessage)
-        logError(message, t)
+        if (fileData.isDefined) {
+          val fileName = fileData.get._2
+          logError(log"Error while building the fair scheduler pools from 
${MDC(PATH, fileName)}",
+            t)
+        } else {
+          logError("Error while building the fair scheduler pools", t)
+        }
         throw t
     } finally {
       fileData.foreach { case (is, fileName) => is.close() }
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index dc0656778455..1418901e3442 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -534,9 +534,9 @@ private[spark] class TaskSetManager(
       // If the task cannot be serialized, then there's no point to re-attempt 
the task,
       // as it will always fail. So just abort the whole task-set.
       case NonFatal(e) =>
-        val msg = s"Failed to serialize task $taskId, not attempting to retry 
it."
+        val msg = log"Failed to serialize task ${MDC(TASK_ID, taskId)}, not 
attempting to retry it."
         logError(msg, e)
-        abort(s"$msg Exception during serialization: $e")
+        abort(s"${msg.message} Exception during serialization: $e")
         throw SparkCoreErrors.failToSerializeTaskError(e)
     }
     if (serializedTask.limit() > TaskSetManager.TASK_SIZE_TO_WARN_KIB * 1024 &&
diff --git 
a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala 
b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index 613080813d8e..5a0b2ba3735c 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -39,7 +39,8 @@ import org.roaringbitmap.RoaringBitmap
 
 import org.apache.spark._
 import org.apache.spark.api.python.PythonBroadcast
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.CLASS_NAME
 import org.apache.spark.internal.config.Kryo._
 import org.apache.spark.internal.io.FileCommitProtocol._
 import org.apache.spark.network.util.ByteUnit
@@ -739,7 +740,7 @@ private object JavaIterableWrapperSerializer extends 
Logging {
   private val underlyingMethodOpt = {
     try Some(wrapperClass.getDeclaredMethod("underlying")) catch {
       case e: Exception =>
-        logError("Failed to find the underlying field in " + wrapperClass, e)
+        logError(log"Failed to find the underlying field in ${MDC(CLASS_NAME, 
wrapperClass)}", e)
         None
     }
   }
diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala 
b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
index d50b8f935d56..109a9a2e3eb9 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
@@ -25,7 +25,8 @@ import scala.collection.mutable.HashMap
 import scala.jdk.CollectionConverters._
 
 import org.apache.spark.{JobExecutionStatus, SparkConf, SparkContext}
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.PATH
 import org.apache.spark.internal.config.Status.LIVE_UI_LOCAL_STORE_DIR
 import org.apache.spark.status.AppStatusUtils.getQuantilesValue
 import org.apache.spark.status.api.v1
@@ -864,7 +865,7 @@ private[spark] object AppStatusStore extends Logging {
         Some(localDir)
       } catch {
         case e: IOException =>
-          logError(s"Failed to create spark ui store path in $rootDir.", e)
+          logError(log"Failed to create spark ui store path in ${MDC(PATH, 
rootDir)}.", e)
           None
       }
     }
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index e68239f260d9..9aa100d9ff36 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -42,7 +42,7 @@ import org.apache.spark._
 import org.apache.spark.errors.SparkCoreErrors
 import org.apache.spark.executor.DataReadMethod
 import org.apache.spark.internal.{config, Logging, MDC}
-import org.apache.spark.internal.LogKey.BLOCK_ID
+import org.apache.spark.internal.LogKey.{BLOCK_ID, COUNT, SLEEP_TIME_SECONDS}
 import org.apache.spark.internal.config.{Network, 
RDD_CACHE_VISIBILITY_TRACKING_ENABLED, Tests}
 import org.apache.spark.memory.{MemoryManager, MemoryMode}
 import org.apache.spark.metrics.source.Source
@@ -626,8 +626,9 @@ private[spark] class BlockManager(
         return
       } catch {
         case e: Exception if i < MAX_ATTEMPTS =>
-          logError(s"Failed to connect to external shuffle server, will retry 
${MAX_ATTEMPTS - i}"
-            + s" more times after waiting $SLEEP_TIME_SECS seconds...", e)
+          logError(log"Failed to connect to external shuffle server, will 
retry " +
+            log"${MDC(COUNT, MAX_ATTEMPTS - i)} more times after waiting " +
+            log"${MDC(SLEEP_TIME_SECONDS, SLEEP_TIME_SECS)} seconds...", e)
           Thread.sleep(SLEEP_TIME_SECS * 1000L)
         case NonFatal(e) => throw 
SparkCoreErrors.unableToRegisterWithExternalShuffleServerError(e)
       }
diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
index 686003e2c51d..5b4ecef233f8 100644
--- 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
+++ 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
@@ -26,8 +26,8 @@ import scala.util.control.NonFatal
 
 import org.apache.spark._
 import org.apache.spark.errors.SparkCoreErrors
-import org.apache.spark.internal.Logging
-import org.apache.spark.internal.config
+import org.apache.spark.internal.{config, Logging, MDC}
+import org.apache.spark.internal.LogKey.SHUFFLE_BLOCK_INFO
 import org.apache.spark.shuffle.ShuffleBlockInfo
 import org.apache.spark.storage.BlockManagerMessages.ReplicateBlock
 import org.apache.spark.util.{ThreadUtils, Utils}
@@ -152,11 +152,13 @@ private[storage] class BlockManagerDecommissioner(
                   isTargetDecommissioned = true
                   keepRunning = false
                 } else {
-                  logError(s"Error occurred during migrating 
$shuffleBlockInfo", e)
+                  logError(log"Error occurred during migrating " +
+                    log"${MDC(SHUFFLE_BLOCK_INFO, shuffleBlockInfo)}", e)
                   keepRunning = false
                 }
               case e: Exception =>
-                logError(s"Error occurred during migrating $shuffleBlockInfo", 
e)
+                logError(log"Error occurred during migrating " +
+                  log"${MDC(SHUFFLE_BLOCK_INFO, shuffleBlockInfo)}", e)
                 keepRunning = false
             }
           }
diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
index ac453d0f743c..5bb4e096c029 100644
--- 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
+++ 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
@@ -32,7 +32,7 @@ import com.google.common.cache.CacheBuilder
 import org.apache.spark.{MapOutputTrackerMaster, SparkConf, SparkContext, 
SparkEnv}
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.internal.{config, Logging, MDC}
-import org.apache.spark.internal.LogKey.{BLOCK_MANAGER_ID, 
OLD_BLOCK_MANAGER_ID}
+import org.apache.spark.internal.LogKey.{BLOCK_MANAGER_ID, EXECUTOR_ID, 
OLD_BLOCK_MANAGER_ID}
 import org.apache.spark.internal.config.RDD_CACHE_VISIBILITY_TRACKING_ENABLED
 import org.apache.spark.network.shuffle.{ExternalBlockStoreClient, 
RemoteBlockPushResolver}
 import org.apache.spark.rpc.{IsolatedThreadSafeRpcEndpoint, RpcCallContext, 
RpcEndpointRef, RpcEnv}
@@ -328,7 +328,8 @@ class BlockManagerMasterEndpoint(
         // care about the return result of removing blocks. That way we avoid 
breaking
         // down the whole application.
         case NonFatal(e) =>
-          logError(s"Cannot determine whether executor $executorId is alive or 
not.", e)
+          logError(log"Cannot determine whether executor " +
+            log"${MDC(EXECUTOR_ID, executorId)} is alive or not.", e)
           false
       }
       if (!isAlive) {
diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala
 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala
index 5cc08714d41c..1fccbd16ced5 100644
--- 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala
+++ 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala
@@ -20,7 +20,8 @@ package org.apache.spark.storage
 import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, 
Future}
 
 import org.apache.spark.{MapOutputTracker, SparkEnv}
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC, MessageWithContext}
+import org.apache.spark.internal.LogKey.{BLOCK_ID, BROADCAST_ID, RDD_ID, 
SHUFFLE_ID}
 import org.apache.spark.rpc.{IsolatedThreadSafeRpcEndpoint, RpcCallContext, 
RpcEnv}
 import org.apache.spark.storage.BlockManagerMessages._
 import org.apache.spark.util.{ThreadUtils, Utils}
@@ -44,18 +45,18 @@ class BlockManagerStorageEndpoint(
   // Operations that involve removing blocks may be slow and should be done 
asynchronously
   override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, 
Unit] = {
     case RemoveBlock(blockId) =>
-      doAsync[Boolean]("removing block " + blockId, context) {
+      doAsync[Boolean](log"removing block ${MDC(BLOCK_ID, blockId)}", context) 
{
         blockManager.removeBlock(blockId)
         true
       }
 
     case RemoveRdd(rddId) =>
-      doAsync[Int]("removing RDD " + rddId, context) {
+      doAsync[Int](log"removing RDD ${MDC(RDD_ID, rddId)}", context) {
         blockManager.removeRdd(rddId)
       }
 
     case RemoveShuffle(shuffleId) =>
-      doAsync[Boolean]("removing shuffle " + shuffleId, context) {
+      doAsync[Boolean](log"removing shuffle ${MDC(SHUFFLE_ID, shuffleId)}", 
context) {
         if (mapOutputTracker != null) {
           mapOutputTracker.unregisterShuffle(shuffleId)
         }
@@ -66,7 +67,7 @@ class BlockManagerStorageEndpoint(
       context.reply(blockManager.decommissionSelf())
 
     case RemoveBroadcast(broadcastId, _) =>
-      doAsync[Int]("removing broadcast " + broadcastId, context) {
+      doAsync[Int](log"removing broadcast ${MDC(BROADCAST_ID, broadcastId)}", 
context) {
         blockManager.removeBroadcast(broadcastId, tellMaster = true)
       }
 
@@ -96,18 +97,20 @@ class BlockManagerStorageEndpoint(
       
context.reply(blockManager.blockInfoManager.tryMarkBlockAsVisible(blockId))
   }
 
-  private def doAsync[T](actionMessage: String, context: RpcCallContext)(body: 
=> T): Unit = {
+  private def doAsync[T](
+      actionMessage: MessageWithContext,
+      context: RpcCallContext)(body: => T): Unit = {
     val future = Future {
-      logDebug(actionMessage)
+      logDebug(actionMessage.message)
       body
     }
     future.foreach { response =>
-      logDebug(s"Done $actionMessage, response is $response")
+      logDebug(s"Done ${actionMessage.message}, response is $response")
       context.reply(response)
       logDebug(s"Sent response: $response to ${context.senderAddress}")
     }
     future.failed.foreach { t =>
-      logError(s"Error in $actionMessage", t)
+      logError(log"Error in " + actionMessage, t)
       context.sendFailure(t)
     }
   }
diff --git 
a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
index 7446a55fc7c3..4c0b5f4a14f6 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
@@ -30,7 +30,8 @@ import com.fasterxml.jackson.module.scala.DefaultScalaModule
 import org.apache.spark.{SparkConf, SparkException}
 import org.apache.spark.errors.SparkCoreErrors
 import org.apache.spark.executor.ExecutorExitCode
-import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.internal.{config, Logging, MDC}
+import org.apache.spark.internal.LogKey.{MERGE_DIR_NAME, PATH}
 import org.apache.spark.network.shuffle.ExecutorDiskUtils
 import org.apache.spark.storage.DiskBlockManager.ATTEMPT_ID_KEY
 import org.apache.spark.storage.DiskBlockManager.MERGE_DIR_KEY
@@ -255,7 +256,8 @@ private[spark] class DiskBlockManager(
         Some(localDir)
       } catch {
         case e: IOException =>
-          logError(s"Failed to create local dir in $rootDir. Ignoring this 
directory.", e)
+          logError(
+            log"Failed to create local dir in ${MDC(PATH, rootDir)}. Ignoring 
this directory.", e)
           None
       }
     }
@@ -292,7 +294,8 @@ private[spark] class DiskBlockManager(
         } catch {
           case e: IOException =>
             logError(
-              s"Failed to create $mergeDirName dir in $rootDir. Ignoring this 
directory.", e)
+              log"Failed to create ${MDC(MERGE_DIR_NAME, mergeDirName)} dir in 
" +
+                log"${MDC(PATH, rootDir)}. Ignoring this directory.", e)
         }
       }
     }
@@ -370,7 +373,7 @@ private[spark] class DiskBlockManager(
             }
           } catch {
             case e: Exception =>
-              logError(s"Exception while deleting local spark dir: $localDir", 
e)
+              logError(log"Exception while deleting local spark dir: 
${MDC(PATH, localDir)}", e)
           }
         }
       }
diff --git 
a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala 
b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala
index 80e268081fa7..0b6e33ff5fb3 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala
@@ -271,7 +271,8 @@ private[spark] class DiskBlockObjectWriter(
           logError(log"Exception occurred while reverting partial writes to 
file "
             + log"${MDC(PATH, file)}, ${MDC(ERROR, ce.getMessage)}")
         case e: Exception =>
-          logError("Uncaught exception while reverting partial writes to file 
" + file, e)
+          logError(
+            log"Uncaught exception while reverting partial writes to file 
${MDC(PATH, file)}", e)
       } finally {
         if (truncateStream != null) {
           truncateStream.close()
diff --git 
a/core/src/main/scala/org/apache/spark/storage/PushBasedFetchHelper.scala 
b/core/src/main/scala/org/apache/spark/storage/PushBasedFetchHelper.scala
index 9b6048e90c9a..31958af84e54 100644
--- a/core/src/main/scala/org/apache/spark/storage/PushBasedFetchHelper.scala
+++ b/core/src/main/scala/org/apache/spark/storage/PushBasedFetchHelper.scala
@@ -28,7 +28,8 @@ import org.roaringbitmap.RoaringBitmap
 
 import org.apache.spark.MapOutputTracker
 import org.apache.spark.MapOutputTracker.SHUFFLE_PUSH_MAP_ID
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{HOST, PORT, REDUCE_ID, SHUFFLE_ID, 
SHUFFLE_MERGE_ID}
 import org.apache.spark.network.shuffle.{BlockStoreClient, MergedBlockMeta, 
MergedBlocksMetaListener}
 import org.apache.spark.shuffle.ShuffleReadMetricsReporter
 import org.apache.spark.storage.BlockManagerId.SHUFFLE_MERGER_IDENTIFIER
@@ -170,9 +171,10 @@ private class PushBasedFetchHelper(
             reduceId, sizeMap((shuffleId, reduceId)), meta.readChunkBitmaps(), 
address))
         } catch {
           case exception: Exception =>
-            logError(s"Failed to parse the meta of push-merged block for 
($shuffleId, " +
-              s"$shuffleMergeId, $reduceId) from" +
-              s" ${req.address.host}:${req.address.port}", exception)
+            logError(log"Failed to parse the meta of push-merged block for (" +
+              log"${MDC(SHUFFLE_ID, shuffleId)}, ${MDC(SHUFFLE_MERGE_ID, 
shuffleMergeId)}, " +
+              log"${MDC(REDUCE_ID, reduceId)}) from ${MDC(HOST, 
req.address.host)}" +
+              log":${MDC(PORT, req.address.port)}", exception)
             iterator.addToResultsQueue(
               PushMergedRemoteMetaFailedFetchResult(shuffleId, shuffleMergeId, 
reduceId,
                 address))
@@ -181,8 +183,9 @@ private class PushBasedFetchHelper(
 
       override def onFailure(shuffleId: Int, shuffleMergeId: Int, reduceId: 
Int,
           exception: Throwable): Unit = {
-        logError(s"Failed to get the meta of push-merged block for 
($shuffleId, $reduceId) " +
-          s"from ${req.address.host}:${req.address.port}", exception)
+        logError(log"Failed to get the meta of push-merged block for " +
+          log"(${MDC(SHUFFLE_ID, shuffleId)}, ${MDC(REDUCE_ID, reduceId)}) " +
+          log"from ${MDC(HOST, req.address.host)}:${MDC(PORT, 
req.address.port)}", exception)
         iterator.addToResultsQueue(
           PushMergedRemoteMetaFailedFetchResult(shuffleId, shuffleMergeId, 
reduceId, address))
       }
diff --git 
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
 
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
index 916cb83d379e..d22ce3dbed77 100644
--- 
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
+++ 
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -37,7 +37,7 @@ import org.apache.spark.{MapOutputTracker, SparkException, 
TaskContext}
 import org.apache.spark.MapOutputTracker.SHUFFLE_PUSH_MAP_ID
 import org.apache.spark.errors.SparkCoreErrors
 import org.apache.spark.internal.{Logging, MDC}
-import org.apache.spark.internal.LogKey.{BLOCK_ID, ERROR, MAX_ATTEMPTS}
+import org.apache.spark.internal.LogKey.{BLOCK_ID, ERROR, HOST, MAX_ATTEMPTS, 
PORT}
 import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, 
ManagedBuffer}
 import org.apache.spark.network.shuffle._
 import org.apache.spark.network.shuffle.checksum.{Cause, ShuffleChecksumHelper}
@@ -314,7 +314,8 @@ final class ShuffleBlockFetcherIterator(
 
       override def onBlockFetchFailure(blockId: String, e: Throwable): Unit = {
         ShuffleBlockFetcherIterator.this.synchronized {
-          logError(s"Failed to get block(s) from 
${req.address.host}:${req.address.port}", e)
+          logError(log"Failed to get block(s) from " +
+            log"${MDC(HOST, req.address.host)}:${MDC(PORT, 
req.address.port)}", e)
           e match {
             // SPARK-27991: Catch the Netty OOM and set the flag 
`isNettyOOMOnShuffle` (shared among
             // tasks) to true as early as possible. The pending fetch requests 
won't be sent
diff --git a/core/src/main/scala/org/apache/spark/ui/DriverLogPage.scala 
b/core/src/main/scala/org/apache/spark/ui/DriverLogPage.scala
index 83a8fd628cd7..310211515999 100644
--- a/core/src/main/scala/org/apache/spark/ui/DriverLogPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/DriverLogPage.scala
@@ -21,7 +21,8 @@ import scala.xml.{Node, Unparsed}
 import jakarta.servlet.http.HttpServletRequest
 
 import org.apache.spark.SparkConf
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{LOG_TYPE, PATH}
 import org.apache.spark.internal.config.DRIVER_LOG_LOCAL_DIR
 import org.apache.spark.util.Utils
 import org.apache.spark.util.logging.DriverLogger.DRIVER_LOG_FILE
@@ -136,7 +137,8 @@ private[ui] class DriverLogPage(
       (logText, startIndex, endIndex, totalLength)
     } catch {
       case e: Exception =>
-        logError(s"Error getting $logType logs from directory $logDirectory", 
e)
+        logError(log"Error getting ${MDC(LOG_TYPE, logType)} logs from 
directory " +
+          log"${MDC(PATH, logDirectory)}", e)
         ("Error getting logs due to exception: " + e.getMessage, 0, 0, 0)
     }
   }
diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala 
b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
index 099e47abf408..ddf451c16f3a 100644
--- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
@@ -23,7 +23,8 @@ import jakarta.servlet.http.{HttpServlet, HttpServletRequest, 
HttpServletRespons
 import org.eclipse.jetty.servlet.ServletContextHandler
 
 import org.apache.spark.{SecurityManager, SparkConf, SparkContext}
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.CLASS_NAME
 import org.apache.spark.internal.config.DRIVER_LOG_LOCAL_DIR
 import org.apache.spark.internal.config.UI._
 import org.apache.spark.scheduler._
@@ -155,7 +156,7 @@ private[spark] class SparkUI private (
       serverInfo = Some(server)
     } catch {
       case e: Exception =>
-        logError(s"Failed to bind $className", e)
+        logError(log"Failed to bind ${MDC(CLASS_NAME, className)}", e)
         System.exit(1)
     }
   }
diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala 
b/core/src/main/scala/org/apache/spark/ui/WebUI.scala
index 2c937e71f64b..baeed322e8ad 100644
--- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala
@@ -29,7 +29,8 @@ import org.eclipse.jetty.servlet.{FilterHolder, 
FilterMapping, ServletContextHan
 import org.json4s.JsonAST.{JNothing, JValue}
 
 import org.apache.spark.{SecurityManager, SparkConf, SSLOptions}
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.CLASS_NAME
 import org.apache.spark.internal.config._
 import org.apache.spark.ui.JettyUtils._
 import org.apache.spark.util.Utils
@@ -158,7 +159,7 @@ private[spark] abstract class WebUI(
       logInfo(s"Bound $className to $hostName, and started at $webUrl")
     } catch {
       case e: Exception =>
-        logError(s"Failed to bind $className", e)
+        logError(log"Failed to bind ${MDC(CLASS_NAME, className)}", e)
         System.exit(1)
     }
   }
diff --git a/core/src/main/scala/org/apache/spark/util/EventLoop.scala 
b/core/src/main/scala/org/apache/spark/util/EventLoop.scala
index 5125adc9f7ca..eaa9ef517294 100644
--- a/core/src/main/scala/org/apache/spark/util/EventLoop.scala
+++ b/core/src/main/scala/org/apache/spark/util/EventLoop.scala
@@ -22,7 +22,8 @@ import java.util.concurrent.atomic.AtomicBoolean
 
 import scala.util.control.NonFatal
 
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.EVENT_LOOP
 
 /**
  * An event loop to receive events from the caller and process all events in 
the event thread. It
@@ -52,13 +53,13 @@ private[spark] abstract class EventLoop[E](name: String) 
extends Logging {
               try {
                 onError(e)
               } catch {
-                case NonFatal(e) => logError("Unexpected error in " + name, e)
+                case NonFatal(e) => logError(log"Unexpected error in 
${MDC(EVENT_LOOP, name)}", e)
               }
           }
         }
       } catch {
         case ie: InterruptedException => // exit even if eventQueue is not 
empty
-        case NonFatal(e) => logError("Unexpected error in " + name, e)
+        case NonFatal(e) => logError(log"Unexpected error in ${MDC(EVENT_LOOP, 
name)}", e)
       }
     }
 
diff --git a/core/src/main/scala/org/apache/spark/util/ListenerBus.scala 
b/core/src/main/scala/org/apache/spark/util/ListenerBus.scala
index f1daa76f3116..814201d8c959 100644
--- a/core/src/main/scala/org/apache/spark/util/ListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/util/ListenerBus.scala
@@ -26,7 +26,8 @@ import scala.util.control.NonFatal
 import com.codahale.metrics.Timer
 
 import org.apache.spark.SparkEnv
-import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.internal.{config, Logging, MDC}
+import org.apache.spark.internal.LogKey.LISTENER
 import org.apache.spark.scheduler.EventLoggingListener
 import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate
 
@@ -122,10 +123,11 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends 
Logging {
         }
       } catch {
         case ie: InterruptedException =>
-          logError(s"Interrupted while posting to ${listenerName}. Removing 
that listener.", ie)
+          logError(log"Interrupted while posting to " +
+            log"${MDC(LISTENER, listenerName)}. Removing that listener.", ie)
           removeListenerOnError(listener)
         case NonFatal(e) if !isIgnorableException(e) =>
-          logError(s"Listener ${listenerName} threw an exception", e)
+          logError(log"Listener ${MDC(LISTENER, listenerName)} threw an 
exception", e)
       } finally {
         if (maybeTimerContext != null) {
           val elapsed = maybeTimerContext.stop()
diff --git 
a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala 
b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
index c6cad9440168..b9dece19f265 100644
--- a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
+++ b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
@@ -26,7 +26,8 @@ import scala.util.Try
 import org.apache.hadoop.fs.FileSystem
 
 import org.apache.spark.SparkConf
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.PATH
 import org.apache.spark.internal.config.SPARK_SHUTDOWN_TIMEOUT_MS
 
 
@@ -68,7 +69,8 @@ private[spark] object ShutdownHookManager extends Logging {
         logInfo("Deleting directory " + dirPath)
         Utils.deleteRecursively(new File(dirPath))
       } catch {
-        case e: Exception => logError(s"Exception while deleting Spark temp 
dir: $dirPath", e)
+        case e: Exception =>
+          logError(log"Exception while deleting Spark temp dir: ${MDC(PATH, 
dirPath)}", e)
       }
     }
   }
diff --git 
a/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala 
b/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala
index b24129eb3697..74f1474f9cf7 100644
--- 
a/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala
+++ 
b/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala
@@ -17,7 +17,8 @@
 
 package org.apache.spark.util
 
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.THREAD
 
 /**
  * The default uncaught exception handler for Spark daemons. It terminates the 
whole process for
@@ -36,11 +37,14 @@ private[spark] class SparkUncaughtExceptionHandler(val 
exitOnUncaughtException:
 
   override def uncaughtException(thread: Thread, exception: Throwable): Unit = 
{
     try {
+      val mdc = MDC(THREAD, thread)
       // Make it explicit that uncaught exceptions are thrown when container 
is shutting down.
       // It will help users when they analyze the executor logs
-      val inShutdownMsg = if (ShutdownHookManager.inShutdown()) "[Container in 
shutdown] " else ""
-      val errMsg = "Uncaught exception in thread "
-      logError(inShutdownMsg + errMsg + thread, exception)
+      if (ShutdownHookManager.inShutdown()) {
+        logError(log"[Container in shutdown] Uncaught exception in thread 
$mdc", exception)
+      } else {
+        logError(log"Uncaught exception in thread $mdc", exception)
+      }
 
       // We may have been called from a shutdown hook. If so, we must not call 
System.exit().
       // (If we do, we will deadlock.)
@@ -61,7 +65,9 @@ private[spark] class SparkUncaughtExceptionHandler(val 
exitOnUncaughtException:
     } catch {
       case oom: OutOfMemoryError =>
         try {
-          logError(s"Uncaught OutOfMemoryError in thread $thread, process 
halted.", oom)
+          logError(
+            log"Uncaught OutOfMemoryError in thread ${MDC(THREAD, thread)}, 
process halted.",
+            oom)
         } catch {
           // absorb any exception/error since we're halting the process
           case _: Throwable =>
@@ -69,7 +75,9 @@ private[spark] class SparkUncaughtExceptionHandler(val 
exitOnUncaughtException:
         Runtime.getRuntime.halt(SparkExitCode.OOM)
       case t: Throwable =>
         try {
-          logError(s"Another uncaught exception in thread $thread, process 
halted.", t)
+          logError(
+            log"Another uncaught exception in thread ${MDC(THREAD, thread)}, 
process halted.",
+            t)
         } catch {
           case _: Throwable =>
         }
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index d7e174f5497c..7022506e5508 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -69,7 +69,7 @@ import org.slf4j.Logger
 import org.apache.spark._
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.{Logging, MDC}
-import org.apache.spark.internal.LogKey.{COMMAND, COMMAND_OUTPUT, EXIT_CODE, 
PATH}
+import org.apache.spark.internal.LogKey._
 import org.apache.spark.internal.config._
 import org.apache.spark.internal.config.Streaming._
 import org.apache.spark.internal.config.Tests.IS_TESTING
@@ -1276,11 +1276,13 @@ private[spark] object Utils
       case t: Throwable =>
         val currentThreadName = Thread.currentThread().getName
         if (sc != null) {
-          logError(s"uncaught error in thread $currentThreadName, stopping 
SparkContext", t)
+          logError(log"uncaught error in thread ${MDC(THREAD_NAME, 
currentThreadName)}, " +
+              log"stopping SparkContext", t)
           sc.stopInNewThread()
         }
         if (!NonFatal(t)) {
-          logError(s"throw uncaught fatal error in thread $currentThreadName", 
t)
+          logError(
+            log"throw uncaught fatal error in thread ${MDC(THREAD_NAME, 
currentThreadName)}", t)
           throw t
         }
     }
@@ -1292,7 +1294,8 @@ private[spark] object Utils
       block
     } catch {
       case NonFatal(t) =>
-        logError(s"Uncaught exception in thread 
${Thread.currentThread().getName}", t)
+        logError(
+          log"Uncaught exception in thread ${MDC(THREAD_NAME, 
Thread.currentThread().getName)}", t)
     }
   }
 
@@ -1469,7 +1472,7 @@ private[spark] object Utils
       fileSize
     } catch {
       case e: Throwable =>
-        logError(s"Cannot get file length of ${file}", e)
+        logError(log"Cannot get file length of ${MDC(PATH, file)}", e)
         throw e
     } finally {
       if (gzInputStream != null) {
@@ -1847,7 +1850,8 @@ private[spark] object Utils
       case ct: ControlThrowable =>
         throw ct
       case t: Throwable =>
-        logError(s"Uncaught exception in thread 
${Thread.currentThread().getName}", t)
+        logError(
+          log"Uncaught exception in thread ${MDC(THREAD_NAME, 
Thread.currentThread().getName)}", t)
         throw t
     }
   }
@@ -1861,7 +1865,8 @@ private[spark] object Utils
       case ct: ControlThrowable =>
         throw ct
       case t: Throwable =>
-        logError(s"Uncaught exception in thread 
${Thread.currentThread().getName}", t)
+        logError(
+          log"Uncaught exception in thread ${MDC(THREAD_NAME, 
Thread.currentThread().getName)}", t)
         scala.util.Failure(t)
     }
   }
@@ -2348,7 +2353,8 @@ private[spark] object Utils
         val currentUserGroups = groupMappingServiceProvider.getGroups(username)
         return currentUserGroups
       } catch {
-        case e: Exception => logError(s"Error getting groups for 
user=$username", e)
+        case e: Exception =>
+          logError(log"Error getting groups for user=${MDC(USER_NAME, 
username)}", e)
       }
     }
     EMPTY_USER_GROUPS
diff --git 
a/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala 
b/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala
index 2243239dce6f..1dadf15da40f 100644
--- a/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala
+++ b/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala
@@ -20,7 +20,8 @@ package org.apache.spark.util.logging
 import java.io.{File, FileOutputStream, InputStream, IOException}
 
 import org.apache.spark.SparkConf
-import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.internal.{config, Logging, MDC}
+import org.apache.spark.internal.LogKey.PATH
 import org.apache.spark.util.{IntParam, Utils}
 
 /**
@@ -90,7 +91,7 @@ private[spark] class FileAppender(
       }
     } catch {
       case e: Exception =>
-        logError(s"Error writing stream to file $file", e)
+        logError(log"Error writing stream to file ${MDC(PATH, file)}", e)
     }
   }
 
diff --git 
a/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala 
b/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala
index e374c41b9140..f8f144f6e388 100644
--- 
a/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala
+++ 
b/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala
@@ -24,7 +24,8 @@ import com.google.common.io.Files
 import org.apache.commons.io.IOUtils
 
 import org.apache.spark.SparkConf
-import org.apache.spark.internal.config
+import org.apache.spark.internal.{config, MDC}
+import org.apache.spark.internal.LogKey.PATH
 import org.apache.spark.util.ArrayImplicits._
 
 /**
@@ -77,7 +78,7 @@ private[spark] class RollingFileAppender(
       }
     } catch {
       case e: Exception =>
-        logError(s"Error rolling over $activeFile", e)
+        logError(log"Error rolling over ${MDC(PATH, activeFile)}", e)
     }
   }
 
@@ -156,7 +157,8 @@ private[spark] class RollingFileAppender(
       }
     } catch {
       case e: Exception =>
-        logError("Error cleaning logs in directory " + 
activeFile.getParentFile.getAbsolutePath, e)
+        val path = activeFile.getParentFile.getAbsolutePath
+        logError(log"Error cleaning logs in directory ${MDC(PATH, path)}", e)
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to