This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 87b20b166c41 [SPARK-47585][SQL] SQL core: Migrate logInfo with 
variables to structured logging framework
87b20b166c41 is described below

commit 87b20b166c41d4c265ac54eed75707b7726d371f
Author: panbingkun <panbing...@baidu.com>
AuthorDate: Mon Apr 29 22:10:59 2024 -0700

    [SPARK-47585][SQL] SQL core: Migrate logInfo with variables to structured 
logging framework
    
    ### What changes were proposed in this pull request?
    The pr aims to migrate `logInfo` in module `SQL core` with variables to 
`structured logging framework`.
    
    ### Why are the changes needed?
    To enhance Apache Spark's logging system by implementing structured logging.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    - Pass GA.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #46264 from panbingkun/SPARK-47585.
    
    Authored-by: panbingkun <panbing...@baidu.com>
    Signed-off-by: Gengliang Wang <gengli...@apache.org>
---
 .../scala/org/apache/spark/internal/LogKey.scala   | 55 +++++++++++++++++++---
 .../org/apache/spark/ml/util/Instrumentation.scala |  4 +-
 .../spark/sql/catalyst/optimizer/Optimizer.scala   |  9 ++--
 .../spark/sql/catalyst/rules/RuleExecutor.scala    |  6 +--
 .../spark/sql/columnar/CachedBatchSerializer.scala |  6 ++-
 .../spark/sql/execution/DataSourceScanExec.scala   | 11 +++--
 .../ExternalAppendOnlyUnsafeRowArray.scala         |  8 ++--
 .../sql/execution/WholeStageCodegenExec.scala      | 14 +++---
 .../sql/execution/adaptive/AQEOptimizer.scala      |  9 ++--
 .../aggregate/AggregateCodegenSupport.scala        | 10 ++--
 .../execution/aggregate/HashAggregateExec.scala    |  6 ++-
 .../aggregate/ObjectAggregationIterator.scala      | 13 +++--
 .../spark/sql/execution/command/CommandUtils.scala |  9 ++--
 .../execution/command/createDataSourceTables.scala |  2 +-
 .../apache/spark/sql/execution/command/ddl.scala   | 30 +++++++-----
 .../datasources/BasicWriteStatsTracker.scala       |  8 ++--
 .../sql/execution/datasources/DataSource.scala     |  4 +-
 .../execution/datasources/DataSourceStrategy.scala |  5 +-
 .../datasources/FileFormatDataWriter.scala         | 11 +++--
 .../execution/datasources/FileFormatWriter.scala   |  9 ++--
 .../sql/execution/datasources/FilePartition.scala  |  8 ++--
 .../sql/execution/datasources/FileScanRDD.scala    |  4 +-
 .../execution/datasources/FileSourceStrategy.scala | 12 ++---
 .../execution/datasources/InMemoryFileIndex.scala  |  7 +--
 .../datasources/PartitioningAwareFileIndex.scala   |  7 +--
 .../SQLHadoopMapReduceCommitProtocol.scala         |  9 ++--
 .../sql/execution/datasources/jdbc/JDBCRDD.scala   |  5 +-
 .../execution/datasources/jdbc/JDBCRelation.scala  |  7 +--
 .../datasources/parquet/ParquetUtils.scala         |  7 +--
 .../execution/datasources/v2/FileBatchWrite.scala  |  9 ++--
 .../datasources/v2/FilePartitionReader.scala       |  4 +-
 .../GroupBasedRowLevelOperationScanPlanning.scala  | 17 ++++---
 .../datasources/v2/V2ScanRelationPushDown.scala    | 32 +++++++------
 .../datasources/v2/WriteToDataSourceV2Exec.scala   | 52 +++++++++++---------
 .../python/PythonStreamingSinkCommitRunner.scala   |  5 +-
 .../execution/exchange/EnsureRequirements.scala    | 42 ++++++++++-------
 .../python/PythonStreamingSourceRunner.scala       |  5 +-
 .../spark/sql/execution/r/ArrowRRunner.scala       | 24 +++++-----
 .../WriteToContinuousDataSourceExec.scala          |  2 +-
 .../state/HDFSBackedStateStoreProvider.scala       | 16 ++++---
 .../apache/spark/sql/internal/SharedState.scala    | 15 +++---
 .../sql/streaming/StreamingQueryManager.scala      |  4 +-
 42 files changed, 318 insertions(+), 204 deletions(-)

diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala 
b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
index 2ca80a496ccb..238432d354f6 100644
--- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
+++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
@@ -33,6 +33,7 @@ object LogKeys {
   case object ACCUMULATOR_ID extends LogKey
   case object ACTUAL_NUM_FILES extends LogKey
   case object ACTUAL_PARTITION_COLUMN extends LogKey
+  case object AGGREGATE_FUNCTIONS extends LogKey
   case object ALPHA extends LogKey
   case object ANALYSIS_ERROR extends LogKey
   case object APP_ATTEMPT_ID extends LogKey
@@ -43,10 +44,13 @@ object LogKeys {
   case object ARGS extends LogKey
   case object BACKUP_FILE extends LogKey
   case object BATCH_ID extends LogKey
+  case object BATCH_NAME extends LogKey
   case object BATCH_TIMESTAMP extends LogKey
   case object BATCH_WRITE extends LogKey
   case object BLOCK_ID extends LogKey
   case object BLOCK_MANAGER_ID extends LogKey
+  case object BOOT extends LogKey
+  case object BROADCAST extends LogKey
   case object BROADCAST_ID extends LogKey
   case object BUCKET extends LogKey
   case object BYTECODE_SIZE extends LogKey
@@ -66,6 +70,7 @@ object LogKeys {
   case object CHOSEN_WATERMARK extends LogKey
   case object CLASS_LOADER extends LogKey
   case object CLASS_NAME extends LogKey
+  case object CLAUSES extends LogKey
   case object CLUSTER_CENTROIDS extends LogKey
   case object CLUSTER_ID extends LogKey
   case object CLUSTER_LABEL extends LogKey
@@ -83,6 +88,7 @@ object LogKeys {
   case object COMMITTED_VERSION extends LogKey
   case object COMPACT_INTERVAL extends LogKey
   case object COMPONENT extends LogKey
+  case object COMPUTE extends LogKey
   case object CONFIG extends LogKey
   case object CONFIG2 extends LogKey
   case object CONFIG3 extends LogKey
@@ -103,6 +109,7 @@ object LogKeys {
   case object CSV_SCHEMA_FIELD_NAMES extends LogKey
   case object CSV_SOURCE extends LogKey
   case object CURRENT_BATCH_ID extends LogKey
+  case object CURRENT_FILE extends LogKey
   case object CURRENT_PATH extends LogKey
   case object DATA extends LogKey
   case object DATABASE_NAME extends LogKey
@@ -111,7 +118,7 @@ object LogKeys {
   case object DATA_FILE_NUM extends LogKey
   case object DATA_SOURCE extends LogKey
   case object DATA_SOURCES extends LogKey
-  case object DATA_SOURCE_PROVIDER extends LogKey
+  case object DEFAULT_COMPACTION_INTERVAL extends LogKey
   case object DEFAULT_COMPACT_INTERVAL extends LogKey
   case object DEFAULT_ISOLATION_LEVEL extends LogKey
   case object DEFAULT_VALUE extends LogKey
@@ -119,13 +126,14 @@ object LogKeys {
   case object DELEGATE extends LogKey
   case object DELTA extends LogKey
   case object DESCRIPTION extends LogKey
-  case object DESIRED_PARTITIONS_SIZE extends LogKey
+  case object DESIRED_NUM_PARTITIONS extends LogKey
   case object DFS_FILE extends LogKey
   case object DIFF_DELTA extends LogKey
   case object DIVISIBLE_CLUSTER_INDICES_SIZE extends LogKey
   case object DRIVER_ID extends LogKey
   case object DROPPED_PARTITIONS extends LogKey
   case object DURATION extends LogKey
+  case object EARLIEST_LOADED_VERSION extends LogKey
   case object EFFECTIVE_STORAGE_LEVEL extends LogKey
   case object ELAPSED_TIME extends LogKey
   case object ENCODING extends LogKey
@@ -136,6 +144,7 @@ object LogKeys {
   case object EPOCH extends LogKey
   case object ERROR extends LogKey
   case object ESTIMATOR_PARAMETER_MAP extends LogKey
+  case object EVALUATED_FILTERS extends LogKey
   case object EVENT_LOOP extends LogKey
   case object EVENT_QUEUE extends LogKey
   case object EXCEPTION extends LogKey
@@ -169,10 +178,13 @@ object LogKeys {
   case object FILE_END_OFFSET extends LogKey
   case object FILE_FORMAT extends LogKey
   case object FILE_FORMAT2 extends LogKey
+  case object FILE_LENGTH_XATTR extends LogKey
   case object FILE_MODIFICATION_TIME extends LogKey
   case object FILE_NAME extends LogKey
   case object FILE_START_OFFSET extends LogKey
   case object FILE_VERSION extends LogKey
+  case object FILTER extends LogKey
+  case object FILTERS extends LogKey
   case object FINAL_PATH extends LogKey
   case object FINISH_TRIGGER_DURATION extends LogKey
   case object FROM_OFFSET extends LogKey
@@ -180,9 +192,11 @@ object LogKeys {
   case object FUNCTION_NAME extends LogKey
   case object FUNCTION_PARAMETER extends LogKey
   case object GLOBAL_WATERMARK extends LogKey
+  case object GROUP_BY_EXPRS extends LogKey
   case object GROUP_ID extends LogKey
   case object HADOOP_VERSION extends LogKey
   case object HASH_JOIN_KEYS extends LogKey
+  case object HASH_MAP_SIZE extends LogKey
   case object HEARTBEAT_INTERVAL extends LogKey
   case object HISTORY_DIR extends LogKey
   case object HIVE_CLIENT_VERSION extends LogKey
@@ -191,20 +205,23 @@ object LogKeys {
   case object HIVE_OPERATION_TYPE extends LogKey
   case object HOST extends LogKey
   case object HOST_PORT extends LogKey
-  case object IDENTIFIER extends LogKey
+  case object HUGE_METHOD_LIMIT extends LogKey
   case object INCOMPATIBLE_TYPES extends LogKey
   case object INDEX extends LogKey
   case object INDEX_FILE_NUM extends LogKey
   case object INDEX_NAME extends LogKey
   case object INFERENCE_MODE extends LogKey
+  case object INIT extends LogKey
   case object INITIAL_CAPACITY extends LogKey
   case object INITIAL_HEARTBEAT_INTERVAL extends LogKey
   case object INIT_MODE extends LogKey
+  case object INPUT extends LogKey
   case object INTERVAL extends LogKey
   case object ISOLATION_LEVEL extends LogKey
   case object JOB_ID extends LogKey
   case object JOIN_CONDITION extends LogKey
   case object JOIN_CONDITION_SUB_EXPR extends LogKey
+  case object JOIN_TYPE extends LogKey
   case object K8S_CONTEXT extends LogKey
   case object KAFKA_PULLS_COUNT extends LogKey
   case object KAFKA_RECORDS_PULLED_COUNT extends LogKey
@@ -218,6 +235,7 @@ object LogKeys {
   case object LATEST_COMMITTED_BATCH_ID extends LogKey
   case object LEARNING_RATE extends LogKey
   case object LEFT_EXPR extends LogKey
+  case object LEFT_LOGICAL_PLAN_STATS_SIZE_IN_BYTES extends LogKey
   case object LINE extends LogKey
   case object LINE_NUM extends LogKey
   case object LISTENER extends LogKey
@@ -239,9 +257,13 @@ object LogKeys {
   case object MAX_CATEGORIES extends LogKey
   case object MAX_EXECUTOR_FAILURES extends LogKey
   case object MAX_FILE_VERSION extends LogKey
+  case object MAX_JVM_METHOD_PARAMS_LENGTH extends LogKey
   case object MAX_MEMORY_SIZE extends LogKey
-  case object MAX_PARTITIONS_SIZE extends LogKey
+  case object MAX_METHOD_CODE_SIZE extends LogKey
+  case object MAX_NUM_PARTITIONS extends LogKey
+  case object MAX_NUM_ROWS_IN_MEMORY_BUFFER extends LogKey
   case object MAX_SIZE extends LogKey
+  case object MAX_SPLIT_BYTES extends LogKey
   case object MAX_TABLE_PARTITION_METADATA_SIZE extends LogKey
   case object MEMORY_SIZE extends LogKey
   case object MERGE_DIR_NAME extends LogKey
@@ -264,10 +286,13 @@ object LogKeys {
   case object NODES extends LogKey
   case object NODE_LOCATION extends LogKey
   case object NORM extends LogKey
+  case object NUM_ADDED_PARTITIONS extends LogKey
   case object NUM_BIN extends LogKey
   case object NUM_BYTES extends LogKey
   case object NUM_CLASSES extends LogKey
   case object NUM_COLUMNS extends LogKey
+  case object NUM_CONCURRENT_WRITER extends LogKey
+  case object NUM_DROPPED_PARTITIONS extends LogKey
   case object NUM_EXAMPLES extends LogKey
   case object NUM_FEATURES extends LogKey
   case object NUM_FILES extends LogKey
@@ -276,31 +301,40 @@ object LogKeys {
   case object NUM_FILES_REUSED extends LogKey
   case object NUM_FREQUENT_ITEMS extends LogKey
   case object NUM_ITERATIONS extends LogKey
+  case object NUM_LEFT_PARTITION_VALUES extends LogKey
+  case object NUM_LOADED_ENTRIES extends LogKey
   case object NUM_LOCAL_FREQUENT_PATTERN extends LogKey
-  case object NUM_PARTITION extends LogKey
+  case object NUM_PARTITIONS extends LogKey
+  case object NUM_PARTITION_VALUES extends LogKey
   case object NUM_POINT extends LogKey
   case object NUM_PREFIXES extends LogKey
+  case object NUM_PRUNED extends LogKey
+  case object NUM_RIGHT_PARTITION_VALUES extends LogKey
   case object NUM_SEQUENCES extends LogKey
+  case object NUM_VERSIONS_RETAIN extends LogKey
+  case object OBJECT_AGG_SORT_BASED_FALLBACK_THRESHOLD extends LogKey
   case object OBJECT_ID extends LogKey
   case object OFFSET extends LogKey
   case object OFFSETS extends LogKey
   case object OFFSET_SEQUENCE_METADATA extends LogKey
   case object OLD_BLOCK_MANAGER_ID extends LogKey
   case object OLD_VALUE extends LogKey
+  case object OPEN_COST_IN_BYTES extends LogKey
   case object OPTIMIZED_PLAN_COLUMNS extends LogKey
   case object OPTIMIZER_CLASS_NAME extends LogKey
   case object OPTIONS extends LogKey
   case object OP_ID extends LogKey
   case object OP_TYPE extends LogKey
+  case object OUTPUT extends LogKey
   case object OVERHEAD_MEMORY_SIZE extends LogKey
   case object PARSE_MODE extends LogKey
   case object PARTITIONED_FILE_READER extends LogKey
-  case object PARTITIONS_SIZE extends LogKey
   case object PARTITION_ID extends LogKey
   case object PARTITION_SPECIFICATION extends LogKey
   case object PARTITION_SPECS extends LogKey
   case object PATH extends LogKey
   case object PATHS extends LogKey
+  case object PERCENT extends LogKey
   case object PIPELINE_STAGE_UID extends LogKey
   case object POD_COUNT extends LogKey
   case object POD_ID extends LogKey
@@ -312,12 +346,17 @@ object LogKeys {
   case object POD_TARGET_COUNT extends LogKey
   case object POLICY extends LogKey
   case object PORT extends LogKey
+  case object POST_SCAN_FILTERS extends LogKey
+  case object PREDICATE extends LogKey
+  case object PREDICATES extends LogKey
   case object PRETTY_ID_STRING extends LogKey
   case object PRINCIPAL extends LogKey
   case object PROCESSING_TIME extends LogKey
   case object PRODUCER_ID extends LogKey
   case object PROVIDER extends LogKey
+  case object PUSHED_FILTERS extends LogKey
   case object PVC_METADATA_NAME extends LogKey
+  case object PYTHON_EXEC extends LogKey
   case object QUERY_CACHE_VALUE extends LogKey
   case object QUERY_HINT extends LogKey
   case object QUERY_ID extends LogKey
@@ -339,6 +378,7 @@ object LogKeys {
   case object REDACTED_STATEMENT extends LogKey
   case object REDUCE_ID extends LogKey
   case object RELATION_NAME extends LogKey
+  case object RELATION_OUTPUT extends LogKey
   case object RELATIVE_TOLERANCE extends LogKey
   case object REMAINING_PARTITIONS extends LogKey
   case object REPORT_DETAILS extends LogKey
@@ -349,11 +389,11 @@ object LogKeys {
   case object RETRY_COUNT extends LogKey
   case object RETRY_INTERVAL extends LogKey
   case object RIGHT_EXPR extends LogKey
+  case object RIGHT_LOGICAL_PLAN_STATS_SIZE_IN_BYTES extends LogKey
   case object RMSE extends LogKey
   case object ROCKS_DB_LOG_LEVEL extends LogKey
   case object ROCKS_DB_LOG_MESSAGE extends LogKey
   case object RPC_ENDPOINT_REF extends LogKey
-  case object RULE_BATCH_NAME extends LogKey
   case object RULE_NAME extends LogKey
   case object RULE_NUMBER_OF_RUNS extends LogKey
   case object RUN_ID extends LogKey
@@ -450,6 +490,7 @@ object LogKeys {
   case object URI extends LogKey
   case object USER_ID extends LogKey
   case object USER_NAME extends LogKey
+  case object UUID extends LogKey
   case object VALUE extends LogKey
   case object VERSION_NUMBER extends LogKey
   case object VIRTUAL_CORES extends LogKey
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/util/Instrumentation.scala 
b/mllib/src/main/scala/org/apache/spark/ml/util/Instrumentation.scala
index acb6bf831239..9413605a31ce 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/util/Instrumentation.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/util/Instrumentation.scala
@@ -28,7 +28,7 @@ import org.json4s.JsonDSL._
 import org.json4s.jackson.JsonMethods._
 
 import org.apache.spark.internal.{LogEntry, Logging, MDC}
-import org.apache.spark.internal.LogKeys.{CLASS_NAME, NUM_PARTITION, 
PIPELINE_STAGE_UID, STORAGE_LEVEL}
+import org.apache.spark.internal.LogKeys.{CLASS_NAME, NUM_PARTITIONS, 
PIPELINE_STAGE_UID, STORAGE_LEVEL}
 import org.apache.spark.ml.{MLEvents, PipelineStage}
 import org.apache.spark.ml.param.{Param, Params}
 import org.apache.spark.rdd.RDD
@@ -67,7 +67,7 @@ private[spark] class Instrumentation private () extends 
Logging with MLEvents {
    * Log some data about the dataset being fit.
    */
   def logDataset(dataset: RDD[_]): Unit = {
-    logInfo(log"training: numPartitions=${MDC(NUM_PARTITION, 
dataset.partitions.length)}" +
+    logInfo(log"training: numPartitions=${MDC(NUM_PARTITIONS, 
dataset.partitions.length)}" +
       log" storageLevel=${MDC(STORAGE_LEVEL, dataset.getStorageLevel)}")
   }
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index bdcbe36a3346..dfc1e17c2a29 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -20,8 +20,7 @@ package org.apache.spark.sql.catalyst.optimizer
 import scala.collection.mutable
 
 import org.apache.spark.SparkException
-import org.apache.spark.internal.LogKeys._
-import org.apache.spark.internal.MDC
+import org.apache.spark.internal.{LogKeys, MDC}
 import org.apache.spark.sql.catalyst.SQLConfHelper
 import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog}
@@ -446,7 +445,7 @@ abstract class Optimizer(catalogManager: CatalogManager)
     val excludedRules = excludedRulesConf.filter { ruleName =>
       val nonExcludable = nonExcludableRules.contains(ruleName)
       if (nonExcludable) {
-        logWarning(log"Optimization rule '${MDC(RULE_NAME, ruleName)}' " +
+        logWarning(log"Optimization rule '${MDC(LogKeys.RULE_NAME, ruleName)}' 
" +
           log"was not excluded from the optimizer because this rule is a 
non-excludable rule.")
       }
       !nonExcludable
@@ -458,7 +457,7 @@ abstract class Optimizer(catalogManager: CatalogManager)
         val filteredRules = batch.rules.filter { rule =>
           val exclude = excludedRules.contains(rule.ruleName)
           if (exclude) {
-            logInfo(log"Optimization rule '${MDC(RULE_NAME, rule.ruleName)}' " 
+
+            logInfo(log"Optimization rule '${MDC(LogKeys.RULE_NAME, 
rule.ruleName)}' " +
               log"is excluded from the optimizer.")
           }
           !exclude
@@ -468,7 +467,7 @@ abstract class Optimizer(catalogManager: CatalogManager)
         } else if (filteredRules.nonEmpty) {
           Some(Batch(batch.name, batch.strategy, filteredRules: _*))
         } else {
-          logInfo(log"Optimization batch '${MDC(RULE_BATCH_NAME, batch.name)}' 
" +
+          logInfo(log"Optimization batch '${MDC(LogKeys.BATCH_NAME, 
batch.name)}' " +
             log"is excluded from the optimizer as all enclosed rules have been 
excluded.")
           None
         }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
index 3a5153f46865..c3c105995cd8 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
@@ -75,11 +75,11 @@ class PlanChangeLogger[TreeType <: TreeNode[_]] extends 
Logging {
       def message(): MessageWithContext = {
         if (!oldPlan.fastEquals(newPlan)) {
           log"""
-             |=== Result of Batch ${MDC(RULE_BATCH_NAME, batchName)} ===
+             |=== Result of Batch ${MDC(BATCH_NAME, batchName)} ===
              |${MDC(QUERY_PLAN, sideBySide(oldPlan.treeString, 
newPlan.treeString).mkString("\n"))}
           """.stripMargin
         } else {
-          log"Batch ${MDC(RULE_BATCH_NAME, batchName)} has no effect."
+          log"Batch ${MDC(BATCH_NAME, batchName)} has no effect."
         }
       }
 
@@ -263,7 +263,7 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] 
extends Logging {
                 log"to a larger value."
             }
             val log = log"Max iterations (${MDC(NUM_ITERATIONS, iteration - 
1)}) " +
-              log"reached for batch ${MDC(RULE_BATCH_NAME, batch.name)}" +
+              log"reached for batch ${MDC(BATCH_NAME, batch.name)}" +
               endingMsg
             if (Utils.isTesting || batch.strategy.errorOnExceed) {
               throw new RuntimeException(log.message)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/columnar/CachedBatchSerializer.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/columnar/CachedBatchSerializer.scala
index 1113e63cab33..885ddf4110cb 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/columnar/CachedBatchSerializer.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/columnar/CachedBatchSerializer.scala
@@ -18,7 +18,8 @@
 package org.apache.spark.sql.columnar
 
 import org.apache.spark.annotation.{DeveloperApi, Since}
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKeys.{FILTER, PREDICATE}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.dsl.expressions._
@@ -307,7 +308,8 @@ abstract class SimpleMetricsCachedBatchSerializer extends 
CachedBatchSerializer
               allowFailures = true))
 
         boundFilter.foreach(_ =>
-          filter.foreach(f => logInfo(s"Predicate $p generates partition 
filter: $f")))
+          filter.foreach(f => logInfo(log"Predicate ${MDC(PREDICATE, p)} 
generates " +
+            log"partition filter: ${MDC(FILTER, f)}")))
 
         // If the filter can't be resolved then we are missing required 
statistics.
         boundFilter.filter(_.resolved)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index f583bb665de1..2ebbb9664f67 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -22,6 +22,8 @@ import java.util.concurrent.TimeUnit._
 import org.apache.commons.lang3.StringUtils
 import org.apache.hadoop.fs.Path
 
+import org.apache.spark.internal.LogKeys.{COUNT, MAX_SPLIT_BYTES, 
OPEN_COST_IN_BYTES}
+import org.apache.spark.internal.MDC
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.{FileSourceOptions, InternalRow, 
TableIdentifier}
 import org.apache.spark.sql.catalyst.catalog.BucketSpec
@@ -713,7 +715,7 @@ case class FileSourceScanExec(
       bucketSpec: BucketSpec,
       readFile: (PartitionedFile) => Iterator[InternalRow],
       selectedPartitions: ScanFileListing): RDD[InternalRow] = {
-    logInfo(s"Planning with ${bucketSpec.numBuckets} buckets")
+    logInfo(log"Planning with ${MDC(COUNT, bucketSpec.numBuckets)} buckets")
     val partitionArray = selectedPartitions.toPartitionArray
     val filesGroupedToBuckets = partitionArray.groupBy { f =>
       BucketingUtils
@@ -731,7 +733,7 @@ case class FileSourceScanExec(
     }
 
     val filePartitions = optionalNumCoalescedBuckets.map { numCoalescedBuckets 
=>
-      logInfo(s"Coalescing to ${numCoalescedBuckets} buckets")
+      logInfo(log"Coalescing to ${MDC(COUNT, numCoalescedBuckets)} buckets")
       val coalescedBuckets = prunedFilesGroupedToBuckets.groupBy(_._1 % 
numCoalescedBuckets)
       Seq.tabulate(numCoalescedBuckets) { bucketId =>
         val partitionedFiles = coalescedBuckets.get(bucketId).map {
@@ -764,8 +766,9 @@ case class FileSourceScanExec(
     val openCostInBytes = 
relation.sparkSession.sessionState.conf.filesOpenCostInBytes
     val maxSplitBytes =
       FilePartition.maxSplitBytes(relation.sparkSession, selectedPartitions)
-    logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, 
" +
-      s"open cost is considered as scanning $openCostInBytes bytes.")
+    logInfo(log"Planning scan with bin packing, max size: 
${MDC(MAX_SPLIT_BYTES, maxSplitBytes)} " +
+      log"bytes, open cost is considered as scanning ${MDC(OPEN_COST_IN_BYTES, 
openCostInBytes)} " +
+      log"bytes.")
 
     // Filter files with bucket pruning if possible
     val bucketingEnabled = 
relation.sparkSession.sessionState.conf.bucketingEnabled
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala
index 56289d73c071..59810adc4b22 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala
@@ -22,7 +22,8 @@ import java.io.Closeable
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.{SparkEnv, TaskContext}
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKeys.{CLASS_NAME, 
MAX_NUM_ROWS_IN_MEMORY_BUFFER}
 import org.apache.spark.memory.TaskMemoryManager
 import org.apache.spark.serializer.SerializerManager
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
@@ -122,8 +123,9 @@ private[sql] class ExternalAppendOnlyUnsafeRowArray(
       inMemoryBuffer += unsafeRow.copy()
     } else {
       if (spillableArray == null) {
-        logInfo(s"Reached spill threshold of $numRowsInMemoryBufferThreshold 
rows, switching to " +
-          s"${classOf[UnsafeExternalSorter].getName}")
+        logInfo(log"Reached spill threshold of " +
+          log"${MDC(MAX_NUM_ROWS_IN_MEMORY_BUFFER, 
numRowsInMemoryBufferThreshold)} rows, " +
+          log"switching to ${MDC(CLASS_NAME, 
classOf[UnsafeExternalSorter].getName)}")
 
         // We will not sort the rows, so prefixComparator and recordComparator 
are null
         spillableArray = UnsafeExternalSorter.create(
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
index eb83fe16d808..382f8cf8861a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
@@ -24,7 +24,7 @@ import scala.collection.mutable
 import scala.util.control.NonFatal
 
 import org.apache.spark.{broadcast, SparkException, 
SparkUnsupportedOperationException}
-import org.apache.spark.internal.LogKeys.{CODEGEN_STAGE_ID, ERROR, TREE_NODE}
+import org.apache.spark.internal.LogKeys.{CODEGEN_STAGE_ID, CONFIG, ERROR, 
HUGE_METHOD_LIMIT, MAX_METHOD_CODE_SIZE, TREE_NODE}
 import org.apache.spark.internal.MDC
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
@@ -739,11 +739,13 @@ case class WholeStageCodegenExec(child: SparkPlan)(val 
codegenStageId: Int)
 
     // Check if compiled code has a too large function
     if (compiledCodeStats.maxMethodCodeSize > conf.hugeMethodLimit) {
-      logInfo(s"Found too long generated codes and JIT optimization might not 
work: " +
-        s"the bytecode size (${compiledCodeStats.maxMethodCodeSize}) is above 
the limit " +
-        s"${conf.hugeMethodLimit}, and the whole-stage codegen was disabled " +
-        s"for this plan (id=$codegenStageId). To avoid this, you can raise the 
limit " +
-        s"`${SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.key}`:\n$treeString")
+      logInfo(log"Found too long generated codes and JIT optimization might 
not work: " +
+        log"the bytecode size (${MDC(MAX_METHOD_CODE_SIZE, 
compiledCodeStats.maxMethodCodeSize)})" +
+        log" is above the limit ${MDC(HUGE_METHOD_LIMIT, 
conf.hugeMethodLimit)}, " +
+        log"and the whole-stage codegen was disabled for this plan " +
+        log"(id=${MDC(CODEGEN_STAGE_ID, codegenStageId)}). To avoid this, you 
can raise the limit" +
+        log" `${MDC(CONFIG, SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.key)}`:\n" +
+        log"${MDC(TREE_NODE, treeString)}")
       return child.execute()
     }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEOptimizer.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEOptimizer.scala
index 7bee641a00e7..014d23f2f410 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEOptimizer.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEOptimizer.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.execution.adaptive
 
+import org.apache.spark.internal.LogKeys.{BATCH_NAME, RULE_NAME}
+import org.apache.spark.internal.MDC
 import org.apache.spark.sql.catalyst.analysis.UpdateAttributeNullability
 import org.apache.spark.sql.catalyst.optimizer.{ConvertToLocalRelation, 
EliminateLimits, OptimizeOneRowPlan}
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, 
LogicalPlanIntegrity}
@@ -52,7 +54,8 @@ class AQEOptimizer(conf: SQLConf, 
extendedRuntimeOptimizerRules: Seq[Rule[Logica
       val filteredRules = batch.rules.filter { rule =>
         val exclude = excludedRules.contains(rule.ruleName)
         if (exclude) {
-          logInfo(s"Optimization rule '${rule.ruleName}' is excluded from the 
optimizer.")
+          logInfo(log"Optimization rule '${MDC(RULE_NAME, rule.ruleName)}' is 
excluded from " +
+            log"the optimizer.")
         }
         !exclude
       }
@@ -61,8 +64,8 @@ class AQEOptimizer(conf: SQLConf, 
extendedRuntimeOptimizerRules: Seq[Rule[Logica
       } else if (filteredRules.nonEmpty) {
         Some(Batch(batch.name, batch.strategy, filteredRules: _*))
       } else {
-        logInfo(s"Optimization batch '${batch.name}' is excluded from the 
optimizer " +
-          s"as all enclosed rules have been excluded.")
+        logInfo(log"Optimization batch '${MDC(BATCH_NAME, batch.name)}' is 
excluded from " +
+          log"the optimizer as all enclosed rules have been excluded.")
         None
       }
     }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregateCodegenSupport.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregateCodegenSupport.scala
index 9523bf1a1c02..e2d8ac898804 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregateCodegenSupport.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregateCodegenSupport.scala
@@ -18,6 +18,8 @@
 package org.apache.spark.sql.execution.aggregate
 
 import org.apache.spark.SparkException
+import org.apache.spark.internal.LogKeys.MAX_JVM_METHOD_PARAMS_LENGTH
+import org.apache.spark.internal.MDC
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, 
Expression, ExpressionEquals, UnsafeRow}
@@ -340,11 +342,11 @@ trait AggregateCodegenSupport
         }
         Some(splitCodes)
       } else {
-        val errMsg = "Failed to split aggregate code into small functions 
because the parameter " +
-          "length of at least one split function went over the JVM limit: " +
-          CodeGenerator.MAX_JVM_METHOD_PARAMS_LENGTH
+        val errMsg = log"Failed to split aggregate code into small functions 
because the " +
+          log"parameter length of at least one split function went over the 
JVM limit: " +
+          log"${MDC(MAX_JVM_METHOD_PARAMS_LENGTH, 
CodeGenerator.MAX_JVM_METHOD_PARAMS_LENGTH)}"
         if (Utils.isTesting) {
-          throw SparkException.internalError(errMsg)
+          throw SparkException.internalError(errMsg.message)
         } else {
           logInfo(errMsg)
           None
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
index bdf17607d77c..8f2b7ca5cba2 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
@@ -22,6 +22,8 @@ import java.util.concurrent.TimeUnit._
 import scala.collection.mutable
 
 import org.apache.spark.TaskContext
+import org.apache.spark.internal.LogKeys.CONFIG
+import org.apache.spark.internal.MDC
 import org.apache.spark.memory.SparkOutOfMemoryError
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
@@ -410,8 +412,8 @@ case class HashAggregateExec(
   private def enableTwoLevelHashMap(): Unit = {
     if (!checkIfFastHashMapSupported()) {
       if (!Utils.isTesting) {
-        logInfo(s"${SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key} is set to true, but"
-          + " current version of codegened fast hashmap does not support this 
aggregate.")
+        logInfo(log"${MDC(CONFIG, SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key)} is set 
to true, but" +
+          log" current version of codegened fast hashmap does not support this 
aggregate.")
       }
     } else {
       isFastHashMapEnabled = true
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectAggregationIterator.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectAggregationIterator.scala
index 57b8fd8570f2..a4a6dc8e4ab0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectAggregationIterator.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectAggregationIterator.scala
@@ -18,7 +18,8 @@
 package org.apache.spark.sql.execution.aggregate
 
 import org.apache.spark.{SparkEnv, SparkException, TaskContext}
-import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.internal.{config, Logging, MDC}
+import org.apache.spark.internal.LogKeys.{CONFIG, HASH_MAP_SIZE, 
OBJECT_AGG_SORT_BASED_FALLBACK_THRESHOLD}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate._
@@ -174,10 +175,12 @@ class ObjectAggregationIterator(
         // The hash map gets too large, makes a sorted spill and clear the map.
         if (hashMap.size >= fallbackCountThreshold && inputRows.hasNext) {
           logInfo(
-            s"Aggregation hash map size ${hashMap.size} reaches threshold " +
-              s"capacity ($fallbackCountThreshold entries), spilling and 
falling back to sort" +
-              " based aggregation. You may change the threshold by adjust 
option " +
-              SQLConf.OBJECT_AGG_SORT_BASED_FALLBACK_THRESHOLD.key
+            log"Aggregation hash map size ${MDC(HASH_MAP_SIZE, hashMap.size)} 
reaches threshold " +
+              log"capacity " +
+              log"(${MDC(OBJECT_AGG_SORT_BASED_FALLBACK_THRESHOLD, 
fallbackCountThreshold)}" +
+              log" entries), spilling and falling back to sort based 
aggregation. You may change " +
+              log"the threshold by adjust option " +
+              log"${MDC(CONFIG, 
SQLConf.OBJECT_AGG_SORT_BASED_FALLBACK_THRESHOLD.key)}"
           )
 
           // Falls back to sort-based aggregation
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
index 2cc09a61e224..d7c5df151bf1 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
@@ -25,7 +25,7 @@ import scala.util.control.NonFatal
 import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
 
 import org.apache.spark.internal.{Logging, MDC}
-import org.apache.spark.internal.LogKeys.{DATABASE_NAME, ERROR, TABLE_NAME}
+import org.apache.spark.internal.LogKeys.{COUNT, DATABASE_NAME, ERROR, 
TABLE_NAME, TIME}
 import org.apache.spark.sql.{Column, SparkSession}
 import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis.{ResolvedIdentifier, 
UnresolvedAttribute}
@@ -92,11 +92,12 @@ object CommandUtils extends Logging {
     } else {
       // Calculate table size as a sum of the visible partitions. See 
SPARK-21079
       val partitions = 
sessionState.catalog.listPartitions(catalogTable.identifier)
-      logInfo(s"Starting to calculate sizes for ${partitions.length} 
partitions.")
+      logInfo(log"Starting to calculate sizes for ${MDC(COUNT, 
partitions.length)} " +
+        log"partitions.")
       calculatePartitionStats(spark, catalogTable, partitions, 
partitionRowCount)
     }
-    logInfo(s"It took ${(System.nanoTime() - startTime) / (1000 * 1000)} ms to 
calculate" +
-      s" the total size for table ${catalogTable.identifier}.")
+    logInfo(log"It took ${MDC(TIME, (System.nanoTime() - startTime) / (1000 * 
1000))} ms to " +
+      log"calculate the total size for table ${MDC(TABLE_NAME, 
catalogTable.identifier)}.")
     (totalSize, newPartitions)
   }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index 9f5c240818fd..539d8346a5ca 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -233,7 +233,7 @@ case class CreateDataSourceTableAsSelectCommand(
     } catch {
       case ex: AnalysisException =>
         logError(log"Failed to write to table " +
-          log"${MDC(IDENTIFIER, table.identifier.unquotedString)}", ex)
+          log"${MDC(TABLE_NAME, table.identifier.unquotedString)}", ex)
         throw ex
     }
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 6f36802bef59..cdeb4716e126 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -28,8 +28,7 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs._
 import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
 
-import org.apache.spark.internal.{Logging, MDC}
-import org.apache.spark.internal.LogKeys._
+import org.apache.spark.internal.{Logging, LogKeys, MDC}
 import org.apache.spark.internal.config.RDD_PARALLEL_LISTING_THRESHOLD
 import org.apache.spark.sql.{Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
@@ -697,7 +696,7 @@ case class RepairTableCommand(
     }
 
     val root = new Path(table.location)
-    logInfo(s"Recover all the partitions in $root")
+    logInfo(log"Recover all the partitions in ${MDC(LogKeys.PATH, root)}")
     val hadoopConf = spark.sessionState.newHadoopConf()
     val fs = root.getFileSystem(hadoopConf)
 
@@ -717,14 +716,16 @@ case class RepairTableCommand(
           evalPool.shutdown()
         }
       val total = partitionSpecsAndLocs.length
-      logInfo(s"Found $total partitions in $root")
+      logInfo(log"Found ${MDC(LogKeys.NUM_PARTITIONS, total)} partitions " +
+        log"in ${MDC(LogKeys.PATH, root)}")
 
       val partitionStats = if (spark.sessionState.conf.gatherFastStats) {
         gatherPartitionStats(spark, partitionSpecsAndLocs, fs, pathFilter, 
threshold)
       } else {
         Map.empty[Path, PartitionStatistics]
       }
-      logInfo(s"Finished to gather the fast stats for all $total partitions.")
+      logInfo(log"Finished to gather the fast stats for all " +
+        log"${MDC(LogKeys.NUM_PARTITIONS, total)} partitions.")
 
       addPartitions(spark, table, partitionSpecsAndLocs, partitionStats)
       total
@@ -737,12 +738,14 @@ case class RepairTableCommand(
       spark.catalog.refreshTable(tableIdentWithDB)
     } catch {
       case NonFatal(e) =>
-        logError(log"Cannot refresh the table '${MDC(IDENTIFIER, 
tableIdentWithDB)}'. " +
+        logError(log"Cannot refresh the table '${MDC(LogKeys.TABLE_NAME, 
tableIdentWithDB)}'. " +
           log"A query of the table might return wrong result if the table was 
cached. " +
           log"To avoid such issue, you should uncache the table manually via 
the UNCACHE TABLE " +
           log"command after table recovering will complete fully.", e)
     }
-    logInfo(s"Recovered all partitions: added ($addedAmount), dropped 
($droppedAmount).")
+    logInfo(log"Recovered all partitions: " +
+      log"added (${MDC(LogKeys.NUM_ADDED_PARTITIONS, addedAmount)}), " +
+      log"dropped (${MDC(LogKeys.NUM_DROPPED_PARTITIONS, droppedAmount)}).")
     Seq.empty[Row]
   }
 
@@ -783,13 +786,13 @@ case class RepairTableCommand(
           scanPartitions(spark, fs, filter, st.getPath, spec ++ 
Map(partitionNames.head -> value),
             partitionNames.drop(1), threshold, resolver, evalTaskSupport)
         } else {
-          logWarning(
-            log"expected partition column ${MDC(EXPECTED_PARTITION_COLUMN, 
partitionNames.head)}," +
-              log" but got ${MDC(ACTUAL_PARTITION_COLUMN, ps(0))}, ignoring 
it")
+          logWarning(log"expected partition column " +
+            log"${MDC(LogKeys.EXPECTED_PARTITION_COLUMN, 
partitionNames.head)}," +
+            log" but got ${MDC(LogKeys.ACTUAL_PARTITION_COLUMN, ps(0))}, 
ignoring it")
           Seq.empty
         }
       } else {
-        logWarning(log"ignore ${MDC(PATH, new Path(path, name))}")
+        logWarning(log"ignore ${MDC(LogKeys.PATH, new Path(path, name))}")
         Seq.empty
       }
     }
@@ -813,7 +816,8 @@ case class RepairTableCommand(
         Math.min(spark.sparkContext.defaultParallelism, 10000))
       // gather the fast stats for all the partitions otherwise Hive metastore 
will list all the
       // files for all the new partitions in sequential way, which is super 
slow.
-      logInfo(s"Gather the fast stats in parallel using $numParallelism 
tasks.")
+      logInfo(log"Gather the fast stats in parallel using ${MDC(LogKeys.COUNT, 
numParallelism)} " +
+        log"tasks.")
       spark.sparkContext.parallelize(locations, numParallelism)
         .mapPartitions { locationsEachPartition =>
           val pathFilter = getPathFilter(serializableConfiguration.value)
@@ -1030,7 +1034,7 @@ object DDLUtils extends Logging {
       DataSource.lookupDataSource(provider, 
SQLConf.get).getConstructor().newInstance()
     } catch {
       case e: Throwable =>
-        logError(log"Failed to find data source: ${MDC(DATA_SOURCE_PROVIDER, 
provider)} " +
+        logError(log"Failed to find data source: ${MDC(LogKeys.DATA_SOURCE, 
provider)} " +
           log"when check data column names.", e)
         return
     }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala
index 2f097f78443b..1858a8421359 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala
@@ -26,7 +26,7 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 
 import org.apache.spark.{SparkContext, TaskContext}
-import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.{Logging, LogKeys, MDC}
 import org.apache.spark.internal.LogKeys.{ACTUAL_NUM_FILES, EXPECTED_NUM_FILES}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.execution.SQLExecution
@@ -120,9 +120,9 @@ class BasicWriteTaskStatsTracker(
     } catch {
       case e: NumberFormatException =>
         // warn but don't dump the whole stack
-        logInfo(s"Failed to parse" +
-          s" ${BasicWriteJobStatsTracker.FILE_LENGTH_XATTR}:$e;" +
-          s" bytes written may be under-reported");
+        logInfo(log"Failed to parse " +
+          log"${MDC(LogKeys.FILE_LENGTH_XATTR, 
BasicWriteJobStatsTracker.FILE_LENGTH_XATTR)}:" +
+          log"${MDC(LogKeys.ERROR, e)}; bytes written may be under-reported");
       case e: UnsupportedOperationException =>
         // this is not unusual; ignore
         logDebug(s"XAttr not supported on path $path", e);
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 4b337798df37..d88b5ee8877d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -28,7 +28,7 @@ import org.apache.hadoop.fs.Path
 import org.apache.spark.SparkException
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.{Logging, MDC}
-import org.apache.spark.internal.LogKeys.{CLASS_NAME, DATA_SOURCE_PROVIDER, 
DATA_SOURCES, PATHS}
+import org.apache.spark.internal.LogKeys.{CLASS_NAME, DATA_SOURCE, 
DATA_SOURCES, PATHS}
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
 import org.apache.spark.sql.catalyst.catalog.{BucketSpec, 
CatalogStorageFormat, CatalogTable, CatalogUtils}
@@ -696,7 +696,7 @@ object DataSource extends Logging {
             throw QueryCompilationErrors
               .foundMultipleXMLDataSourceError(provider1, sourceNames, 
externalSource.getName)
           } else if (internalSources.size == 1) {
-            logWarning(log"Multiple sources found for 
${MDC(DATA_SOURCE_PROVIDER, provider1)} " +
+            logWarning(log"Multiple sources found for ${MDC(DATA_SOURCE, 
provider1)} " +
               log"(${MDC(DATA_SOURCES, sourceNames.mkString(", "))}), 
defaulting to the " +
               log"internal datasource (${MDC(CLASS_NAME, 
internalSources.head.getClass.getName)}).")
             internalSources.head.getClass
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 845d969df088..22b60caf2669 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -24,7 +24,8 @@ import scala.collection.mutable
 
 import org.apache.hadoop.fs.Path
 
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKeys.PREDICATES
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, 
QualifiedTableName, SQLConfHelper}
@@ -494,7 +495,7 @@ object DataSourceStrategy
       val partitionSet = AttributeSet(partitionColumns)
       val predicates = ExpressionSet(normalizedFilters
         .flatMap(extractPredicatesWithinOutputSet(_, partitionSet)))
