This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new c88fabfee41d [SPARK-47604][CORE] Resource managers: Migrate logInfo 
with variables to structured logging framework
c88fabfee41d is described below

commit c88fabfee41df1ca4729058450ec6f798641c936
Author: panbingkun <panbing...@baidu.com>
AuthorDate: Tue Apr 23 11:00:44 2024 -0700

    [SPARK-47604][CORE] Resource managers: Migrate logInfo with variables to 
structured logging framework
    
    ### What changes were proposed in this pull request?
    The pr aims to migrate `logInfo` in module `Resource managers` with 
variables to `structured logging framework`.
    
    ### Why are the changes needed?
    To enhance Apache Spark's logging system by implementing structured logging.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    - Pass GA.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #46130 from panbingkun/SPARK-47604.
    
    Authored-by: panbingkun <panbing...@baidu.com>
    Signed-off-by: Gengliang Wang <gengli...@apache.org>
---
 .../scala/org/apache/spark/internal/LogKey.scala   |  45 +++++++-
 .../execution/ExecuteResponseObserver.scala        |   2 +-
 .../deploy/k8s/SparkKubernetesClientFactory.scala  |   9 +-
 .../k8s/submit/KubernetesClientApplication.scala   |   8 +-
 .../deploy/k8s/submit/KubernetesClientUtils.scala  |   4 +-
 .../k8s/submit/LoggingPodStatusWatcher.scala       |  20 ++--
 .../cluster/k8s/ExecutorPodsAllocator.scala        |  25 ++--
 .../cluster/k8s/ExecutorPodsLifecycleManager.scala |   8 +-
 .../scheduler/cluster/k8s/ExecutorRollPlugin.scala |   4 +-
 .../cluster/k8s/KubernetesClusterManager.scala     |   5 +-
 .../k8s/KubernetesClusterSchedulerBackend.scala    |  11 +-
 ...ernetesLocalDiskShuffleExecutorComponents.scala |  21 ++--
 .../spark/deploy/yarn/ApplicationMaster.scala      |  40 ++++---
 .../org/apache/spark/deploy/yarn/Client.scala      |  59 ++++++----
 .../apache/spark/deploy/yarn/ClientArguments.scala |   5 +-
 .../spark/deploy/yarn/ExecutorRunnable.scala       |  29 ++---
 .../spark/deploy/yarn/SparkRackResolver.scala      |   7 +-
 .../apache/spark/deploy/yarn/YarnAllocator.scala   | 126 ++++++++++++---------
 .../yarn/YarnAllocatorNodeHealthTracker.scala      |  12 +-
 .../cluster/YarnClientSchedulerBackend.scala       |   4 +-
 .../scheduler/cluster/YarnSchedulerBackend.scala   |  17 +--
 21 files changed, 283 insertions(+), 178 deletions(-)

diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala 
b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
index 585373f1782b..b9b0e372a2b0 100644
--- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
+++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
@@ -26,9 +26,12 @@ object LogKey extends Enumeration {
   val ACTUAL_PARTITION_COLUMN = Value
   val ALPHA = Value
   val ANALYSIS_ERROR = Value
+  val APP_ATTEMPT_ID = Value
   val APP_DESC = Value
   val APP_ID = Value
+  val APP_NAME = Value
   val APP_STATE = Value
+  val ARGS = Value
   val BACKUP_FILE = Value
   val BATCH_ID = Value
   val BLOCK_ID = Value
@@ -45,6 +48,7 @@ object LogKey extends Enumeration {
   val CATEGORICAL_FEATURES = Value
   val CHECKPOINT_FILE = Value
   val CHECKPOINT_TIME = Value
+  val CHECKSUM_FILE_NUM = Value
   val CLASS_LOADER = Value
   val CLASS_NAME = Value
   val CLUSTER_CENTROIDS = Value
@@ -70,6 +74,7 @@ object LogKey extends Enumeration {
   val CONSUMER = Value
   val CONTAINER = Value
   val CONTAINER_ID = Value
+  val CONTAINER_STATE = Value
   val COST = Value
   val COUNT = Value
   val CROSS_VALIDATION_METRIC = Value
@@ -85,6 +90,7 @@ object LogKey extends Enumeration {
   val DATABASE_NAME = Value
   val DATAFRAME_CACHE_ENTRY = Value
   val DATAFRAME_ID = Value
+  val DATA_FILE_NUM = Value
   val DATA_SOURCE = Value
   val DATA_SOURCES = Value
   val DATA_SOURCE_PROVIDER = Value
@@ -113,10 +119,16 @@ object LogKey extends Enumeration {
   val EXECUTE_INFO = Value
   val EXECUTE_KEY = Value
   val EXECUTION_PLAN_LEAVES = Value
+  val EXECUTOR_DESIRED_COUNT = Value
+  val EXECUTOR_ENVS = Value
   val EXECUTOR_ENV_REGEX = Value
   val EXECUTOR_ID = Value
   val EXECUTOR_IDS = Value
+  val EXECUTOR_LAUNCH_COMMANDS = Value
+  val EXECUTOR_LAUNCH_COUNT = Value
+  val EXECUTOR_RESOURCES = Value
   val EXECUTOR_STATE = Value
+  val EXECUTOR_TARGET_COUNT = Value
   val EXIT_CODE = Value
   val EXPECTED_NUM_FILES = Value
   val EXPECTED_PARTITION_COLUMN = Value
@@ -129,8 +141,10 @@ object LogKey extends Enumeration {
   val FEATURE_COLUMN = Value
   val FEATURE_DIMENSION = Value
   val FIELD_NAME = Value
+  val FILE_ABSOLUTE_PATH = Value
   val FILE_FORMAT = Value
   val FILE_FORMAT2 = Value
+  val FILE_NAME = Value
   val FILE_VERSION = Value
   val FINISH_TRIGGER_DURATION = Value
   val FROM_OFFSET = Value
@@ -140,6 +154,7 @@ object LogKey extends Enumeration {
   val GROUP_ID = Value
   val HADOOP_VERSION = Value
   val HASH_JOIN_KEYS = Value
+  val HEARTBEAT_INTERVAL = Value
   val HISTORY_DIR = Value
   val HIVE_CLIENT_VERSION = Value
   val HIVE_METASTORE_VERSION = Value
@@ -149,18 +164,22 @@ object LogKey extends Enumeration {
   val HOST_PORT = Value
   val INCOMPATIBLE_TYPES = Value
   val INDEX = Value
+  val INDEX_FILE_NUM = Value
   val INDEX_NAME = Value
   val INFERENCE_MODE = Value
   val INITIAL_CAPACITY = Value
+  val INITIAL_HEARTBEAT_INTERVAL = Value
   val INIT_MODE = Value
   val INTERVAL = Value
   val ISOLATION_LEVEL = Value
   val JOB_ID = Value
   val JOIN_CONDITION = Value
   val JOIN_CONDITION_SUB_EXPR = Value
+  val K8S_CONTEXT = Value
   val KAFKA_PULLS_COUNT = Value
   val KAFKA_RECORDS_PULLED_COUNT = Value
   val KEY = Value
+  val KEYTAB = Value
   val LABEL_COLUMN = Value
   val LARGEST_CLUSTER_INDEX = Value
   val LAST_ACCESS_TIME = Value
@@ -187,9 +206,11 @@ object LogKey extends Enumeration {
   val MAX_CATEGORIES = Value
   val MAX_EXECUTOR_FAILURES = Value
   val MAX_FILE_VERSION = Value
+  val MAX_MEMORY_SIZE = Value
   val MAX_PARTITIONS_SIZE = Value
   val MAX_SIZE = Value
   val MAX_TABLE_PARTITION_METADATA_SIZE = Value
+  val MEMORY_SIZE = Value
   val MERGE_DIR_NAME = Value
   val MESSAGE = Value
   val METHOD_NAME = Value
@@ -202,6 +223,8 @@ object LogKey extends Enumeration {
   val NEW_LABEL_COLUMN_NAME = Value
   val NEW_PATH = Value
   val NEW_VALUE = Value
+  val NODES = Value
+  val NODE_LOCATION = Value
   val NORM = Value
   val NUM_BIN = Value
   val NUM_CLASSES = Value
@@ -226,6 +249,7 @@ object LogKey extends Enumeration {
   val OPTIONS = Value
   val OP_ID = Value
   val OP_TYPE = Value
+  val OVERHEAD_MEMORY_SIZE = Value
   val PARSE_MODE = Value
   val PARTITIONED_FILE_READER = Value
   val PARTITIONS_SIZE = Value
@@ -235,15 +259,21 @@ object LogKey extends Enumeration {
   val PATH = Value
   val PATHS = Value
   val PIPELINE_STAGE_UID = Value
+  val POD_COUNT = Value
   val POD_ID = Value
   val POD_NAME = Value
   val POD_NAMESPACE = Value
   val POD_PHASE = Value
+  val POD_SHARED_SLOT_COUNT = Value
+  val POD_STATE = Value
+  val POD_TARGET_COUNT = Value
   val POLICY = Value
   val PORT = Value
+  val PRINCIPAL = Value
   val PROCESSING_TIME = Value
   val PRODUCER_ID = Value
   val PROVIDER = Value
+  val PVC_METADATA_NAME = Value
   val QUERY_CACHE_VALUE = Value
   val QUERY_HINT = Value
   val QUERY_ID = Value
@@ -267,11 +297,16 @@ object LogKey extends Enumeration {
   val RELATION_NAME = Value
   val RELATIVE_TOLERANCE = Value
   val REMAINING_PARTITIONS = Value
+  val REPORT_DETAILS = Value
+  val RESOURCE = Value
   val RESOURCE_NAME = Value
+  val RESOURCE_PROFILE_ID = Value
+  val RESOURCE_PROFILE_IDS = Value
   val RETRY_COUNT = Value
   val RETRY_INTERVAL = Value
   val RIGHT_EXPR = Value
   val RMSE = Value
+  val RPC_ENDPOINT_REF = Value
   val RULE_BATCH_NAME = Value
   val RULE_NAME = Value
   val RULE_NUMBER_OF_RUNS = Value
@@ -286,6 +321,7 @@ object LogKey extends Enumeration {
   val SHUFFLE_BLOCK_INFO = Value
   val SHUFFLE_ID = Value
   val SHUFFLE_MERGE_ID = Value
+  val SHUFFLE_SERVICE_NAME = Value
   val SIZE = Value
   val SLEEP_TIME = Value
   val SLIDE_DURATION = Value
@@ -293,6 +329,7 @@ object LogKey extends Enumeration {
   val SPARK_DATA_STREAM = Value
   val SPARK_PLAN_ID = Value
   val SQL_TEXT = Value
+  val SRC_PATH = Value
   val STAGE_ID = Value
   val START_INDEX = Value
   val STATEMENT_ID = Value
@@ -309,6 +346,7 @@ object LogKey extends Enumeration {
   val SUB_QUERY = Value
   val TABLE_NAME = Value
   val TABLE_TYPES = Value
+  val TARGET_PATH = Value
   val TASK_ATTEMPT_ID = Value
   val TASK_ID = Value
   val TASK_NAME = Value
@@ -326,14 +364,15 @@ object LogKey extends Enumeration {
   val TIMER_LABEL = Value
   val TIME_UNITS = Value
   val TIP = Value
+  val TOKEN_REGEX = Value
   val TOPIC = Value
   val TOPIC_PARTITION = Value
   val TOPIC_PARTITIONS = Value
   val TOPIC_PARTITION_OFFSET = Value
   val TOPIC_PARTITION_OFFSET_RANGE = Value
+  val TOTAL = Value
   val TOTAL_EFFECTIVE_TIME = Value
   val TOTAL_RECORDS_READ = Value
-  val TOTAL_SIZE = Value
   val TOTAL_TIME = Value
   val TOTAL_TIME_READ = Value
   val TO_TIME = Value
@@ -343,6 +382,9 @@ object LogKey extends Enumeration {
   val TRAIN_WORD_COUNT = Value
   val TREE_NODE = Value
   val TRIGGER_INTERVAL = Value
+  val UI_FILTER = Value
+  val UI_FILTER_PARAMS = Value
+  val UI_PROXY_BASE = Value
   val UNSUPPORTED_EXPR = Value
   val UNSUPPORTED_HINT_REASON = Value
   val UNTIL_OFFSET = Value
@@ -351,6 +393,7 @@ object LogKey extends Enumeration {
   val USER_ID = Value
   val USER_NAME = Value
   val VALUE = Value
+  val VIRTUAL_CORES = Value
   val VOCAB_SIZE = Value
   val WAIT_RESULT_TIME = Value
   val WAIT_SEND_TIME = Value
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala
index 30a899a2ac13..1a6ceffd41c3 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala
@@ -246,7 +246,7 @@ private[connect] class ExecuteResponseObserver[T <: 
Message](val executeHolder:
     // scalastyle:off line.size.limit
     logInfo(
       log"Release all for opId=${MDC(LogKey.OP_ID, 
executeHolder.operationId)}. Execution stats: " +
-        log"total=${MDC(LogKey.TOTAL_SIZE, totalSize)} " +
+        log"total=${MDC(LogKey.TOTAL, totalSize)} " +
         log"autoRemoved=${MDC(LogKey.CACHE_AUTO_REMOVED_SIZE, 
autoRemovedSize)} " +
         
log"cachedUntilConsumed=${MDC(LogKey.CACHE_UNTIL_HIGHEST_CONSUMED_SIZE, 
cachedSizeUntilHighestConsumed)} " +
         log"cachedUntilProduced=${MDC(LogKey.CACHE_UNTIL_LAST_PRODUCED_SIZE, 
cachedSizeUntilLastProduced)} " +
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala
index 3763aeadea0e..abb8a43e3732 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala
@@ -32,7 +32,8 @@ import okhttp3.OkHttpClient
 import org.apache.spark.SparkConf
 import org.apache.spark.annotation.{DeveloperApi, Since, Stable}
 import org.apache.spark.deploy.k8s.Config._
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.K8S_CONTEXT
 import org.apache.spark.internal.config.ConfigEntry
 import org.apache.spark.util.ThreadUtils
 
@@ -84,9 +85,9 @@ object SparkKubernetesClientFactory extends Logging {
 
     // Allow for specifying a context used to auto-configure from the users 
K8S config file
     val kubeContext = sparkConf.get(KUBERNETES_CONTEXT).filter(_.nonEmpty)
-    logInfo("Auto-configuring K8S client using " +
-      kubeContext.map("context " + _).getOrElse("current context") +
-      " from users K8S config file")
+    logInfo(log"Auto-configuring K8S client using " +
+      log"${MDC(K8S_CONTEXT, kubeContext.map("context " + 
_).getOrElse("current context"))}" +
+      log" from users K8S config file")
 
     // if backoff limit is not set then set it to 3
     if 
(getSystemPropertyOrEnvVar(KUBERNETES_REQUEST_RETRY_BACKOFFLIMIT_SYSTEM_PROPERTY)
 == null) {
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
index 662f5ddbd4a7..c1cef2b84c71 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
@@ -31,7 +31,8 @@ import org.apache.spark.deploy.k8s._
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
 import org.apache.spark.deploy.k8s.KubernetesUtils.addOwnerReference
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{APP_ID, APP_NAME, SUBMISSION_ID}
 import org.apache.spark.util.Utils
 
 /**
@@ -203,8 +204,9 @@ private[spark] class Client(
         }
       }
     } else {
-      logInfo(s"Deployed Spark application ${conf.appName} with application ID 
${conf.appId} " +
-        s"and submission ID $sId into Kubernetes")
+      logInfo(log"Deployed Spark application ${MDC(APP_NAME, conf.appName)} 
with " +
+        log"application ID ${MDC(APP_ID, conf.appId)} and " +
+        log"submission ID ${MDC(SUBMISSION_ID, sId)} into Kubernetes")
     }
   }
 }
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientUtils.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientUtils.scala
index beb7ff6bfe22..51f6a2079bf7 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientUtils.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientUtils.scala
@@ -141,8 +141,8 @@ private[spark] object KubernetesClientUtils extends Logging 
{
         }
       }
       if (truncatedMap.nonEmpty) {
-        logInfo(s"Spark configuration files loaded from $confDir :" +
-          s" ${truncatedMap.keys.mkString(",")}")
+        logInfo(log"Spark configuration files loaded from ${MDC(PATH, 
confDir)} : " +
+          log"${MDC(PATHS, truncatedMap.keys.mkString(","))}")
       }
       if (skippedFiles.nonEmpty) {
         logWarning(log"Skipped conf file(s) ${MDC(PATHS, 
skippedFiles.mkString(","))}, due to " +
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala
index 3227a72a8371..2c6fb8079a17 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala
@@ -23,7 +23,8 @@ import io.fabric8.kubernetes.client.Watcher.Action
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.KubernetesDriverConf
 import org.apache.spark.deploy.k8s.KubernetesUtils._
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{APP_ID, APP_NAME, POD_PHASE, 
POD_STATE, STATUS, SUBMISSION_ID}
 
 private[k8s] trait LoggingPodStatusWatcher extends Watcher[Pod] {
   def watchOrStop(submissionId: String): Boolean
@@ -83,7 +84,8 @@ private[k8s] class LoggingPodStatusWatcherImpl(conf: 
KubernetesDriverConf)
   }
 
   private def logLongStatus(): Unit = {
-    logInfo("State changed, new state: " + 
pod.map(formatPodState).getOrElse("unknown"))
+    logInfo(log"State changed, new state: " +
+      log"${MDC(POD_STATE, pod.map(formatPodState).getOrElse("unknown"))}")
   }
 
   private def hasCompleted(): Boolean = {
@@ -96,22 +98,22 @@ private[k8s] class LoggingPodStatusWatcherImpl(conf: 
KubernetesDriverConf)
   }
 
   override def watchOrStop(sId: String): Boolean = {
-    logInfo(s"Waiting for application ${conf.appName} with application ID 
$appId " +
-      s"and submission ID $sId to finish...")
+    logInfo(log"Waiting for application ${MDC(APP_NAME, conf.appName)}} with 
application ID " +
+      log"${MDC(APP_ID, appId)} and submission ID ${MDC(SUBMISSION_ID, sId)} 
to finish...")
     val interval = conf.get(REPORT_INTERVAL)
     synchronized {
       while (!podCompleted && !resourceTooOldReceived) {
         wait(interval)
-        logInfo(s"Application status for $appId (phase: $phase)")
+        logInfo(log"Application status for ${MDC(APP_ID, appId)} (phase: 
${MDC(POD_PHASE, phase)})")
       }
     }
 
     if(podCompleted) {
       logInfo(
-        pod.map { p => s"Container final 
statuses:\n\n${containersDescription(p)}" }
-          .getOrElse("No containers were found in the driver pod."))
-      logInfo(s"Application ${conf.appName} with application ID $appId " +
-        s"and submission ID $sId finished")
+        pod.map { p => log"Container final statuses:\n\n${MDC(STATUS, 
containersDescription(p))}" }
+          .getOrElse(log"No containers were found in the driver pod."))
+      logInfo(log"Application ${MDC(APP_NAME, conf.appName)} with application 
ID " +
+        log"${MDC(APP_ID, appId)} and submission ID ${MDC(SUBMISSION_ID, sId)} 
finished")
     }
     podCompleted
   }
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
index a48e1fba9954..eb70a509d1e7 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
@@ -343,7 +343,8 @@ class ExecutorPodsAllocator(
         val toDelete = newlyCreatedToDelete ++ pendingToDelete
 
         if (toDelete.nonEmpty) {
-          logInfo(s"Deleting ${toDelete.size} excess pod requests 
(${toDelete.mkString(",")}).")
+          logInfo(log"Deleting ${MDC(LogKey.COUNT, toDelete.size)} excess pod 
requests " +
+            log"(${MDC(LogKey.RESOURCE_PROFILE_IDS, 
toDelete.mkString(","))}).")
           _deletedExecutorIds = _deletedExecutorIds ++ toDelete
 
           Utils.tryLogNonFatalError {
@@ -397,9 +398,11 @@ class ExecutorPodsAllocator(
         val numMissingPodsForRpId = targetNum - podCountForRpId
         val numExecutorsToAllocate =
           math.min(math.min(numMissingPodsForRpId, podAllocationSize), 
sharedSlotFromPendingPods)
-        logInfo(s"Going to request $numExecutorsToAllocate executors from 
Kubernetes for " +
-          s"ResourceProfile Id: $rpId, target: $targetNum, known: 
$podCountForRpId, " +
-          s"sharedSlotFromPendingPods: $sharedSlotFromPendingPods.")
+        logInfo(log"Going to request ${MDC(LogKey.COUNT, 
numExecutorsToAllocate)} executors from " +
+          log"Kubernetes for ResourceProfile Id: 
${MDC(LogKey.RESOURCE_PROFILE_ID, rpId)}, " +
+          log"target: ${MDC(LogKey.POD_TARGET_COUNT, targetNum)}, " +
+          log"known: ${MDC(LogKey.POD_COUNT, podCountForRpId)}, 
sharedSlotFromPendingPods: " +
+          log"${MDC(LogKey.POD_SHARED_SLOT_COUNT, 
sharedSlotFromPendingPods)}.")
         requestNewExecutors(numExecutorsToAllocate, applicationId, rpId, 
k8sKnownPVCNames)
       }
     }
@@ -428,7 +431,8 @@ class ExecutorPodsAllocator(
           .filterNot(pvc => pvcsInUse.contains(pvc.getMetadata.getName))
           .filter(pvc => now - 
Instant.parse(pvc.getMetadata.getCreationTimestamp).toEpochMilli
             > podAllocationDelay)
-        logInfo(s"Found ${reusablePVCs.size} reusable PVCs from 
${createdPVCs.size} PVCs")
+        logInfo(log"Found ${MDC(LogKey.COUNT, reusablePVCs.size)} reusable 
PVCs from " +
+          log"${MDC(LogKey.TOTAL, createdPVCs.size)} PVCs")
         reusablePVCs
       } catch {
         case _: KubernetesClientException =>
@@ -449,7 +453,8 @@ class ExecutorPodsAllocator(
     val reusablePVCs = getReusablePVCs(applicationId, pvcsInUse)
     for ( _ <- 0 until numExecutorsToAllocate) {
       if (reusablePVCs.isEmpty && podAllocOnPVC && maxPVCs <= 
PVC_COUNTER.get()) {
-        logInfo(s"Wait to reuse one of the existing ${PVC_COUNTER.get()} 
PVCs.")
+        logInfo(
+          log"Wait to reuse one of the existing ${MDC(LogKey.COUNT, 
PVC_COUNTER.get())} PVCs.")
         return
       }
       val newExecutorId = EXECUTOR_ID_COUNTER.incrementAndGet()
@@ -480,8 +485,9 @@ class ExecutorPodsAllocator(
               addOwnerReference(driverPod.get, Seq(resource))
             }
             val pvc = resource.asInstanceOf[PersistentVolumeClaim]
-            logInfo(s"Trying to create PersistentVolumeClaim 
${pvc.getMetadata.getName} with " +
-              s"StorageClass ${pvc.getSpec.getStorageClassName}")
+            logInfo(log"Trying to create PersistentVolumeClaim " +
+              log"${MDC(LogKey.PVC_METADATA_NAME, pvc.getMetadata.getName)} 
with " +
+              log"StorageClass ${MDC(LogKey.CLASS_NAME, 
pvc.getSpec.getStorageClassName)}")
             
kubernetesClient.persistentVolumeClaims().inNamespace(namespace).resource(pvc).create()
             PVC_COUNTER.incrementAndGet()
           }
@@ -519,7 +525,8 @@ class ExecutorPodsAllocator(
           if (volume.nonEmpty) {
             val matchedPVC = reusablePVCs.remove(index)
             replacedResources.add(pvc)
-            logInfo(s"Reuse PersistentVolumeClaim 
${matchedPVC.getMetadata.getName}")
+            logInfo(log"Reuse PersistentVolumeClaim " +
+              log"${MDC(LogKey.PVC_METADATA_NAME, 
matchedPVC.getMetadata.getName)}")
             
volume.get.getPersistentVolumeClaim.setClaimName(matchedPVC.getMetadata.getName)
           }
         }
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
index 5590311bf661..212659d48be8 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
@@ -30,7 +30,8 @@ import org.apache.spark.SparkConf
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
 import org.apache.spark.deploy.k8s.KubernetesUtils._
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.EXECUTOR_ID
 import org.apache.spark.scheduler.ExecutorExited
 import org.apache.spark.util.Utils
 
@@ -99,8 +100,9 @@ private[spark] class ExecutorPodsLifecycleManager(
             if (onFinalNonDeletedState(succeeded, execId, schedulerBackend, 
deleteFromK8s)) {
               execIdsRemovedInThisRound += execId
               if (schedulerBackend.isExecutorActive(execId.toString)) {
-                logInfo(s"Snapshot reported succeeded executor with id 
$execId, " +
-                  "even though the application has not requested for it to be 
removed.")
+                logInfo(log"Snapshot reported succeeded executor with id " +
+                  log"${MDC(EXECUTOR_ID, execId)}, even though the application 
has not " +
+                  log"requested for it to be removed.")
               } else {
                 logDebug(s"Snapshot reported succeeded executor with id 
$execId," +
                   s" pod name ${state.pod.getMetadata.getName}.")
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorRollPlugin.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorRollPlugin.scala
index 1c0de8e2afde..2a09aebbeb20 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorRollPlugin.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorRollPlugin.scala
@@ -27,7 +27,7 @@ import org.apache.spark.api.plugin.{DriverPlugin, 
ExecutorPlugin, PluginContext,
 import org.apache.spark.deploy.k8s.Config.{EXECUTOR_ROLL_INTERVAL, 
EXECUTOR_ROLL_POLICY, ExecutorRollPolicy, 
MINIMUM_TASKS_PER_EXECUTOR_BEFORE_ROLLING}
 import org.apache.spark.executor.ExecutorMetrics
 import org.apache.spark.internal.{Logging, MDC}
-import org.apache.spark.internal.LogKey.{CLASS_NAME, CONFIG, INTERVAL}
+import org.apache.spark.internal.LogKey.{CLASS_NAME, CONFIG, EXECUTOR_ID, 
INTERVAL}
 import org.apache.spark.internal.config.DECOMMISSION_ENABLED
 import org.apache.spark.scheduler.ExecutorDecommissionInfo
 import org.apache.spark.sql.catalyst.util.DateTimeConstants.MILLIS_PER_SECOND
@@ -82,7 +82,7 @@ class ExecutorRollDriverPlugin extends DriverPlugin with 
Logging {
               choose(executorSummaryList, policy) match {
                 case Some(id) =>
                   // Use decommission to be safe.
-                  logInfo(s"Ask to decommission executor $id")
+                  logInfo(log"Ask to decommission executor ${MDC(EXECUTOR_ID, 
id)}")
                   val now = System.currentTimeMillis()
                   scheduler.decommissionExecutor(
                     id,
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
index 3235d922204b..26355b749d54 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
@@ -25,7 +25,8 @@ import org.apache.spark.{SparkConf, SparkContext, 
SparkMasterRegex}
 import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesUtils, 
SparkKubernetesClientFactory}
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants.DEFAULT_EXECUTOR_CONTAINER_NAME
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.MASTER_URL
 import org.apache.spark.internal.config.TASK_MAX_FAILURES
 import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, 
TaskScheduler, TaskSchedulerImpl}
 import org.apache.spark.scheduler.local.LocalSchedulerBackend
@@ -61,7 +62,7 @@ private[spark] class KubernetesClusterManager extends 
ExternalClusterManager wit
           if (threads == "*") localCpuCount else threads.toInt
         case _ => 1
       }
-      logInfo(s"Running Spark with 
${sc.conf.get(KUBERNETES_DRIVER_MASTER_URL)}")
+      logInfo(log"Running Spark with ${MDC(MASTER_URL, 
sc.conf.get(KUBERNETES_DRIVER_MASTER_URL))}")
       val schedulerImpl = scheduler.asInstanceOf[TaskSchedulerImpl]
       // KubernetesClusterSchedulerBackend respects `spark.app.id` while 
LocalSchedulerBackend
       // does not. Propagate `spark.app.id` via `spark.test.appId` to match 
the behavior.
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
index daf8d5e3f58a..8a8278b9d076 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
@@ -32,6 +32,8 @@ import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
 import org.apache.spark.deploy.k8s.submit.KubernetesClientUtils
 import org.apache.spark.deploy.security.HadoopDelegationTokenManager
+import org.apache.spark.internal.LogKey.{COUNT, HOST_PORT, TOTAL}
+import org.apache.spark.internal.MDC
 import 
org.apache.spark.internal.config.SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO
 import org.apache.spark.resource.ResourceProfile
 import org.apache.spark.rpc.{RpcAddress, RpcCallContext}
@@ -255,9 +257,10 @@ private[spark] class KubernetesClusterSchedulerBackend(
           .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
           .withLabelIn(SPARK_EXECUTOR_ID_LABEL, executorIds: _*)
 
-        if (!running.list().getItems().isEmpty()) {
-          logInfo(s"Forcefully deleting ${running.list().getItems().size()} 
pods " +
-            s"(out of ${executorIds.size}) that are still running after 
graceful shutdown period.")
+        if (!running.list().getItems.isEmpty) {
+          logInfo(log"Forcefully deleting ${MDC(COUNT, 
running.list().getItems.size())} pods " +
+            log"(out of ${MDC(TOTAL, executorIds.size)}) that are still 
running after graceful " +
+            log"shutdown period.")
           running.delete()
         }
       }
@@ -353,7 +356,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
               execIDRequester -= rpcAddress
               // Expected, executors re-establish a connection with an ID
             case _ =>
-              logInfo(s"No executor found for ${rpcAddress}")
+              logInfo(log"No executor found for ${MDC(HOST_PORT, rpcAddress)}")
           }
       }
     }
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleExecutorComponents.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleExecutorComponents.scala
index 376218df5770..8acd2b5f2aba 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleExecutorComponents.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleExecutorComponents.scala
@@ -27,7 +27,7 @@ import org.apache.commons.io.FileExistsException
 
 import org.apache.spark.{SparkConf, SparkEnv}
 import org.apache.spark.deploy.k8s.Config.KUBERNETES_DRIVER_REUSE_PVC
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, LogKey, MDC}
 import org.apache.spark.internal.config.{SHUFFLE_CHECKSUM_ALGORITHM, 
SHUFFLE_CHECKSUM_ENABLED}
 import org.apache.spark.shuffle.ShuffleChecksumUtils.{compareChecksums, 
getChecksumFileName}
 import org.apache.spark.shuffle.api.{ShuffleExecutorComponents, 
ShuffleMapOutputWriter, SingleSpillShuffleMapOutputWriter}
@@ -54,7 +54,8 @@ class KubernetesLocalDiskShuffleExecutorComponents(sparkConf: 
SparkConf)
         
KubernetesLocalDiskShuffleExecutorComponents.recoverDiskStore(sparkConf, 
blockManager)
       }
     } else {
-      logInfo(s"Skip recovery because ${KUBERNETES_DRIVER_REUSE_PVC.key} is 
disabled.")
+      logInfo(log"Skip recovery because ${MDC(LogKey.CONFIG, 
KUBERNETES_DRIVER_REUSE_PVC.key)} " +
+        log"is disabled.")
     }
   }
 
@@ -94,20 +95,23 @@ object KubernetesLocalDiskShuffleExecutorComponents extends 
Logging {
       .partition(_.getName.contains(".checksum"))
     val (indexFiles, dataFiles) = files.partition(_.getName.endsWith(".index"))
 
-    logInfo(s"Found ${dataFiles.size} data files, ${indexFiles.size} index 
files, " +
-        s"and ${checksumFiles.size} checksum files.")
+    logInfo(log"Found ${MDC(LogKey.DATA_FILE_NUM, dataFiles.length)} data 
files, " +
+      log"${MDC(LogKey.INDEX_FILE_NUM, indexFiles.length)} index files, " +
+      log"and ${MDC(LogKey.CHECKSUM_FILE_NUM, checksumFiles.length)} checksum 
files.")
 
     // Build a hashmap with checksum file name as a key
     val checksumFileMap = new mutable.HashMap[String, File]()
     val algorithm = conf.get(SHUFFLE_CHECKSUM_ALGORITHM)
     checksumFiles.foreach { f =>
-      logInfo(s"${f.getName} -> ${f.getAbsolutePath}")
+      logInfo(log"${MDC(LogKey.FILE_NAME, f.getName)} -> " +
+        log"${MDC(LogKey.FILE_ABSOLUTE_PATH, f.getAbsolutePath)}")
       checksumFileMap.put(f.getName, f)
     }
     // Build a hashmap with shuffle data file name as a key
     val indexFileMap = new mutable.HashMap[String, File]()
     indexFiles.foreach { f =>
-      logInfo(s"${f.getName.replace(".index", ".data")} -> 
${f.getAbsolutePath}")
+      logInfo(log"${MDC(LogKey.FILE_NAME, f.getName.replace(".index", 
".data"))} -> " +
+        log"${MDC(LogKey.FILE_ABSOLUTE_PATH, f.getAbsolutePath)}")
       indexFileMap.put(f.getName.replace(".index", ".data"), f)
     }
 
@@ -116,7 +120,7 @@ object KubernetesLocalDiskShuffleExecutorComponents extends 
Logging {
     val level = StorageLevel.DISK_ONLY
     val checksumDisabled = !conf.get(SHUFFLE_CHECKSUM_ENABLED)
     (dataFiles ++ indexFiles).foreach { f =>
-      logInfo(s"Try to recover ${f.getAbsolutePath}")
+      logInfo(log"Try to recover ${MDC(LogKey.FILE_ABSOLUTE_PATH, 
f.getAbsolutePath)}")
       try {
         val id = BlockId(f.getName)
         // To make it sure to handle only shuffle blocks
@@ -129,7 +133,8 @@ object KubernetesLocalDiskShuffleExecutorComponents extends 
Logging {
             val decryptedSize = f.length()
             bm.TempFileBasedBlockStoreUpdater(id, level, classTag, f, 
decryptedSize).save()
           } else {
-            logInfo(s"Ignore ${f.getAbsolutePath} due to the verification 
failure.")
+            logInfo(log"Ignore ${MDC(LogKey.FILE_ABSOLUTE_PATH, 
f.getAbsolutePath)} " +
+              log"due to the verification failure.")
           }
         } else {
           logInfo("Ignore a non-shuffle block file.")
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index eb944244fc9d..19e918c4ff2a 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -43,8 +43,7 @@ import org.apache.spark.deploy.{ExecutorFailureTracker, 
SparkHadoopUtil}
 import org.apache.spark.deploy.history.HistoryServer
 import org.apache.spark.deploy.security.HadoopDelegationTokenManager
 import org.apache.spark.deploy.yarn.config._
-import org.apache.spark.internal.{Logging, MDC}
-import org.apache.spark.internal.LogKey.{EXIT_CODE, FAILURES, HOST_PORT}
+import org.apache.spark.internal.{Logging, LogKey, MDC}
 import org.apache.spark.internal.config._
 import org.apache.spark.internal.config.UI._
 import org.apache.spark.metrics.{MetricsSystem, MetricsSystemInstances}
@@ -220,7 +219,7 @@ private[spark] class ApplicationMaster(
         "APPMASTER", sparkConf.get(APP_CALLER_CONTEXT),
         Option(appAttemptId.getApplicationId.toString), 
attemptID).setCurrentContext()
 
-      logInfo("ApplicationAttemptId: " + appAttemptId)
+      logInfo(log"ApplicationAttemptId: ${MDC(LogKey.APP_ATTEMPT_ID, 
appAttemptId)}")
 
       // During shutdown, we may not be able to create an FileSystem object. 
So, pre-create here.
       val stagingDirPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR"))
@@ -368,8 +367,9 @@ private[spark] class ApplicationMaster(
   final def unregister(status: FinalApplicationStatus, diagnostics: String = 
null): Unit = {
     synchronized {
       if (registered && !unregistered) {
-        logInfo(s"Unregistering ApplicationMaster with $status" +
-          Option(diagnostics).map(msg => s" (diag message: 
$msg)").getOrElse(""))
+        logInfo(log"Unregistering ApplicationMaster with 
${MDC(LogKey.APP_STATE, status)}" +
+          Option(diagnostics).map(
+            msg => log" (diag message: ${MDC(LogKey.MESSAGE, 
msg)})").getOrElse(log""))
         unregistered = true
         client.unregister(status, Option(diagnostics).getOrElse(""))
       }
@@ -387,8 +387,9 @@ private[spark] class ApplicationMaster(
           finalStatus = FinalApplicationStatus.FAILED
           exitCode = ApplicationMaster.EXIT_SC_NOT_INITED
         }
-        logInfo(s"Final app status: $finalStatus, exitCode: $exitCode" +
-          Option(msg).map(msg => s", (reason: $msg)").getOrElse(""))
+        logInfo(log"Final app status: ${MDC(LogKey.APP_STATE, finalStatus)}, " 
+
+          log"exitCode: ${MDC(LogKey.EXIT_CODE, exitCode)}" +
+          Option(msg).map(msg => log", (reason: ${MDC(LogKey.REASON, 
msg)})").getOrElse(log""))
         finalMsg = ComStrUtils.abbreviate(msg, 
sparkConf.get(AM_FINAL_MSG_LIMIT).toInt)
         finished = true
         if (!inShutdown && Thread.currentThread() != reporterThread && 
reporterThread != null) {
@@ -481,8 +482,8 @@ private[spark] class ApplicationMaster(
     // the allocator is ready to service requests.
     rpcEnv.setupEndpoint("YarnAM", new AMEndpoint(rpcEnv, driverRef))
     if (_sparkConf.get(SHUFFLE_SERVICE_ENABLED)) {
-      logInfo("Initializing service data for shuffle service using name '" +
-        s"${_sparkConf.get(SHUFFLE_SERVICE_NAME)}'")
+      logInfo(log"Initializing service data for shuffle service using name '" +
+        log"${MDC(LogKey.SHUFFLE_SERVICE_NAME, 
_sparkConf.get(SHUFFLE_SERVICE_NAME))}'")
     }
     allocator.allocateResources()
     val ms = 
MetricsSystem.createMetricsSystem(MetricsSystemInstances.APPLICATION_MASTER, 
sparkConf)
@@ -598,7 +599,7 @@ private[spark] class ApplicationMaster(
                 s"$failureCount time(s) from Reporter thread.")
           } else {
             logWarning(
-              log"Reporter thread fails ${MDC(FAILURES, failureCount)} time(s) 
in a row.", e)
+              log"Reporter thread fails ${MDC(LogKey.FAILURES, failureCount)} 
time(s) in a row.", e)
           }
       }
       try {
@@ -656,8 +657,9 @@ private[spark] class ApplicationMaster(
     t.setDaemon(true)
     t.setName("Reporter")
     t.start()
-    logInfo(s"Started progress reporter thread with (heartbeat : 
$heartbeatInterval, " +
-            s"initial allocation : $initialAllocationInterval) intervals")
+    logInfo(log"Started progress reporter thread with " +
+      log"(heartbeat: ${MDC(LogKey.HEARTBEAT_INTERVAL, heartbeatInterval)}, 
initial allocation: " +
+      log"${MDC(LogKey.INITIAL_HEARTBEAT_INTERVAL, 
initialAllocationInterval)}) intervals")
     t
   }
 
@@ -683,7 +685,7 @@ private[spark] class ApplicationMaster(
     try {
       val preserveFiles = sparkConf.get(PRESERVE_STAGING_FILES)
       if (!preserveFiles) {
-        logInfo("Deleting staging directory " + stagingDirPath)
+        logInfo(log"Deleting staging directory ${MDC(LogKey.PATH, 
stagingDirPath)}")
         fs.delete(stagingDirPath, true)
       }
     } catch {
@@ -748,7 +750,7 @@ private[spark] class ApplicationMaster(
                 // Reporter thread can interrupt to stop user class
               case SparkUserAppException(exitCode) =>
                 val msg = log"User application exited with status " +
-                  log"${MDC(EXIT_CODE, exitCode)}"
+                  log"${MDC(LogKey.EXIT_CODE, exitCode)}"
                 logError(msg)
                 finish(FinalApplicationStatus.FAILED, exitCode, msg.message)
               case cause: Throwable =>
@@ -831,7 +833,8 @@ private[spark] class ApplicationMaster(
         }
 
       case KillExecutors(executorIds) =>
-        logInfo(s"Driver requested to kill executor(s) 
${executorIds.mkString(", ")}.")
+        logInfo(log"Driver requested to kill executor(s) " +
+          log"${MDC(LogKey.EXECUTOR_IDS, executorIds.mkString(", "))}.")
         Option(allocator) match {
           case Some(a) => executorIds.foreach(a.killExecutor)
           case None => logWarning("Container allocator is not ready to kill 
executors yet.")
@@ -854,11 +857,12 @@ private[spark] class ApplicationMaster(
       if (!(isClusterMode || sparkConf.get(YARN_UNMANAGED_AM))) {
         if (shutdown || !clientModeTreatDisconnectAsFailed) {
           if (exitCode == 0) {
-            logInfo(s"Driver terminated or disconnected! Shutting down. 
$remoteAddress")
+            logInfo(log"Driver terminated or disconnected! Shutting down. " +
+              log"${MDC(LogKey.HOST_PORT, remoteAddress)}")
             finish(FinalApplicationStatus.SUCCEEDED, 
ApplicationMaster.EXIT_SUCCESS)
           } else {
-            logError(log"Driver terminated with exit code ${MDC(EXIT_CODE, 
exitCode)}! " +
-              log"Shutting down. ${MDC(HOST_PORT, remoteAddress)}")
+            logError(log"Driver terminated with exit code 
${MDC(LogKey.EXIT_CODE, exitCode)}! " +
+              log"Shutting down. ${MDC(LogKey.HOST_PORT, remoteAddress)}")
             finish(FinalApplicationStatus.FAILED, exitCode)
           }
         } else {
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index bed7c859003a..214a79559755 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -56,7 +56,6 @@ import org.apache.spark.deploy.yarn.ResourceRequestHelper._
 import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
 import org.apache.spark.deploy.yarn.config._
 import org.apache.spark.internal.{Logging, LogKey, MDC}
-import org.apache.spark.internal.LogKey.{APP_ID, CONFIG, CONFIG2, PATH}
 import org.apache.spark.internal.config._
 import org.apache.spark.internal.config.Python._
 import org.apache.spark.launcher.{JavaModuleOptions, LauncherBackend, 
SparkAppHandle, YarnCommandBuilderUtils}
@@ -142,7 +141,8 @@ private[spark] class Client(
     val principal = sparkConf.get(PRINCIPAL).orNull
     require((principal == null) == (keytab == null),
       "Both principal and keytab must be defined, or neither.")
-    logInfo(s"Kerberos credentials: principal = $principal, keytab = $keytab")
+    logInfo(log"Kerberos credentials: principal = ${MDC(LogKey.PRINCIPAL, 
principal)}, " +
+      log"keytab = ${MDC(LogKey.KEYTAB, keytab)}")
     // Generate a file name that can be used for the keytab file, that does 
not conflict
     // with any user file.
     Some(new File(keytab).getName() + "-" + UUID.randomUUID().toString)
@@ -229,7 +229,7 @@ private[spark] class Client(
       val appContext = createApplicationSubmissionContext(newApp, 
containerContext)
 
       // Finally, submit and monitor the application
-      logInfo(s"Submitting application $appId to ResourceManager")
+      logInfo(log"Submitting application ${MDC(LogKey.APP_ID, appId)} to 
ResourceManager")
       yarnClient.submitApplication(appContext)
       launcherBackend.setAppId(appId.toString)
       reportLauncherState(SparkAppHandle.State.SUBMITTED)
@@ -254,11 +254,11 @@ private[spark] class Client(
       try {
         val fs = stagingDirPath.getFileSystem(hadoopConf)
         if (fs.delete(stagingDirPath, true)) {
-          logInfo(s"Deleted staging directory $stagingDirPath")
+          logInfo(log"Deleted staging directory ${MDC(LogKey.PATH, 
stagingDirPath)}")
         }
       } catch {
         case ioe: IOException =>
-          logWarning(log"Failed to cleanup staging dir ${MDC(PATH, 
stagingDirPath)}", ioe)
+          logWarning(log"Failed to cleanup staging dir ${MDC(LogKey.PATH, 
stagingDirPath)}", ioe)
       }
     }
 
@@ -332,7 +332,7 @@ private[spark] class Client(
         appContext.setLogAggregationContext(logAggregationContext)
       } catch {
         case NonFatal(e) =>
-          logWarning(log"Ignoring ${MDC(CONFIG, 
ROLLED_LOG_INCLUDE_PATTERN.key)}} " +
+          logWarning(log"Ignoring ${MDC(LogKey.CONFIG, 
ROLLED_LOG_INCLUDE_PATTERN.key)}} " +
             log"because the version of YARN does not support it", e)
       }
     }
@@ -371,14 +371,16 @@ private[spark] class Client(
     // SPARK-37205: this regex is used to grep a list of configurations and 
send them to YARN RM
     // for fetching delegation tokens. See YARN-5910 for more details.
     sparkConf.get(config.AM_TOKEN_CONF_REGEX).foreach { regex =>
-      logInfo(s"Processing token conf (spark.yarn.am.tokenConfRegex) with 
regex $regex")
+      logInfo(log"Processing token conf (spark.yarn.am.tokenConfRegex) with " +
+        log"regex ${MDC(LogKey.TOKEN_REGEX, regex)}")
       val dob = new DataOutputBuffer()
       val copy = new Configuration(false)
       copy.clear()
       hadoopConf.asScala.foreach { entry =>
         if (entry.getKey.matches(regex)) {
           copy.set(entry.getKey, entry.getValue)
-          logInfo(s"Captured key: ${entry.getKey} -> value: ${entry.getValue}")
+          logInfo(log"Captured key: ${MDC(LogKey.KEY, entry.getKey)} -> " +
+            log"value: ${MDC(LogKey.VALUE, entry.getValue)}")
         }
       }
       copy.write(dob);
@@ -403,8 +405,8 @@ private[spark] class Client(
    */
   private def verifyClusterResources(newAppResponse: 
GetNewApplicationResponse): Unit = {
     val maxMem = newAppResponse.getMaximumResourceCapability.getMemorySize
-    logInfo("Verifying our application has not requested more than the maximum 
" +
-      s"memory capability of the cluster ($maxMem MB per container)")
+    logInfo(log"Verifying our application has not requested more than the 
maximum memory " +
+      log"capability of the cluster (${MDC(LogKey.MAX_MEMORY_SIZE, maxMem)} MB 
per container)")
     val executorMem =
       executorMemory + executorOffHeapMemory + executorMemoryOverhead + 
pysparkWorkerMemory
     if (executorMem > maxMem) {
@@ -421,9 +423,8 @@ private[spark] class Client(
         "Please check the values of 'yarn.scheduler.maximum-allocation-mb' 
and/or " +
         "'yarn.nodemanager.resource.memory-mb'.")
     }
-    logInfo("Will allocate AM container, with %d MB memory including %d MB 
overhead".format(
-      amMem,
-      amMemoryOverhead))
+    logInfo(log"Will allocate AM container, with ${MDC(LogKey.MEMORY_SIZE, 
amMem)} MB memory " +
+      log"including ${MDC(LogKey.OVERHEAD_MEMORY_SIZE, amMemoryOverhead)} MB 
overhead")
 
     // We could add checks to make sure the entire cluster has enough 
resources but that involves
     // getting all the node reports and computing ourselves.
@@ -447,7 +448,8 @@ private[spark] class Client(
     var destPath = srcPath
     if (force || !compareFs(srcFs, destFs) || "file".equals(srcFs.getScheme)) {
       destPath = new Path(destDir, destName.getOrElse(srcPath.getName()))
-      logInfo(s"Uploading resource $srcPath -> $destPath")
+      logInfo(log"Uploading resource ${MDC(LogKey.SRC_PATH, srcPath)} -> " +
+        log"${MDC(LogKey.TARGET_PATH, destPath)}")
       try {
         FileUtil.copy(srcFs, srcPath, destFs, destPath, false, hadoopConf)
       } catch {
@@ -458,7 +460,8 @@ private[spark] class Client(
       replication.foreach(repl => destFs.setReplication(destPath, repl))
       destFs.setPermission(destPath, new FsPermission(APP_FILE_PERMISSION))
     } else {
-      logInfo(s"Source and destination file systems are the same. Not copying 
$srcPath")
+      logInfo(log"Source and destination file systems are the same. " +
+        log"Not copying ${MDC(LogKey.SRC_PATH, srcPath)}")
     }
     // Resolve any symlinks in the URI path so using a "current" symlink to 
point to a specific
     // version shows the specific version in the distributed cache 
configuration
@@ -701,8 +704,9 @@ private[spark] class Client(
         case None =>
           // No configuration, so fall back to uploading local jar files.
           logWarning(
-            log"Neither ${MDC(CONFIG, SPARK_JARS.key)} nor ${MDC(CONFIG2, 
SPARK_ARCHIVE.key)}} " +
-            log"is set, falling back to uploading libraries under SPARK_HOME.")
+            log"Neither ${MDC(LogKey.CONFIG, SPARK_JARS.key)} nor " +
+              log"${MDC(LogKey.CONFIG2, SPARK_ARCHIVE.key)}} is set, falling 
back to uploading " +
+              log"libraries under SPARK_HOME.")
           val jarsDir = new File(YarnCommandBuilderUtils.findJarsDir(
             sparkConf.getenv("SPARK_HOME")))
           val jarsArchive = File.createTempFile(LOCALIZED_LIB_DIR, ".zip",
@@ -881,7 +885,7 @@ private[spark] class Client(
         if (dir.isDirectory()) {
           val files = dir.listFiles()
           if (files == null) {
-            logWarning(log"Failed to list files under directory ${MDC(PATH, 
dir)}")
+            logWarning(log"Failed to list files under directory 
${MDC(LogKey.PATH, dir)}")
           } else {
             files.foreach { file =>
               if (file.isFile && !hadoopConfFiles.contains(file.getName())) {
@@ -1070,7 +1074,8 @@ private[spark] class Client(
           sparkConf))
       }
       if (sparkConf.get(AM_JAVA_OPTIONS).isDefined) {
-        logWarning(log"${MDC(CONFIG, AM_JAVA_OPTIONS.key)} will not take 
effect in cluster mode")
+        logWarning(log"${MDC(LogKey.CONFIG, AM_JAVA_OPTIONS.key)} will not 
take effect " +
+          log"in cluster mode")
       }
     } else {
       // Validate and include yarn am specific java options in yarn-client 
mode.
@@ -1202,7 +1207,7 @@ private[spark] class Client(
           getApplicationReport()
         } catch {
           case e: ApplicationNotFoundException =>
-            logError(log"Application ${MDC(APP_ID, appId)} not found.")
+            logError(log"Application ${MDC(LogKey.APP_ID, appId)} not found.")
             cleanupStagingDir()
             return YarnAppReport(YarnApplicationState.KILLED, 
FinalApplicationStatus.KILLED, None)
           case NonFatal(e) if !e.isInstanceOf[InterruptedIOException] =>
@@ -1216,7 +1221,8 @@ private[spark] class Client(
       reportsSinceLastLog += 1
       if (logApplicationReport) {
         if (lastState != state || reportsSinceLastLog >= reportsTillNextLog) {
-          logInfo(s"Application report for $appId (state: $state)")
+          logInfo(log"Application report for ${MDC(LogKey.APP_ID, appId)} " +
+            log"(state: ${MDC(LogKey.APP_STATE, state)})")
           reportsSinceLastLog = 0
         }
 
@@ -1225,7 +1231,8 @@ private[spark] class Client(
         if (log.isDebugEnabled) {
           logDebug(formatReportDetails(report, getDriverLogsLink(report)))
         } else if (lastState != state) {
-          logInfo(formatReportDetails(report, getDriverLogsLink(report)))
+          logInfo(log"${MDC(LogKey.REPORT_DETAILS,
+            formatReportDetails(report, getDriverLogsLink(report)))}")
         }
       }
 
@@ -1347,7 +1354,7 @@ private[spark] class Client(
         .getOrElse(IMap.empty)
     } catch {
       case e: Exception =>
-        logWarning(log"Unable to get driver log links for ${MDC(APP_ID, 
appId)}: ", e)
+        logWarning(log"Unable to get driver log links for ${MDC(LogKey.APP_ID, 
appId)}: ", e)
         // Include the full stack trace only at DEBUG level to reduce verbosity
         logDebug(s"Unable to get driver log links for $appId", e)
         IMap.empty
@@ -1367,8 +1374,10 @@ private[spark] class Client(
     if (!launcherBackend.isConnected() && fireAndForget) {
       val report = getApplicationReport()
       val state = report.getYarnApplicationState
-      logInfo(s"Application report for $appId (state: $state)")
-      logInfo(formatReportDetails(report, getDriverLogsLink(report)))
+      logInfo(log"Application report for ${MDC(LogKey.APP_ID, appId)} " +
+        log"(state: ${MDC(LogKey.APP_STATE, state)})")
+      logInfo(log"${MDC(LogKey.REPORT_DETAILS,
+        formatReportDetails(report, getDriverLogsLink(report)))}")
       if (state == YarnApplicationState.FAILED || state == 
YarnApplicationState.KILLED) {
         throw new SparkException(s"Application $appId finished with status: 
$state")
       }
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index 202ef36166d2..81588b2fcdda 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -19,7 +19,8 @@ package org.apache.spark.deploy.yarn
 
 import scala.collection.mutable.ArrayBuffer
 
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.ARGS
 
 // TODO: Add code and support for ensuring that yarn resource 'tasks' are 
location aware !
 private[spark] class ClientArguments(args: Array[String]) extends Logging {
@@ -75,7 +76,7 @@ private[spark] class ClientArguments(args: Array[String]) 
extends Logging {
     }
 
     if (verbose) {
-      logInfo(s"Parsed user args for YARN application: [${userArgs.mkString(" 
")}]")
+      logInfo(log"Parsed user args for YARN application: [${MDC(ARGS, 
userArgs.mkString(" "))}]")
     }
   }
 
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index 81b210a2297a..035f02f16fbe 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -38,7 +38,8 @@ import org.apache.hadoop.yarn.ipc.YarnRPC
 import org.apache.hadoop.yarn.util.Records
 
 import org.apache.spark.{SecurityManager, SparkConf, SparkException}
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC, MessageWithContext}
+import org.apache.spark.internal.LogKey.{EXECUTOR_ENVS, 
EXECUTOR_LAUNCH_COMMANDS, EXECUTOR_RESOURCES}
 import org.apache.spark.internal.config._
 import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.util.Utils
@@ -68,21 +69,23 @@ private[yarn] class ExecutorRunnable(
     startContainer()
   }
 
-  def launchContextDebugInfo(): String = {
+  def launchContextDebugInfo(): MessageWithContext = {
     val commands = prepareCommand()
     val env = prepareEnvironment()
 
-    s"""
-    
|===============================================================================
-    |Default YARN executor launch context:
-    |  env:
-    |${Utils.redact(sparkConf, env.toSeq).map { case (k, v) => s"    $k -> 
$v\n" }.mkString}
-    |  command:
-    |    ${Utils.redactCommandLineArgs(sparkConf, commands).mkString(" \\ \n   
   ")}
-    |
-    |  resources:
-    |${localResources.map { case (k, v) => s"    $k -> $v\n" }.mkString}
-    
|===============================================================================""".stripMargin
+    // scalastyle:off line.size.limit
+    log"""
+        
|===============================================================================
+        |Default YARN executor launch context:
+        |  env:
+        |${MDC(EXECUTOR_ENVS, Utils.redact(sparkConf, env.toSeq).map { case 
(k, v) => s"    $k -> $v\n" }.mkString)}
+        |  command:
+        |    ${MDC(EXECUTOR_LAUNCH_COMMANDS, 
Utils.redactCommandLineArgs(sparkConf, commands).mkString(" \\ \n      "))}
+        |
+        |  resources:
+        |${MDC(EXECUTOR_RESOURCES, localResources.map { case (k, v) => s"    
$k -> $v\n" }.mkString)}
+        
|===============================================================================""".stripMargin
+    // scalastyle:on line.size.limit
   }
 
   def startContainer(): java.util.Map[String, ByteBuffer] = {
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/SparkRackResolver.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/SparkRackResolver.scala
index e0d66af348e2..16f7581c7af4 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/SparkRackResolver.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/SparkRackResolver.scala
@@ -29,7 +29,8 @@ import org.apache.hadoop.yarn.util.RackResolver
 import org.apache.logging.log4j.{Level, LogManager}
 import org.apache.logging.log4j.core.Logger
 
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.NODE_LOCATION
 
 /**
  * Re-implement YARN's [[RackResolver]] for hadoop releases without YARN-9332.
@@ -77,8 +78,8 @@ private[spark] class SparkRackResolver(conf: Configuration) 
extends Logging {
     val rNameList = dnsToSwitchMapping.resolve(hostNames.toList.asJava).asScala
     if (rNameList == null || rNameList.isEmpty) {
       hostNames.foreach(nodes += new NodeBase(_, NetworkTopology.DEFAULT_RACK))
-      logInfo(s"Got an error when resolving hostNames. " +
-        s"Falling back to ${NetworkTopology.DEFAULT_RACK} for all")
+      logInfo(log"Got an error when resolving hostNames. " +
+        log"Falling back to ${MDC(NODE_LOCATION, 
NetworkTopology.DEFAULT_RACK)} for all")
     } else {
       for ((hostName, rName) <- hostNames.zip(rNameList)) {
         if (Strings.isNullOrEmpty(rName)) {
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index efe766be8356..b410ea2eb44d 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -41,7 +41,7 @@ import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
 import org.apache.spark.deploy.yarn.config._
 import org.apache.spark.executor.ExecutorExitCode
 import org.apache.spark.internal.{Logging, MDC}
-import org.apache.spark.internal.LogKey.{APP_STATE, CONFIG, CONFIG2, CONFIG3, 
CONTAINER_ID, ERROR, EXECUTOR_ID, HOST, REASON}
+import org.apache.spark.internal.LogKey
 import org.apache.spark.internal.config._
 import org.apache.spark.resource.ResourceProfile
 import org.apache.spark.resource.ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
@@ -195,7 +195,8 @@ private[yarn] class YarnAllocator(
       case (true, false) => true
       case (true, true) =>
         logWarning(log"Yarn Executor Decommissioning is supported only " +
-          log"when ${MDC(CONFIG, SHUFFLE_SERVICE_ENABLED.key)} is set to 
false. See: SPARK-39018.")
+          log"when ${MDC(LogKey.CONFIG, SHUFFLE_SERVICE_ENABLED.key)} is set 
to false. " +
+          log"See: SPARK-39018.")
         false
       case (false, _) => false
     }
@@ -313,7 +314,8 @@ private[yarn] class YarnAllocator(
     if (!rpIdToYarnResource.containsKey(rp.id)) {
       // track the resource profile if not already there
       getOrUpdateRunningExecutorForRPId(rp.id)
-      logInfo(s"Resource profile ${rp.id} doesn't exist, adding it")
+      logInfo(log"Resource profile ${MDC(LogKey.RESOURCE_PROFILE_ID, rp.id)} 
doesn't exist, " +
+        log"adding it")
 
       val resourcesWithDefaults =
         ResourceProfile.getResourcesForClusterManager(rp.id, 
rp.executorResources,
@@ -399,8 +401,8 @@ private[yarn] class YarnAllocator(
       val res = resourceProfileToTotalExecs.map { case (rp, numExecs) =>
         createYarnResourceForResourceProfile(rp)
         if (numExecs != getOrUpdateTargetNumExecutorsForRPId(rp.id)) {
-          logInfo(s"Driver requested a total number of $numExecs executor(s) " 
+
-            s"for resource profile id: ${rp.id}.")
+          logInfo(log"Driver requested a total number of ${MDC(LogKey.COUNT, 
numExecs)} " +
+            log"executor(s) for resource profile id: 
${MDC(LogKey.RESOURCE_PROFILE_ID, rp.id)}.")
           targetNumExecutorsPerResourceProfileId(rp.id) = numExecs
           allocatorNodeHealthTracker.setSchedulerExcludedNodes(excludedNodes)
           true
@@ -421,7 +423,8 @@ private[yarn] class YarnAllocator(
         val (_, rpId) = 
containerIdToExecutorIdAndResourceProfileId(container.getId)
         internalReleaseContainer(container)
         getOrUpdateRunningExecutorForRPId(rpId).remove(executorId)
-      case _ => logWarning(log"Attempted to kill unknown executor 
${MDC(EXECUTOR_ID, executorId)}!")
+      case _ => logWarning(log"Attempted to kill unknown executor " +
+        log"${MDC(LogKey.EXECUTOR_ID, executorId)}!")
     }
   }
 
@@ -520,12 +523,13 @@ private[yarn] class YarnAllocator(
       if (missing > 0) {
         val resource = rpIdToYarnResource.get(rpId)
         if (log.isInfoEnabled()) {
-          var requestContainerMessage = s"Will request $missing executor 
container(s) for " +
-            s" ResourceProfile Id: $rpId, each with " +
-            s"${resource.getVirtualCores} core(s) and " +
-            s"${resource.getMemorySize} MB memory."
-          if (resource.getResources().nonEmpty) {
-            requestContainerMessage ++= s" with custom resources: $resource"
+          var requestContainerMessage = log"Will request ${MDC(LogKey.COUNT, 
missing)} executor " +
+            log"container(s) for ResourceProfile Id: 
${MDC(LogKey.RESOURCE_PROFILE_ID, rpId)}, " +
+            log"each with ${MDC(LogKey.VIRTUAL_CORES, 
resource.getVirtualCores)} core(s) and " +
+            log"${MDC(LogKey.MEMORY_SIZE, resource.getMemorySize)} MB memory."
+          if (resource.getResources.nonEmpty) {
+            requestContainerMessage = requestContainerMessage +
+              log" with custom resources: ${MDC(LogKey.RESOURCE, resource)}"
           }
           logInfo(requestContainerMessage)
         }
@@ -536,7 +540,8 @@ private[yarn] class YarnAllocator(
         }
         val cancelledContainers = staleRequests.size
         if (cancelledContainers > 0) {
-          logInfo(s"Canceled $cancelledContainers container request(s) 
(locality no longer needed)")
+          logInfo(log"Canceled ${MDC(LogKey.COUNT, cancelledContainers)} 
container request(s) " +
+            log"(locality no longer needed)")
         }
 
         // consider the number of new containers and cancelled stale 
containers available
@@ -570,8 +575,8 @@ private[yarn] class YarnAllocator(
             amClient.removeContainerRequest(nonLocal)
           }
           if (numToCancel > 0) {
-            logInfo(s"Canceled $numToCancel unlocalized container requests to 
" +
-              s"resubmit with locality")
+            logInfo(log"Canceled ${MDC(LogKey.COUNT, numToCancel)} unlocalized 
container " +
+              log"requests to resubmit with locality")
           }
         }
 
@@ -582,16 +587,20 @@ private[yarn] class YarnAllocator(
         if (log.isInfoEnabled()) {
           val (localized, anyHost) = 
newLocalityRequests.partition(_.getNodes() != null)
           if (anyHost.nonEmpty) {
-            logInfo(s"Submitted ${anyHost.size} unlocalized container 
requests.")
+            logInfo(log"Submitted ${MDC(LogKey.COUNT, anyHost.size)}} 
unlocalized container " +
+              log"requests.")
           }
           localized.foreach { request =>
-            logInfo(s"Submitted container request for host 
${hostStr(request)}.")
+            logInfo(log"Submitted container request for host " +
+              log"${MDC(LogKey.HOST, hostStr(request))}.")
           }
         }
       } else if (numPendingAllocate > 0 && missing < 0) {
         val numToCancel = math.min(numPendingAllocate, -missing)
-        logInfo(s"Canceling requests for $numToCancel executor container(s) to 
have a new " +
-          s"desired total ${getOrUpdateTargetNumExecutorsForRPId(rpId)} 
executors.")
+        logInfo(log"Canceling requests for ${MDC(LogKey.COUNT, numToCancel)} 
executor " +
+          log"container(s) to have a new desired total " +
+          log"${MDC(LogKey.EXECUTOR_DESIRED_COUNT,
+            getOrUpdateTargetNumExecutorsForRPId(rpId))} executors.")
         // cancel pending allocate requests by taking locality preference into 
account
         val cancelRequests = (staleRequests ++ anyHostRequests ++ 
localRequests).take(numToCancel)
         cancelRequests.foreach(amClient.removeContainerRequest)
@@ -697,8 +706,9 @@ private[yarn] class YarnAllocator(
 
     runAllocatedContainers(containersToUse)
 
-    logInfo("Received %d containers from YARN, launching executors on %d of 
them."
-      .format(allocatedContainers.size, containersToUse.size))
+    logInfo(log"Received ${MDC(LogKey.COUNT, allocatedContainers.size)} 
containers from YARN, " +
+      log"launching executors on ${MDC(LogKey.EXECUTOR_LAUNCH_COUNT, 
containersToUse.size)} " +
+      log"of them.")
   }
 
   /**
@@ -751,8 +761,10 @@ private[yarn] class YarnAllocator(
       val executorId = executorIdCounter.toString
       val yarnResourceForRpId = rpIdToYarnResource.get(rpId)
       assert(container.getResource.getMemorySize >= 
yarnResourceForRpId.getMemorySize)
-      logInfo(s"Launching container $containerId on host $executorHostname " +
-        s"for executor with ID $executorId for ResourceProfile Id $rpId")
+      logInfo(log"Launching container ${MDC(LogKey.CONTAINER_ID, containerId)} 
" +
+        log"on host ${MDC(LogKey.HOST, executorHostname)} for " +
+        log"executor with ID ${MDC(LogKey.EXECUTOR_ID, executorId)} for " +
+        log"ResourceProfile Id ${MDC(LogKey.RESOURCE_PROFILE_ID, rpId)}")
 
       val rp = rpIdToResourceProfile(rpId)
       val defaultResources = 
ResourceProfile.getDefaultProfileExecutorResources(sparkConf)
@@ -790,8 +802,8 @@ private[yarn] class YarnAllocator(
                 getOrUpdateNumExecutorsStartingForRPId(rpId).decrementAndGet()
                 launchingExecutorContainerIds.remove(containerId)
                 if (NonFatal(e)) {
-                  logError(log"Failed to launch executor ${MDC(EXECUTOR_ID, 
executorId)} " +
-                    log"on container ${MDC(CONTAINER_ID, containerId)}", e)
+                  logError(log"Failed to launch executor 
${MDC(LogKey.EXECUTOR_ID, executorId)} " +
+                    log"on container ${MDC(LogKey.CONTAINER_ID, 
containerId)}", e)
                   // Assigned container should be released immediately
                   // to avoid unnecessary resource occupation.
                   amClient.releaseAssignedContainer(containerId)
@@ -805,9 +817,9 @@ private[yarn] class YarnAllocator(
           updateInternalState(rpId, executorId, container)
         }
       } else {
-        logInfo(("Skip launching executorRunnable as running executors count: 
%d " +
-          "reached target executors count: %d.").format(rpRunningExecs,
-          getOrUpdateTargetNumExecutorsForRPId(rpId)))
+        logInfo(log"Skip launching executorRunnable as running executors 
count: " +
+          log"${MDC(LogKey.COUNT, rpRunningExecs)} reached target executors 
count: " +
+          log"${MDC(LogKey.EXECUTOR_TARGET_COUNT, 
getOrUpdateTargetNumExecutorsForRPId(rpId))}.")
       }
     }
   }
@@ -849,47 +861,47 @@ private[yarn] class YarnAllocator(
           case Some((executorId, _)) =>
             getOrUpdateRunningExecutorForRPId(rpId).remove(executorId)
           case None => logWarning(log"Cannot find executorId for container: " +
-            log"${MDC(CONTAINER_ID, containerId)}")
+            log"${MDC(LogKey.CONTAINER_ID, containerId)}")
         }
 
-        logInfo("Completed container %s%s (state: %s, exit status: %s)".format(
-          containerId,
-          onHostStr,
-          completedContainer.getState,
-          completedContainer.getExitStatus))
+        logInfo(log"Completed container ${MDC(LogKey.CONTAINER_ID, 
containerId)}" +
+          log"${MDC(LogKey.HOST, onHostStr)} " +
+          log"(state: ${MDC(LogKey.CONTAINER_STATE, 
completedContainer.getState)}, " +
+          log"exit status: ${MDC(LogKey.EXIT_CODE, 
completedContainer.getExitStatus)}")
         val exitStatus = completedContainer.getExitStatus
         val (exitCausedByApp, containerExitReason) = exitStatus match {
           case _ if shutdown =>
-            (false, log"Executor for container ${MDC(CONTAINER_ID, 
containerId)} exited after " +
-              log"Application shutdown.")
+            (false, log"Executor for container ${MDC(LogKey.CONTAINER_ID, 
containerId)} " +
+              log"exited after Application shutdown.")
           case ContainerExitStatus.SUCCESS =>
-            (false, log"Executor for container ${MDC(CONTAINER_ID, 
containerId)} exited because " +
-              log"of a YARN event (e.g., preemption) and not because of an 
error in the running " +
-              log"job.")
+            (false, log"Executor for container ${MDC(LogKey.CONTAINER_ID, 
containerId)} " +
+              log"exited because of a YARN event (e.g., preemption) and not 
because of an " +
+              log"error in the running job.")
           case ContainerExitStatus.PREEMPTED =>
             // Preemption is not the fault of the running tasks, since YARN 
preempts containers
             // merely to do resource sharing, and tasks that fail due to 
preempted executors could
             // just as easily finish on any other executor. See SPARK-8167.
-            (false, log"Container ${MDC(CONTAINER_ID, containerId)}${MDC(HOST, 
onHostStr)} " +
-              log"was preempted.")
+            (false, log"Container ${MDC(LogKey.CONTAINER_ID, containerId)}" +
+              log"${MDC(LogKey.HOST, onHostStr)} was preempted.")
           // Should probably still count memory exceeded exit codes towards 
task failures
           case ContainerExitStatus.KILLED_EXCEEDED_VMEM =>
             val vmemExceededPattern = raw"$MEM_REGEX of $MEM_REGEX virtual 
memory used".r
             val diag = 
vmemExceededPattern.findFirstIn(completedContainer.getDiagnostics)
               .map(_.concat(".")).getOrElse("")
             val message = log"Container killed by YARN for exceeding virtual 
memory limits. " +
-              log"${MDC(ERROR, diag)} Consider boosting " +
-              log"${MDC(CONFIG, EXECUTOR_MEMORY_OVERHEAD.key)} or boosting " +
-              log"${MDC(CONFIG2, YarnConfiguration.NM_VMEM_PMEM_RATIO)} or 
disabling " +
-              log"${MDC(CONFIG3, YarnConfiguration.NM_VMEM_CHECK_ENABLED)} 
because of YARN-4714."
+              log"${MDC(LogKey.ERROR, diag)} Consider boosting " +
+              log"${MDC(LogKey.CONFIG, EXECUTOR_MEMORY_OVERHEAD.key)} or 
boosting " +
+              log"${MDC(LogKey.CONFIG2, YarnConfiguration.NM_VMEM_PMEM_RATIO)} 
or disabling " +
+              log"${MDC(LogKey.CONFIG3, 
YarnConfiguration.NM_VMEM_CHECK_ENABLED)} " +
+              log"because of YARN-4714."
             (true, message)
           case ContainerExitStatus.KILLED_EXCEEDED_PMEM =>
             val pmemExceededPattern = raw"$MEM_REGEX of $MEM_REGEX physical 
memory used".r
             val diag = 
pmemExceededPattern.findFirstIn(completedContainer.getDiagnostics)
               .map(_.concat(".")).getOrElse("")
             val message = log"Container killed by YARN for exceeding physical 
memory limits. " +
-              log"${MDC(ERROR, diag)} Consider boosting " +
-              log"${MDC(CONFIG, EXECUTOR_MEMORY_OVERHEAD.key)}."
+              log"${MDC(LogKey.ERROR, diag)} Consider boosting " +
+              log"${MDC(LogKey.CONFIG, EXECUTOR_MEMORY_OVERHEAD.key)}."
             (true, message)
           case other_exit_status =>
             val exitStatus = completedContainer.getExitStatus
@@ -900,17 +912,19 @@ private[yarn] class YarnAllocator(
             // SPARK-26269: follow YARN's behaviour, see details in
             // 
org.apache.hadoop.yarn.util.Apps#shouldCountTowardsNodeBlacklisting
             if 
(NOT_APP_AND_SYSTEM_FAULT_EXIT_STATUS.contains(other_exit_status)) {
-              (false, log"Container marked as failed: ${MDC(CONTAINER_ID, 
containerId)}" +
-                log"${MDC(HOST, onHostStr)}. Exit status: ${MDC(APP_STATE, 
exitStatus)}. " +
-                log"Possible causes: ${MDC(REASON, sparkExitCodeReason)} " +
-                log"Diagnostics: ${MDC(ERROR, 
completedContainer.getDiagnostics)}.")
+              (false, log"Container marked as failed: 
${MDC(LogKey.CONTAINER_ID, containerId)}" +
+                log"${MDC(LogKey.HOST, onHostStr)}. " +
+                log"Exit status: ${MDC(LogKey.EXIT_CODE, exitStatus)}. " +
+                log"Possible causes: ${MDC(LogKey.REASON, 
sparkExitCodeReason)} " +
+                log"Diagnostics: ${MDC(LogKey.ERROR, 
completedContainer.getDiagnostics)}.")
             } else {
               // completed container from a bad node
               
allocatorNodeHealthTracker.handleResourceAllocationFailure(hostOpt)
-              (true, log"Container from a bad node: ${MDC(CONTAINER_ID, 
containerId)}" +
-                log"${MDC(HOST, onHostStr)}. Exit status: ${MDC(APP_STATE, 
exitStatus)}. " +
-                log"Possible causes: ${MDC(REASON, sparkExitCodeReason)} " +
-                log"Diagnostics: ${MDC(ERROR, 
completedContainer.getDiagnostics)}.")
+              (true, log"Container from a bad node: ${MDC(LogKey.CONTAINER_ID, 
containerId)}" +
+                log"${MDC(LogKey.HOST, onHostStr)}. " +
+                log"Exit status: ${MDC(LogKey.EXIT_CODE, exitStatus)}. " +
+                log"Possible causes: ${MDC(LogKey.REASON, 
sparkExitCodeReason)} " +
+                log"Diagnostics: ${MDC(LogKey.ERROR, 
completedContainer.getDiagnostics)}.")
             }
         }
         if (exitCausedByApp) {
@@ -981,7 +995,7 @@ private[yarn] class YarnAllocator(
       context.reply(releasedExecutorLossReasons.remove(eid).get)
     } else {
       logWarning(log"Tried to get the loss reason for non-existent executor " +
-        log"${MDC(EXECUTOR_ID, eid)}")
+        log"${MDC(LogKey.EXECUTOR_ID, eid)}")
       context.sendFailure(
         new SparkException(s"Fail to find loss reason for non-existent 
executor $eid"))
     }
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorNodeHealthTracker.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorNodeHealthTracker.scala
index 22937ed8117a..3ba6687b0224 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorNodeHealthTracker.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorNodeHealthTracker.scala
@@ -25,7 +25,8 @@ import 
org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
 import org.apache.spark.SparkConf
 import org.apache.spark.deploy.ExecutorFailureTracker
 import org.apache.spark.deploy.yarn.config._
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{FAILURES, HOST, NODES}
 import org.apache.spark.internal.config._
 import org.apache.spark.scheduler.HealthTracker
 
@@ -90,7 +91,8 @@ private[spark] class YarnAllocatorNodeHealthTracker(
   private def updateAllocationExcludedNodes(hostname: String): Unit = {
     val failuresOnHost = failureTracker.numFailuresOnHost(hostname)
     if (failuresOnHost > maxFailuresPerHost) {
-      logInfo(s"excluding $hostname as YARN allocation failed $failuresOnHost 
times")
+      logInfo(log"excluding ${MDC(HOST, hostname)} as YARN allocation failed " 
+
+        log"${MDC(FAILURES, failuresOnHost)} times")
       allocatorExcludedNodeList.put(
         hostname,
         failureTracker.clock.getTimeMillis() + excludeOnFailureTimeoutMillis)
@@ -125,10 +127,12 @@ private[spark] class YarnAllocatorNodeHealthTracker(
     val additions = (nodesToExclude -- 
currentExcludededYarnNodes).toList.sorted
     val removals = (currentExcludededYarnNodes -- nodesToExclude).toList.sorted
     if (additions.nonEmpty) {
-      logInfo(s"adding nodes to YARN application master's excluded node list: 
$additions")
+      logInfo(log"adding nodes to YARN application master's " +
+        log"excluded node list: ${MDC(NODES, additions)}")
     }
     if (removals.nonEmpty) {
-      logInfo(s"removing nodes from YARN application master's excluded node 
list: $removals")
+      logInfo(log"removing nodes from YARN application master's " +
+        log"excluded node list: ${MDC(NODES, removals)}")
     }
     if (additions.nonEmpty || removals.nonEmpty) {
       // Note YARNs api for excluding nodes is updateBlacklist.
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index ccc0bc9f715e..c7defac98b90 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -27,7 +27,7 @@ import org.apache.spark.{SparkContext, SparkException}
 import org.apache.spark.deploy.yarn.{Client, ClientArguments, YarnAppReport}
 import org.apache.spark.deploy.yarn.config._
 import org.apache.spark.internal.{config, Logging, MDC}
-import org.apache.spark.internal.LogKey.APP_STATE
+import org.apache.spark.internal.LogKey.{APP_ID, APP_STATE}
 import org.apache.spark.launcher.SparkAppHandle
 import org.apache.spark.scheduler.TaskSchedulerImpl
 import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
@@ -99,7 +99,7 @@ private[spark] class YarnClientSchedulerBackend(
       throw new SparkException(exceptionMsg)
     }
     if (state == YarnApplicationState.RUNNING) {
-      logInfo(s"Application ${appId.get} has started running.")
+      logInfo(log"Application ${MDC(APP_ID, appId.get)} has started running.")
     }
   }
 
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
index d7f285aeb892..c6976caf3a77 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -30,7 +30,7 @@ import 
org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId}
 import org.apache.spark.SparkContext
 import org.apache.spark.deploy.security.HadoopDelegationTokenManager
 import org.apache.spark.internal.{config, Logging, MDC}
-import org.apache.spark.internal.LogKey.{EXECUTOR_ID, HOST_PORT, REASON}
+import org.apache.spark.internal.LogKey
 import org.apache.spark.internal.config.UI._
 import org.apache.spark.resource.ResourceProfile
 import org.apache.spark.rpc._
@@ -221,7 +221,9 @@ private[spark] abstract class YarnSchedulerBackend(
     if (hasFilter) {
       // SPARK-26255: Append user provided filters(spark.ui.filters) with yarn 
filter.
       val allFilters = Seq(filterName) ++ conf.get(UI_FILTERS)
-      logInfo(s"Add WebUI Filter. $filterName, $filterParams, $proxyBase")
+      logInfo(log"Add WebUI Filter. ${MDC(LogKey.UI_FILTER, filterName)}, " +
+        log"${MDC(LogKey.UI_FILTER_PARAMS, filterParams)}, " +
+        log"${MDC(LogKey.UI_PROXY_BASE, proxyBase)}")
 
       // For already installed handlers, prepend the filter.
       scheduler.sc.ui.foreach { ui =>
@@ -306,8 +308,8 @@ private[spark] abstract class YarnSchedulerBackend(
             .recover {
               case NonFatal(e) =>
                 logWarning(log"Attempted to get executor loss reason for 
executor id " +
-                  log"${MDC(EXECUTOR_ID, executorId)} at RPC address " +
-                  log"${MDC(HOST_PORT, executorRpcAddress)}, but got no 
response. " +
+                  log"${MDC(LogKey.EXECUTOR_ID, executorId)} at RPC address " +
+                  log"${MDC(LogKey.HOST_PORT, executorRpcAddress)}, but got no 
response. " +
                   log"Marking as agent lost.", e)
                 RemoveExecutor(executorId, ExecutorProcessLost())
             }(ThreadUtils.sameThread)
@@ -332,7 +334,7 @@ private[spark] abstract class YarnSchedulerBackend(
 
     override def receive: PartialFunction[Any, Unit] = {
       case RegisterClusterManager(am) =>
-        logInfo(s"ApplicationMaster registered as $am")
+        logInfo(log"ApplicationMaster registered as 
${MDC(LogKey.RPC_ENDPOINT_REF, am)}")
         amEndpoint = Option(am)
         reset()
 
@@ -346,7 +348,7 @@ private[spark] abstract class YarnSchedulerBackend(
       case r @ RemoveExecutor(executorId, reason) =>
         if (!stopped.get) {
           logWarning(log"Requesting driver to remove executor " +
-            log"${MDC(EXECUTOR_ID, executorId)} for reason ${MDC(REASON, 
reason)}")
+            log"${MDC(LogKey.EXECUTOR_ID, executorId)} for reason 
${MDC(LogKey.REASON, reason)}")
           driverEndpoint.send(r)
         }
 
@@ -395,7 +397,8 @@ private[spark] abstract class YarnSchedulerBackend(
 
     override def onDisconnected(remoteAddress: RpcAddress): Unit = {
       if (amEndpoint.exists(_.address == remoteAddress)) {
-        logWarning(log"ApplicationMaster has disassociated: ${MDC(HOST_PORT, 
remoteAddress)}")
+        logWarning(log"ApplicationMaster has disassociated: " +
+          log"${MDC(LogKey.HOST_PORT, remoteAddress)}")
         amEndpoint = None
       }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to