This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 520f3b1c192b [SPARK-47593][CORE] Connector module: Migrate logWarn 
with variables to structured logging framework
520f3b1c192b is described below

commit 520f3b1c192b1bae53509fdad770f5711ca3791f
Author: panbingkun <panbing...@baidu.com>
AuthorDate: Tue Apr 9 21:42:39 2024 -0700

    [SPARK-47593][CORE] Connector module: Migrate logWarn with variables to 
structured logging framework
    
    ### What changes were proposed in this pull request?
    The pr aims to migrate `logWarning` in module `Connector` with variables to 
`structured logging framework`.
    
    ### Why are the changes needed?
    To enhance Apache Spark's logging system by implementing structured logging.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    - Pass GA.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #45879 from panbingkun/SPARK-47593_warning.
    
    Lead-authored-by: panbingkun <panbing...@baidu.com>
    Co-authored-by: Gengliang Wang <gengli...@apache.org>
    Signed-off-by: Gengliang Wang <gengli...@apache.org>
---
 .../scala/org/apache/spark/internal/LogKey.scala   | 23 +++++++++++++
 .../scala/org/apache/spark/util/MDCSuite.scala     | 15 +++++++-
 .../org/apache/spark/sql/avro/AvroUtils.scala      |  9 ++---
 .../ExecutePlanResponseReattachableIterator.scala  |  2 +-
 .../sql/connect/client/GrpcRetryHandler.scala      | 18 +++++-----
 .../execution/ExecuteGrpcResponseSender.scala      | 15 +++++---
 .../service/SparkConnectStreamingQueryCache.scala  | 14 +++++---
 .../connect/ui/SparkConnectServerListener.scala    | 36 +++++++++++++------
 .../sql/jdbc/DockerJDBCIntegrationSuite.scala      |  7 ++--
 .../spark/sql/kafka010/KafkaContinuousStream.scala |  5 +--
 .../spark/sql/kafka010/KafkaMicroBatchStream.scala |  5 +--
 .../sql/kafka010/KafkaOffsetReaderAdmin.scala      | 10 +++---
 .../sql/kafka010/KafkaOffsetReaderConsumer.scala   | 10 +++---
 .../apache/spark/sql/kafka010/KafkaSource.scala    |  5 +--
 .../sql/kafka010/consumer/FetchedDataPool.scala    |  7 ++--
 .../sql/kafka010/consumer/KafkaDataConsumer.scala  | 40 +++++++++++++---------
 .../producer/InternalKafkaProducerPool.scala       |  6 ++--
 .../apache/spark/sql/kafka010/KafkaTestUtils.scala |  9 ++---
 .../kafka010/KafkaDelegationTokenProvider.scala    | 10 +++---
 .../streaming/kafka010/ConsumerStrategy.scala      | 11 +++---
 .../streaming/kafka010/KafkaDataConsumer.scala     |  7 ++--
 .../spark/streaming/kafka010/KafkaUtils.scala      | 16 +++++----
 .../streaming/kinesis/KinesisBackedBlockRDD.scala  |  6 ++--
 .../streaming/kinesis/KinesisRecordProcessor.scala |  2 +-
 .../spark/streaming/kinesis/KinesisTestUtils.scala |  7 ++--
 25 files changed, 195 insertions(+), 100 deletions(-)

diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala 
b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
index 2cb5eac4548c..6cdec011e2ae 100644
--- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
+++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
@@ -34,6 +34,7 @@ object LogKey extends Enumeration {
   val CATEGORICAL_FEATURES = Value
   val CLASS_LOADER = Value
   val CLASS_NAME = Value
+  val CLUSTER_ID = Value
   val COLUMN_DATA_TYPE_SOURCE = Value
   val COLUMN_DATA_TYPE_TARGET = Value
   val COLUMN_DEFAULT_VALUE = Value
@@ -43,6 +44,7 @@ object LogKey extends Enumeration {
   val COMPONENT = Value
   val CONFIG = Value
   val CONFIG2 = Value
+  val CONTAINER = Value
   val CONTAINER_ID = Value
   val COUNT = Value
   val CSV_HEADER_COLUMN_NAME = Value
@@ -51,6 +53,7 @@ object LogKey extends Enumeration {
   val CSV_SCHEMA_FIELD_NAME = Value
   val CSV_SCHEMA_FIELD_NAMES = Value
   val CSV_SOURCE = Value
+  val DATA = Value
   val DATABASE_NAME = Value
   val DRIVER_ID = Value
   val DROPPED_PARTITIONS = Value
@@ -70,9 +73,11 @@ object LogKey extends Enumeration {
   val HIVE_OPERATION_STATE = Value
   val HIVE_OPERATION_TYPE = Value
   val HOST = Value
+  val INDEX = Value
   val JOB_ID = Value
   val JOIN_CONDITION = Value
   val JOIN_CONDITION_SUB_EXPRESSION = Value
+  val KEY = Value
   val LEARNING_RATE = Value
   val LINE = Value
   val LINE_NUM = Value
@@ -80,17 +85,23 @@ object LogKey extends Enumeration {
   val LOG_TYPE = Value
   val MASTER_URL = Value
   val MAX_ATTEMPTS = Value
+  val MAX_CAPACITY = Value
   val MAX_CATEGORIES = Value
   val MAX_EXECUTOR_FAILURES = Value
   val MAX_SIZE = Value
   val MERGE_DIR_NAME = Value
   val METHOD_NAME = Value
   val MIN_SIZE = Value
+  val NEW_VALUE = Value
   val NUM_COLUMNS = Value
   val NUM_ITERATIONS = Value
   val OBJECT_ID = Value
+  val OFFSET = Value
+  val OFFSETS = Value
   val OLD_BLOCK_MANAGER_ID = Value
+  val OLD_VALUE = Value
   val OPTIMIZER_CLASS_NAME = Value
+  val OP_ID = Value
   val OP_TYPE = Value
   val PARSE_MODE = Value
   val PARTITION_ID = Value
@@ -99,8 +110,11 @@ object LogKey extends Enumeration {
   val PATH = Value
   val PATHS = Value
   val POD_ID = Value
+  val POLICY = Value
   val PORT = Value
+  val PRODUCER_ID = Value
   val QUERY_HINT = Value
+  val QUERY_ID = Value
   val QUERY_PLAN = Value
   val QUERY_PLAN_LENGTH_ACTUAL = Value
   val QUERY_PLAN_LENGTH_MAX = Value
@@ -117,6 +131,7 @@ object LogKey extends Enumeration {
   val RULE_BATCH_NAME = Value
   val RULE_NAME = Value
   val RULE_NUMBER_OF_RUNS = Value
+  val SERVICE_NAME = Value
   val SESSION_ID = Value
   val SHARD_ID = Value
   val SHUFFLE_BLOCK_INFO = Value
@@ -127,6 +142,8 @@ object LogKey extends Enumeration {
   val SQL_TEXT = Value
   val STAGE_ID = Value
   val STATEMENT_ID = Value
+  val STATUS = Value
+  val STREAM_NAME = Value
   val SUBMISSION_ID = Value
   val SUBSAMPLING_RATE = Value
   val TABLE_NAME = Value
@@ -140,13 +157,19 @@ object LogKey extends Enumeration {
   val TID = Value
   val TIMEOUT = Value
   val TIME_UNITS = Value
+  val TIP = Value
+  val TOPIC_PARTITION = Value
   val TOTAL_EFFECTIVE_TIME = Value
   val TOTAL_TIME = Value
   val UNSUPPORTED_EXPRESSION = Value
   val UNSUPPORTED_HINT_REASON = Value
+  val UNTIL_OFFSET = Value
   val URI = Value
   val USER_ID = Value
   val USER_NAME = Value
+  val WAIT_RESULT_TIME = Value
+  val WAIT_SEND_TIME = Value
+  val WAIT_TIME = Value
   val WATERMARK_CONSTRAINT = Value
   val WORKER_URL = Value
   val XSD_PATH = Value
diff --git a/common/utils/src/test/scala/org/apache/spark/util/MDCSuite.scala 
b/common/utils/src/test/scala/org/apache/spark/util/MDCSuite.scala
index 1ac51e236080..62eaa852913c 100644
--- a/common/utils/src/test/scala/org/apache/spark/util/MDCSuite.scala
+++ b/common/utils/src/test/scala/org/apache/spark/util/MDCSuite.scala
@@ -22,7 +22,7 @@ import scala.jdk.CollectionConverters._
 import org.scalatest.funsuite.AnyFunSuite // scalastyle:ignore funsuite
 
 import org.apache.spark.internal.{Logging, MDC}
-import org.apache.spark.internal.LogKey.EXIT_CODE
+import org.apache.spark.internal.LogKey.{EXIT_CODE, OFFSET, RANGE}
 
 class MDCSuite
     extends AnyFunSuite // scalastyle:ignore funsuite
@@ -41,6 +41,19 @@ class MDCSuite
     assert(log.context === Map("exit_code" -> "CustomObjectValue: spark, 
10086").asJava)
   }
 
+  test("check MDC stripMargin") {
+    val log =
+      log"""
+           |The current available offset range is ${MDC(RANGE, "12 - 34")}.
+           | Offset ${MDC(OFFSET, "666")}. is out of range""".stripMargin
+    val expected =
+      s"""
+         |The current available offset range is 12 - 34.
+         | Offset 666. is out of range""".stripMargin
+    assert(log.message === expected)
+    assert(log.context === Map("range" -> "12 - 34", "offset" -> "666").asJava)
+  }
+
   case class CustomObjectValue(key: String, value: Int) {
     override def toString: String = {
       "CustomObjectValue: " + key + ", " + value
diff --git 
a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala 
b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala
index 269a8b80c2b7..e8be11f48a2b 100644
--- a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala
+++ b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala
@@ -31,7 +31,8 @@ import org.apache.hadoop.fs.FileStatus
 import org.apache.hadoop.mapreduce.Job
 
 import org.apache.spark.{SparkException, SparkIllegalArgumentException}
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{CONFIG, PATH}
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.avro.AvroCompressionCodec._
 import org.apache.spark.sql.avro.AvroOptions.IGNORE_EXTENSION
@@ -51,8 +52,8 @@ private[sql] object AvroUtils extends Logging {
     val parsedOptions = new AvroOptions(options, conf)
 
     if (parsedOptions.parameters.contains(IGNORE_EXTENSION)) {
-      logWarning(s"Option $IGNORE_EXTENSION is deprecated. Please use the " +
-        "general data source option pathGlobFilter for filtering file names.")
+      logWarning(log"Option ${MDC(CONFIG, IGNORE_EXTENSION)} is deprecated. 
Please use the " +
+        log"general data source option pathGlobFilter for filtering file 
names.")
     }
     // User can specify an optional avro json schema.
     val avroSchema = parsedOptions.schema
@@ -160,7 +161,7 @@ private[sql] object AvroUtils extends Logging {
           } catch {
             case e: IOException =>
               if (ignoreCorruptFiles) {
-                logWarning(s"Skipped the footer in the corrupted file: $path", 
e)
+                logWarning(log"Skipped the footer in the corrupted file: 
${MDC(PATH, path)}", e)
                 None
               } else {
                 throw new SparkException(s"Could not read file: $path", e)
diff --git 
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala
 
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala
index 5854a9225dbe..74f13272a365 100644
--- 
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala
+++ 
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala
@@ -277,7 +277,7 @@ class ExecutePlanResponseReattachableIterator(
           }
         } catch {
           case NonFatal(e) =>
-            logWarning(s"ReleaseExecute failed with exception: $e.")
+            logWarning(log"ReleaseExecute failed with exception:", e)
         }
       }
     }
diff --git 
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala
 
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala
index 2418dfa03505..508dad3d748d 100644
--- 
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala
+++ 
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala
@@ -22,6 +22,8 @@ import scala.util.control.NonFatal
 import io.grpc.stub.StreamObserver
 
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.LogKey.{ERROR, POLICY, RETRY_COUNT, WAIT_TIME}
+import org.apache.spark.internal.MDC
 
 private[sql] class GrpcRetryHandler(
     private val policies: Seq[RetryPolicy],
@@ -187,8 +189,8 @@ private[sql] object GrpcRetryHandler extends Logging {
       if (lastException.isInstanceOf[RetryException]) {
         // retry exception is considered immediately retriable without any 
policies.
         logWarning(
-          s"Non-Fatal error during RPC execution: $lastException, retrying " +
-            s"(currentRetryNum=$currentRetryNum)")
+          log"Non-Fatal error during RPC execution: ${MDC(ERROR, 
lastException)}, " +
+            log"retrying (currentRetryNum=${MDC(RETRY_COUNT, 
currentRetryNum)})")
         return
       }
 
@@ -197,18 +199,18 @@ private[sql] object GrpcRetryHandler extends Logging {
 
         if (time.isDefined) {
           logWarning(
-            s"Non-Fatal error during RPC execution: $lastException, retrying " 
+
-              s"(wait=${time.get.toMillis}, currentRetryNum=$currentRetryNum, 
" +
-              s"policy: ${policy.getName})")
-
+            log"Non-Fatal error during RPC execution: ${MDC(ERROR, 
lastException)}, " +
+              log"retrying (wait=${MDC(WAIT_TIME, time.get.toMillis)} ms, " +
+              log"currentRetryNum=${MDC(RETRY_COUNT, currentRetryNum)}, " +
+              log"policy=${MDC(POLICY, policy.getName)}).")
           sleep(time.get.toMillis)
           return
         }
       }
 
       logWarning(
-        s"Non-Fatal error during RPC execution: $lastException, exceeded 
retries " +
-          s"(currentRetryNum=$currentRetryNum)")
+        log"Non-Fatal error during RPC execution: ${MDC(ERROR, 
lastException)}, " +
+          log"exceeded retries (currentRetryNum=${MDC(RETRY_COUNT, 
currentRetryNum)})")
 
       val error = new RetriesExceeded()
       exceptionList.foreach(error.addSuppressed)
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala
index 4b95f38c6695..1139507a37a5 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala
@@ -24,7 +24,9 @@ import io.grpc.stub.{ServerCallStreamObserver, StreamObserver}
 
 import org.apache.spark.{SparkEnv, SparkSQLException}
 import org.apache.spark.connect.proto.ExecutePlanResponse
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{INDEX, OP_ID, TOTAL_TIME, 
WAIT_RESULT_TIME, WAIT_SEND_TIME}
+import org.apache.spark.sql.catalyst.util.DateTimeConstants.NANOS_PER_MILLIS
 import org.apache.spark.sql.connect.common.ProtoUtils
 import 
org.apache.spark.sql.connect.config.Connect.{CONNECT_EXECUTE_REATTACHABLE_SENDER_MAX_STREAM_DURATION,
 CONNECT_EXECUTE_REATTACHABLE_SENDER_MAX_STREAM_SIZE, 
CONNECT_PROGRESS_REPORT_INTERVAL}
 import org.apache.spark.sql.connect.service.{ExecuteHolder, 
SparkConnectService}
@@ -268,10 +270,15 @@ private[connect] class ExecuteGrpcResponseSender[T <: 
Message](
       // Process the outcome of the inner loop.
       if (interrupted) {
         // This sender got interrupted. Kill this RPC.
+        val totalTime = (System.nanoTime - startTime) / 
NANOS_PER_MILLIS.toDouble
+        val waitResultTime = consumeSleep / NANOS_PER_MILLIS.toDouble
+        val waitSendTime = sendSleep / NANOS_PER_MILLIS.toDouble
         logWarning(
-          s"Got detached from opId=${executeHolder.operationId} at index 
${nextIndex - 1}." +
-            s"totalTime=${System.nanoTime - startTime}ns " +
-            s"waitingForResults=${consumeSleep}ns 
waitingForSend=${sendSleep}ns")
+          log"Got detached from opId=${MDC(OP_ID, executeHolder.operationId)} 
" +
+            log"at index ${MDC(INDEX, nextIndex - 1)}." +
+            log"totalTime=${MDC(TOTAL_TIME, totalTime)} ms " +
+            log"waitingForResults=${MDC(WAIT_RESULT_TIME, waitResultTime)} ms 
" +
+            log"waitingForSend=${MDC(WAIT_SEND_TIME, waitSendTime)} ms")
         throw new SparkSQLException(errorClass = 
"INVALID_CURSOR.DISCONNECTED", Map.empty)
       } else if (gotResponse) {
         enqueueProgressMessage()
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala
index a5d3fa497bb3..9690d10eba1a 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala
@@ -26,7 +26,8 @@ import scala.collection.mutable
 import scala.concurrent.duration.{Duration, DurationInt, FiniteDuration}
 import scala.util.control.NonFatal
 
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{NEW_VALUE, OLD_VALUE, QUERY_ID}
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.streaming.StreamingQuery
 import org.apache.spark.util.{Clock, SystemClock, ThreadUtils}
@@ -65,9 +66,9 @@ private[connect] class SparkConnectStreamingQueryCache(
 
       queryCache.put(QueryCacheKey(query.id.toString, query.runId.toString), 
value) match {
         case Some(existing) => // Query is being replace. Not really expected.
-          logWarning(
-            s"Replacing existing query in the cache (unexpected). Query Id: 
${query.id}." +
-              s"Existing value $existing, new value $value.")
+          logWarning(log"Replacing existing query in the cache (unexpected). " 
+
+            log"Query Id: ${MDC(QUERY_ID, query.id)}.Existing value 
${MDC(OLD_VALUE, existing)}, " +
+            log"new value ${MDC(NEW_VALUE, value)}.")
         case None =>
           logInfo(s"Adding new query to the cache. Query Id ${query.id}, value 
$value.")
       }
@@ -115,7 +116,10 @@ private[connect] class SparkConnectStreamingQueryCache(
             v.query.stop()
           } catch {
             case NonFatal(ex) =>
-              logWarning(s"Failed to stop the query ${k.queryId}. Error is 
ignored.", ex)
+              logWarning(
+                log"Failed to stop the query ${MDC(QUERY_ID, k.queryId)}. " +
+                  log"Error is ignored.",
+                ex)
           }
         }
       }
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerListener.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerListener.scala
index ec6079a89145..a1bbab7dbdbc 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerListener.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerListener.scala
@@ -21,7 +21,8 @@ import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.{SparkConf, SparkContext, SparkEnv}
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{OP_ID, SESSION_ID}
 import org.apache.spark.internal.config.Status.LIVE_ENTITY_UPDATE_PERIOD
 import org.apache.spark.scheduler._
 import org.apache.spark.sql.connect.config.Connect.{CONNECT_UI_SESSION_LIMIT, 
CONNECT_UI_STATEMENT_LIMIT}
@@ -183,8 +184,8 @@ private[connect] class SparkConnectServerListener(
         updateLiveStore(sessionData)
       case None =>
         logWarning(
-          s"onOperationStart called with unknown session id: ${e.sessionId}." +
-            s"Regardless, the operation has been registered.")
+          log"onOperationStart called with unknown session id: 
${MDC(SESSION_ID, e.sessionId)}." +
+            log"Regardless, the operation has been registered.")
     }
   }
 
@@ -194,7 +195,9 @@ private[connect] class SparkConnectServerListener(
         executionData.state = ExecutionState.COMPILED
         updateLiveStore(executionData)
       case None =>
-        logWarning(s"onOperationAnalyzed called with unknown operation id: 
${e.jobTag}")
+        logWarning(
+          log"onOperationAnalyzed called with " +
+            log"unknown operation id: ${MDC(OP_ID, e.jobTag)}")
     }
   }
 
@@ -205,7 +208,9 @@ private[connect] class SparkConnectServerListener(
         executionData.state = ExecutionState.READY
         updateLiveStore(executionData)
       case None =>
-        logWarning(s"onOperationReadyForExecution called with unknown 
operation id: ${e.jobTag}")
+        logWarning(
+          log"onOperationReadyForExecution called with " +
+            log"unknown operation id: ${MDC(OP_ID, e.jobTag)}")
     }
   }
 
@@ -216,7 +221,9 @@ private[connect] class SparkConnectServerListener(
         executionData.state = ExecutionState.CANCELED
         updateLiveStore(executionData)
       case None =>
-        logWarning(s"onOperationCanceled called with unknown operation id: 
${e.jobTag}")
+        logWarning(
+          log"onOperationCanceled called with " +
+            log"unknown operation id: ${MDC(OP_ID, e.jobTag)}")
     }
   }
   private def onOperationFailed(e: SparkListenerConnectOperationFailed) = 
synchronized {
@@ -227,7 +234,9 @@ private[connect] class SparkConnectServerListener(
         executionData.state = ExecutionState.FAILED
         updateLiveStore(executionData)
       case None =>
-        logWarning(s"onOperationFailed called with unknown operation id: 
${e.jobTag}")
+        logWarning(
+          log"onOperationFailed called with " +
+            log"unknown operation id: ${MDC(OP_ID, e.jobTag)}")
     }
   }
   private def onOperationFinished(e: SparkListenerConnectOperationFinished) = 
synchronized {
@@ -237,7 +246,9 @@ private[connect] class SparkConnectServerListener(
         executionData.state = ExecutionState.FINISHED
         updateLiveStore(executionData)
       case None =>
-        logWarning(s"onOperationFinished called with unknown operation id: 
${e.jobTag}")
+        logWarning(
+          log"onOperationFinished called with " +
+            log"unknown operation id: ${MDC(OP_ID, e.jobTag)}")
     }
   }
   private def onOperationClosed(e: SparkListenerConnectOperationClosed) = 
synchronized {
@@ -248,7 +259,9 @@ private[connect] class SparkConnectServerListener(
         updateStoreWithTriggerEnabled(executionData)
         executionList.remove(e.jobTag)
       case None =>
-        logWarning(s"onOperationClosed called with unknown operation id: 
${e.jobTag}")
+        logWarning(
+          log"onOperationClosed called with " +
+            log"unknown operation id: ${MDC(OP_ID, e.jobTag)}")
     }
   }
 
@@ -265,7 +278,10 @@ private[connect] class SparkConnectServerListener(
         updateStoreWithTriggerEnabled(sessionData)
         sessionList.remove(e.sessionId)
 
-      case None => logWarning(s"onSessionClosed called with unknown session 
id: ${e.sessionId}")
+      case None =>
+        logWarning(
+          log"onSessionClosed called with " +
+            log"unknown session id: ${MDC(SESSION_ID, e.sessionId)}")
     }
   }
 
diff --git 
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala
 
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala
index 8e65e8af9d7a..fc095c5f5b31 100644
--- 
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala
+++ 
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala
@@ -36,7 +36,7 @@ import com.github.dockerjava.zerodep.ZerodepDockerHttpClient
 import org.scalatest.concurrent.{Eventually, PatienceConfiguration}
 import org.scalatest.time.SpanSugar._
 
-import org.apache.spark.internal.LogKey.CLASS_NAME
+import org.apache.spark.internal.LogKey.{CLASS_NAME, CONTAINER, STATUS}
 import org.apache.spark.internal.MDC
 import org.apache.spark.sql.QueryTest
 import org.apache.spark.sql.test.SharedSparkSession
@@ -263,9 +263,10 @@ abstract class DockerJDBCIntegrationSuite
       } catch {
         case NonFatal(e) =>
           val response = docker.inspectContainerCmd(container.getId).exec()
-          logWarning(s"Container $container already stopped")
+          logWarning(log"Container ${MDC(CONTAINER, container)} already 
stopped")
           val status = 
Option(response).map(_.getState.getStatus).getOrElse("unknown")
-          logWarning(s"Could not stop container $container at stage 
'$status'", e)
+          logWarning(log"Could not stop container ${MDC(CONTAINER, container)} 
" +
+            log"at stage '${MDC(STATUS, status)}'", e)
       } finally {
         logContainerOutput()
         docker.removeContainerCmd(container.getId).exec()
diff --git 
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala
 
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala
index 9b7f52585545..9bf0a2e9e513 100644
--- 
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala
+++ 
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala
@@ -24,7 +24,8 @@ import org.apache.kafka.clients.consumer.{ConsumerConfig, 
ConsumerRecord, Offset
 import org.apache.kafka.common.TopicPartition
 
 import org.apache.spark.TaskContext
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{ERROR, TIP}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
 import org.apache.spark.sql.connector.read.InputPartition
@@ -148,7 +149,7 @@ class KafkaContinuousStream(
     if (failOnDataLoss) {
       throw getException()
     } else {
-      logWarning(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE")
+      logWarning(log"${MDC(ERROR, message)}. ${MDC(TIP, 
INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE)}")
     }
   }
 }
diff --git 
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
 
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
index fefa3efcc353..be838ddc3c80 100644
--- 
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
+++ 
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
@@ -25,7 +25,8 @@ import scala.jdk.CollectionConverters._
 import org.apache.kafka.common.TopicPartition
 
 import org.apache.spark.SparkEnv
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{ERROR, TIP}
 import org.apache.spark.internal.config.Network.NETWORK_TIMEOUT
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.connector.read.{InputPartition, 
PartitionReaderFactory}
@@ -309,7 +310,7 @@ private[kafka010] class KafkaMicroBatchStream(
     if (failOnDataLoss) {
       throw getException()
     } else {
-      logWarning(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE")
+      logWarning(log"${MDC(ERROR, message)}. ${MDC(TIP, 
INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE)}")
     }
   }
 
diff --git 
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala
 
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala
index 27adccf6f902..433da08176e7 100644
--- 
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala
+++ 
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala
@@ -30,7 +30,8 @@ import org.apache.kafka.common.{IsolationLevel, 
TopicPartition}
 import org.apache.kafka.common.requests.OffsetFetchResponse
 
 import org.apache.spark.SparkEnv
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{OFFSETS, RETRY_COUNT}
 import org.apache.spark.scheduler.ExecutorCacheTaskLocation
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import 
org.apache.spark.sql.kafka010.KafkaSourceProvider.StrategyOnNoMatchStartingOffset
@@ -335,8 +336,8 @@ private[kafka010] class KafkaOffsetReaderAdmin(
 
           incorrectOffsets = findIncorrectOffsets()
           if (incorrectOffsets.nonEmpty) {
-            logWarning("Found incorrect offsets in some partitions " +
-              s"(partition, previous offset, fetched offset): 
$incorrectOffsets")
+            logWarning(log"Found incorrect offsets in some partitions " +
+              log"(partition, previous offset, fetched offset): ${MDC(OFFSETS, 
incorrectOffsets)}")
             if (attempt < maxOffsetFetchAttempts) {
               logWarning("Retrying to fetch latest offsets because of 
incorrect offsets")
               Thread.sleep(offsetFetchAttemptIntervalMs)
@@ -534,7 +535,8 @@ private[kafka010] class KafkaOffsetReaderAdmin(
         } catch {
           case NonFatal(e) =>
             lastException = e
-            logWarning(s"Error in attempt $attempt getting Kafka offsets: ", e)
+            logWarning(
+              log"Error in attempt ${MDC(RETRY_COUNT, attempt)} getting Kafka 
offsets: ", e)
             attempt += 1
             Thread.sleep(offsetFetchAttemptIntervalMs)
             resetAdmin()
diff --git 
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala
 
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala
index d4953a4a65e3..2ba4a9a563df 100644
--- 
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala
+++ 
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala
@@ -27,7 +27,8 @@ import org.apache.kafka.clients.consumer.{Consumer, 
ConsumerConfig, OffsetAndTim
 import org.apache.kafka.common.TopicPartition
 
 import org.apache.spark.SparkEnv
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{OFFSETS, RETRY_COUNT}
 import org.apache.spark.scheduler.ExecutorCacheTaskLocation
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import 
org.apache.spark.sql.kafka010.KafkaSourceProvider.StrategyOnNoMatchStartingOffset
@@ -385,8 +386,8 @@ private[kafka010] class KafkaOffsetReaderConsumer(
 
           incorrectOffsets = findIncorrectOffsets()
           if (incorrectOffsets.nonEmpty) {
-            logWarning("Found incorrect offsets in some partitions " +
-              s"(partition, previous offset, fetched offset): 
$incorrectOffsets")
+            logWarning(log"Found incorrect offsets in some partitions " +
+              log"(partition, previous offset, fetched offset): ${MDC(OFFSETS, 
incorrectOffsets)}")
             if (attempt < maxOffsetFetchAttempts) {
               logWarning("Retrying to fetch latest offsets because of 
incorrect offsets")
               Thread.sleep(offsetFetchAttemptIntervalMs)
@@ -611,7 +612,8 @@ private[kafka010] class KafkaOffsetReaderConsumer(
               } catch {
                 case NonFatal(e) =>
                   lastException = e
-                  logWarning(s"Error in attempt $attempt getting Kafka 
offsets: ", e)
+                  logWarning(
+                    log"Error in attempt ${MDC(RETRY_COUNT, attempt)} getting 
Kafka offsets: ", e)
                   attempt += 1
                   Thread.sleep(offsetFetchAttemptIntervalMs)
                   resetConsumer()
diff --git 
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
 
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
index 83ed7fff23fc..426672d2e458 100644
--- 
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
+++ 
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
@@ -22,7 +22,8 @@ import java.{util => ju}
 import org.apache.kafka.common.TopicPartition
 
 import org.apache.spark.SparkContext
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{ERROR, TIP}
 import org.apache.spark.internal.config.Network.NETWORK_TIMEOUT
 import org.apache.spark.scheduler.ExecutorCacheTaskLocation
 import org.apache.spark.sql._
@@ -351,7 +352,7 @@ private[kafka010] class KafkaSource(
     if (failOnDataLoss) {
       throw getException()
     } else {
-      logWarning(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE")
+      logWarning(log"${MDC(ERROR, message)}. ${MDC(TIP, 
INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE)}")
     }
   }
 
diff --git 
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/FetchedDataPool.scala
 
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/FetchedDataPool.scala
index 3e6831770a67..981aa71bf947 100644
--- 
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/FetchedDataPool.scala
+++ 
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/FetchedDataPool.scala
@@ -26,7 +26,8 @@ import scala.collection.mutable
 import org.apache.kafka.clients.consumer.ConsumerRecord
 
 import org.apache.spark.SparkConf
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{DATA, KEY}
 import 
org.apache.spark.sql.kafka010.{FETCHED_DATA_CACHE_EVICTOR_THREAD_RUN_INTERVAL, 
FETCHED_DATA_CACHE_TIMEOUT}
 import 
org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.{AvailableOffsetRange, 
CacheKey, UNKNOWN_OFFSET}
 import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
@@ -110,8 +111,8 @@ private[consumer] class FetchedDataPool(
 
   def release(key: CacheKey, fetchedData: FetchedData): Unit = synchronized {
     def warnReleasedDataNotInPool(key: CacheKey, fetchedData: FetchedData): 
Unit = {
-      logWarning(s"No matching data in pool for $fetchedData in key $key. " +
-        "It might be released before, or it was not a part of pool.")
+      logWarning(log"No matching data in pool for ${MDC(DATA, fetchedData)} in 
key " +
+        log"${MDC(KEY, key)}. It might be released before, or it was not a 
part of pool.")
     }
 
     cache.get(key) match {
diff --git 
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala
 
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala
index fbc4a500322e..3ea7d967744c 100644
--- 
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala
+++ 
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala
@@ -30,7 +30,8 @@ import org.apache.kafka.common.TopicPartition
 
 import org.apache.spark.{SparkEnv, TaskContext}
 import org.apache.spark.deploy.security.HadoopDelegationTokenManager
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC, MessageWithContext}
+import org.apache.spark.internal.LogKey.{ERROR, GROUP_ID, OFFSET, RANGE, TIP, 
TOPIC_PARTITION, UNTIL_OFFSET}
 import org.apache.spark.kafka010.{KafkaConfigUpdater, KafkaTokenUtil}
 import org.apache.spark.sql.kafka010.KafkaExceptions
 import org.apache.spark.sql.kafka010.KafkaSourceProvider._
@@ -426,7 +427,8 @@ private[kafka010] class KafkaDataConsumer(
     val range = timeNanos {
       consumer.getAvailableOffsetRange()
     }
-    logWarning(s"Some data may be lost. Recovering from the earliest offset: 
${range.earliest}")
+    logWarning(log"Some data may be lost. Recovering from the earliest offset: 
" +
+      log"${MDC(OFFSET, range.earliest)}")
 
     val topicPartition = consumer.topicPartition
     val groupId = consumer.groupId
@@ -444,11 +446,12 @@ private[kafka010] class KafkaDataConsumer(
       //      |          |              |                |
       //   offset   untilOffset   earliestOffset   latestOffset
       val warningMessage =
-      s"""
-         |The current available offset range is $range.
-         | Offset $offset is out of range, and records in [$offset, 
$untilOffset) will be
-         | skipped ${additionalWarningMessage(topicPartition, groupId)}
-        """.stripMargin
+      log"""
+         |The current available offset range is ${MDC(RANGE, range)}.
+         | Offset ${MDC(OFFSET, offset)} is out of range, and records in
+         | [${MDC(OFFSET, offset)}, ${MDC(UNTIL_OFFSET, untilOffset)}] will be
+         | skipped""".stripMargin +
+        additionalWarningMessage(topicPartition, groupId)
       logWarning(warningMessage)
       UNKNOWN_OFFSET
     } else if (offset >= range.earliest) {
@@ -460,8 +463,8 @@ private[kafka010] class KafkaDataConsumer(
       // This will happen when a topic is deleted and recreated, and new data 
are pushed very fast,
       // then we will see `offset` disappears first then appears again. 
Although the parameters
       // are same, the state in Kafka cluster is changed, so the outer loop 
won't be endless.
-      logWarning(s"Found a disappeared offset $offset. Some data may be lost " 
+
-        s"${additionalWarningMessage(topicPartition, groupId)}")
+      logWarning(log"Found a disappeared offset ${MDC(OFFSET, offset)}. Some 
data may be lost " +
+        additionalWarningMessage(topicPartition, groupId))
       offset
     } else {
       // 
------------------------------------------------------------------------------
@@ -470,10 +473,11 @@ private[kafka010] class KafkaDataConsumer(
       //   offset   earliestOffset   min(untilOffset,latestOffset)   
max(untilOffset, latestOffset)
       val warningMessage =
       s"""
-         |The current available offset range is $range.
-         | Offset ${offset} is out of range, and records in [$offset, 
${range.earliest}) will be
-         | skipped ${additionalWarningMessage(topicPartition, groupId)}
-        """.stripMargin
+         |The current available offset range is ${MDC(RANGE, range)}.
+         | Offset ${MDC(OFFSET, offset)} is out of range, and records in
+         | [${MDC(OFFSET, offset)}, ${MDC(UNTIL_OFFSET, range.earliest)}] will 
be
+         | skipped""".stripMargin +
+        additionalWarningMessage(topicPartition, groupId)
       logWarning(warningMessage)
       range.earliest
     }
@@ -629,9 +633,10 @@ private[kafka010] class KafkaDataConsumer(
    */
   private def additionalWarningMessage(
       topicPartition: TopicPartition,
-      groupId: String): String = {
-    s"(GroupId: $groupId, TopicPartition: $topicPartition). " +
-      s"$INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE"
+      groupId: String): MessageWithContext = {
+    log"(GroupId: ${MDC(GROUP_ID, groupId)}, " +
+      log"TopicPartition: ${MDC(TOPIC_PARTITION, topicPartition)}). " +
+      log"${MDC(TIP, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE)}"
   }
 
   /**
@@ -660,7 +665,8 @@ private[kafka010] class KafkaDataConsumer(
       groupId: String,
       message: String,
       cause: Throwable = null): Unit = {
-    val finalMessage = s"$message ${additionalWarningMessage(topicPartition, 
groupId)}"
+    val finalMessage = log"${MDC(ERROR, message)}" +
+      additionalWarningMessage(topicPartition, groupId)
 
     dataLoss += 1
     if (cause != null) {
diff --git 
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/producer/InternalKafkaProducerPool.scala
 
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/producer/InternalKafkaProducerPool.scala
index ddde7805f1a4..f35023d744b6 100644
--- 
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/producer/InternalKafkaProducerPool.scala
+++ 
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/producer/InternalKafkaProducerPool.scala
@@ -27,7 +27,8 @@ import scala.jdk.CollectionConverters._
 import org.apache.kafka.clients.producer.KafkaProducer
 
 import org.apache.spark.{SparkConf, SparkEnv}
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.PRODUCER_ID
 import org.apache.spark.kafka010.{KafkaConfigUpdater, KafkaRedactionUtil}
 import 
org.apache.spark.sql.kafka010.{PRODUCER_CACHE_EVICTOR_THREAD_RUN_INTERVAL, 
PRODUCER_CACHE_TIMEOUT}
 import org.apache.spark.util.{Clock, ShutdownHookManager, SystemClock, 
ThreadUtils, Utils}
@@ -96,7 +97,8 @@ private[producer] class InternalKafkaProducerPool(
         case Some(entry) if entry.producer.id == producer.id =>
           entry.handleReturned(clock.nanoTime())
         case _ =>
-          logWarning(s"Released producer ${producer.id} is not a member of the 
cache. Closing.")
+          logWarning(log"Released producer ${MDC(PRODUCER_ID, producer.id)} is 
not " +
+            log"a member of the cache. Closing.")
           producer.close()
       }
     }
diff --git 
a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
 
b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
index 1c397a8d5005..8ec8f2556b9b 100644
--- 
a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
+++ 
b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
@@ -51,7 +51,8 @@ import org.scalatest.concurrent.Eventually._
 import org.scalatest.time.SpanSugar._
 
 import org.apache.spark.{SparkConf, SparkException}
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.ERROR
 import org.apache.spark.kafka010.KafkaTokenUtil
 import org.apache.spark.util.{SecurityUtils, ShutdownHookManager, Utils}
 import org.apache.spark.util.ArrayImplicits._
@@ -332,7 +333,7 @@ class KafkaTestUtils(
         Utils.deleteRecursively(new File(f))
       } catch {
         case e: IOException if Utils.isWindows =>
-          logWarning(e.getMessage)
+          logWarning(log"${MDC(ERROR, e.getMessage)}")
       }
     }
 
@@ -653,13 +654,13 @@ class KafkaTestUtils(
         Utils.deleteRecursively(snapshotDir)
       } catch {
         case e: IOException if Utils.isWindows =>
-          logWarning(e.getMessage)
+          logWarning(log"${MDC(ERROR, e.getMessage)}")
       }
       try {
         Utils.deleteRecursively(logDir)
       } catch {
         case e: IOException if Utils.isWindows =>
-          logWarning(e.getMessage)
+          logWarning(log"${MDC(ERROR, e.getMessage)}")
       }
       System.clearProperty(ZOOKEEPER_AUTH_PROVIDER)
     }
diff --git 
a/connector/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaDelegationTokenProvider.scala
 
b/connector/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaDelegationTokenProvider.scala
index d1d2a031e662..d0bcf90babc1 100644
--- 
a/connector/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaDelegationTokenProvider.scala
+++ 
b/connector/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaDelegationTokenProvider.scala
@@ -24,7 +24,8 @@ import org.apache.hadoop.security.Credentials
 import org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, 
SASL_SSL, SSL}
 
 import org.apache.spark.SparkConf
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{CLUSTER_ID, SERVICE_NAME}
 import org.apache.spark.security.HadoopDelegationTokenProvider
 
 private[spark] class KafkaDelegationTokenProvider
@@ -54,9 +55,10 @@ private[spark] class KafkaDelegationTokenProvider
           }
         } catch {
           case NonFatal(e) =>
-            logWarning(s"Failed to get token from service: $serviceName due to 
$e on " +
-              s"cluster: ${clusterConf.identifier}. If $serviceName is not 
used, " +
-              s"set spark.security.credentials.$serviceName.enabled to false")
+            logWarning(log"Failed to get token from service: 
${MDC(SERVICE_NAME, serviceName)} " +
+              log"on cluster: ${MDC(CLUSTER_ID, clusterConf.identifier)}. If " 
+
+              log"${MDC(SERVICE_NAME, serviceName)} is not used, please set " +
+              log"spark.security.credentials.${MDC(SERVICE_NAME, 
serviceName)}.enabled to false", e)
         }
       }
     } catch {
diff --git 
a/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala
 
b/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala
index 693ddd31d9a8..2bc2acf9aaf9 100644
--- 
a/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala
+++ 
b/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala
@@ -25,7 +25,8 @@ import scala.jdk.CollectionConverters._
 import org.apache.kafka.clients.consumer._
 import org.apache.kafka.common.TopicPartition
 
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.CONFIG
 import org.apache.spark.kafka010.KafkaConfigUpdater
 
 /**
@@ -107,8 +108,8 @@ private case class Subscribe[K, V](
         consumer.poll(0)
       } catch {
         case x: NoOffsetForPartitionException if shouldSuppress =>
-          logWarning("Catching NoOffsetForPartitionException since " +
-            ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + " is none.  See 
KAFKA-3370")
+          logWarning(log"Catching NoOffsetForPartitionException since " +
+            log"${MDC(CONFIG, ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)} is 
none. See KAFKA-3370")
       }
       toSeek.asScala.foreach { case (topicPartition, offset) =>
           consumer.seek(topicPartition, offset)
@@ -161,8 +162,8 @@ private case class SubscribePattern[K, V](
         consumer.poll(0)
       } catch {
         case x: NoOffsetForPartitionException if shouldSuppress =>
-          logWarning("Catching NoOffsetForPartitionException since " +
-            ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + " is none.  See 
KAFKA-3370")
+          logWarning(log"Catching NoOffsetForPartitionException since " +
+            log"${MDC(CONFIG, ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)} is 
none. See KAFKA-3370")
       }
       toSeek.asScala.foreach { case (topicPartition, offset) =>
           consumer.seek(topicPartition, offset)
diff --git 
a/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
 
b/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
index c7ac6a8bf744..6b47e9d72f4b 100644
--- 
a/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
+++ 
b/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
@@ -26,7 +26,8 @@ import org.apache.kafka.clients.consumer.{ConsumerConfig, 
ConsumerRecord, KafkaC
 import org.apache.kafka.common.{KafkaException, TopicPartition}
 
 import org.apache.spark.TaskContext
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{KEY, MAX_CAPACITY}
 import org.apache.spark.kafka010.KafkaConfigUpdater
 
 private[kafka010] sealed trait KafkaDataConsumer[K, V] {
@@ -256,8 +257,8 @@ private[kafka010] object KafkaDataConsumer extends Logging {
 
           if (entry.getValue.inUse == false && this.size > maxCapacity) {
             logWarning(
-                s"KafkaConsumer cache hitting max capacity of $maxCapacity, " +
-                s"removing consumer for ${entry.getKey}")
+              log"KafkaConsumer cache hitting max capacity of 
${MDC(MAX_CAPACITY, maxCapacity)}, " +
+                log"removing consumer for ${MDC(KEY, entry.getKey)}")
                try {
               entry.getValue.close()
             } catch {
diff --git 
a/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala
 
b/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala
index 731d06fd95fa..f3e4c45b3aa9 100644
--- 
a/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala
+++ 
b/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala
@@ -25,7 +25,7 @@ import org.apache.kafka.common.TopicPartition
 import org.apache.spark.SparkContext
 import org.apache.spark.api.java.{ JavaRDD, JavaSparkContext }
 import org.apache.spark.internal.{Logging, MDC}
-import org.apache.spark.internal.LogKey.GROUP_ID
+import org.apache.spark.internal.LogKey.{CONFIG, GROUP_ID}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.streaming.StreamingContext
 import org.apache.spark.streaming.api.java.{ JavaInputDStream, 
JavaStreamingContext }
@@ -184,26 +184,30 @@ object KafkaUtils extends Logging {
    * Tweak kafka params to prevent issues on executors
    */
   private[kafka010] def fixKafkaParams(kafkaParams: ju.HashMap[String, 
Object]): Unit = {
-    logWarning(s"overriding ${ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG} to 
false for executor")
+    logWarning(log"overriding ${MDC(CONFIG, 
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)} " +
+      log"to false for executor")
     kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false: 
java.lang.Boolean)
 
-    logWarning(s"overriding ${ConsumerConfig.AUTO_OFFSET_RESET_CONFIG} to none 
for executor")
+    logWarning(log"overriding ${MDC(CONFIG, 
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)} " +
+      log"to none for executor")
     kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none")
 
     // driver and executor should be in different consumer groups
     val originalGroupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG)
     if (null == originalGroupId) {
-      logError(log"${MDC(GROUP_ID, ConsumerConfig.GROUP_ID_CONFIG)} is null, " 
+
+      logError(log"${MDC(CONFIG, ConsumerConfig.GROUP_ID_CONFIG)} is null, " +
         log"you should probably set it")
     }
     val groupId = "spark-executor-" + originalGroupId
-    logWarning(s"overriding executor ${ConsumerConfig.GROUP_ID_CONFIG} to 
${groupId}")
+    logWarning(log"overriding executor ${MDC(CONFIG, 
ConsumerConfig.GROUP_ID_CONFIG)} " +
+      log"to ${MDC(GROUP_ID, groupId)}")
     kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
 
     // possible workaround for KAFKA-3135
     val rbb = kafkaParams.get(ConsumerConfig.RECEIVE_BUFFER_CONFIG)
     if (null == rbb || rbb.asInstanceOf[java.lang.Integer] < 65536) {
-      logWarning(s"overriding ${ConsumerConfig.RECEIVE_BUFFER_CONFIG} to 65536 
see KAFKA-3135")
+      logWarning(log"overriding ${MDC(CONFIG, 
ConsumerConfig.RECEIVE_BUFFER_CONFIG)} " +
+        log"to 65536 see KAFKA-3135")
       kafkaParams.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: 
java.lang.Integer)
     }
   }
diff --git 
a/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
 
b/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
index d22496a84b58..6b0c091534b7 100644
--- 
a/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
+++ 
b/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
@@ -29,7 +29,8 @@ import 
com.amazonaws.services.kinesis.clientlibrary.types.UserRecord
 import com.amazonaws.services.kinesis.model._
 
 import org.apache.spark._
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{ERROR, RETRY_COUNT}
 import org.apache.spark.rdd.{BlockRDD, BlockRDDPartition}
 import org.apache.spark.storage.BlockId
 import org.apache.spark.util.NextIterator
@@ -277,7 +278,8 @@ class KinesisSequenceRangeIterator(
           lastError = t
            t match {
              case ptee: ProvisionedThroughputExceededException =>
-               logWarning(s"Error while $message [attempt = ${retryCount + 
1}]", ptee)
+               logWarning(log"Error while ${MDC(ERROR, message)} " +
+                 log"[attempt = ${MDC(RETRY_COUNT, retryCount + 1)}]", ptee)
              case e: Throwable =>
                throw new SparkException(s"Error while $message", e)
            }
diff --git 
a/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
 
b/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
index e1e21954ce88..94e109680fbc 100644
--- 
a/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
+++ 
b/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
@@ -120,7 +120,7 @@ private[kinesis] class KinesisRecordProcessor[T](receiver: 
KinesisReceiver[T], w
     logInfo(s"Shutdown:  Shutting down workerId $workerId with reason $reason")
     // null if not initialized before shutdown:
     if (shardId == null) {
-      logWarning(s"No shardId for workerId $workerId?")
+      logWarning(log"No shardId for workerId ${MDC(WORKER_URL, workerId)}?")
     } else {
       reason match {
         /*
diff --git 
a/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
 
b/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
index 406c19be9bff..cd4c61396a12 100644
--- 
a/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
+++ 
b/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
@@ -33,7 +33,8 @@ import com.amazonaws.services.dynamodbv2.document.DynamoDB
 import com.amazonaws.services.kinesis.{AmazonKinesis, AmazonKinesisClient}
 import com.amazonaws.services.kinesis.model._
 
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{STREAM_NAME, TABLE_NAME}
 
 /**
  * Shared utility methods for performing Kinesis tests that actually transfer 
data.
@@ -147,7 +148,7 @@ private[kinesis] class KinesisTestUtils(streamShardCount: 
Int = 2) extends Loggi
       }
     } catch {
       case e: Exception =>
-        logWarning(s"Could not delete stream $streamName")
+        logWarning(log"Could not delete stream ${MDC(STREAM_NAME, 
streamName)}", e)
     }
   }
 
@@ -158,7 +159,7 @@ private[kinesis] class KinesisTestUtils(streamShardCount: 
Int = 2) extends Loggi
       table.waitForDelete()
     } catch {
       case e: Exception =>
-        logWarning(s"Could not delete DynamoDB table $tableName")
+        logWarning(log"Could not delete DynamoDB table ${MDC(TABLE_NAME, 
tableName)}", e)
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to