-      logInfo(s"Pruning directories with: ${predicates.mkString(",")}")
+      logInfo(log"Pruning directories with: ${MDC(PREDICATES, 
predicates.mkString(","))}")
       predicates
     }
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala
index 1dbb6ce26f69..7d071124b0b3 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala
@@ -22,7 +22,8 @@ import org.apache.hadoop.fs.{FileAlreadyExistsException, Path}
 import org.apache.hadoop.mapreduce.TaskAttemptContext
 
 import org.apache.spark.TaskOutputFileAlreadyExistException
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKeys.{CONFIG, NUM_CONCURRENT_WRITER}
 import org.apache.spark.internal.io.{FileCommitProtocol, FileNameSpec}
 import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
 import org.apache.spark.shuffle.FetchFailedException
@@ -558,9 +559,11 @@ class DynamicPartitionDataConcurrentWriter(
         new WriterStatus(currentWriter, recordsInFile, fileCounter))
       if (concurrentWriters.size >= concurrentOutputWriterSpec.maxWriters && 
!sorted) {
         // Fall back to sort-based sequential writer mode.
-        logInfo(s"Number of concurrent writers ${concurrentWriters.size} 
reaches the threshold. " +
-          "Fall back from concurrent writers to sort-based sequential writer. 
You may change " +
-          s"threshold with configuration 
${SQLConf.MAX_CONCURRENT_OUTPUT_FILE_WRITERS.key}")
+        logInfo(log"Number of concurrent writers " +
+          log"${MDC(NUM_CONCURRENT_WRITER, concurrentWriters.size)} reaches 
the threshold. " +
+          log"Fall back from concurrent writers to sort-based sequential 
writer. You may change " +
+          log"threshold with configuration " +
+          log"${MDC(CONFIG, SQLConf.MAX_CONCURRENT_OUTPUT_FILE_WRITERS.key)}")
         sorted = true
       }
     }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
