This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new eaf6b518f67c [SPARK-47587][SQL] Hive module: Migrate logWarn with 
variables to structured logging framework
eaf6b518f67c is described below

commit eaf6b518f67c0e3ed04f264c3a89573bd7e74fe7
Author: panbingkun <panbing...@baidu.com>
AuthorDate: Wed Apr 10 22:34:14 2024 -0700

    [SPARK-47587][SQL] Hive module: Migrate logWarn with variables to 
structured logging framework
    
    ### What changes were proposed in this pull request?
    The pr aims to migrate `logWarning` in module `Hive` with variables to 
`structured logging framework`.
    
    ### Why are the changes needed?
    To enhance Apache Spark's logging system by implementing structured logging.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    - Pass GA.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #45927 from panbingkun/SPARK-47587.
    
    Authored-by: panbingkun <panbing...@baidu.com>
    Signed-off-by: Gengliang Wang <gengli...@apache.org>
---
 .../scala/org/apache/spark/internal/LogKey.scala   |  9 +++++++
 .../security/HBaseDelegationTokenProvider.scala    |  7 ++---
 .../main/scala/org/apache/spark/util/Utils.scala   | 10 ++++----
 .../spark/sql/hive/HiveExternalCatalog.scala       | 28 ++++++++++----------
 .../spark/sql/hive/HiveMetastoreCatalog.scala      | 30 ++++++++++++++--------
 .../org/apache/spark/sql/hive/HiveUtils.scala      |  5 ++--
 .../spark/sql/hive/client/HiveClientImpl.scala     |  8 +++---
 .../apache/spark/sql/hive/client/HiveShim.scala    | 23 +++++++++--------
 .../sql/hive/client/IsolatedClientLoader.scala     | 13 ++++++----
 .../spark/sql/hive/execution/HiveFileFormat.scala  | 11 ++++----
 .../spark/sql/hive/execution/HiveTempPath.scala    |  5 ++--
 .../spark/sql/hive/orc/OrcFileOperator.scala       |  5 ++--
 .../security/HiveDelegationTokenProvider.scala     |  8 +++---
 13 files changed, 97 insertions(+), 65 deletions(-)

diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala 
b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
index a9a79de05c27..28b06f448784 100644
--- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
+++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
@@ -44,6 +44,7 @@ object LogKey extends Enumeration {
   val COMPONENT = Value
   val CONFIG = Value
   val CONFIG2 = Value
+  val CONFIG3 = Value
   val CONTAINER = Value
   val CONTAINER_ID = Value
   val COUNT = Value
@@ -58,6 +59,7 @@ object LogKey extends Enumeration {
   val DRIVER_ID = Value
   val DROPPED_PARTITIONS = Value
   val END_POINT = Value
+  val ENGINE = Value
   val ERROR = Value
   val EVENT_LOOP = Value
   val EVENT_QUEUE = Value
@@ -66,14 +68,19 @@ object LogKey extends Enumeration {
   val EXIT_CODE = Value
   val EXPRESSION_TERMS = Value
   val FAILURES = Value
+  val FALLBACK_VERSION = Value
   val FIELD_NAME = Value
+  val FILE_FORMAT = Value
+  val FILE_FORMAT2 = Value
   val FUNCTION_NAME = Value
   val FUNCTION_PARAMETER = Value
   val GROUP_ID = Value
+  val HADOOP_VERSION = Value
   val HIVE_OPERATION_STATE = Value
   val HIVE_OPERATION_TYPE = Value
   val HOST = Value
   val INDEX = Value
+  val INFERENCE_MODE = Value
   val JOB_ID = Value
   val JOIN_CONDITION = Value
   val JOIN_CONDITION_SUB_EXPRESSION = Value
@@ -132,6 +139,8 @@ object LogKey extends Enumeration {
   val RULE_BATCH_NAME = Value
   val RULE_NAME = Value
   val RULE_NUMBER_OF_RUNS = Value
+  val SCHEMA = Value
+  val SCHEMA2 = Value
   val SERVICE_NAME = Value
   val SESSION_ID = Value
   val SHARD_ID = Value
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/security/HBaseDelegationTokenProvider.scala
 
b/core/src/main/scala/org/apache/spark/deploy/security/HBaseDelegationTokenProvider.scala
index d60e5975071d..1b2e41bc0a2e 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/security/HBaseDelegationTokenProvider.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/security/HBaseDelegationTokenProvider.scala
@@ -27,7 +27,8 @@ import org.apache.hadoop.security.Credentials
 import org.apache.hadoop.security.token.{Token, TokenIdentifier}
 
 import org.apache.spark.SparkConf
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.SERVICE_NAME
 import org.apache.spark.security.HadoopDelegationTokenProvider
 import org.apache.spark.util.Utils
 
@@ -53,8 +54,8 @@ private[security] class HBaseDelegationTokenProvider
       creds.addToken(token.getService, token)
     } catch {
       case NonFatal(e) =>
-        logWarning(Utils.createFailedToGetTokenMessage(serviceName, e) +
-          s" Retrying to fetch HBase security token with $serviceName 
connection parameter.")
+        logWarning(Utils.createFailedToGetTokenMessage(serviceName, e) + log" 
Retrying to fetch " +
+          log"HBase security token with ${MDC(SERVICE_NAME, serviceName)} 
connection parameter.")
         // Seems to be spark is trying to get the token from HBase 2.x.x  
version or above where the
         // obtainToken(Configuration conf) API has been removed. Lets try 
obtaining the token from
         // another compatible API of HBase service.
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 7022506e5508..af91a4b32c6f 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -68,7 +68,7 @@ import org.slf4j.Logger
 
 import org.apache.spark._
 import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.{Logging, MDC, MessageWithContext}
 import org.apache.spark.internal.LogKey._
 import org.apache.spark.internal.config._
 import org.apache.spark.internal.config.Streaming._
@@ -2972,10 +2972,10 @@ private[spark] object Utils
   }
 
   /** Returns a string message about delegation token generation failure */
-  def createFailedToGetTokenMessage(serviceName: String, e: scala.Throwable): 
String = {
-    val message = "Failed to get token from service %s due to %s. " +
-      "If %s is not used, set spark.security.credentials.%s.enabled to false."
-    message.format(serviceName, e, serviceName, serviceName)
+  def createFailedToGetTokenMessage(serviceName: String, e: scala.Throwable): 
MessageWithContext = {
+    log"Failed to get token from service ${MDC(SERVICE_NAME, serviceName)} " +
+      log"due to ${MDC(ERROR, e)}. If ${MDC(SERVICE_NAME, serviceName)} is not 
used, " +
+      log"set spark.security.credentials.${MDC(SERVICE_NAME, 
serviceName)}.enabled to false."
   }
 
   /**
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 32fc8a452106..8c35e10b383f 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -33,7 +33,8 @@ import 
org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_FORMAT
 import org.apache.thrift.TException
 
 import org.apache.spark.{SparkConf, SparkException}
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{DATABASE_NAME, SCHEMA, SCHEMA2, 
TABLE_NAME}
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
@@ -175,9 +176,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
   override def alterDatabase(dbDefinition: CatalogDatabase): Unit = withClient 
{
     val existingDb = getDatabase(dbDefinition.name)
     if (existingDb.properties == dbDefinition.properties) {
-      logWarning(s"Request to alter database ${dbDefinition.name} is a no-op 
because " +
-        s"the provided database properties are the same as the old ones. Hive 
does not " +
-        s"currently support altering other database fields.")
+      logWarning(log"Request to alter database ${MDC(DATABASE_NAME, 
dbDefinition.name)} is a " +
+        log"no-op because the provided database properties are the same as the 
old ones. Hive " +
+        log"does not currently support altering other database fields.")
     }
     client.alterDatabase(dbDefinition)
   }
@@ -380,8 +381,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
         } catch {
           case NonFatal(e) =>
             val warningMessage =
-              s"Could not persist ${table.identifier.quotedString} in a Hive " 
+
-                "compatible way. Persisting it into Hive metastore in Spark 
SQL specific format."
+              log"Could not persist ${MDC(TABLE_NAME, 
table.identifier.quotedString)} in a Hive " +
+                log"compatible way. Persisting it into Hive metastore in Spark 
SQL specific format."
             logWarning(warningMessage, e)
             saveTableIntoHive(newSparkSQLSpecificMetastoreTable(), 
ignoreIfExists)
         }
@@ -676,9 +677,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
         client.alterTableDataSchema(db, table, newDataSchema, schemaProps)
       } catch {
         case NonFatal(e) =>
-          val warningMessage =
-            s"Could not alter schema of table 
${oldTable.identifier.quotedString} in a Hive " +
-              "compatible way. Updating Hive metastore in Spark SQL specific 
format."
+          val warningMessage = log"Could not alter schema of table " +
+            log"${MDC(TABLE_NAME, oldTable.identifier.quotedString)} in a Hive 
compatible way. " +
+            log"Updating Hive metastore in Spark SQL specific format."
           logWarning(warningMessage, e)
           client.alterTableDataSchema(db, table, EMPTY_DATA_SCHEMA, 
schemaProps)
       }
@@ -800,10 +801,11 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
         // schema we read back is different(ignore case and nullability) from 
the one in table
         // properties which was written when creating table, we should respect 
the table schema
         // from hive.
-        logWarning(s"The table schema given by Hive 
metastore(${table.schema.catalogString}) is " +
-          "different from the schema when this table was created by Spark SQL" 
+
-          s"(${schemaFromTableProps.catalogString}). We have to fall back to 
the table schema " +
-          "from Hive metastore which is not case preserving.")
+        logWarning(log"The table schema given by Hive metastore" +
+          log"(${MDC(SCHEMA, table.schema.catalogString)}) is different from 
the schema when " +
+          log"this table was created by Spark SQL" +
+          log"(${MDC(SCHEMA2, schemaFromTableProps.catalogString)}). We have 
to fall back to " +
+          log"the table schema from Hive metastore which is not case 
preserving.")
         hiveTable.copy(schemaPreservesCase = false)
       }
     } else {
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index f1d99d359cde..5b3160c56304 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -25,7 +25,8 @@ import com.google.common.util.concurrent.Striped
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.SparkException
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{FILE_FORMAT, FILE_FORMAT2, 
INFERENCE_MODE, TABLE_NAME}
 import org.apache.spark.sql.{AnalysisException, SparkSession}
 import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
 import org.apache.spark.sql.catalyst.catalog._
@@ -104,21 +105,28 @@ private[hive] class HiveMetastoreCatalog(sparkSession: 
SparkSession) extends Log
               None
             }
           case _ =>
-            logWarning(s"Table $tableIdentifier should be stored as 
$expectedFileFormat. " +
-              s"However, we are getting a ${relation.fileFormat} from the 
metastore cache. " +
-              "This cached entry will be invalidated.")
+            logWarningUnexpectedFileFormat(tableIdentifier, expectedFileFormat,
+              relation.fileFormat.toString)
             catalogProxy.invalidateCachedTable(tableIdentifier)
             None
         }
       case other =>
-        logWarning(s"Table $tableIdentifier should be stored as 
$expectedFileFormat. " +
-          s"However, we are getting a $other from the metastore cache. " +
-          "This cached entry will be invalidated.")
+        logWarningUnexpectedFileFormat(tableIdentifier, expectedFileFormat, 
other.toString)
         catalogProxy.invalidateCachedTable(tableIdentifier)
         None
     }
   }
 
+  private def logWarningUnexpectedFileFormat(
+      tableIdentifier: QualifiedTableName,
+      expectedFileFormat: Class[_ <: FileFormat],
+      actualFileFormat: String): Unit = {
+    logWarning(log"Table ${MDC(TABLE_NAME, tableIdentifier)} should be stored 
as " +
+      log"${MDC(FILE_FORMAT, expectedFileFormat)}. However, we are getting a " 
+
+      log"${MDC(FILE_FORMAT2, actualFileFormat)} from the metastore cache. " +
+      log"This cached entry will be invalidated.")
+  }
+
   // Return true for Apache ORC and Hive ORC-related configuration names.
   // Note that Spark doesn't support configurations like 
`hive.merge.orcfile.stripe.level`.
   private def isOrcProperty(key: String) =
@@ -353,8 +361,9 @@ private[hive] class HiveMetastoreCatalog(sparkSession: 
SparkSession) extends Log
           val newSchema = StructType(dataSchema ++ 
relation.tableMeta.partitionSchema)
           relation.tableMeta.copy(schema = newSchema)
         case None =>
-          logWarning(s"Unable to infer schema for table $tableName from file 
format " +
-            s"$fileFormat (inference mode: $inferenceMode). Using metastore 
schema.")
+          logWarning(log"Unable to infer schema for table ${MDC(TABLE_NAME, 
tableName)} from " +
+            log"file format ${MDC(FILE_FORMAT, fileFormat)} (inference mode: " 
+
+            log"${MDC(INFERENCE_MODE, inferenceMode)}). Using metastore 
schema.")
           relation.tableMeta
       }
     } else {
@@ -367,7 +376,8 @@ private[hive] class HiveMetastoreCatalog(sparkSession: 
SparkSession) extends Log
     sparkSession.sessionState.catalog.alterTableDataSchema(identifier, 
newDataSchema)
   } catch {
     case NonFatal(ex) =>
-      logWarning(s"Unable to save case-sensitive schema for table 
${identifier.unquotedString}", ex)
+      logWarning(log"Unable to save case-sensitive schema for table " +
+        log"${MDC(TABLE_NAME, identifier.unquotedString)}", ex)
   }
 }
 
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
index 79d0af0f9a09..68f34bd2beb0 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
@@ -36,7 +36,8 @@ import org.apache.hive.common.util.HiveVersionInfo
 
 import org.apache.spark.SparkConf
 import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.PATH
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
 import org.apache.spark.sql.execution.command.DDLUtils
@@ -320,7 +321,7 @@ private[spark] object HiveUtils extends Logging {
       if (file.getName == "*") {
         val files = file.getParentFile.listFiles()
         if (files == null) {
-          logWarning(s"Hive jar path '${file.getPath}' does not exist.")
+          logWarning(log"Hive jar path '${MDC(PATH, file.getPath)}' does not 
exist.")
           Nil
         } else {
           
files.filter(_.getName.toLowerCase(Locale.ROOT).endsWith(".jar")).map(_.toURI.toURL)
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 92561bed1195..502cec3be9c8 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -230,8 +230,8 @@ private[hive] class HiveClientImpl(
         case e: Exception if causedByThrift(e) =>
           caughtException = e
           logWarning(
-            "HiveClient got thrift exception, destroying client and retrying " 
+
-              s"(${retryLimit - numTries} tries remaining)", e)
+            log"HiveClient got thrift exception, destroying client and 
retrying " +
+              log"${MDC(RETRY_COUNT, numTries)} times", e)
           clientLoader.cachedHive = null
           Thread.sleep(retryDelayMillis)
       }
@@ -1335,8 +1335,8 @@ private[hive] object HiveClientImpl extends Logging {
     // initialize spark or tez stuff, which is useless for spark.
     val engine = hiveConf.get("hive.execution.engine")
     if (engine != "mr") {
-      logWarning(s"Detected HiveConf hive.execution.engine is '$engine' and 
will be reset to 'mr'" +
-        " to disable useless hive logic")
+      logWarning(log"Detected HiveConf hive.execution.engine is '${MDC(ENGINE, 
engine)}' and " +
+        log"will be reset to 'mr' to disable useless hive logic")
       hiveConf.set("hive.execution.engine", "mr", SOURCE_SPARK)
     }
     hiveConf
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
index 9943c0178fcf..07daa2938628 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
@@ -37,7 +37,8 @@ import 
org.apache.hadoop.hive.ql.processors.{CommandProcessor, CommandProcessorF
 import org.apache.hadoop.hive.ql.session.SessionState
 import org.apache.hadoop.hive.serde.serdeConstants
 
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{CONFIG, CONFIG2, CONFIG3}
 import org.apache.spark.metrics.source.HiveCatalogMetrics
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow}
 import org.apache.spark.sql.catalyst.analysis.NoSuchPermanentFunctionException
@@ -368,16 +369,16 @@ private[client] class Shim_v2_0 extends Shim with Logging 
{
           hive.getPartitionsByFilter(table, filter)
         } catch {
           case ex: MetaException if shouldFallback =>
-            logWarning("Caught Hive MetaException attempting to get partition 
metadata by " +
-              "filter from Hive. Falling back to fetching all partition 
metadata, which will " +
-              "degrade performance. Modifying your Hive metastore 
configuration to set " +
-              s"${tryDirectSqlConfVar.varname} to true (if it is not true 
already) may resolve " +
-              "this problem. Or you can enable " +
-              s"${SQLConf.HIVE_METASTORE_PARTITION_PRUNING_FAST_FALLBACK.key} 
" +
-              "to alleviate performance downgrade. " +
-              "Otherwise, to avoid degraded performance you can set " +
-              
s"${SQLConf.HIVE_METASTORE_PARTITION_PRUNING_FALLBACK_ON_EXCEPTION.key} " +
-              " to false and let the query fail instead.", ex)
+            logWarning(log"Caught Hive MetaException attempting to get 
partition metadata by " +
+              log"filter from Hive. Falling back to fetching all partition 
metadata, which will " +
+              log"degrade performance. Modifying your Hive metastore 
configuration to set " +
+              log"${MDC(CONFIG, tryDirectSqlConfVar.varname)} to true " +
+              log"(if it is not true already) may resolve this problem. Or you 
can enable " +
+              log"${MDC(CONFIG2, 
SQLConf.HIVE_METASTORE_PARTITION_PRUNING_FAST_FALLBACK.key)} " +
+              log"to alleviate performance downgrade. Otherwise, to avoid 
degraded performance " +
+              log"you can set ${MDC(CONFIG3,
+                
SQLConf.HIVE_METASTORE_PARTITION_PRUNING_FALLBACK_ON_EXCEPTION.key)} " +
+              log"to false and let the query fail instead.", ex)
             // HiveShim clients are expected to handle a superset of the 
requested partitions
             prunePartitionsFastFallback(hive, table, catalogTable, predicates)
           case ex: MetaException =>
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
index 5297910cbfa4..e4bab4631ab1 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
@@ -30,7 +30,8 @@ import org.apache.hadoop.hive.shims.ShimLoader
 
 import org.apache.spark.SparkConf
 import org.apache.spark.deploy.SparkSubmit
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{FALLBACK_VERSION, HADOOP_VERSION}
 import org.apache.spark.sql.catalyst.util.quietly
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.hive.HiveUtils
@@ -66,10 +67,12 @@ private[hive] object IsolatedClientLoader extends Logging {
             // If the error message contains hadoop, it is probably because 
the hadoop
             // version cannot be resolved.
             val fallbackVersion = "3.4.0"
-            logWarning(s"Failed to resolve Hadoop artifacts for the version 
$hadoopVersion. We " +
-              s"will change the hadoop version from $hadoopVersion to 
$fallbackVersion and try " +
-              "again. It is recommended to set jars used by Hive metastore 
client through " +
-              "spark.sql.hive.metastore.jars in the production environment.")
+            logWarning(log"Failed to resolve Hadoop artifacts for the version 
" +
+              log"${MDC(HADOOP_VERSION, hadoopVersion)}. We will change the 
hadoop version from " +
+              log"${MDC(HADOOP_VERSION, hadoopVersion)} to " +
+              log"${MDC(FALLBACK_VERSION, fallbackVersion)} and try again. It 
is recommended to " +
+              log"set jars used by Hive metastore client through 
spark.sql.hive.metastore.jars " +
+              log"in the production environment.")
             (downloadVersion(
               resolvedVersion, fallbackVersion, ivyPath, remoteRepos), 
fallbackVersion)
         }
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
index fa21be0c6514..1b76478a5cf3 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
@@ -31,7 +31,8 @@ import org.apache.hadoop.io.Writable
 import org.apache.hadoop.mapred.{JobConf, Reporter}
 import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
 
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.CLASS_NAME
 import org.apache.spark.internal.config.SPECULATION_ENABLED
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
@@ -76,10 +77,10 @@ class HiveFileFormat(fileSinkConf: FileSinkDesc)
     val outputCommitterClass = conf.get("mapred.output.committer.class", "")
     if (speculationEnabled && outputCommitterClass.contains("Direct")) {
       val warningMessage =
-        s"$outputCommitterClass may be an output committer that writes data 
directly to " +
-          "the final location. Because speculation is enabled, this output 
committer may " +
-          "cause data loss (see the case in SPARK-10063). If possible, please 
use an output " +
-          "committer that does not have this behavior (e.g. 
FileOutputCommitter)."
+        log"${MDC(CLASS_NAME, outputCommitterClass)} may be an output 
committer that writes data " +
+          log"directly to the final location. Because speculation is enabled, 
this output " +
+          log"committer may cause data loss (see the case in SPARK-10063). If 
possible, please " +
+          log"use an output committer that does not have this behavior (e.g. 
FileOutputCommitter)."
       logWarning(warningMessage)
     }
 
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTempPath.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTempPath.scala
index eb8482da38e5..a16191b72a8d 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTempPath.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTempPath.scala
@@ -31,7 +31,8 @@ import org.apache.hadoop.hive.common.FileUtils
 import org.apache.hadoop.hive.ql.exec.TaskRunner
 
 import org.apache.spark.SparkException
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.PATH
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.hive.HiveExternalCatalog
@@ -140,7 +141,7 @@ class HiveTempPath(session: SparkSession, val hadoopConf: 
Configuration, path: P
     } catch {
       case NonFatal(e) =>
         val stagingDir = hadoopConf.get("hive.exec.stagingdir", 
".hive-staging")
-        logWarning(s"Unable to delete staging directory: $stagingDir.\n" + e)
+        logWarning(log"Unable to delete staging directory: ${MDC(PATH, 
stagingDir)}.", e)
     }
   }
 
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala
index 3fdd5a9c4cec..3e1bdff8c007 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala
@@ -25,7 +25,8 @@ import org.apache.hadoop.hive.ql.io.orc.{OrcFile, Reader}
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector
 
 import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.PATH
 import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.types.StructType
@@ -77,7 +78,7 @@ private[hive] object OrcFileOperator extends Logging {
       } catch {
         case e: IOException =>
           if (ignoreCorruptFiles) {
-            logWarning(s"Skipped the footer in the corrupted file: $path", e)
+            logWarning(log"Skipped the footer in the corrupted file: 
${MDC(PATH, path)}", e)
             None
           } else {
             throw QueryExecutionErrors.cannotReadFooterForFileError(path, e)
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/security/HiveDelegationTokenProvider.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/security/HiveDelegationTokenProvider.scala
index 253c0a9ebafe..13ff721736b2 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/security/HiveDelegationTokenProvider.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/security/HiveDelegationTokenProvider.scala
@@ -32,7 +32,8 @@ import org.apache.hadoop.security.token.Token
 
 import org.apache.spark.SparkConf
 import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.CLASS_NAME
 import org.apache.spark.internal.config.KEYTAB
 import org.apache.spark.security.HadoopDelegationTokenProvider
 import org.apache.spark.sql.hive.client.HiveClientImpl
@@ -43,8 +44,9 @@ private[spark] class HiveDelegationTokenProvider
 
   override def serviceName: String = "hive"
 
-  private val classNotFoundErrorStr = s"You are attempting to use the " +
-    s"${getClass.getCanonicalName}, but your Spark distribution is not built 
with Hive libraries."
+  private val classNotFoundErrorStr =
+    log"You are attempting to use the ${MDC(CLASS_NAME, 
getClass.getCanonicalName)}, " +
+      log"but your Spark distribution is not built with Hive libraries."
 
   private def hiveConf(hadoopConf: Configuration): Configuration = {
     try {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to