This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d540786d9cea [SPARK-47597][STREAMING] Streaming: Migrate logInfo with 
variables to structured logging framework
d540786d9cea is described below

commit d540786d9ceacd7426803ad615f7ab32ec6faf67
Author: Daniel Tenedorio <daniel.tenedo...@databricks.com>
AuthorDate: Thu Apr 25 13:23:21 2024 -0700

    [SPARK-47597][STREAMING] Streaming: Migrate logInfo with variables to 
structured logging framework
    
    ### What changes were proposed in this pull request?
    
    Migrate logInfo with variables of the streaming module to structured 
logging framework. This transforms the logInfo entries of the following API
    ```
    def logInfo(msg: => String): Unit
    ```
    to
    ```
    def logInfo(entry: LogEntry): Unit
    ```
    
    ### Why are the changes needed?
    
    To enhance Apache Spark's logging system by implementing structured logging.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, Spark core logs will contain additional MDC
    
    ### How was this patch tested?
    
    Compiler and scala style checks, as well as code review.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #46192 from dtenedor/streaming-log-info.
    
    Authored-by: Daniel Tenedorio <daniel.tenedo...@databricks.com>
    Signed-off-by: Gengliang Wang <gengli...@apache.org>
---
 .../scala/org/apache/spark/internal/LogKey.scala   | 47 ++++++++++++-
 .../execution/adaptive/AdaptiveSparkPlanExec.scala |  2 +-
 .../AsyncProgressTrackingMicroBatchExecution.scala |  5 +-
 .../streaming/CheckpointFileManager.scala          |  8 ++-
 .../streaming/CompactibleFileStreamLog.scala       | 11 ++--
 .../sql/execution/streaming/FileStreamSink.scala   |  4 +-
 .../execution/streaming/FileStreamSinkLog.scala    |  4 +-
 .../sql/execution/streaming/FileStreamSource.scala | 13 ++--
 .../sql/execution/streaming/HDFSMetadataLog.scala  |  7 +-
 .../execution/streaming/IncrementalExecution.scala |  4 +-
 .../streaming/ManifestFileCommitProtocol.scala     |  4 +-
 .../execution/streaming/MetadataLogFileIndex.scala |  4 +-
 .../execution/streaming/MicroBatchExecution.scala  | 34 ++++++----
 .../sql/execution/streaming/ProgressReporter.scala |  8 +--
 .../execution/streaming/ResolveWriteToStream.scala |  5 +-
 .../sql/execution/streaming/StreamExecution.scala  |  7 +-
 .../sql/execution/streaming/WatermarkTracker.scala |  7 +-
 .../streaming/continuous/ContinuousExecution.scala | 13 ++--
 .../continuous/ContinuousQueuedDataReader.scala    |  6 +-
 .../streaming/continuous/ContinuousWriteRDD.scala  | 10 +--
 .../WriteToContinuousDataSourceExec.scala          |  7 +-
 .../sources/RateStreamMicroBatchStream.scala       |  5 +-
 .../state/HDFSBackedStateStoreProvider.scala       | 34 ++++++----
 .../sql/execution/streaming/state/RocksDB.scala    | 54 ++++++++-------
 .../streaming/state/RocksDBFileManager.scala       | 77 +++++++++++++---------
 .../streaming/state/RocksDBMemoryManager.scala     |  7 +-
 .../state/RocksDBStateStoreProvider.scala          | 12 ++--
 .../sql/execution/streaming/state/StateStore.scala | 23 ++++---
 .../streaming/state/StateStoreChangelog.scala      |  8 ++-
 .../state/StreamingSessionWindowStateManager.scala |  5 +-
 .../state/SymmetricHashJoinStateManager.scala      |  6 +-
 31 files changed, 286 insertions(+), 155 deletions(-)

diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala 
b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
index fab5e80dd0e6..6df7cb5a5867 100644
--- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
+++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
@@ -34,6 +34,7 @@ object LogKey extends Enumeration {
   val ARGS = Value
   val BACKUP_FILE = Value
   val BATCH_ID = Value
+  val BATCH_TIMESTAMP = Value
   val BATCH_WRITE = Value
   val BLOCK_ID = Value
   val BLOCK_MANAGER_ID = Value
@@ -48,8 +49,12 @@ object LogKey extends Enumeration {
   val CATALOG_NAME = Value
   val CATEGORICAL_FEATURES = Value
   val CHECKPOINT_FILE = Value
+  val CHECKPOINT_LOCATION = Value
+  val CHECKPOINT_PATH = Value
+  val CHECKPOINT_ROOT = Value
   val CHECKPOINT_TIME = Value
   val CHECKSUM_FILE_NUM = Value
+  val CHOSEN_WATERMARK = Value
   val CLASS_LOADER = Value
   val CLASS_NAME = Value
   val CLUSTER_CENTROIDS = Value
@@ -66,6 +71,8 @@ object LogKey extends Enumeration {
   val COLUMN_NAME = Value
   val COMMAND = Value
   val COMMAND_OUTPUT = Value
+  val COMMITTED_VERSION = Value
+  val COMPACT_INTERVAL = Value
   val COMPONENT = Value
   val CONFIG = Value
   val CONFIG2 = Value
@@ -86,6 +93,7 @@ object LogKey extends Enumeration {
   val CSV_SCHEMA_FIELD_NAME = Value
   val CSV_SCHEMA_FIELD_NAMES = Value
   val CSV_SOURCE = Value
+  val CURRENT_BATCH_ID = Value
   val CURRENT_PATH = Value
   val DATA = Value
   val DATABASE_NAME = Value
@@ -95,6 +103,7 @@ object LogKey extends Enumeration {
   val DATA_SOURCE = Value
   val DATA_SOURCES = Value
   val DATA_SOURCE_PROVIDER = Value
+  val DEFAULT_COMPACT_INTERVAL = Value
   val DEFAULT_ISOLATION_LEVEL = Value
   val DEFAULT_VALUE = Value
   val DELAY = Value
@@ -102,6 +111,7 @@ object LogKey extends Enumeration {
   val DELTA = Value
   val DESCRIPTION = Value
   val DESIRED_PARTITIONS_SIZE = Value
+  val DFS_FILE = Value
   val DIFF_DELTA = Value
   val DIVISIBLE_CLUSTER_INDICES_SIZE = Value
   val DRIVER_ID = Value
@@ -112,12 +122,13 @@ object LogKey extends Enumeration {
   val ENCODING = Value
   val END_INDEX = Value
   val END_POINT = Value
+  val END_VERSION = Value
   val ENGINE = Value
+  val EPOCH = Value
   val ERROR = Value
   val ESTIMATOR_PARAMETER_MAP = Value
   val EVENT_LOOP = Value
   val EVENT_QUEUE = Value
-  val EXCEPTION = Value
   val EXECUTE_INFO = Value
   val EXECUTE_KEY = Value
   val EXECUTION_PLAN_LEAVES = Value
@@ -131,6 +142,7 @@ object LogKey extends Enumeration {
   val EXECUTOR_RESOURCES = Value
   val EXECUTOR_STATE = Value
   val EXECUTOR_TARGET_COUNT = Value
+  val EXISTING_FILE = Value
   val EXIT_CODE = Value
   val EXPECTED_NUM_FILES = Value
   val EXPECTED_PARTITION_COLUMN = Value
@@ -144,15 +156,20 @@ object LogKey extends Enumeration {
   val FEATURE_DIMENSION = Value
   val FIELD_NAME = Value
   val FILE_ABSOLUTE_PATH = Value
+  val FILE_END_OFFSET = Value
   val FILE_FORMAT = Value
   val FILE_FORMAT2 = Value
+  val FILE_MODIFICATION_TIME = Value
   val FILE_NAME = Value
+  val FILE_START_OFFSET = Value
   val FILE_VERSION = Value
+  val FINAL_PATH = Value
   val FINISH_TRIGGER_DURATION = Value
   val FROM_OFFSET = Value
   val FROM_TIME = Value
   val FUNCTION_NAME = Value
   val FUNCTION_PARAMETER = Value
+  val GLOBAL_WATERMARK = Value
   val GROUP_ID = Value
   val HADOOP_VERSION = Value
   val HASH_JOIN_KEYS = Value
@@ -194,10 +211,13 @@ object LogKey extends Enumeration {
   val LINE = Value
   val LINE_NUM = Value
   val LISTENER = Value
+  val LOADED_VERSION = Value
   val LOAD_FACTOR = Value
   val LOAD_TIME = Value
   val LOGICAL_PLAN_COLUMNS = Value
   val LOGICAL_PLAN_LEAVES = Value
+  val LOG_ID = Value
+  val LOG_OFFSET = Value
   val LOG_TYPE = Value
   val LOWER_BOUND = Value
   val MALFORMATTED_STIRNG = Value
@@ -216,10 +236,15 @@ object LogKey extends Enumeration {
   val MEMORY_SIZE = Value
   val MERGE_DIR_NAME = Value
   val MESSAGE = Value
+  val METADATA_DIRECTORY = Value
+  val METADATA_JSON = Value
   val METHOD_NAME = Value
+  val METRICS_JSON = Value
+  val MIN_COMPACTION_BATCH_ID = Value
   val MIN_FREQUENT_PATTERN_COUNT = Value
   val MIN_POINT_PER_CLUSTER = Value
   val MIN_SIZE = Value
+  val MIN_VERSION_NUMBER = Value
   val MODEL_WEIGHTS = Value
   val NAMESPACE = Value
   val NEW_FEATURE_COLUMN_NAME = Value
@@ -230,11 +255,15 @@ object LogKey extends Enumeration {
   val NODE_LOCATION = Value
   val NORM = Value
   val NUM_BIN = Value
+  val NUM_BYTES = Value
   val NUM_CLASSES = Value
   val NUM_COLUMNS = Value
   val NUM_EXAMPLES = Value
   val NUM_FEATURES = Value
   val NUM_FILES = Value
+  val NUM_FILES_COPIED = Value
+  val NUM_FILES_FAILED_TO_DELETE = Value
+  val NUM_FILES_REUSED = Value
   val NUM_FREQUENT_ITEMS = Value
   val NUM_ITERATIONS = Value
   val NUM_LOCAL_FREQUENT_PATTERN = Value
@@ -245,6 +274,7 @@ object LogKey extends Enumeration {
   val OBJECT_ID = Value
   val OFFSET = Value
   val OFFSETS = Value
+  val OFFSET_SEQUENCE_METADATA = Value
   val OLD_BLOCK_MANAGER_ID = Value
   val OLD_VALUE = Value
   val OPTIMIZED_PLAN_COLUMNS = Value
@@ -272,6 +302,7 @@ object LogKey extends Enumeration {
   val POD_TARGET_COUNT = Value
   val POLICY = Value
   val PORT = Value
+  val PRETTY_ID_STRING = Value
   val PRINCIPAL = Value
   val PROCESSING_TIME = Value
   val PRODUCER_ID = Value
@@ -309,6 +340,8 @@ object LogKey extends Enumeration {
   val RETRY_INTERVAL = Value
   val RIGHT_EXPR = Value
   val RMSE = Value
+  val ROCKS_DB_LOG_LEVEL = Value
+  val ROCKS_DB_LOG_MESSAGE = Value
   val RPC_ENDPOINT_REF = Value
   val RULE_BATCH_NAME = Value
   val RULE_NAME = Value
@@ -329,6 +362,7 @@ object LogKey extends Enumeration {
   val SLEEP_TIME = Value
   val SLIDE_DURATION = Value
   val SMALLEST_CLUSTER_INDEX = Value
+  val SNAPSHOT_VERSION = Value
   val SPARK_DATA_STREAM = Value
   val SPARK_PLAN_ID = Value
   val SQL_TEXT = Value
@@ -337,13 +371,23 @@ object LogKey extends Enumeration {
   val STAGE_ID = Value
   val START_INDEX = Value
   val STATEMENT_ID = Value
+  val STATE_STORE_ID = Value
   val STATE_STORE_PROVIDER = Value
+  val STATE_STORE_VERSION = Value
   val STATUS = Value
   val STDERR = Value
   val STORAGE_LEVEL = Value
   val STORAGE_LEVEL_DESERIALIZED = Value
   val STORAGE_LEVEL_REPLICATION = Value
   val STORE_ID = Value
+  val STREAMING_DATA_SOURCE_DESCRIPTION = Value
+  val STREAMING_DATA_SOURCE_NAME = Value
+  val STREAMING_OFFSETS_END = Value
+  val STREAMING_OFFSETS_START = Value
+  val STREAMING_QUERY_PROGRESS = Value
+  val STREAMING_SOURCE = Value
+  val STREAMING_TABLE = Value
+  val STREAMING_WRITE = Value
   val STREAM_ID = Value
   val STREAM_NAME = Value
   val SUBMISSION_ID = Value
@@ -398,6 +442,7 @@ object LogKey extends Enumeration {
   val USER_ID = Value
   val USER_NAME = Value
   val VALUE = Value
+  val VERSION_NUMBER = Value
   val VIRTUAL_CORES = Value
   val VOCAB_SIZE = Value
   val WAIT_RESULT_TIME = Value
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
index 12ca0652f1b7..e5bfb01d51f5 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
@@ -745,7 +745,7 @@ case class AdaptiveSparkPlanExec(
       Some((finalPlan, optimized))
     } catch {
       case e: InvalidAQEPlanException[_] =>
-        logOnLevel(log"Re-optimize - ${MDC(EXCEPTION, e.getMessage())}:\n" +
+        logOnLevel(log"Re-optimize - ${MDC(ERROR, e.getMessage())}:\n" +
           log"${MDC(QUERY_PLAN, e.plan)}")
         None
     }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala
index ec24ec0fd335..75c7f9dbe861 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala
@@ -20,6 +20,8 @@ package org.apache.spark.sql.execution.streaming
 import java.util.concurrent._
 import java.util.concurrent.atomic.AtomicLong
 
+import org.apache.spark.internal.LogKey.PRETTY_ID_STRING
+import org.apache.spark.internal.MDC
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.streaming.WriteToStream
 import org.apache.spark.sql.errors.QueryExecutionErrors
@@ -221,7 +223,8 @@ class AsyncProgressTrackingMicroBatchExecution(
     super.cleanup()
 
     ThreadUtils.shutdown(asyncWritesExecutorService)
-    logInfo(s"Async progress tracking executor pool for query 
${prettyIdString} has been shutdown")
+    logInfo(log"Async progress tracking executor pool for query " +
+      log"${MDC(PRETTY_ID_STRING, prettyIdString)} has been shutdown")
   }
 
   // used for testing
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
index 1c530fcabe0a..e9307ec971ac 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
@@ -27,7 +27,7 @@ import org.apache.hadoop.fs.local.{LocalFs, RawLocalFs}
 import org.apache.hadoop.fs.permission.FsPermission
 
 import org.apache.spark.internal.{Logging, MDC}
-import org.apache.spark.internal.LogKey.{PATH, TEMP_PATH}
+import org.apache.spark.internal.LogKey._
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import 
org.apache.spark.sql.execution.streaming.CheckpointFileManager.RenameHelperMethods
 import org.apache.spark.sql.internal.SQLConf
@@ -144,7 +144,8 @@ object CheckpointFileManager extends Logging {
       this(fm, path, generateTempPath(path), overwrite)
     }
 
-    logInfo(s"Writing atomically to $finalPath using temp file $tempPath")
+    logInfo(log"Writing atomically to ${MDC(FINAL_PATH, finalPath)} using temp 
file " +
+      log"${MDC(TEMP_PATH, tempPath)}")
     @volatile private var terminated = false
 
     override def close(): Unit = synchronized {
@@ -166,7 +167,8 @@ object CheckpointFileManager extends Logging {
             s"But $finalPath does not exist.")
         }
 
-        logInfo(s"Renamed temp file $tempPath to $finalPath")
+        logInfo(log"Renamed temp file ${MDC(TEMP_PATH, tempPath)} to " +
+          log"${MDC(FINAL_PATH, finalPath)}")
       } finally {
         terminated = true
       }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
index 884de070bf32..908a9c662669 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
@@ -27,7 +27,7 @@ import org.apache.hadoop.fs.Path
 import org.json4s.{Formats, NoTypeHints}
 import org.json4s.jackson.Serialization
 
-import org.apache.spark.internal.LogKey.{BATCH_ID, ELAPSED_TIME}
+import org.apache.spark.internal.LogKey._
 import org.apache.spark.internal.MDC
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.errors.QueryExecutionErrors
@@ -105,8 +105,8 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : 
ClassTag](
         defaultCompactInterval, compactibleBatchIds(0).toInt)
     }
     assert(interval > 0, s"intervalValue = $interval not positive value.")
-    logInfo(s"Set the compact interval to $interval " +
-      s"[defaultCompactInterval: $defaultCompactInterval]")
+    logInfo(log"Set the compact interval to ${MDC(COMPACT_INTERVAL, interval)} 
" +
+      log"[defaultCompactInterval: ${MDC(DEFAULT_COMPACT_INTERVAL, 
defaultCompactInterval)}]")
     interval
   }
 
@@ -309,8 +309,9 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : 
ClassTag](
       assert(isCompactionBatch(minCompactionBatchId, compactInterval),
         s"$minCompactionBatchId is not a compaction batch")
 
-      logInfo(s"Current compact batch id = $currentBatchId " +
-        s"min compaction batch id to delete = $minCompactionBatchId")
+      logInfo(log"Current compact batch id = ${MDC(CURRENT_BATCH_ID, 
currentBatchId)} " +
+        log"min compaction batch id to delete = " +
+        log"${MDC(MIN_COMPACTION_BATCH_ID, minCompactionBatchId)}")
 
       val expiredTime = System.currentTimeMillis() - fileCleanupDelayMs
       fileManager.list(metadataPath, (path: Path) => {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
index 18895e525156..806caebeaf4b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
@@ -24,7 +24,7 @@ import org.apache.hadoop.fs.{FileSystem, Path}
 
 import org.apache.spark.SparkException
 import org.apache.spark.internal.{Logging, MDC}
-import org.apache.spark.internal.LogKey.{ERROR, PATH}
+import org.apache.spark.internal.LogKey._
 import org.apache.spark.internal.io.FileCommitProtocol
 import org.apache.spark.sql.{DataFrame, SparkSession}
 import org.apache.spark.sql.catalyst.expressions._
@@ -146,7 +146,7 @@ class FileStreamSink(
 
   override def addBatch(batchId: Long, data: DataFrame): Unit = {
     if (batchId <= fileLog.getLatestBatchId().getOrElse(-1L)) {
-      logInfo(s"Skipping already committed batch $batchId")
+      logInfo(log"Skipping already committed batch ${MDC(BATCH_ID, batchId)}")
     } else {
       val committer = FileCommitProtocol.instantiate(
         className = 
sparkSession.sessionState.conf.streamingFileCommitProtocolClass,
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
index d8aa31be4797..5059bdecca66 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.streaming
 
 import org.apache.hadoop.fs.FileStatus
 
+import org.apache.spark.internal.LogKey._
+import org.apache.spark.internal.MDC
 import org.apache.spark.paths.SparkPath
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.internal.SQLConf
@@ -101,7 +103,7 @@ class FileStreamSinkLog(
 
   val retentionMs: Long = _retentionMs match {
     case Some(retention) =>
-      logInfo(s"Retention is set to $retention ms")
+      logInfo(log"Retention is set to ${MDC(TIME_UNITS, retention)} ms")
       retention
 
     case _ => Long.MaxValue
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index 6aa8e94db94f..b8151a07398a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -28,7 +28,7 @@ import org.apache.hadoop.fs.{FileStatus, FileSystem, 
GlobFilter, Path}
 
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.{Logging, MDC}
-import org.apache.spark.internal.LogKey.{CURRENT_PATH, ELAPSED_TIME, NEW_PATH, 
NUM_FILES}
+import org.apache.spark.internal.LogKey._
 import org.apache.spark.paths.SparkPath
 import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
@@ -126,8 +126,9 @@ class FileStreamSource(
   }
   seenFiles.purge()
 
-  logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, " +
-    s"maxBytesPerBatch = $maxBytesPerBatch, maxFileAgeMs = $maxFileAgeMs")
+  logInfo(log"maxFilesPerBatch = ${MDC(NUM_FILES, maxFilesPerBatch)}, " +
+    log"maxBytesPerBatch = ${MDC(NUM_BYTES, maxBytesPerBatch)}, " +
+    log"maxFileAgeMs = ${MDC(TIME_UNITS, maxFileAgeMs)}")
 
   private var unreadFiles: Seq[NewFileEntry] = _
 
@@ -251,7 +252,8 @@ class FileStreamSource(
         FileEntry(path = p.urlEncoded, timestamp = timestamp, batchId = 
metadataLogCurrentOffset)
       }.toArray
       if (metadataLog.add(metadataLogCurrentOffset, fileEntries)) {
-        logInfo(s"Log offset set to $metadataLogCurrentOffset with 
${batchFiles.size} new files")
+        logInfo(log"Log offset set to ${MDC(LOG_OFFSET, 
metadataLogCurrentOffset)} " +
+          log"with ${MDC(NUM_FILES, batchFiles.size)} new files")
       } else {
         throw new IllegalStateException("Concurrent update to the log. 
Multiple streaming jobs " +
           s"detected for $metadataLogCurrentOffset")
@@ -291,7 +293,8 @@ class FileStreamSource(
 
     assert(startOffset <= endOffset)
     val files = metadataLog.get(Some(startOffset + 1), 
Some(endOffset)).flatMap(_._2)
-    logInfo(s"Processing ${files.length} files from ${startOffset + 
1}:$endOffset")
+    logInfo(log"Processing ${MDC(NUM_FILES, files.length)} " +
+      log"files from ${MDC(FILE_START_OFFSET, startOffset + 
1)}:${MDC(FILE_END_OFFSET, endOffset)}")
     logTrace(s"Files are:\n\t" + files.mkString("\n\t"))
     val newDataSource =
       DataSource(
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
index b3eedbf93f04..febe4dc8efc0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
@@ -29,7 +29,8 @@ import org.apache.hadoop.fs._
 import org.json4s.{Formats, NoTypeHints}
 import org.json4s.jackson.Serialization
 
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey._
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.internal.SQLConf
@@ -265,7 +266,7 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: 
SparkSession, path:
 
   override def getLatest(): Option[(Long, T)] = {
     listBatches.sorted.lastOption.map { batchId =>
-      logInfo(s"Getting latest batch $batchId")
+      logInfo(log"Getting latest batch ${MDC(BATCH_ID, batchId)}")
       (batchId, getExistingBatch(batchId))
     }
   }
@@ -335,7 +336,7 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: 
SparkSession, path:
       batchCache.synchronized {
         batchCache.keySet.asScala.toArray
       }
-    logInfo("BatchIds found from listing: " + batchIds.sorted.mkString(", "))
+    logInfo(log"BatchIds found from listing: ${MDC(BATCH_ID, 
batchIds.sorted.mkString(", "))}")
 
     if (batchIds.isEmpty) {
       Array.empty
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
index 62999d711ce8..c3cb7949bbd6 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
@@ -23,7 +23,7 @@ import java.util.concurrent.atomic.AtomicInteger
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.internal.{Logging, MDC}
-import org.apache.spark.internal.LogKey.ERROR
+import org.apache.spark.internal.LogKey._
 import org.apache.spark.sql.{SparkSession, Strategy}
 import org.apache.spark.sql.catalyst.QueryPlanningTracker
 import org.apache.spark.sql.catalyst.expressions.{CurrentBatchTimestamp, 
ExpressionWithRandomSeed}
@@ -102,7 +102,7 @@ class IncrementalExecution(
       tracker).transformAllExpressionsWithPruning(
       _.containsAnyPattern(CURRENT_LIKE, EXPRESSION_WITH_RANDOM_SEED)) {
       case ts @ CurrentBatchTimestamp(timestamp, _, _) =>
-        logInfo(s"Current batch timestamp = $timestamp")
+        logInfo(log"Current batch timestamp = ${MDC(BATCH_TIMESTAMP, 
timestamp)}")
         ts.toLiteral
       case e: ExpressionWithRandomSeed => 
e.withNewSeed(Utils.random.nextLong())
     }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala
index f444c46f1419..230b8f5dc4ef 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala
@@ -26,7 +26,7 @@ import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
 
 import org.apache.spark.internal.{Logging, MDC}
-import org.apache.spark.internal.LogKey.PATH
+import org.apache.spark.internal.LogKey._
 import org.apache.spark.internal.io.FileCommitProtocol
 import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
 import org.apache.spark.sql.errors.QueryExecutionErrors
@@ -75,7 +75,7 @@ class ManifestFileCommitProtocol(jobId: String, path: String)
     pendingCommitFiles.clear()
 
     if (fileLog.add(batchId, fileStatuses)) {
-      logInfo(s"Committed batch $batchId")
+      logInfo(log"Committed batch ${MDC(BATCH_ID, batchId)}")
     } else {
       throw new IllegalStateException(s"Race while writing batch $batchId")
     }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileIndex.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileIndex.scala
index 6eaccfb6d934..bd1aa91e43ff 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileIndex.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileIndex.scala
@@ -21,6 +21,8 @@ import scala.collection.mutable
 
 import org.apache.hadoop.fs.{FileStatus, Path}
 
+import org.apache.spark.internal.LogKey._
+import org.apache.spark.internal.MDC
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.types.StructType
@@ -47,7 +49,7 @@ class MetadataLogFileIndex(
     metadataDir
   }
 
-  logInfo(s"Reading streaming file log from $metadataDirectory")
+  logInfo(log"Reading streaming file log from ${MDC(METADATA_DIRECTORY, 
metadataDirectory)}")
   private val metadataLog =
     new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, 
metadataDirectory.toString)
   private val allFilesFromLog = 
metadataLog.allFiles().map(_.toFileStatus).filterNot(_.isDirectory)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index e5d275d7c242..ec7a259609d3 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.streaming
 import scala.collection.mutable.{Map => MutableMap}
 import scala.collection.mutable
 
-import org.apache.spark.internal.LogKey.{LATEST_BATCH_ID, 
LATEST_COMMITTED_BATCH_ID, READ_LIMIT, SPARK_DATA_STREAM}
+import org.apache.spark.internal.LogKey._
 import org.apache.spark.internal.MDC
 import org.apache.spark.sql.{Dataset, SparkSession}
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
@@ -103,8 +103,8 @@ class MicroBatchExecution(
         // See SPARK-45178 for more details.
         if (sparkSession.sessionState.conf.getConf(
             SQLConf.STREAMING_TRIGGER_AVAILABLE_NOW_WRAPPER_ENABLED)) {
-          logInfo("Configured to use the wrapper of Trigger.AvailableNow for 
query " +
-            s"$prettyIdString.")
+          logInfo(log"Configured to use the wrapper of Trigger.AvailableNow 
for query " +
+            log"${MDC(PRETTY_ID_STRING, prettyIdString)}.")
           MultiBatchExecutor()
         } else {
           val supportsTriggerAvailableNow = sources.distinct.forall { src =>
@@ -158,7 +158,9 @@ class MicroBatchExecution(
           val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
           val source = dataSourceV1.createSource(metadataPath)
           nextSourceId += 1
-          logInfo(s"Using Source [$source] from DataSourceV1 named 
'$sourceName' [$dataSourceV1]")
+          logInfo(log"Using Source [${MDC(STREAMING_SOURCE, source)}] " +
+            log"from DataSourceV1 named '${MDC(STREAMING_DATA_SOURCE_NAME, 
sourceName)}' " +
+            log"[${MDC(STREAMING_DATA_SOURCE_DESCRIPTION, dataSourceV1)}]")
           StreamingExecutionRelation(source, output, 
dataSourceV1.catalogTable)(sparkSession)
         })
 
@@ -171,7 +173,9 @@ class MicroBatchExecution(
             // Materialize source to avoid creating it in every batch
             val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
             nextSourceId += 1
-            logInfo(s"Reading table [$table] from DataSourceV2 named 
'$srcName' $dsStr")
+            logInfo(log"Reading table [${MDC(STREAMING_TABLE, table)}] " +
+              log"from DataSourceV2 named '${MDC(STREAMING_DATA_SOURCE_NAME, 
srcName)}' " +
+              log"${MDC(STREAMING_DATA_SOURCE_DESCRIPTION, dsStr)}")
             // TODO: operator pushdown.
             val scan = table.newScanBuilder(options).build()
             val stream = scan.toMicroBatchStream(metadataPath)
@@ -189,7 +193,9 @@ class MicroBatchExecution(
             val source =
               
v1.get.asInstanceOf[StreamingRelation].dataSource.createSource(metadataPath)
             nextSourceId += 1
-            logInfo(s"Using Source [$source] from DataSourceV2 named 
'$srcName' $dsStr")
+            logInfo(log"Using Source [${MDC(STREAMING_SOURCE, source)}] from 
DataSourceV2 " +
+              log"named '${MDC(STREAMING_DATA_SOURCE_NAME, srcName)}' " +
+              log"${MDC(STREAMING_DATA_SOURCE_DESCRIPTION, dsStr)}")
             // We don't have a catalog table but may have a table identifier. 
Given this is about
             // v1 fallback path, we just give up and set the catalog table as 
None.
             StreamingExecutionRelation(source, output, None)(sparkSession)
@@ -280,7 +286,7 @@ class MicroBatchExecution(
       // microBatchThread may spawn new jobs, so we need to cancel again to 
prevent a leak
       sparkSession.sparkContext.cancelJobGroup(runId.toString)
     }
-    logInfo(s"Query $prettyIdString was stopped")
+    logInfo(log"Query ${MDC(PRETTY_ID_STRING, prettyIdString)} was stopped")
   }
 
   private val watermarkPropagator = 
WatermarkPropagator(sparkSession.sessionState.conf)
@@ -290,7 +296,8 @@ class MicroBatchExecution(
 
     // shutdown and cleanup required for async log purge mechanism
     asyncLogPurgeShutdown()
-    logInfo(s"Async log purge executor pool for query ${prettyIdString} has 
been shutdown")
+    logInfo(log"Async log purge executor pool for query " +
+      log"${MDC(PRETTY_ID_STRING, prettyIdString)} has been shutdown")
   }
 
   private def initializeExecution(
@@ -306,7 +313,7 @@ class MicroBatchExecution(
     setLatestExecutionContext(execCtx)
 
     populateStartOffsets(execCtx, sparkSessionForStream)
-    logInfo(s"Stream started from ${execCtx.startOffsets}")
+    logInfo(log"Stream started from ${MDC(STREAMING_OFFSETS_START, 
execCtx.startOffsets)}")
     execCtx
   }
   /**
@@ -520,8 +527,9 @@ class MicroBatchExecution(
         }
         // initialize committed offsets to start offsets of the most recent 
committed batch
         committedOffsets = execCtx.startOffsets
-        logInfo(s"Resuming at batch ${execCtx.batchId} with committed offsets 
" +
-          s"${execCtx.startOffsets} and available offsets 
${execCtx.endOffsets}")
+        logInfo(log"Resuming at batch ${MDC(BATCH_ID, execCtx.batchId)} with 
committed offsets " +
+          log"${MDC(STREAMING_OFFSETS_START, execCtx.startOffsets)} and 
available offsets " +
+          log"${MDC(STREAMING_OFFSETS_END, execCtx.endOffsets)}")
       case None => // We are starting this stream for the first time.
         logInfo(s"Starting new streaming query.")
         execCtx.batchId = 0
@@ -875,8 +883,8 @@ class MicroBatchExecution(
       throw QueryExecutionErrors.concurrentStreamLogUpdate(execCtx.batchId)
     }
 
-    logInfo(s"Committed offsets for batch ${execCtx.batchId}. " +
-      s"Metadata ${execCtx.offsetSeqMetadata.toString}")
+    logInfo(log"Committed offsets for batch ${MDC(BATCH_ID, execCtx.batchId)}. 
" +
+      log"Metadata ${MDC(OFFSET_SEQUENCE_METADATA, 
execCtx.offsetSeqMetadata.toString)}")
   }
 
   /**
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
index 138e4abefce2..2e633009dd67 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
@@ -26,7 +26,7 @@ import scala.collection.mutable
 import scala.jdk.CollectionConverters._
 
 import org.apache.spark.internal.{Logging, MDC}
-import org.apache.spark.internal.LogKey.{EXECUTION_PLAN_LEAVES, 
FINISH_TRIGGER_DURATION, LOGICAL_PLAN_LEAVES, PROCESSING_TIME}
+import org.apache.spark.internal.LogKey._
 import org.apache.spark.sql.{Row, SparkSession}
 import org.apache.spark.sql.catalyst.optimizer.InlineCTE
 import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, 
LogicalPlan, WithCTE}
@@ -82,7 +82,7 @@ class ProgressReporter(
 
     addNewProgress(newProgress)
     postEvent(new QueryProgressEvent(newProgress))
-    logInfo(s"Streaming query made progress: $newProgress")
+    logInfo(log"Streaming query made progress: ${MDC(STREAMING_QUERY_PROGRESS, 
newProgress)}")
   }
 
   private def addNewProgress(newProgress: StreamingQueryProgress): Unit = {
@@ -104,8 +104,8 @@ class ProgressReporter(
       addNewProgress(newProgress)
       if (lastNoExecutionProgressEventTime > Long.MinValue) {
         postEvent(new QueryIdleEvent(id, runId, 
formatTimestamp(currentTriggerStartTimestamp)))
-        logInfo(s"Streaming query has been idle and waiting for new data more 
than " +
-          s"${noDataProgressEventInterval} ms.")
+        logInfo(log"Streaming query has been idle and waiting for new data 
more than " +
+          log"${MDC(TIME_UNITS, noDataProgressEventInterval)} ms.")
       }
 
       lastNoExecutionProgressEventTime = now
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ResolveWriteToStream.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ResolveWriteToStream.scala
index e80860b82c32..60052687b2d7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ResolveWriteToStream.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ResolveWriteToStream.scala
@@ -23,7 +23,7 @@ import scala.util.control.NonFatal
 
 import org.apache.hadoop.fs.Path
 
-import org.apache.spark.internal.LogKey.{CONFIG, PATH}
+import org.apache.spark.internal.LogKey._
 import org.apache.spark.internal.MDC
 import org.apache.spark.sql.catalyst.SQLConfHelper
 import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker
@@ -134,7 +134,8 @@ object ResolveWriteToStream extends Rule[LogicalPlan] with 
SQLConfHelper {
       val checkpointDir = fileManager.createCheckpointDirectory()
       checkpointDir.toString
     }
-    logInfo(s"Checkpoint root $checkpointLocation resolved to 
$resolvedCheckpointRoot.")
+    logInfo(log"Checkpoint root ${MDC(CHECKPOINT_LOCATION, 
checkpointLocation)} " +
+      log"resolved to ${MDC(CHECKPOINT_ROOT, resolvedCheckpointRoot)}.")
     (resolvedCheckpointRoot, deleteCheckpointOnStop)
   }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index ce2bbba550bc..b4316bad9a80 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -33,7 +33,7 @@ import org.apache.hadoop.fs.Path
 
 import org.apache.spark.{JobArtifactSet, SparkContext, SparkException, 
SparkThrowable}
 import org.apache.spark.internal.{Logging, MDC}
-import org.apache.spark.internal.LogKey.{PATH, SPARK_DATA_STREAM}
+import org.apache.spark.internal.LogKey._
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._
@@ -261,7 +261,8 @@ abstract class StreamExecution(
    * has been posted to all the listeners.
    */
   def start(): Unit = {
-    logInfo(s"Starting $prettyIdString. Use $resolvedCheckpointRoot to store 
the query checkpoint.")
+    logInfo(log"Starting ${MDC(PRETTY_ID_STRING, prettyIdString)}. " +
+      log"Use ${MDC(CHECKPOINT_ROOT, resolvedCheckpointRoot)} to store the 
query checkpoint.")
     queryExecutionThread.setDaemon(true)
     queryExecutionThread.start()
     startLatch.await()  // Wait until thread started and QueryStart event has 
been posted
@@ -406,7 +407,7 @@ abstract class StreamExecution(
               .getConf(SQLConf.FORCE_DELETE_TEMP_CHECKPOINT_LOCATION) || 
exception.isEmpty)) {
           val checkpointPath = new Path(resolvedCheckpointRoot)
           try {
-            logInfo(s"Deleting checkpoint $checkpointPath.")
+            logInfo(log"Deleting checkpoint ${MDC(CHECKPOINT_PATH, 
checkpointPath)}.")
             fileManager.delete(checkpointPath)
           } catch {
             case NonFatal(e) =>
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/WatermarkTracker.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/WatermarkTracker.scala
index b0f8cf9cd184..83da9af739da 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/WatermarkTracker.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/WatermarkTracker.scala
@@ -21,7 +21,8 @@ import java.util.Locale
 
 import scala.collection.mutable
 
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey._
 import org.apache.spark.sql.RuntimeConfig
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.internal.SQLConf
@@ -113,7 +114,9 @@ case class WatermarkTracker(policy: 
MultipleWatermarkPolicy) extends Logging {
     // `org.apache.spark.sql.execution.streaming.MultipleWatermarkPolicy` 
implementations.
     val chosenGlobalWatermark = 
policy.chooseGlobalWatermark(operatorToWatermarkMap.values.toSeq)
     if (chosenGlobalWatermark > globalWatermarkMs) {
-      logInfo(s"Updating event-time watermark from $globalWatermarkMs to 
$chosenGlobalWatermark ms")
+      logInfo(log"Updating event-time watermark from " +
+        log"${MDC(GLOBAL_WATERMARK, globalWatermarkMs)} " +
+        log"to ${MDC(CHOSEN_WATERMARK, chosenGlobalWatermark)} ms")
       globalWatermarkMs = chosenGlobalWatermark
     } else {
       logDebug(s"Event time watermark didn't move: $chosenGlobalWatermark < 
$globalWatermarkMs")
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
index 920a7c68314b..0756449e1e53 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
@@ -25,6 +25,8 @@ import java.util.function.UnaryOperator
 import scala.collection.mutable.{Map => MutableMap}
 
 import org.apache.spark.SparkEnv
+import org.apache.spark.internal.LogKey._
+import org.apache.spark.internal.MDC
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.expressions.{CurrentDate, 
CurrentTimestampLike, LocalTimestamp}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
@@ -83,7 +85,9 @@ class ContinuousExecution(
         v2ToRelationMap.getOrElseUpdate(s, {
           val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
           nextSourceId += 1
-          logInfo(s"Reading table [$table] from DataSourceV2 named 
'$sourceName' $dsStr")
+          logInfo(log"Reading table [${MDC(STREAMING_TABLE, table)}] " +
+            log"from DataSourceV2 named '${MDC(STREAMING_DATA_SOURCE_NAME, 
sourceName)}' " +
+            log"${MDC(STREAMING_DATA_SOURCE_DESCRIPTION, dsStr)}")
           // TODO: operator pushdown.
           val scan = table.newScanBuilder(options).build()
           val stream = scan.toContinuousStream(metadataPath)
@@ -276,7 +280,7 @@ class ContinuousExecution(
               false
             } else if (isActive) {
               execCtx.batchId = 
epochEndpoint.askSync[Long](IncrementAndGetEpoch)
-              logInfo(s"New epoch ${execCtx.batchId} is starting.")
+              logInfo(log"New epoch ${MDC(BATCH_ID, execCtx.batchId)} is 
starting.")
               true
             } else {
               false
@@ -307,7 +311,8 @@ class ContinuousExecution(
     } catch {
       case t: Throwable if StreamExecution.isInterruptionException(t, 
sparkSession.sparkContext) &&
           state.get() == RECONFIGURING =>
-        logInfo(s"Query $id ignoring exception from reconfiguring: $t")
+        logInfo(log"Query ${MDC(QUERY_ID, id)} ignoring exception from 
reconfiguring: " +
+          log"${MDC(ERROR, t)}")
         // interrupted by reconfiguration - swallow exception so we can 
restart the query
     } finally {
       // The above execution may finish before getting interrupted, for 
example, a Spark job having
@@ -476,7 +481,7 @@ class ContinuousExecution(
       // We just need to interrupt the long running job.
       interruptAndAwaitExecutionThreadTermination()
     }
-    logInfo(s"Query $prettyIdString was stopped")
+    logInfo(log"Query ${MDC(PRETTY_ID_STRING, prettyIdString)} was stopped")
   }
 }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousQueuedDataReader.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousQueuedDataReader.scala
index 8e5548ca2aca..2a508f27a10e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousQueuedDataReader.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousQueuedDataReader.scala
@@ -23,7 +23,8 @@ import java.util.concurrent.{ArrayBlockingQueue, TimeUnit}
 import scala.util.control.NonFatal
 
 import org.apache.spark.{SparkEnv, TaskContext}
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey._
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.UnsafeProjection
 import org.apache.spark.sql.connector.read.PartitionReader
@@ -159,7 +160,8 @@ class ContinuousQueuedDataReader(
       } catch {
         case _: InterruptedException =>
           // Continuous shutdown always involves an interrupt; do nothing and 
shut down quietly.
-          logInfo(s"shutting down interrupted data reader thread $getName")
+          logInfo(log"shutting down interrupted data reader thread " +
+            log"${MDC(THREAD_NAME, getName)}")
 
         case NonFatal(t) =>
           failureReason = t
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteRDD.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteRDD.scala
index 1d6ba87145d4..3af7b2a76f93 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteRDD.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteRDD.scala
@@ -18,6 +18,8 @@
 package org.apache.spark.sql.execution.streaming.continuous
 
 import org.apache.spark.{Partition, SparkEnv, TaskContext}
+import org.apache.spark.internal.LogKey._
+import org.apache.spark.internal.MDC
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.connector.write.DataWriter
@@ -68,8 +70,8 @@ class ContinuousWriteRDD(var prev: RDD[InternalRow], 
writerFactory: StreamingDat
           }
           CustomMetrics.updateMetrics(
             dataWriter.currentMetricsValues.toImmutableArraySeq, customMetrics)
-          logInfo(s"Writer for partition ${context.partitionId()} " +
-            s"in epoch ${EpochTracker.getCurrentEpoch.get} is committing.")
+          logInfo(log"Writer for partition ${MDC(PARTITION_ID, 
context.partitionId())} " +
+            log"in epoch ${MDC(EPOCH, EpochTracker.getCurrentEpoch.get)} is 
committing.")
           val msg = dataWriter.commit()
           epochCoordinator.send(
             CommitPartitionEpoch(
@@ -77,8 +79,8 @@ class ContinuousWriteRDD(var prev: RDD[InternalRow], 
writerFactory: StreamingDat
               EpochTracker.getCurrentEpoch.get,
               msg)
           )
-          logInfo(s"Writer for partition ${context.partitionId()} " +
-            s"in epoch ${EpochTracker.getCurrentEpoch.get} committed.")
+          logInfo(log"Writer for partition ${MDC(PARTITION_ID, 
context.partitionId())} " +
+            log"in epoch ${MDC(EPOCH, EpochTracker.getCurrentEpoch.get)} 
committed.")
           EpochTracker.incrementCurrentEpoch()
         } catch {
           case _: InterruptedException =>
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala
index 2b7d68f9b98b..d2b6ef0eb796 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala
@@ -17,7 +17,8 @@
 
 package org.apache.spark.sql.execution.streaming.continuous
 
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey._
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Attribute
@@ -47,8 +48,8 @@ case class WriteToContinuousDataSourceExec(write: 
StreamingWrite, query: SparkPl
       PhysicalWriteInfoImpl(queryRdd.getNumPartitions))
     val rdd = new ContinuousWriteRDD(queryRdd, writerFactory, metrics)
 
-    logInfo(s"Start processing data source write support: $write. " +
-      s"The input RDD has ${rdd.partitions.length} partitions.")
+    logInfo(log"Start processing data source write support: 
${MDC(STREAMING_WRITE, write)}. " +
+      log"The input RDD has ${MDC(NUM_PARTITION, rdd.partitions.length)} 
partitions.")
     EpochCoordinatorRef.get(
       
sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY),
       sparkContext.env)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchStream.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchStream.scala
index 5640c7d3ca76..520e7fc23f4d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchStream.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchStream.scala
@@ -23,7 +23,8 @@ import java.util.concurrent.TimeUnit
 
 import org.apache.commons.io.IOUtils
 
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey._
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
@@ -92,7 +93,7 @@ class RateStreamMicroBatchStream(
     metadataLog.get(0).getOrElse {
       val offset = LongOffset(clock.getTimeMillis())
       metadataLog.add(0, offset)
-      logInfo(s"Start time: $offset")
+      logInfo(log"Start time: ${MDC(TIME_UNITS, offset)}")
       offset
     }.offset
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
index eaa4e92af929..3350cd900d4d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
@@ -33,7 +33,7 @@ import org.apache.hadoop.fs._
 
 import org.apache.spark.{SparkConf, SparkEnv}
 import org.apache.spark.internal.{Logging, MDC, MessageWithContext}
-import org.apache.spark.internal.LogKey.{FILE_VERSION, OP_ID, PARTITION_ID, 
PATH}
+import org.apache.spark.internal.LogKey._
 import org.apache.spark.io.CompressionCodec
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
 import org.apache.spark.sql.errors.QueryExecutionErrors
@@ -170,7 +170,8 @@ private[sql] class HDFSBackedStateStoreProvider extends 
StateStoreProvider with
         verify(state == UPDATING, "Cannot commit after already committed or 
aborted")
         commitUpdates(newVersion, mapToUpdate, compressedStream)
         state = COMMITTED
-        logInfo(s"Committed version $newVersion for $this to file 
$finalDeltaFile")
+        logInfo(log"Committed version ${MDC(COMMITTED_VERSION, newVersion)} " +
+          log"for ${MDC(STATE_STORE_PROVIDER, this)} to file ${MDC(FILE_NAME, 
finalDeltaFile)}")
         newVersion
       } catch {
         case e: Throwable =>
@@ -188,7 +189,8 @@ private[sql] class HDFSBackedStateStoreProvider extends 
StateStoreProvider with
       } else {
         state = ABORTED
       }
-      logInfo(s"Aborted version $newVersion for $this")
+      logInfo(log"Aborted version ${MDC(STATE_STORE_VERSION, newVersion)} " +
+        log"for ${MDC(STATE_STORE_PROVIDER, this)}")
     }
 
     /**
@@ -254,14 +256,16 @@ private[sql] class HDFSBackedStateStoreProvider extends 
StateStoreProvider with
   /** Get the state store for making updates to create a new `version` of the 
store. */
   override def getStore(version: Long): StateStore = {
     val newMap = getLoadedMapForStore(version)
-    logInfo(s"Retrieved version $version of 
${HDFSBackedStateStoreProvider.this} for update")
+    logInfo(log"Retrieved version ${MDC(STATE_STORE_VERSION, version)} " +
+      log"of ${MDC(STATE_STORE_PROVIDER, HDFSBackedStateStoreProvider.this)} 
for update")
     new HDFSBackedStateStore(version, newMap)
   }
 
   /** Get the state store for reading to specific `version` of the store. */
   override def getReadStore(version: Long): ReadStateStore = {
     val newMap = getLoadedMapForStore(version)
-    logInfo(s"Retrieved version $version of 
${HDFSBackedStateStoreProvider.this} for readonly")
+    logInfo(log"Retrieved version ${MDC(STATE_STORE_VERSION, version)} of " +
+      log"${MDC(STATE_STORE_PROVIDER, HDFSBackedStateStoreProvider.this)} for 
readonly")
     new HDFSBackedReadStateStore(version, newMap)
   }
 
@@ -590,7 +594,8 @@ private[sql] class HDFSBackedStateStoreProvider extends 
StateStoreProvider with
     } finally {
       if (input != null) input.close()
     }
-    logInfo(s"Read delta file for version $version of $this from $fileToRead")
+    logInfo(log"Read delta file for version ${MDC(FILE_VERSION, version)} " +
+      log"of ${MDC(STATE_STORE_PROVIDER, this)} from ${MDC(FILE_NAME, 
fileToRead)}")
   }
 
   private def writeSnapshotFile(version: Long, map: HDFSBackedStateStoreMap): 
Unit = {
@@ -617,7 +622,8 @@ private[sql] class HDFSBackedStateStoreProvider extends 
StateStoreProvider with
         cancelDeltaFile(compressedStream = output, rawStream = rawOutput)
         throw e
     }
-    logInfo(s"Written snapshot file for version $version of $this at 
$targetFile")
+    logInfo(log"Written snapshot file for version ${MDC(FILE_VERSION, 
version)} of " +
+      log"${MDC(STATE_STORE_PROVIDER, this)} at ${MDC(FILE_NAME, targetFile)}")
   }
 
   /**
@@ -642,8 +648,9 @@ private[sql] class HDFSBackedStateStoreProvider extends 
StateStoreProvider with
       // SPARK-42668 - Catch and log any other exception thrown while trying 
to cancel
       // raw stream or close compressed stream.
       case NonFatal(ex) =>
-        logInfo(s"Failed to cancel delta file for provider=$stateStoreId " +
-          s"with exception=$ex")
+        logInfo(log"Failed to cancel delta file for " +
+          log"provider=${MDC(STATE_STORE_ID, stateStoreId)} " +
+          log"with exception=${MDC(ERROR, ex)}")
     }
   }
 
@@ -692,7 +699,8 @@ private[sql] class HDFSBackedStateStoreProvider extends 
StateStoreProvider with
           }
         }
       }
-      logInfo(s"Read snapshot file for version $version of $this from 
$fileToRead")
+      logInfo(log"Read snapshot file for version ${MDC(SNAPSHOT_VERSION, 
version)} " +
+        log"of ${MDC(STATE_STORE_PROVIDER, this)} from ${MDC(FILE_NAME, 
fileToRead)}")
       Some(map)
     } catch {
       case _: FileNotFoundException =>
@@ -750,8 +758,10 @@ private[sql] class HDFSBackedStateStoreProvider extends 
StateStoreProvider with
             }
           }
           logDebug(s"deleting files took $e2 ms.")
-          logInfo(s"Deleted files older than ${earliestFileToRetain.version} 
for $this: " +
-            filesToDelete.mkString(", "))
+          logInfo(log"Deleted files older than " +
+            log"${MDC(FILE_VERSION, earliestFileToRetain.version)} for " +
+            log"${MDC(STATE_STORE_PROVIDER, this)}: " +
+            log"${MDC(FILE_NAME, filesToDelete.mkString(", "))}")
         }
       }
     } catch {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index b1005e589073..eb74bd40a285 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -36,8 +36,8 @@ import org.rocksdb.CompressionType._
 import org.rocksdb.TickerType._
 
 import org.apache.spark.TaskContext
-import org.apache.spark.internal.{Logging, MDC}
-import org.apache.spark.internal.LogKey.{ERROR, PATH}
+import org.apache.spark.internal.{LogEntry, Logging, MDC}
+import org.apache.spark.internal.LogKey._
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.util.{NextIterator, Utils}
@@ -178,7 +178,7 @@ class RocksDB(
     assert(version >= 0)
     acquire(LoadStore)
     recordedMetrics = None
-    logInfo(s"Loading $version")
+    logInfo(log"Loading ${MDC(VERSION_NUMBER, version)}")
     try {
       if (loadedVersion != version) {
         closeDB()
@@ -213,7 +213,7 @@ class RocksDB(
       if (conf.resetStatsOnLoad) {
         nativeStats.reset
       }
-      logInfo(s"Loaded $version")
+      logInfo(log"Loaded ${MDC(VERSION_NUMBER, version)}")
     } catch {
       case t: Throwable =>
         loadedVersion = -1  // invalidate loaded data
@@ -232,7 +232,8 @@ class RocksDB(
    */
   private def replayChangelog(endVersion: Long): Unit = {
     for (v <- loadedVersion + 1 to endVersion) {
-      logInfo(s"replaying changelog from version $loadedVersion -> 
$endVersion")
+      logInfo(log"replaying changelog from version ${MDC(LOADED_VERSION, 
loadedVersion)} -> " +
+        log"${MDC(END_VERSION, endVersion)}")
       var changelogReader: StateStoreChangelogReader = null
       try {
         changelogReader = fileManager.getChangelogReader(v, useColumnFamilies)
@@ -462,7 +463,7 @@ class RocksDB(
     verifyColFamilyOperations("iterator", colFamilyName)
 
     val iter = db.newIterator(colFamilyNameToHandleMap(colFamilyName))
-    logInfo(s"Getting iterator from version $loadedVersion")
+    logInfo(log"Getting iterator from version ${MDC(LOADED_VERSION, 
loadedVersion)}")
     iter.seekToFirst()
 
     // Attempt to close this iterator if there is a task failure, or a task 
interruption.
@@ -492,7 +493,8 @@ class RocksDB(
     val iter = db.newIterator(colFamilyNameToHandleMap(colFamilyName))
 
     try {
-      logInfo(s"Counting keys - getting iterator from version $loadedVersion")
+      logInfo(log"Counting keys - getting iterator from version " +
+        log"${MDC(LOADED_VERSION, loadedVersion)}")
 
       iter.seekToFirst()
 
@@ -546,7 +548,7 @@ class RocksDB(
     val newVersion = loadedVersion + 1
     try {
 
-      logInfo(s"Flushing updates for $newVersion")
+      logInfo(log"Flushing updates for ${MDC(VERSION_NUMBER, newVersion)}")
 
       var compactTimeMs = 0L
       var flushTimeMs = 0L
@@ -554,7 +556,7 @@ class RocksDB(
       if (shouldCreateSnapshot()) {
         // Need to flush the change to disk before creating a checkpoint
         // because rocksdb wal is disabled.
-        logInfo(s"Flushing updates for $newVersion")
+        logInfo(log"Flushing updates for ${MDC(VERSION_NUMBER, newVersion)}")
         flushTimeMs = timeTakenMs {
           // Flush updates to all available column families
           assert(!colFamilyNameToHandleMap.isEmpty)
@@ -572,7 +574,8 @@ class RocksDB(
 
         checkpointTimeMs = timeTakenMs {
           val checkpointDir = createTempDir("checkpoint")
-          logInfo(s"Creating checkpoint for $newVersion in $checkpointDir")
+          logInfo(log"Creating checkpoint for ${MDC(VERSION_NUMBER, 
newVersion)} " +
+            log"in ${MDC(PATH, checkpointDir)}")
           // Make sure the directory does not exist. Native RocksDB fails if 
the directory to
           // checkpoint exists.
           Utils.deleteRecursively(checkpointDir)
@@ -592,7 +595,7 @@ class RocksDB(
         }
       }
 
-      logInfo(s"Syncing checkpoint for $newVersion to DFS")
+      logInfo(log"Syncing checkpoint for ${MDC(VERSION_NUMBER, newVersion)} to 
DFS")
       val fileSyncTimeMs = timeTakenMs {
         if (enableChangelogCheckpointing) {
           try {
@@ -616,7 +619,8 @@ class RocksDB(
         "fileSync" -> fileSyncTimeMs
       )
       recordedMetrics = Some(metrics)
-      logInfo(s"Committed $newVersion, stats = ${recordedMetrics.get.json}")
+      logInfo(log"Committed ${MDC(VERSION_NUMBER, newVersion)}, " +
+        log"stats = ${MDC(METRICS_JSON, recordedMetrics.get.json)}")
       loadedVersion
     } catch {
       case t: Throwable =>
@@ -651,8 +655,9 @@ class RocksDB(
             fileManager.saveCheckpointToDfs(localDir, version, numKeys)
             fileManagerMetrics = fileManager.latestSaveCheckpointMetrics
           }
-          logInfo(s"$loggingId: Upload snapshot of version $version," +
-            s" time taken: $uploadTime ms")
+          logInfo(log"${MDC(LOG_ID, loggingId)}: Upload snapshot of version " +
+            log"${MDC(VERSION_NUMBER, version)}," +
+            log" time taken: ${MDC(TIME_UNITS, uploadTime)} ms")
         } finally {
           localCheckpoint.foreach(_.close())
         }
@@ -671,7 +676,7 @@ class RocksDB(
     // Make sure changelogWriter gets recreated next time.
     changelogWriter = None
     release(RollbackStore)
-    logInfo(s"Rolled back to $loadedVersion")
+    logInfo(log"Rolled back to ${MDC(VERSION_NUMBER, loadedVersion)}")
   }
 
   def doMaintenance(): Unit = {
@@ -694,7 +699,7 @@ class RocksDB(
     val cleanupTime = timeTakenMs {
       fileManager.deleteOldVersions(conf.minVersionsToRetain)
     }
-    logInfo(s"Cleaned old data, time taken: $cleanupTime ms")
+    logInfo(log"Cleaned old data, time taken: ${MDC(TIME_UNITS, cleanupTime)} 
ms")
   }
 
   /** Release all resources */
@@ -801,7 +806,7 @@ class RocksDB(
       rocksDBMetricsOpt = recordedMetrics
     } catch {
       case ex: Exception =>
-        logInfo(s"Failed to acquire metrics with exception=$ex")
+        logInfo(log"Failed to acquire metrics with exception=${MDC(ERROR, 
ex)}")
     } finally {
       release(ReportStoreMetrics)
     }
@@ -839,7 +844,8 @@ class RocksDB(
       Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit] {
         _ => this.release(StoreTaskCompletionListener)
       })
-      logInfo(s"RocksDB instance was acquired by $acquiredThreadInfo for 
opType=${opType.toString}")
+      logInfo(log"RocksDB instance was acquired by ${MDC(THREAD, 
acquiredThreadInfo)} " +
+        log"for opType=${MDC(OP_TYPE, opType.toString)}")
     }
   }
 
@@ -850,7 +856,8 @@ class RocksDB(
    * @param opType - operation type releasing the lock
    */
   private def release(opType: RocksDBOpType): Unit = acquireLock.synchronized {
-    logInfo(s"RocksDB instance was released by $acquiredThreadInfo for 
opType=${opType.toString}")
+    logInfo(log"RocksDB instance was released by ${MDC(THREAD, 
acquiredThreadInfo)} " +
+      log"for opType=${MDC(OP_TYPE, opType.toString)}")
     acquiredThreadInfo = null
     acquireLock.notifyAll()
   }
@@ -888,7 +895,7 @@ class RocksDB(
     colFamilyHandles.asScala.toList.foreach { handle =>
       colFamilyNameToHandleMap(handle.getName.map(_.toChar).mkString) = handle
     }
-    logInfo(s"Opened DB with conf ${conf}")
+    logInfo(log"Opened DB with conf ${MDC(CONFIG, conf)}")
   }
 
   private def closeDB(): Unit = {
@@ -912,13 +919,14 @@ class RocksDB(
         // Map DB log level to log4j levels
         // Warn is mapped to info because RocksDB warn is too verbose
         // (e.g. dumps non-warning stuff like stats)
-        val loggingFunc: ( => String) => Unit = infoLogLevel match {
+        val loggingFunc: ( => LogEntry) => Unit = infoLogLevel match {
           case InfoLogLevel.FATAL_LEVEL | InfoLogLevel.ERROR_LEVEL => 
logError(_)
           case InfoLogLevel.WARN_LEVEL | InfoLogLevel.INFO_LEVEL => logInfo(_)
           case InfoLogLevel.DEBUG_LEVEL => logDebug(_)
           case _ => logTrace(_)
         }
-        loggingFunc(s"[NativeRocksDB-${infoLogLevel.getValue}] $logMsg")
+        loggingFunc(log"[NativeRocksDB-${MDC(ROCKS_DB_LOG_LEVEL, 
infoLogLevel.getValue)}] " +
+          log"${MDC(ROCKS_DB_LOG_MESSAGE, logMsg)}")
       }
     }
 
@@ -931,7 +939,7 @@ class RocksDB(
     // customized logger. We still set it as it might show up in RocksDB 
config file or logging.
     dbOptions.setInfoLogLevel(dbLogLevel)
     dbOptions.setLogger(dbLogger)
-    logInfo(s"Set RocksDB native logging level to $dbLogLevel")
+    logInfo(log"Set RocksDB native logging level to ${MDC(ROCKS_DB_LOG_LEVEL, 
dbLogLevel)}")
     dbLogger
   }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
index e24a156357f9..f1510e126795 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
@@ -38,8 +38,8 @@ import org.json4s.{Formats, NoTypeHints}
 import org.json4s.jackson.Serialization
 
 import org.apache.spark.{SparkConf, SparkEnv}
-import org.apache.spark.internal.{Logging, MDC}
-import org.apache.spark.internal.LogKey.{FILE_VERSION, MAX_FILE_VERSION, PATH}
+import org.apache.spark.internal.{Logging, MDC, MessageWithContext}
+import org.apache.spark.internal.LogKey._
 import org.apache.spark.io.CompressionCodec
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.execution.streaming.CheckpointFileManager
@@ -206,13 +206,15 @@ class RocksDBFileManager(
 
   /** Save all the files in given local checkpoint directory as a committed 
version in DFS */
   def saveCheckpointToDfs(checkpointDir: File, version: Long, numKeys: Long): 
Unit = {
-    logFilesInDir(checkpointDir, s"Saving checkpoint files for version 
$version")
+    logFilesInDir(checkpointDir, log"Saving checkpoint files " +
+      log"for version ${MDC(VERSION_NUMBER, version)}")
     val (localImmutableFiles, localOtherFiles) = 
listRocksDBFiles(checkpointDir)
     val rocksDBFiles = saveImmutableFilesToDfs(version, localImmutableFiles)
     val metadata = RocksDBCheckpointMetadata(rocksDBFiles, numKeys)
     val metadataFile = localMetadataFile(checkpointDir)
     metadata.writeToFile(metadataFile)
-    logInfo(s"Written metadata for version $version:\n${metadata.prettyJson}")
+    logInfo(log"Written metadata for version ${MDC(VERSION_NUMBER, 
version)}:\n" +
+      log"${MDC(METADATA_JSON, metadata.prettyJson)}")
 
     if (version <= 1 && numKeys <= 0) {
       // If we're writing the initial version and there's no data, we have to 
explicitly initialize
@@ -228,7 +230,7 @@ class RocksDBFileManager(
       }
     }
     zipToDfsFile(localOtherFiles :+ metadataFile, dfsBatchZipFile(version))
-    logInfo(s"Saved checkpoint file for version $version")
+    logInfo(log"Saved checkpoint file for version ${MDC(VERSION_NUMBER, 
version)}")
   }
 
   /**
@@ -238,7 +240,7 @@ class RocksDBFileManager(
    * local directory.
    */
   def loadCheckpointFromDfs(version: Long, localDir: File): 
RocksDBCheckpointMetadata = {
-    logInfo(s"Loading checkpoint files for version $version")
+    logInfo(log"Loading checkpoint files for version ${MDC(VERSION_NUMBER, 
version)}")
     // The unique ids of SST files are checked when opening a rocksdb 
instance. The SST files
     // in larger versions can't be reused even if they have the same size and 
name because
     // they belong to another rocksdb instance.
@@ -256,13 +258,15 @@ class RocksDBFileManager(
       // Copy the necessary immutable files
       val metadataFile = localMetadataFile(localDir)
       val metadata = RocksDBCheckpointMetadata.readFromFile(metadataFile)
-      logInfo(s"Read metadata for version $version:\n${metadata.prettyJson}")
+      logInfo(log"Read metadata for version ${MDC(VERSION_NUMBER, 
version)}:\n" +
+        log"${MDC(METADATA_JSON, metadata.prettyJson)}")
       loadImmutableFilesFromDfs(metadata.immutableFiles, localDir)
       versionToRocksDBFiles.put(version, metadata.immutableFiles)
       metadataFile.delete()
       metadata
     }
-    logFilesInDir(localDir, s"Loaded checkpoint files for version $version")
+    logFilesInDir(localDir, log"Loaded checkpoint files " +
+      log"for version ${MDC(VERSION_NUMBER, version)}")
     metadata
   }
 
@@ -328,8 +332,9 @@ class RocksDBFileManager(
       val orphanFiles = fileModificationTimes
         .filter(_._2 < oldestTrackedFileModificationTime).keys.toSeq
       if (orphanFiles.nonEmpty) {
-        logInfo(s"Found ${orphanFiles.size} orphan files: 
${orphanFiles.take(20).mkString(", ")}" +
-          "... (display at most 20 filenames) that should be deleted.")
+        logInfo(log"Found ${MDC(NUM_FILES, orphanFiles.size)} " +
+          log"orphan files: ${MDC(FILE_MODIFICATION_TIME, 
orphanFiles.take(20).mkString(", "))}" +
+          log"... (display at most 20 filenames) that should be deleted.")
       }
       orphanFiles
     } else {
@@ -341,7 +346,7 @@ class RocksDBFileManager(
     versionsToDelete.foreach { version =>
       try {
         fm.delete(dfsChangelogFile(version))
-        logInfo(s"Deleted changelog file $version")
+        logInfo(log"Deleted changelog file ${MDC(VERSION_NUMBER, version)}")
       } catch {
         case e: Exception =>
           logWarning(
@@ -433,7 +438,8 @@ class RocksDBFileManager(
     val allLogFiles = if (fm.exists(logDir)) 
fm.list(logDir).toImmutableArraySeq else Seq.empty
     filesToDelete ++= findOrphanFiles(fileToMaxUsedVersion.keys.toSeq, 
allSstFiles ++ allLogFiles)
       .map(_ -> -1L)
-    logInfo(s"Deleting ${filesToDelete.size} files not used in versions >= 
$minVersionToRetain")
+    logInfo(log"Deleting ${MDC(NUM_FILES, filesToDelete.size)} " +
+      log"files not used in versions >= ${MDC(VERSION_NUMBER, 
minVersionToRetain)}")
     var failedToDelete = 0
     filesToDelete.foreach { case (dfsFileName, maxUsedVersion) =>
       try {
@@ -469,9 +475,10 @@ class RocksDBFileManager(
             log"version ${MDC(FILE_VERSION, version)}", e)
       }
     }
-    logInfo(s"Deleted ${filesToDelete.size - failedToDelete} files (failed to 
delete" +
-      s"$failedToDelete files) not used in versions >= $minVersionToRetain")
-
+    logInfo(log"Deleted ${MDC(NUM_FILES, filesToDelete.size - failedToDelete)} 
files " +
+      log"(failed to delete" +
+      log"${MDC(NUM_FILES_FAILED_TO_DELETE, failedToDelete)} files) " +
+      log"not used in versions >= ${MDC(MIN_VERSION_NUMBER, 
minVersionToRetain)}")
     val changelogVersionsToDelete = changelogFiles
       .map(_.getName.stripSuffix(".changelog")).map(_.toLong)
       .filter(_ < minVersionToRetain)
@@ -484,7 +491,7 @@ class RocksDBFileManager(
       localFiles: Seq[File]): Seq[RocksDBImmutableFile] = {
     // Get the immutable files used in previous versions, as some of those 
uploaded files can be
     // reused for this version
-    logInfo(s"Saving RocksDB files to DFS for $version")
+    logInfo(log"Saving RocksDB files to DFS for ${MDC(VERSION_NUMBER, 
version)}")
 
     var bytesCopied = 0L
     var filesCopied = 0L
@@ -495,7 +502,7 @@ class RocksDBFileManager(
       if (existingDfsFile.isDefined && existingDfsFile.get.sizeBytes == 
localFile.length()) {
         val dfsFile = existingDfsFile.get
         filesReused += 1
-        logInfo(s"reusing file $dfsFile for $localFile")
+        logInfo(log"reusing file ${MDC(DFS_FILE, dfsFile)} for 
${MDC(FILE_NAME, localFile)}")
         RocksDBImmutableFile(localFile.getName, dfsFile.dfsFileName, 
dfsFile.sizeBytes)
       } else {
         val localFileName = localFile.getName
@@ -508,7 +515,8 @@ class RocksDBFileManager(
         fs.copyFromLocalFile(
           new Path(localFile.getAbsoluteFile.toURI), dfsFile)
         val localFileSize = localFile.length()
-        logInfo(s"Copied $localFile to $dfsFile - $localFileSize bytes")
+        logInfo(log"Copied ${MDC(FILE_NAME, localFile)} to ${MDC(DFS_FILE, 
dfsFile)} - " +
+          log"${MDC(NUM_BYTES, localFileSize)} bytes")
         filesCopied += 1
         bytesCopied += localFileSize
 
@@ -518,8 +526,10 @@ class RocksDBFileManager(
         immutableDfsFile
       }
     }
-    logInfo(s"Copied $filesCopied files ($bytesCopied bytes) from local to" +
-      s" DFS for version $version. $filesReused files reused without copying.")
+    logInfo(log"Copied ${MDC(NUM_FILES_COPIED, filesCopied)} files " +
+      log"(${MDC(NUM_BYTES, bytesCopied)} bytes) from local to" +
+      log" DFS for version ${MDC(VERSION_NUMBER, version)}. " +
+      log"${MDC(NUM_FILES_REUSED, filesReused)} files reused without copying.")
     versionToRocksDBFiles.put(version, immutableFiles)
 
     // Cleanup locally deleted files from the localFilesToDfsFiles map
@@ -571,10 +581,13 @@ class RocksDBFileManager(
         if (!isSameFile) {
           existingFile.delete()
           localFilesToDfsFiles.remove(existingFile.getName)
-          logInfo(s"Deleted local file $existingFile with size 
$existingFileSize mapped" +
-            s" to previous dfsFile ${prevDfsFile.getOrElse("null")}")
+          logInfo(log"Deleted local file ${MDC(FILE_NAME, existingFile)} " +
+            log"with size ${MDC(NUM_BYTES, existingFileSize)} mapped" +
+            log" to previous dfsFile ${MDC(DFS_FILE, 
prevDfsFile.getOrElse("null"))}")
         } else {
-          logInfo(s"reusing $prevDfsFile present at $existingFile for 
$requiredFile")
+          logInfo(log"reusing ${MDC(DFS_FILE, prevDfsFile)} present at " +
+            log"${MDC(EXISTING_FILE, existingFile)} " +
+            log"for ${MDC(FILE_NAME, requiredFile)}")
         }
       }
 
@@ -600,13 +613,15 @@ class RocksDBFileManager(
         filesCopied += 1
         bytesCopied += localFileSize
         localFilesToDfsFiles.put(localFileName, file)
-        logInfo(s"Copied $dfsFile to $localFile - $localFileSize bytes")
+        logInfo(log"Copied ${MDC(DFS_FILE, dfsFile)} to ${MDC(FILE_NAME, 
localFile)} - " +
+          log"${MDC(NUM_BYTES, localFileSize)} bytes")
       } else {
         filesReused += 1
       }
     }
-    logInfo(s"Copied $filesCopied files ($bytesCopied bytes) from DFS to local 
with " +
-      s"$filesReused files reused.")
+    logInfo(log"Copied ${MDC(NUM_FILES_COPIED, filesCopied)} files " +
+      log"(${MDC(NUM_BYTES, bytesCopied)} bytes) from DFS to local with " +
+      log"${MDC(NUM_FILES_REUSED, filesReused)} files reused.")
 
     loadCheckpointMetrics = RocksDBFileManagerMetrics(
       bytesCopied = bytesCopied,
@@ -622,7 +637,7 @@ class RocksDBFileManager(
       .filterNot(currentLocalFiles.contains)
 
     mappingsToClean.foreach { f =>
-      logInfo(s"cleaning $f from the localFilesToDfsFiles map")
+      logInfo(log"cleaning ${MDC(FILE_NAME, f)} from the localFilesToDfsFiles 
map")
       localFilesToDfsFiles.remove(f)
     }
   }
@@ -657,7 +672,8 @@ class RocksDBFileManager(
         totalBytes += bytes
       }
       zout.close()  // so that any error in closing also cancels the output 
stream
-      logInfo(s"Zipped $totalBytes bytes (before compression) to $filesStr")
+      logInfo(log"Zipped ${MDC(NUM_BYTES, totalBytes)} bytes (before 
compression) to " +
+        log"${MDC(FILE_NAME, filesStr)}")
       // The other fields saveCheckpointMetrics should have been filled
       saveCheckpointMetrics =
         saveCheckpointMetrics.copy(zipFileBytesUncompressed = Some(totalBytes))
@@ -675,11 +691,12 @@ class RocksDBFileManager(
   }
 
   /** Log the files present in a directory. This is useful for debugging. */
-  private def logFilesInDir(dir: File, msg: String): Unit = {
+  private def logFilesInDir(dir: File, msg: MessageWithContext): Unit = {
     lazy val files = 
Option(Utils.recursiveList(dir)).getOrElse(Array.empty).map { f =>
       s"${f.getAbsolutePath} - ${f.length()} bytes"
     }
-    logInfo(s"$msg - ${files.length} files\n\t${files.mkString("\n\t")}")
+    logInfo(msg + log" - ${MDC(NUM_FILES, files.length)} files\n\t" +
+      log"${MDC(FILE_NAME, files.mkString("\n\t"))}")
   }
 
   private def newDFSFileName(localFileName: String): String = {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBMemoryManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBMemoryManager.scala
index 38b9dc56838e..66451234abc4 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBMemoryManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBMemoryManager.scala
@@ -19,7 +19,8 @@ package org.apache.spark.sql.execution.streaming.state
 
 import org.rocksdb._
 
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey._
 
 /**
  * Singleton responsible for managing cache and write buffer manager 
associated with all RocksDB
@@ -47,8 +48,8 @@ object RocksDBMemoryManager extends Logging {
         }
 
         val totalMemoryUsageInBytes: Long = conf.totalMemoryUsageMB * 1024 * 
1024
-        logInfo(s"Creating RocksDB state store LRU cache with " +
-          s"total_size=$totalMemoryUsageInBytes")
+        logInfo(log"Creating RocksDB state store LRU cache with " +
+          log"total_size=${MDC(NUM_BYTES, totalMemoryUsageInBytes)}")
 
         // SPARK-44878 - avoid using strict limit to prevent insertion 
exception on cache full.
         // Please refer to RocksDB issue here - 
https://github.com/facebook/rocksdb/issues/8670
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
index e05f9c24f719..154f4ea5fc3e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
@@ -24,7 +24,8 @@ import scala.util.control.NonFatal
 import org.apache.hadoop.conf.Configuration
 
 import org.apache.spark.{SparkConf, SparkEnv}
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey._
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.types.StructType
@@ -164,7 +165,8 @@ private[sql] class RocksDBStateStoreProvider
         verify(state == UPDATING, "Cannot commit after already committed or 
aborted")
         val newVersion = rocksDB.commit()
         state = COMMITTED
-        logInfo(s"Committed $newVersion for $id")
+        logInfo(log"Committed ${MDC(VERSION_NUMBER, newVersion)} " +
+          log"for ${MDC(STATE_STORE_ID, id)}")
         newVersion
       } catch {
         case e: Throwable =>
@@ -174,7 +176,8 @@ private[sql] class RocksDBStateStoreProvider
 
     override def abort(): Unit = {
       verify(state == UPDATING || state == ABORTED, "Cannot abort after 
already committed")
-      logInfo(s"Aborting ${version + 1} for $id")
+      logInfo(log"Aborting ${MDC(VERSION_NUMBER, version + 1)} " +
+        log"for ${MDC(STATE_STORE_ID, id)}")
       rocksDB.rollback()
       state = ABORTED
     }
@@ -238,7 +241,8 @@ private[sql] class RocksDBStateStoreProvider
           rocksDBMetrics.totalMemUsageBytes,
           stateStoreCustomMetrics)
       } else {
-        logInfo(s"Failed to collect metrics for store_id=$id and 
version=$version")
+        logInfo(log"Failed to collect metrics for 
store_id=${MDC(STATE_STORE_ID, id)} " +
+          log"and version=${MDC(VERSION_NUMBER, version)}")
         StateStoreMetrics(0, 0, Map.empty)
       }
     }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
index 699ce75a88de..9ada54e20543 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
@@ -31,7 +31,7 @@ import org.apache.hadoop.fs.Path
 
 import org.apache.spark.{SparkContext, SparkEnv, 
SparkUnsupportedOperationException}
 import org.apache.spark.internal.{Logging, MDC}
-import org.apache.spark.internal.LogKey.{LOAD_TIME, QUERY_RUN_ID, 
STATE_STORE_PROVIDER, STORE_ID}
+import org.apache.spark.internal.LogKey._
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
 import org.apache.spark.sql.catalyst.util.UnsafeRowUtils
 import org.apache.spark.sql.errors.QueryExecutionErrors
@@ -821,7 +821,7 @@ object StateStore extends Logging {
             provider.doMaintenance()
             if (!verifyIfStoreInstanceActive(id)) {
               unload(id)
-              logInfo(s"Unloaded $provider")
+              logInfo(log"Unloaded ${MDC(STATE_STORE_PROVIDER, provider)}")
             }
           } catch {
             case NonFatal(e) =>
@@ -830,8 +830,9 @@ object StateStore extends Logging {
               threadPoolException.set(e)
           } finally {
             val duration = System.currentTimeMillis() - startTime
-            val logMsg = s"Finished maintenance task for provider=$id" +
-              s" in elapsed_time=$duration\n"
+            val logMsg =
+              log"Finished maintenance task for 
provider=${MDC(STATE_STORE_PROVIDER, id)}" +
+                log" in elapsed_time=${MDC(TIME_UNITS, duration)}\n"
             if (duration > 5000) {
               logInfo(logMsg)
             } else {
@@ -843,8 +844,9 @@ object StateStore extends Logging {
           }
         })
       } else {
-        logInfo(s"Not processing partition ${id} for maintenance because it is 
currently " +
-          s"being processed")
+        logInfo(log"Not processing partition ${MDC(PARTITION_ID, id)} " +
+          log"for maintenance because it is currently " +
+          log"being processed")
       }
     }
   }
@@ -858,8 +860,10 @@ object StateStore extends Logging {
       val providerIdsToUnload = coordinatorRef
         .map(_.reportActiveInstance(storeProviderId, host, executorId, 
otherProviderIds))
         .getOrElse(Seq.empty[StateStoreProviderId])
-      logInfo(s"Reported that the loaded instance $storeProviderId is active")
-      logDebug(s"The loaded instances are going to unload: 
${providerIdsToUnload.mkString(", ")}")
+      logInfo(log"Reported that the loaded instance " +
+        log"${MDC(STATE_STORE_PROVIDER, storeProviderId)} is active")
+      logDebug(log"The loaded instances are going to unload: " +
+        log"${MDC(STATE_STORE_PROVIDER, providerIdsToUnload.mkString(", "))}")
       providerIdsToUnload
     } else {
       Seq.empty[StateStoreProviderId]
@@ -890,7 +894,8 @@ object StateStore extends Logging {
         logDebug("Getting StateStoreCoordinatorRef")
         _coordRef = StateStoreCoordinatorRef.forExecutor(env)
       }
-      logInfo(s"Retrieved reference to StateStoreCoordinator: ${_coordRef}")
+      logInfo(log"Retrieved reference to StateStoreCoordinator: " +
+        log"${MDC(STATE_STORE_PROVIDER, _coordRef)}")
       Some(_coordRef)
     } else {
       _coordRef = null
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala
index 30cf49d8e56d..a52d2bb2c610 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala
@@ -25,7 +25,8 @@ import com.google.common.io.ByteStreams
 import org.apache.commons.io.IOUtils
 import org.apache.hadoop.fs.{FSError, Path}
 
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey._
 import org.apache.spark.io.CompressionCodec
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.execution.streaming.CheckpointFileManager
@@ -108,8 +109,9 @@ abstract class StateStoreChangelogWriter(
       // IOException into FSError.
       case e: FSError if e.getCause.isInstanceOf[IOException] =>
       case NonFatal(ex) =>
-        logInfo(s"Failed to cancel changelog file $file for state store 
provider " +
-          s"with exception=$ex")
+        logInfo(log"Failed to cancel changelog file ${MDC(FILE_NAME, file)} " +
+          log"for state store provider " +
+          log"with exception=${MDC(ERROR, ex)}")
     } finally {
       backingFileStream = null
       compressedStream = null
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StreamingSessionWindowStateManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StreamingSessionWindowStateManager.scala
index 5130933f52ef..cbb7c1e56907 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StreamingSessionWindowStateManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StreamingSessionWindowStateManager.scala
@@ -17,7 +17,8 @@
 
 package org.apache.spark.sql.execution.streaming.state
 
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey._
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Literal, 
UnsafeProjection, UnsafeRow}
 import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
@@ -261,7 +262,7 @@ class StreamingSessionWindowStateManagerImplV1(
 
   override def abortIfNeeded(store: StateStore): Unit = {
     if (!store.hasCommitted) {
-      logInfo(s"Aborted store ${store.id}")
+      logInfo(log"Aborted store ${MDC(STATE_STORE_ID, store.id)}")
       store.abort()
     }
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
index 1303668b8f86..b541f59a2165 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
@@ -25,7 +25,7 @@ import org.apache.hadoop.conf.Configuration
 
 import org.apache.spark.TaskContext
 import org.apache.spark.internal.{Logging, MDC}
-import org.apache.spark.internal.LogKey.{END_INDEX, START_INDEX}
+import org.apache.spark.internal.LogKey._
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, Expression, JoinedRow, Literal, SafeProjection, 
SpecificInternalRow, UnsafeProjection, UnsafeRow}
 import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
@@ -463,7 +463,7 @@ class SymmetricHashJoinStateManager(
 
     def abortIfNeeded(): Unit = {
       if (!stateStore.hasCommitted) {
-        logInfo(s"Aborted store ${stateStore.id}")
+        logInfo(log"Aborted store ${MDC(STATE_STORE_ID, stateStore.id)}")
         stateStore.abort()
       }
       // If this class manages a state store provider by itself, it should 
take care of closing
@@ -491,7 +491,7 @@ class SymmetricHashJoinStateManager(
           useMultipleValuesPerKey = false)
         stateStoreProvider.getStore(stateInfo.get.storeVersion)
       }
-      logInfo(s"Loaded store ${store.id}")
+      logInfo(log"Loaded store ${MDC(STATE_STORE_ID, store.id)}")
       store
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to