index d842e23cdf40..91749ddd794f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
@@ -26,7 +26,7 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
 
 import org.apache.spark._
-import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.{Logging, LogKeys, MDC}
 import org.apache.spark.internal.LogKeys._
 import org.apache.spark.internal.io.{FileCommitProtocol, 
SparkHadoopWriterUtils}
 import org.apache.spark.sql.SparkSession
@@ -270,14 +270,15 @@ object FileFormatWriter extends Logging {
       val ret = f
       val commitMsgs = ret.map(_.commitMsg)
 
-      logInfo(s"Start to commit write Job ${description.uuid}.")
+      logInfo(log"Start to commit write Job ${MDC(LogKeys.UUID, 
description.uuid)}.")
       val (_, duration) = Utils
         .timeTakenMs { committer.commitJob(job, 
commitMsgs.toImmutableArraySeq) }
-      logInfo(s"Write Job ${description.uuid} committed. Elapsed time: 
$duration ms.")
+      logInfo(log"Write Job ${MDC(LogKeys.UUID, description.uuid)} committed. 
" +
+        log"Elapsed time: ${MDC(LogKeys.ELAPSED_TIME, duration)} ms.")
 
       processStats(
         description.statsTrackers, 
ret.map(_.summary.stats).toImmutableArraySeq, duration)
-      logInfo(s"Finished processing stats for write job ${description.uuid}.")
+      logInfo(log"Finished processing stats for write job ${MDC(LogKeys.UUID, 
description.uuid)}.")
 
       // return a set of all the partition paths that were updated during this 
job
       ret.map(_.summary.updatedPartitions).reduceOption(_ ++ 
_).getOrElse(Set.empty)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FilePartition.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FilePartition.scala
index 10cae0e4966c..8a47a28de845 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FilePartition.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FilePartition.scala
@@ -22,7 +22,7 @@ import scala.math.BigDecimal.RoundingMode
 
 import org.apache.spark.Partition
 import org.apache.spark.internal.{Logging, MDC}
-import org.apache.spark.internal.LogKeys.{CONFIG, DESIRED_PARTITIONS_SIZE, 
MAX_PARTITIONS_SIZE, PARTITIONS_SIZE}
+import org.apache.spark.internal.LogKeys.{CONFIG, DESIRED_NUM_PARTITIONS, 
MAX_NUM_PARTITIONS, NUM_PARTITIONS}
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.connector.read.InputPartition
 import org.apache.spark.sql.execution.ScanFileListing
@@ -99,10 +99,10 @@ object FilePartition extends Logging {
       val desiredSplitBytes =
         (totalSizeInBytes / BigDecimal(maxPartNum.get)).setScale(0, 
RoundingMode.UP).longValue
       val desiredPartitions = getFilePartitions(partitionedFiles, 
desiredSplitBytes, openCostBytes)
-      logWarning(log"The number of partitions is ${MDC(PARTITIONS_SIZE, 
partitions.size)}, " +
+      logWarning(log"The number of partitions is ${MDC(NUM_PARTITIONS, 
partitions.size)}, " +
         log"which exceeds the maximum number configured: " +
-        log"${MDC(MAX_PARTITIONS_SIZE, maxPartNum.get)}. Spark rescales it to 
" +
-        log"${MDC(DESIRED_PARTITIONS_SIZE, desiredPartitions.size)} by 
ignoring the " +
+        log"${MDC(MAX_NUM_PARTITIONS, maxPartNum.get)}. Spark rescales it to " 
+
+        log"${MDC(DESIRED_NUM_PARTITIONS, desiredPartitions.size)} by ignoring 
the " +
         log"configuration of ${MDC(CONFIG, 
SQLConf.FILES_MAX_PARTITION_BYTES.key)}.")
       desiredPartitions
     } else {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
index 7c1c4fe2f137..9bcdbadf7c5c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
@@ -24,7 +24,7 @@ import org.apache.hadoop.fs.Path
 
 import org.apache.spark.{Partition => RDDPartition, TaskContext}
 import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.internal.LogKeys.PATH
+import org.apache.spark.internal.LogKeys.{CURRENT_FILE, PATH}
 import org.apache.spark.internal.MDC
 import org.apache.spark.paths.SparkPath
 import org.apache.spark.rdd.{InputFileBlockHolder, RDD}
@@ -233,7 +233,7 @@ class FileScanRDD(
         if (files.hasNext) {
           currentFile = files.next()
           updateMetadataRow()
-          logInfo(s"Reading File $currentFile")
+          logInfo(log"Reading File ${MDC(CURRENT_FILE, currentFile)}")
           // Sets InputFileBlockHolder for the file block's information
           InputFileBlockHolder
             .set(currentFile.urlEncodedPath, currentFile.start, 
currentFile.length)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index f2dcbe26104f..8333c276cdd8 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -21,7 +21,8 @@ import java.util.Locale
 
 import scala.collection.mutable
 
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKeys.{NUM_PRUNED, POST_SCAN_FILTERS, 
PUSHED_FILTERS, TOTAL}
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.catalog.BucketSpec
 import org.apache.spark.sql.catalyst.expressions
@@ -137,9 +138,8 @@ object FileSourceStrategy extends Strategy with 
PredicateHelper with Logging {
 
     val numBucketsSelected = matchedBuckets.cardinality()
 
-    logInfo {
-      s"Pruned ${numBuckets - numBucketsSelected} out of $numBuckets buckets."
-    }
+    logInfo(log"Pruned ${MDC(NUM_PRUNED, numBuckets - numBucketsSelected)} " +
+      log"out of ${MDC(TOTAL, numBuckets)} buckets.")
 
     // None means all the buckets need to be scanned
     if (numBucketsSelected == numBuckets) {
@@ -206,11 +206,11 @@ object FileSourceStrategy extends Strategy with 
PredicateHelper with Logging {
         DataSourceUtils.supportNestedPredicatePushdown(fsRelation)
       val pushedFilters = dataFilters
         .flatMap(DataSourceStrategy.translateFilter(_, 
supportNestedPredicatePushdown))
-      logInfo(s"Pushed Filters: ${pushedFilters.mkString(",")}")
+      logInfo(log"Pushed Filters: ${MDC(PUSHED_FILTERS, 
pushedFilters.mkString(","))}")
 
       // Predicates with both partition keys and attributes need to be 
evaluated after the scan.
       val afterScanFilters = filterSet -- 
partitionKeyFilters.filter(_.references.nonEmpty)
-      logInfo(s"Post-Scan Filters: ${afterScanFilters.mkString(",")}")
+      logInfo(log"Post-Scan Filters: ${MDC(POST_SCAN_FILTERS, 
afterScanFilters.mkString(","))}")
 
       val filterAttributes = AttributeSet(afterScanFilters ++ stayUpFilters)
       val requiredExpressions: Seq[NamedExpression] = filterAttributes.toSeq 
++ projects
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
index 0f66aa816d96..3b8a20c7cf74 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
@@ -24,7 +24,8 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs._
 import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
 
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKeys.{COUNT, ELAPSED_TIME}
 import org.apache.spark.metrics.source.HiveCatalogMetrics
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.FileSourceOptions
@@ -136,8 +137,8 @@ class InMemoryFileIndex(
       fileStatusCache.putLeafFiles(path, leafFiles.toArray)
       output ++= leafFiles
     }
-    logInfo(s"It took ${(System.nanoTime() - startTime) / (1000 * 1000)} ms to 
list leaf files" +
-      s" for ${paths.length} paths.")
+    logInfo(log"It took ${MDC(ELAPSED_TIME, (System.nanoTime() - startTime) / 
(1000 * 1000))} ms" +
+      log" to list leaf files for ${MDC(COUNT, paths.length)} paths.")
     output
   }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
index 3efe614bcef9..cc9f0d23bcb6 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
@@ -24,7 +24,8 @@ import scala.collection.mutable
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs._
 
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKeys.{COUNT, PERCENT, TOTAL}
 import org.apache.spark.paths.SparkPath
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.{expressions, InternalRow}
@@ -190,8 +191,8 @@ abstract class PartitioningAwareFileIndex(
         val total = partitions.length
         val selectedSize = selected.length
         val percentPruned = (1 - selectedSize.toDouble / total.toDouble) * 100
-        s"Selected $selectedSize partitions out of $total, " +
-          s"pruned ${if (total == 0) "0" else s"$percentPruned%"} partitions."
+        log"Selected ${MDC(COUNT, selectedSize)} partitions out of 
${MDC(TOTAL, total)}, " +
+          log"pruned ${MDC(PERCENT, if (total == 0) "0" else percentPruned)} 
partitions."
       }
 
       selected
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala
index 144be2316f09..03e988eb0bd2 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala
@@ -21,7 +21,8 @@ import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapreduce.{OutputCommitter, TaskAttemptContext}
 import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
 
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKeys.CLASS_NAME
 import org.apache.spark.internal.io.HadoopMapReduceCommitProtocol
 import org.apache.spark.sql.internal.SQLConf
 
@@ -44,7 +45,8 @@ class SQLHadoopMapReduceCommitProtocol(
       configuration.getClass(SQLConf.OUTPUT_COMMITTER_CLASS.key, null, 
classOf[OutputCommitter])
 
     if (clazz != null) {
-      logInfo(s"Using user defined output committer class 
${clazz.getCanonicalName}")
+      logInfo(log"Using user defined output committer class " +
+        log"${MDC(CLASS_NAME, clazz.getCanonicalName)}")
 
       // Every output format based on 
org.apache.hadoop.mapreduce.lib.output.OutputFormat
       // has an associated output committer. To override this output committer,
@@ -64,7 +66,8 @@ class SQLHadoopMapReduceCommitProtocol(
         committer = ctor.newInstance()
       }
     }
-    logInfo(s"Using output committer class 
${committer.getClass.getCanonicalName}")
+    logInfo(log"Using output committer class " +
+      log"${MDC(CLASS_NAME, committer.getClass.getCanonicalName)}")
     committer
   }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
index 8c430e231e39..1b71dc9221f7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
@@ -23,7 +23,8 @@ import scala.util.Using
 import scala.util.control.NonFatal
 
 import org.apache.spark.{InterruptibleIterator, Partition, SparkContext, 
TaskContext}
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKeys.SQL_TEXT
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.connector.expressions.filter.Predicate
@@ -266,7 +267,7 @@ class JDBCRDD(
     options.sessionInitStatement match {
       case Some(sql) =>
         val statement = conn.prepareStatement(sql)
-        logInfo(s"Executing sessionInitStatement: $sql")
+        logInfo(log"Executing sessionInitStatement: ${MDC(SQL_TEXT, sql)}")
         try {
           statement.setQueryTimeout(options.queryTimeout)
           statement.execute()
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
index 599cc369d51c..2c4158dfe153 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
@@ -22,7 +22,7 @@ import scala.math.BigDecimal.RoundingMode
 
 import org.apache.spark.Partition
 import org.apache.spark.internal.{Logging, MDC}
-import org.apache.spark.internal.LogKeys.{LOWER_BOUND, NEW_VALUE, OLD_VALUE, 
UPPER_BOUND}
+import org.apache.spark.internal.LogKeys.{CLAUSES, LOWER_BOUND, NEW_VALUE, 
NUM_PARTITIONS, OLD_VALUE, UPPER_BOUND}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession, 
SQLContext}
 import org.apache.spark.sql.catalyst.analysis._
@@ -164,8 +164,9 @@ private[sql] object JDBCRelation extends Logging {
       i = i + 1
     }
     val partitions = ans.toArray
-    logInfo(s"Number of partitions: $numPartitions, WHERE clauses of these 
partitions: " +
-      partitions.map(_.asInstanceOf[JDBCPartition].whereClause).mkString(", "))
+    val clauses = 
partitions.map(_.asInstanceOf[JDBCPartition].whereClause).mkString(", ")
+    logInfo(log"Number of partitions: ${MDC(NUM_PARTITIONS, numPartitions)}, " 
+
+      log"WHERE clauses of these partitions: ${MDC(CLAUSES, clauses)}")
     partitions
   }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
index 0549d99acc68..3e111252bc6f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
@@ -434,10 +434,11 @@ object ParquetUtils extends Logging {
         classOf[OutputCommitter])
 
     if (conf.get(SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key) == null) {
-      logInfo("Using default output committer for Parquet: " +
-        classOf[ParquetOutputCommitter].getCanonicalName)
+      logInfo(log"Using default output committer for Parquet: " +
+        log"${MDC(CLASS_NAME, 
classOf[ParquetOutputCommitter].getCanonicalName)}")
     } else {
-      logInfo("Using user defined output committer for Parquet: " + 
committerClass.getCanonicalName)
+      logInfo(log"Using user defined output committer for Parquet: " +
+        log"${MDC(CLASS_NAME, committerClass.getCanonicalName)}")
     }
 
     conf.setClass(
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileBatchWrite.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileBatchWrite.scala
index 2f443a0bb1fa..b9f058b55ed0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileBatchWrite.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileBatchWrite.scala
@@ -18,7 +18,7 @@ package org.apache.spark.sql.execution.datasources.v2
 
 import org.apache.hadoop.mapreduce.Job
 
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, LogKeys, MDC}
 import org.apache.spark.internal.io.FileCommitProtocol
 import org.apache.spark.sql.connector.write.{BatchWrite, DataWriterFactory, 
PhysicalWriteInfo, WriterCommitMessage}
 import org.apache.spark.sql.execution.datasources.{WriteJobDescription, 
WriteTaskResult}
@@ -33,14 +33,15 @@ class FileBatchWrite(
   extends BatchWrite with Logging {
   override def commit(messages: Array[WriterCommitMessage]): Unit = {
     val results = messages.map(_.asInstanceOf[WriteTaskResult])
-    logInfo(s"Start to commit write Job ${description.uuid}.")
+    logInfo(log"Start to commit write Job ${MDC(LogKeys.UUID, 
description.uuid)}.")
     val (_, duration) = Utils
       .timeTakenMs { committer.commitJob(job, 
results.map(_.commitMsg).toImmutableArraySeq) }
-    logInfo(s"Write Job ${description.uuid} committed. Elapsed time: $duration 
ms.")
+    logInfo(log"Write Job ${MDC(LogKeys.UUID, description.uuid)} committed. " +
+      log"Elapsed time: ${MDC(LogKeys.ELAPSED_TIME, duration)} ms.")
 
     processStats(
       description.statsTrackers, 
results.map(_.summary.stats).toImmutableArraySeq, duration)
-    logInfo(s"Finished processing stats for write job ${description.uuid}.")
+    logInfo(log"Finished processing stats for write job ${MDC(LogKeys.UUID, 
description.uuid)}.")
   }
 
   override def useCommitCoordinator(): Boolean = false
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala
index b71b077f845e..2679f1414456 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.v2
 import java.io.{FileNotFoundException, IOException}
 
 import org.apache.spark.internal.{Logging, MDC}
-import org.apache.spark.internal.LogKeys.PARTITIONED_FILE_READER
+import org.apache.spark.internal.LogKeys.{CURRENT_FILE, 
PARTITIONED_FILE_READER}
 import org.apache.spark.rdd.InputFileBlockHolder
 import org.apache.spark.sql.catalyst.FileSourceOptions
 import org.apache.spark.sql.connector.read.PartitionReader
@@ -39,7 +39,7 @@ class FilePartitionReader[T](
     if (currentReader == null) {
       if (files.hasNext) {
         val file = files.next()
-        logInfo(s"Reading file $file")
+        logInfo(log"Reading file ${MDC(CURRENT_FILE, file)}")
         // Sets InputFileBlockHolder for the file block's information
         InputFileBlockHolder.set(file.urlEncodedPath, file.start, file.length)
         try {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/GroupBasedRowLevelOperationScanPlanning.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/GroupBasedRowLevelOperationScanPlanning.scala
index 87f70eb696b6..8b8cdc06d398 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/GroupBasedRowLevelOperationScanPlanning.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/GroupBasedRowLevelOperationScanPlanning.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
+import org.apache.spark.internal.{LogKeys, MDC}
 import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, 
AttributeSet, Expression, ExpressionSet, PredicateHelper, SubqueryExpression}
 import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
 import org.apache.spark.sql.catalyst.planning.{GroupBasedRowLevelOperation, 
PhysicalOperation}
@@ -62,14 +63,16 @@ object GroupBasedRowLevelOperationScanPlanning extends 
Rule[LogicalPlan] with Pr
 
       val (scan, output) = PushDownUtils.pruneColumns(scanBuilder, relation, 
relation.output, Nil)
 
+      // scalastyle:off line.size.limit
       logInfo(
-        s"""
-           |Pushing operators to ${relation.name}
-           |Pushed filters: $pushedFiltersStr
-           |Filters evaluated on data source side: 
${evaluatedFilters.mkString(", ")}
-           |Filters evaluated on Spark side: ${postScanFilters.mkString(", ")}
-           |Output: ${output.mkString(", ")}
-         """.stripMargin)
+        log"""
+            |Pushing operators to ${MDC(LogKeys.RELATION_NAME, relation.name)}
+            |Pushed filters: ${MDC(LogKeys.PUSHED_FILTERS, pushedFiltersStr)}
+            |Filters evaluated on data source side: 
${MDC(LogKeys.EVALUATED_FILTERS, evaluatedFilters.mkString(", "))}
+            |Filters evaluated on Spark side: ${MDC(LogKeys.POST_SCAN_FILTERS, 
postScanFilters.mkString(", "))}}
+            |Output: ${MDC(LogKeys.RELATION_OUTPUT, output.mkString(", "))}
+           """.stripMargin)
+      // scalastyle:on line.size.limit
 
       rd transformDown {
         // simplify the join condition in MERGE operations by discarding 
already evaluated filters
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
index 8c262cf56e8b..2b6fcd9d547f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.datasources.v2
 
 import scala.collection.mutable
 
+import org.apache.spark.internal.LogKeys.{AGGREGATE_FUNCTIONS, GROUP_BY_EXPRS, 
POST_SCAN_FILTERS, PUSHED_FILTERS, RELATION_NAME, RELATION_OUTPUT}
+import org.apache.spark.internal.MDC
 import org.apache.spark.sql.catalyst.expressions.{aggregate, Alias, And, 
Attribute, AttributeMap, AttributeReference, AttributeSet, Cast, Expression, 
IntegerLiteral, Literal, NamedExpression, PredicateHelper, 
ProjectionOverSchema, SortOrder, SubqueryExpression}
 import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
 import org.apache.spark.sql.catalyst.optimizer.CollapseProject
@@ -86,11 +88,11 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] 
with PredicateHelper {
       val postScanFilters = postScanFiltersWithoutSubquery ++ 
normalizedFiltersWithSubquery
 
       logInfo(
-        s"""
-           |Pushing operators to ${sHolder.relation.name}
-           |Pushed Filters: $pushedFiltersStr
-           |Post-Scan Filters: ${postScanFilters.mkString(",")}
-         """.stripMargin)
+        log"""
+            |Pushing operators to ${MDC(RELATION_NAME, sHolder.relation.name)}
+            |Pushed Filters: ${MDC(PUSHED_FILTERS, pushedFiltersStr)}
+            |Post-Scan Filters: ${MDC(POST_SCAN_FILTERS, 
postScanFilters.mkString(","))}
+           """.stripMargin)
 
       val filterCondition = postScanFilters.reduceLeftOption(And)
       filterCondition.map(Filter(_, sHolder)).getOrElse(sHolder)
@@ -214,13 +216,13 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] 
with PredicateHelper {
       holder.pushedAggOutputMap = AttributeMap(groupOutputMap ++ aggOutputMap)
       holder.output = newOutput
       logInfo(
-        s"""
-           |Pushing operators to ${holder.relation.name}
-           |Pushed Aggregate Functions:
-           | ${translatedAgg.aggregateExpressions().mkString(", ")}
-           |Pushed Group by:
-           | ${translatedAgg.groupByExpressions.mkString(", ")}
-         """.stripMargin)
+        log"""
+            |Pushing operators to ${MDC(RELATION_NAME, holder.relation.name)}
+            |Pushed Aggregate Functions:
+            | ${MDC(AGGREGATE_FUNCTIONS, 
translatedAgg.aggregateExpressions().mkString(", "))}
+            |Pushed Group by:
+            | ${MDC(GROUP_BY_EXPRS, 
translatedAgg.groupByExpressions.mkString(", "))}
+           """.stripMargin)
 
       if (canCompletePushDown) {
         val projectExpressions = finalResultExprs.map { expr =>
@@ -361,9 +363,9 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] 
with PredicateHelper {
         sHolder.builder, sHolder.relation, normalizedProjects, 
normalizedFilters)
 
       logInfo(
-        s"""
-           |Output: ${output.mkString(", ")}
-         """.stripMargin)
+        log"""
+            |Output: ${MDC(RELATION_OUTPUT, output.mkString(", "))}
+           """.stripMargin)
 
       val wrappedScan = getWrappedScan(scan, sHolder)
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
index 07ea9ce2bcd6..5632595de7cf 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
@@ -20,8 +20,7 @@ package org.apache.spark.sql.execution.datasources.v2
 import scala.jdk.CollectionConverters._
 
 import org.apache.spark.{SparkEnv, SparkException, TaskContext}
-import org.apache.spark.internal.{Logging, MDC}
-import org.apache.spark.internal.LogKeys._
+import org.apache.spark.internal.{Logging, LogKeys, MDC}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
@@ -377,8 +376,9 @@ trait V2TableWriteExec extends V2CommandExec with 
UnaryExecNode {
     val messages = new Array[WriterCommitMessage](rdd.partitions.length)
     val totalNumRowsAccumulator = new LongAccumulator()
 
-    logInfo(s"Start processing data source write support: $batchWrite. " +
-      s"The input RDD has ${messages.length} partitions.")
+    logInfo(log"Start processing data source write support: " +
+      log"${MDC(LogKeys.BATCH_WRITE, batchWrite)}. The input RDD has " +
+      log"${MDC(LogKeys.COUNT, messages.length)}} partitions.")
 
     // Avoid object not serializable issue.
     val writeMetrics: Map[String, SQLMetric] = customMetrics
@@ -397,23 +397,24 @@ trait V2TableWriteExec extends V2CommandExec with 
UnaryExecNode {
         }
       )
 
-      logInfo(s"Data source write support $batchWrite is committing.")
+      logInfo(log"Data source write support ${MDC(LogKeys.BATCH_WRITE, 
batchWrite)} is committing.")
       batchWrite.commit(messages)
-      logInfo(s"Data source write support $batchWrite committed.")
+      logInfo(log"Data source write support ${MDC(LogKeys.BATCH_WRITE, 
batchWrite)} committed.")
       commitProgress = 
Some(StreamWriterCommitProgress(totalNumRowsAccumulator.value))
     } catch {
       case cause: Throwable =>
-        logError(log"Data source write support ${MDC(BATCH_WRITE, batchWrite)} 
is aborting.")
+        logError(
+          log"Data source write support ${MDC(LogKeys.BATCH_WRITE, 
batchWrite)} is aborting.")
         try {
           batchWrite.abort(messages)
         } catch {
           case t: Throwable =>
-            logError(log"Data source write support ${MDC(BATCH_WRITE, 
batchWrite)} " +
+            logError(log"Data source write support ${MDC(LogKeys.BATCH_WRITE, 
batchWrite)} " +
               log"failed to abort.")
             cause.addSuppressed(t)
             throw QueryExecutionErrors.writingJobFailedError(cause)
         }
-        logError(log"Data source write support ${MDC(BATCH_WRITE, batchWrite)} 
aborted.")
+        logError(log"Data source write support ${MDC(LogKeys.BATCH_WRITE, 
batchWrite)} aborted.")
         throw cause
     }
 
@@ -451,36 +452,45 @@ trait WritingSparkTask[W <: DataWriter[InternalRow]] 
extends Logging with Serial
         val coordinator = SparkEnv.get.outputCommitCoordinator
         val commitAuthorized = coordinator.canCommit(stageId, stageAttempt, 
partId, attemptId)
         if (commitAuthorized) {
-          logInfo(s"Commit authorized for partition $partId (task $taskId, 
attempt $attemptId, " +
-            s"stage $stageId.$stageAttempt)")
+          logInfo(log"Commit authorized for partition 
${MDC(LogKeys.PARTITION_ID, partId)} " +
+            log"(task ${MDC(LogKeys.TASK_ID, taskId)}, " +
+            log"attempt ${MDC(LogKeys.TASK_ATTEMPT_ID, attemptId)}, " +
+            log"stage ${MDC(LogKeys.STAGE_ID, stageId)}." +
+            log"${MDC(LogKeys.STAGE_ATTEMPT, stageAttempt)})")
+
           dataWriter.commit()
         } else {
           val commitDeniedException = QueryExecutionErrors.commitDeniedError(
             partId, taskId, attemptId, stageId, stageAttempt)
-          logInfo(commitDeniedException.getMessage)
+          logInfo(log"${MDC(LogKeys.ERROR, commitDeniedException.getMessage)}")
           // throwing CommitDeniedException will trigger the catch block for 
abort
           throw commitDeniedException
         }
 
       } else {
-        logInfo(s"Writer for partition ${context.partitionId()} is 
committing.")
+        logInfo(log"Writer for partition ${MDC(LogKeys.PARTITION_ID, 
context.partitionId())} " +
+          log"is committing.")
         dataWriter.commit()
       }
 
-      logInfo(s"Committed partition $partId (task $taskId, attempt $attemptId, 
" +
-        s"stage $stageId.$stageAttempt)")
+      logInfo(log"Committed partition ${MDC(LogKeys.PARTITION_ID, partId)} " +
+        log"(task ${MDC(LogKeys.TASK_ID, taskId)}, " +
+        log"attempt ${MDC(LogKeys.TASK_ATTEMPT_ID, attemptId)}, " +
+        log"stage ${MDC(LogKeys.STAGE_ID, 
stageId)}.${MDC(LogKeys.STAGE_ATTEMPT, stageAttempt)})")
 
       DataWritingSparkTaskResult(iterWithMetrics.count, msg)
 
     })(catchBlock = {
       // If there is an error, abort this writer
-      logError(log"Aborting commit for partition ${MDC(PARTITION_ID, partId)} 
" +
-        log"(task ${MDC(TASK_ID, taskId)}, attempt ${MDC(TASK_ATTEMPT_ID, 
attemptId)}, " +
-        log"stage ${MDC(STAGE_ID, stageId)}.${MDC(STAGE_ATTEMPT, 
stageAttempt)})")
+      logError(log"Aborting commit for partition ${MDC(LogKeys.PARTITION_ID, 
partId)} " +
+        log"(task ${MDC(LogKeys.TASK_ID, taskId)}, " +
+        log"attempt ${MDC(LogKeys.TASK_ATTEMPT_ID, attemptId)}, " +
+        log"stage ${MDC(LogKeys.STAGE_ID, 
stageId)}.${MDC(LogKeys.STAGE_ATTEMPT, stageAttempt)})")
       dataWriter.abort()
-      logError(log"Aborted commit for partition ${MDC(PARTITION_ID, partId)} " 
+
-        log"(task ${MDC(TASK_ID, taskId)}, attempt ${MDC(TASK_ATTEMPT_ID, 
attemptId)}, " +
-        log"stage ${MDC(STAGE_ID, stageId)}.${MDC(STAGE_ATTEMPT, 
stageAttempt)})")
+      logError(log"Aborted commit for partition ${MDC(LogKeys.PARTITION_ID, 
partId)} " +
+        log"(task ${MDC(LogKeys.TASK_ID, taskId)}, " +
+        log"attempt ${MDC(LogKeys.TASK_ATTEMPT_ID, attemptId)}, " +
+        log"stage ${MDC(LogKeys.STAGE_ID, 
stageId)}.${MDC(LogKeys.STAGE_ATTEMPT, stageAttempt)})")
     }, finallyBlock = {
       dataWriter.close()
     })
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonStreamingSinkCommitRunner.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonStreamingSinkCommitRunner.scala
index a444fdfff7d9..4178aa6b6058 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonStreamingSinkCommitRunner.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonStreamingSinkCommitRunner.scala
@@ -23,7 +23,8 @@ import scala.jdk.CollectionConverters._
 
 import org.apache.spark.SparkEnv
 import org.apache.spark.api.python.{PythonFunction, PythonWorker, 
PythonWorkerFactory, PythonWorkerUtils, SpecialLengths}
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKeys.PYTHON_EXEC
 import org.apache.spark.internal.config.BUFFER_SIZE
 import org.apache.spark.internal.config.Python.PYTHON_AUTH_SOCKET_TIMEOUT
 import org.apache.spark.sql.connector.write.WriterCommitMessage
@@ -60,7 +61,7 @@ class PythonStreamingSinkCommitRunner(
    * Initializes the Python worker for running the streaming sink committer.
    */
   def init(): Unit = {
-    logInfo(s"Initializing Python runner pythonExec: $pythonExec")
+    logInfo(log"Initializing Python runner pythonExec: ${MDC(PYTHON_EXEC, 
pythonExec)}")
     val env = SparkEnv.get
 
     val localdir = env.blockManager.diskBlockManager.localDirs.map(f => 
f.getPath()).mkString(",")
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
index a0f74ef6c3d0..67d879bdd8bf 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.exchange
 import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 
+import org.apache.spark.internal.{LogKeys, MDC}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans._
@@ -407,11 +408,13 @@ case class EnsureRequirements(
         val leftPartValues = leftSpec.partitioning.partitionValues
         val rightPartValues = rightSpec.partitioning.partitionValues
 
+        val numLeftPartValues = MDC(LogKeys.NUM_LEFT_PARTITION_VALUES, 
leftPartValues.size)
+        val numRightPartValues = MDC(LogKeys.NUM_RIGHT_PARTITION_VALUES, 
rightPartValues.size)
         logInfo(
-          s"""
-             |Left side # of partitions: ${leftPartValues.size}
-             |Right side # of partitions: ${rightPartValues.size}
-             |""".stripMargin)
+          log"""
+              |Left side # of partitions: $numLeftPartValues
+              |Right side # of partitions: $numRightPartValues
+              |""".stripMargin)
 
         // As partition keys are compatible, we can pick either left or right 
as partition
         // expressions
@@ -421,7 +424,8 @@ case class EnsureRequirements(
             .mergePartitions(leftSpec.partitioning, rightSpec.partitioning, 
partitionExprs)
             .map(v => (v, 1))
 
-        logInfo(s"After merging, there are ${mergedPartValues.size} 
partitions")
+        logInfo(log"After merging, there are " +
+          log"${MDC(LogKeys.NUM_PARTITIONS, mergedPartValues.size)} 
partitions")
 
         var replicateLeftSide = false
         var replicateRightSide = false
@@ -445,8 +449,8 @@ case class EnsureRequirements(
           val canReplicateRight = canReplicateRightSide(joinType)
 
           if (!canReplicateLeft && !canReplicateRight) {
-            logInfo("Skipping partially clustered distribution as it cannot be 
applied for " +
-                s"join type '$joinType'")
+            logInfo(log"Skipping partially clustered distribution as it cannot 
be applied for " +
+              log"join type '${MDC(LogKeys.JOIN_TYPE, joinType)}'")
           } else {
             val leftLink = left.logicalLink
             val rightLink = right.logicalLink
@@ -455,12 +459,16 @@ case class EnsureRequirements(
               leftLink.isDefined && rightLink.isDefined &&
                   leftLink.get.stats.sizeInBytes > 1 &&
                   rightLink.get.stats.sizeInBytes > 1) {
+              val leftLinkStatsSizeInBytes = 
MDC(LogKeys.LEFT_LOGICAL_PLAN_STATS_SIZE_IN_BYTES,
+                leftLink.get.stats.sizeInBytes)
+              val rightLinkStatsSizeInBytes = 
MDC(LogKeys.RIGHT_LOGICAL_PLAN_STATS_SIZE_IN_BYTES,
+                rightLink.get.stats.sizeInBytes)
               logInfo(
-                s"""
+                log"""
                    |Using plan statistics to determine which side of join to 
fully
                    |cluster partition values:
-                   |Left side size (in bytes): 
${leftLink.get.stats.sizeInBytes}
-                   |Right side size (in bytes): 
${rightLink.get.stats.sizeInBytes}
+                   |Left side size (in bytes): $leftLinkStatsSizeInBytes
+                   |Right side size (in bytes): $rightLinkStatsSizeInBytes
                    |""".stripMargin)
               leftLink.get.stats.sizeInBytes < rightLink.get.stats.sizeInBytes
             } else {
@@ -477,12 +485,14 @@ case class EnsureRequirements(
             // of partitions can be applied. For instance, replication should 
not be allowed for
             // the left-hand side of a right outer join.
             if (replicateLeftSide && !canReplicateLeft) {
-              logInfo("Left-hand side is picked but cannot be applied to join 
type " +
-                  s"'$joinType'. Skipping partially clustered distribution.")
+              logInfo(log"Left-hand side is picked but cannot be applied to 
join type " +
+                log"'${MDC(LogKeys.JOIN_TYPE, joinType)}'. Skipping partially 
clustered " +
+                log"distribution.")
               replicateLeftSide = false
             } else if (replicateRightSide && !canReplicateRight) {
-              logInfo("Right-hand side is picked but cannot be applied to join 
type " +
-                  s"'$joinType'. Skipping partially clustered distribution.")
+              logInfo(log"Right-hand side is picked but cannot be applied to 
join type " +
+                log"'${MDC(LogKeys.JOIN_TYPE, joinType)}'. Skipping partially 
clustered " +
+                log"distribution.")
               replicateRightSide = false
             } else {
               // In partially clustered distribution, we should use un-grouped 
partition values
@@ -499,8 +509,8 @@ case class EnsureRequirements(
                   InternalRowComparableWrapper(partVal, partitionExprs), 
numParts))
               }
 
-              logInfo("After applying partially clustered distribution, there 
are " +
-                  s"${mergedPartValues.map(_._2).sum} partitions.")
+              logInfo(log"After applying partially clustered distribution, 
there are " +
+                log"${MDC(LogKeys.NUM_PARTITIONS, 
mergedPartValues.map(_._2).sum)} partitions.")
               applyPartialClustering = true
             }
           }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala
index dd9c5a25e8a7..2ef046f668b6 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala
@@ -25,7 +25,8 @@ import scala.jdk.CollectionConverters._
 
 import org.apache.spark.SparkEnv
 import org.apache.spark.api.python.{PythonFunction, PythonWorker, 
PythonWorkerFactory, PythonWorkerUtils, SpecialLengths}
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKeys.PYTHON_EXEC
 import org.apache.spark.internal.config.BUFFER_SIZE
 import org.apache.spark.internal.config.Python.PYTHON_AUTH_SOCKET_TIMEOUT
 import org.apache.spark.sql.errors.{QueryCompilationErrors, 
QueryExecutionErrors}
@@ -71,7 +72,7 @@ class PythonStreamingSourceRunner(
    * Initializes the Python worker for running the streaming source.
    */
   def init(): Unit = {
-    logInfo(s"Initializing Python runner pythonExec: $pythonExec")
+    logInfo(log"Initializing Python runner pythonExec: ${MDC(PYTHON_EXEC, 
pythonExec)}")
     val env = SparkEnv.get
 
     val localdir = env.blockManager.diskBlockManager.localDirs.map(f => 
f.getPath()).mkString(",")
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/r/ArrowRRunner.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/r/ArrowRRunner.scala
index 819fd1bd297f..45ecf8700950 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/r/ArrowRRunner.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/r/ArrowRRunner.scala
@@ -30,6 +30,7 @@ import org.apache.spark.TaskContext
 import org.apache.spark.api.r._
 import org.apache.spark.api.r.SpecialLengths
 import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.internal.{LogKeys, MDC}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.execution.arrow.ArrowWriter
 import org.apache.spark.sql.types.StructType
@@ -138,6 +139,10 @@ class ArrowRRunner(
 
       private var batchLoaded = true
 
+      private def format(v: Double): String = {
+        "%.3f".format(v)
+      }
+
       protected override def read(): ColumnarBatch = try {
         if (reader != null && batchLoaded) {
           batchLoaded = reader.loadNextBatch()
@@ -161,17 +166,14 @@ class ArrowRRunner(
               val input = dataStream.readDouble
               val compute = dataStream.readDouble
               val output = dataStream.readDouble
-              logInfo(
-                ("Times: boot = %.3f s, init = %.3f s, broadcast = %.3f s, " +
-                  "read-input = %.3f s, compute = %.3f s, write-output = %.3f 
s, " +
-                  "total = %.3f s").format(
-                  boot,
-                  init,
-                  broadcast,
-                  input,
-                  compute,
-                  output,
-                  boot + init + broadcast + input + compute + output))
+              logInfo(log"Times: boot = ${MDC(LogKeys.BOOT, format(boot))} s, 
" +
+                log"init = ${MDC(LogKeys.INIT, format(init))} s, " +
+                log"broadcast = ${MDC(LogKeys.BROADCAST, format(broadcast))} 
s, " +
+                log"read-input = ${MDC(LogKeys.INPUT, format(input))} s, " +
+                log"compute = ${MDC(LogKeys.COMPUTE, format(compute))} s, " +
+                log"write-output = ${MDC(LogKeys.OUTPUT, format(output))} s, " 
+
+                log"total = ${MDC(LogKeys.TOTAL,
+                  format(boot + init + broadcast + input + compute + output))} 
s")
               read()
             case length if length > 0 =>
               // Likewise, there looks no way to send each batch in streaming 
format via socket
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala
index 6b9a13ed3eb0..42ce32e1bc67 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala
@@ -49,7 +49,7 @@ case class WriteToContinuousDataSourceExec(write: 
StreamingWrite, query: SparkPl
     val rdd = new ContinuousWriteRDD(queryRdd, writerFactory, metrics)
 
     logInfo(log"Start processing data source write support: 
${MDC(STREAMING_WRITE, write)}. " +
-      log"The input RDD has ${MDC(NUM_PARTITION, rdd.partitions.length)} 
partitions.")
+      log"The input RDD has ${MDC(NUM_PARTITIONS, rdd.partitions.length)} 
partitions.")
     EpochCoordinatorRef.get(
       
sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY),
       sparkContext.env)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
index 853a79d2b6d2..64a9eaad8805 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
@@ -446,13 +446,17 @@ private[sql] class HDFSBackedStateStoreProvider extends 
StateStoreProvider with
     }
 
     if (earliestLoadedVersion.isDefined) {
-      logInfo(s"Trying to add version=$newVersion to state cache map with " +
-        s"current_size=$loadedEntries and 
earliest_loaded_version=${earliestLoadedVersion.get} " +
-        s"and 
max_versions_to_retain_in_memory=$numberOfVersionsToRetainInMemory")
+      logInfo(log"Trying to add version=${MDC(LogKeys.STATE_STORE_VERSION, 
newVersion)} to state " +
+        log"cache map with current_size=${MDC(LogKeys.NUM_LOADED_ENTRIES, 
loadedEntries)} and " +
+        log"earliest_loaded_version=" +
+        log"${MDC(LogKeys.EARLIEST_LOADED_VERSION, 
earliestLoadedVersion.get)}} " +
+        log"and max_versions_to_retain_in_memory=" +
+        log"${MDC(LogKeys.NUM_VERSIONS_RETAIN, 
numberOfVersionsToRetainInMemory)}")
     } else {
-      logInfo(s"Trying to add version=$newVersion to state cache map with " +
-        s"current_size=$loadedEntries and " +
-        s"max_versions_to_retain_in_memory=$numberOfVersionsToRetainInMemory")
+      logInfo(log"Trying to add version=${MDC(LogKeys.STATE_STORE_VERSION, 
newVersion)} to state " +
+        log"cache map with current_size=${MDC(LogKeys.NUM_LOADED_ENTRIES, 
loadedEntries)} and " +
+        log"max_versions_to_retain_in_memory=" +
+        log"${MDC(LogKeys.NUM_VERSIONS_RETAIN, 
numberOfVersionsToRetainInMemory)}")
     }
 
     if (numberOfVersionsToRetainInMemory <= 0) {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
index cb93ff25da0a..88fd72a3cb7a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
@@ -30,7 +30,7 @@ import org.apache.hadoop.fs.{FsUrlStreamHandlerFactory, Path}
 
 import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.internal.{Logging, MDC}
-import org.apache.spark.internal.LogKeys.{CONFIG, CONFIG2}
+import org.apache.spark.internal.LogKeys.{CONFIG, CONFIG2, PATH, VALUE}
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.execution.CacheManager
@@ -270,8 +270,10 @@ object SharedState extends Logging {
     if (hiveWarehouseDir != null && sparkWarehouseOption.isEmpty) {
       // If hive.metastore.warehouse.dir is set and spark.sql.warehouse.dir is 
not set,
       // we will respect the value of hive.metastore.warehouse.dir.
-      logInfo(s"${WAREHOUSE_PATH.key} is not set, but 
$HIVE_WAREHOUSE_CONF_NAME is set. " +
-        s"Setting ${WAREHOUSE_PATH.key} to the value of 
$HIVE_WAREHOUSE_CONF_NAME.")
+      logInfo(log"${MDC(CONFIG, WAREHOUSE_PATH.key)} is not set, but " +
+        log"${MDC(CONFIG2, HIVE_WAREHOUSE_CONF_NAME)} is set. " +
+        log"Setting ${MDC(CONFIG, WAREHOUSE_PATH.key)} to " +
+        log"the value of ${MDC(CONFIG2, HIVE_WAREHOUSE_CONF_NAME)}.")
       hiveWarehouseDir
     } else {
       // If spark.sql.warehouse.dir is set, we will override 
hive.metastore.warehouse.dir using
@@ -279,8 +281,9 @@ object SharedState extends Logging {
       // When neither spark.sql.warehouse.dir nor hive.metastore.warehouse.dir 
is set
       // we will set hive.metastore.warehouse.dir to the default value of 
spark.sql.warehouse.dir.
       val sparkWarehouseDir = 
sparkWarehouseOption.getOrElse(WAREHOUSE_PATH.defaultValueString)
-      logInfo(s"Setting $HIVE_WAREHOUSE_CONF_NAME ('$hiveWarehouseDir') to the 
value of " +
-        s"${WAREHOUSE_PATH.key}.")
+      logInfo(log"Setting ${MDC(CONFIG, HIVE_WAREHOUSE_CONF_NAME)} " +
+        log"('${MDC(VALUE, hiveWarehouseDir)}') to the value of " +
+        log"${MDC(CONFIG2, WAREHOUSE_PATH.key)}.")
       sparkWarehouseDir
     }
   }
@@ -288,7 +291,7 @@ object SharedState extends Logging {
   def qualifyWarehousePath(hadoopConf: Configuration, warehousePath: String): 
String = {
     val tempPath = new Path(warehousePath)
     val qualified = 
tempPath.getFileSystem(hadoopConf).makeQualified(tempPath).toString
-    logInfo(s"Warehouse path is '$qualified'.")
+    logInfo(log"Warehouse path is '${MDC(PATH, qualified)}'.")
     qualified
   }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
index 12397468f8ed..55d2e639a56b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
@@ -26,7 +26,7 @@ import scala.jdk.CollectionConverters._
 
 import org.apache.spark.annotation.Evolving
 import org.apache.spark.internal.{Logging, MDC}
-import org.apache.spark.internal.LogKeys.{QUERY_ID, RUN_ID}
+import org.apache.spark.internal.LogKeys.{CLASS_NAME, QUERY_ID, RUN_ID}
 import org.apache.spark.sql.{DataFrame, SparkSession}
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
 import org.apache.spark.sql.catalyst.streaming.{WriteToStream, 
WriteToStreamStatement}
@@ -78,7 +78,7 @@ class StreamingQueryManager private[sql] (
         Utils.loadExtensions(classOf[StreamingQueryListener], classNames,
           sparkSession.sparkContext.conf).foreach { listener =>
           addListener(listener)
-          logInfo(s"Registered listener ${listener.getClass.getName}")
+          logInfo(log"Registered listener ${MDC(CLASS_NAME, 
listener.getClass.getName)}")
         }
       }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to