This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f77495909b29 [SPARK-47588][CORE] Hive module: Migrate logInfo with 
variables to structured logging framework
f77495909b29 is described below

commit f77495909b29fe4883afcfd8fec7be048fe494a3
Author: Gengliang Wang <gengli...@apache.org>
AuthorDate: Tue Apr 16 22:32:34 2024 -0700

    [SPARK-47588][CORE] Hive module: Migrate logInfo with variables to 
structured logging framework
    
    ### What changes were proposed in this pull request?
    
    Migrate logInfo in Hive module with variables to structured logging 
framework.
    
    ### Why are the changes needed?
    
    To enhance Apache Spark's logging system by implementing structured logging.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    GA tests
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #46086 from gengliangwang/hive_loginfo.
    
    Authored-by: Gengliang Wang <gengli...@apache.org>
    Signed-off-by: Gengliang Wang <gengli...@apache.org>
---
 .../scala/org/apache/spark/internal/LogKey.scala   |  4 +++
 .../spark/sql/hive/HiveExternalCatalog.scala       | 30 ++++++++++++----------
 .../spark/sql/hive/HiveMetastoreCatalog.scala      |  9 ++++---
 .../org/apache/spark/sql/hive/HiveUtils.scala      | 27 +++++++++++--------
 .../spark/sql/hive/client/HiveClientImpl.scala     |  5 ++--
 .../sql/hive/client/IsolatedClientLoader.scala     |  4 +--
 .../spark/sql/hive/orc/OrcFileOperator.scala       |  4 +--
 7 files changed, 48 insertions(+), 35 deletions(-)

diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala 
b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
index bfeb733af30a..838ef0355e3a 100644
--- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
+++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
@@ -95,10 +95,13 @@ object LogKey extends Enumeration {
   val GROUP_ID = Value
   val HADOOP_VERSION = Value
   val HISTORY_DIR = Value
+  val HIVE_CLIENT_VERSION = Value
+  val HIVE_METASTORE_VERSION = Value
   val HIVE_OPERATION_STATE = Value
   val HIVE_OPERATION_TYPE = Value
   val HOST = Value
   val HOST_PORT = Value
+  val INCOMPATIBLE_TYPES = Value
   val INDEX = Value
   val INFERENCE_MODE = Value
   val INITIAL_CAPACITY = Value
@@ -152,6 +155,7 @@ object LogKey extends Enumeration {
   val POLICY = Value
   val PORT = Value
   val PRODUCER_ID = Value
+  val PROVIDER = Value
   val QUERY_CACHE_VALUE = Value
   val QUERY_HINT = Value
   val QUERY_ID = Value
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 8c35e10b383f..60f2d2f3e5fe 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -34,7 +34,7 @@ import org.apache.thrift.TException
 
 import org.apache.spark.{SparkConf, SparkException}
 import org.apache.spark.internal.{Logging, MDC}
-import org.apache.spark.internal.LogKey.{DATABASE_NAME, SCHEMA, SCHEMA2, 
TABLE_NAME}
+import org.apache.spark.internal.LogKey.{DATABASE_NAME, INCOMPATIBLE_TYPES, 
PROVIDER, SCHEMA, SCHEMA2, TABLE_NAME}
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
@@ -338,35 +338,37 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
     val (hiveCompatibleTable, logMessage) = maybeSerde match {
       case _ if options.skipHiveMetadata =>
         val message =
-          s"Persisting data source table $qualifiedTableName into Hive 
metastore in" +
-            "Spark SQL specific format, which is NOT compatible with Hive."
+          log"Persisting data source table ${MDC(TABLE_NAME, 
qualifiedTableName)} into Hive " +
+            log"metastore in Spark SQL specific format, which is NOT 
compatible with Hive."
         (None, message)
 
       case _ if incompatibleTypes.nonEmpty =>
+        val incompatibleTypesStr = incompatibleTypes.mkString(", ")
         val message =
-          s"Hive incompatible types found: ${incompatibleTypes.mkString(", 
")}. " +
-            s"Persisting data source table $qualifiedTableName into Hive 
metastore in " +
-            "Spark SQL specific format, which is NOT compatible with Hive."
+          log"Hive incompatible types found: ${MDC(INCOMPATIBLE_TYPES, 
incompatibleTypesStr)}. " +
+            log"Persisting data source table ${MDC(TABLE_NAME, 
qualifiedTableName)} into Hive " +
+            log"metastore in Spark SQL specific format, which is NOT 
compatible with Hive."
         (None, message)
       // our bucketing is un-compatible with hive(different hash function)
       case Some(serde) if table.bucketSpec.nonEmpty =>
         val message =
-          s"Persisting bucketed data source table $qualifiedTableName into " +
-            "Hive metastore in Spark SQL specific format, which is NOT 
compatible with " +
-            "Hive bucketed table. But Hive can read this table as a 
non-bucketed table."
+          log"Persisting bucketed data source table ${MDC(TABLE_NAME, 
qualifiedTableName)} into " +
+            log"Hive metastore in Spark SQL specific format, which is NOT 
compatible with " +
+            log"Hive bucketed table. But Hive can read this table as a 
non-bucketed table."
         (Some(newHiveCompatibleMetastoreTable(serde)), message)
 
       case Some(serde) =>
         val message =
-          s"Persisting file based data source table $qualifiedTableName into " 
+
-            s"Hive metastore in Hive compatible format."
+          log"Persisting file based data source table ${MDC(TABLE_NAME, 
qualifiedTableName)} " +
+            log"into Hive metastore in Hive compatible format."
         (Some(newHiveCompatibleMetastoreTable(serde)), message)
 
       case _ =>
         val message =
-          s"Couldn't find corresponding Hive SerDe for data source provider 
$provider. " +
-            s"Persisting data source table $qualifiedTableName into Hive 
metastore in " +
-            s"Spark SQL specific format, which is NOT compatible with Hive."
+          log"Couldn't find corresponding Hive SerDe for data source provider 
" +
+            log"${MDC(PROVIDER, provider)}. Persisting data source table " +
+            log"${MDC(TABLE_NAME, qualifiedTableName)} into Hive metastore in 
" +
+            log"Spark SQL specific format, which is NOT compatible with Hive."
         (None, message)
     }
 
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 5b3160c56304..df52cf712bdd 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -26,7 +26,7 @@ import org.apache.hadoop.fs.Path
 
 import org.apache.spark.SparkException
 import org.apache.spark.internal.{Logging, MDC}
-import org.apache.spark.internal.LogKey.{FILE_FORMAT, FILE_FORMAT2, 
INFERENCE_MODE, TABLE_NAME}
+import org.apache.spark.internal.LogKey._
 import org.apache.spark.sql.{AnalysisException, SparkSession}
 import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
 import org.apache.spark.sql.catalyst.catalog._
@@ -339,8 +339,8 @@ private[hive] class HiveMetastoreCatalog(sparkSession: 
SparkSession) extends Log
     val shouldInfer = (inferenceMode != NEVER_INFER) && 
!relation.tableMeta.schemaPreservesCase
     val tableName = relation.tableMeta.identifier.unquotedString
     if (shouldInfer) {
-      logInfo(s"Inferring case-sensitive schema for table $tableName 
(inference mode: " +
-        s"$inferenceMode)")
+      logInfo(log"Inferring case-sensitive schema for table ${MDC(TABLE_NAME, 
tableName)} " +
+        log"(inference mode:  ${MDC(INFERENCE_MODE, inferenceMode)})})")
       val fileIndex = fileIndexOpt.getOrElse {
         val rootPath = new Path(relation.tableMeta.location)
         new InMemoryFileIndex(sparkSession, Seq(rootPath), options, None)
@@ -372,7 +372,8 @@ private[hive] class HiveMetastoreCatalog(sparkSession: 
SparkSession) extends Log
   }
 
   private def updateDataSchema(identifier: TableIdentifier, newDataSchema: 
StructType): Unit = try {
-    logInfo(s"Saving case-sensitive schema for table 
${identifier.unquotedString}")
+    logInfo(
+      log"Saving case-sensitive schema for table ${MDC(TABLE_NAME, 
identifier.unquotedString)}")
     sparkSession.sessionState.catalog.alterTableDataSchema(identifier, 
newDataSchema)
   } catch {
     case NonFatal(ex) =>
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
index 68f34bd2beb0..3fc761785acc 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
@@ -37,7 +37,7 @@ import org.apache.hive.common.util.HiveVersionInfo
 import org.apache.spark.SparkConf
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.{Logging, MDC}
-import org.apache.spark.internal.LogKey.PATH
+import org.apache.spark.internal.LogKey
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
 import org.apache.spark.sql.execution.command.DDLUtils
@@ -287,7 +287,8 @@ private[spark] object HiveUtils extends Logging {
   protected[hive] def newClientForExecution(
       conf: SparkConf,
       hadoopConf: Configuration): HiveClientImpl = {
-    logInfo(s"Initializing execution hive, version $builtinHiveVersion")
+    logInfo(log"Initializing execution hive, version " +
+      log"${MDC(LogKey.HIVE_METASTORE_VERSION, builtinHiveVersion)}")
     val loader = new IsolatedClientLoader(
       version = IsolatedClientLoader.hiveVersion(builtinHiveVersion),
       sparkConf = conf,
@@ -321,7 +322,7 @@ private[spark] object HiveUtils extends Logging {
       if (file.getName == "*") {
         val files = file.getParentFile.listFiles()
         if (files == null) {
-          logWarning(log"Hive jar path '${MDC(PATH, file.getPath)}' does not 
exist.")
+          logWarning(log"Hive jar path '${MDC(LogKey.PATH, file.getPath)}' 
does not exist.")
           Nil
         } else {
           
files.filter(_.getName.toLowerCase(Locale.ROOT).endsWith(".jar")).map(_.toURI.toURL)
@@ -332,6 +333,12 @@ private[spark] object HiveUtils extends Logging {
       }
     }
 
+    def logInitWithPath(jars: Seq[URL]): Unit = {
+      logInfo(log"Initializing HiveMetastoreConnection version " +
+        log"${MDC(LogKey.HIVE_METASTORE_VERSION, hiveMetastoreVersion)} using 
paths: " +
+        log"${MDC(LogKey.PATH, jars.mkString(", "))}")
+    }
+
     val isolatedLoader = if (hiveMetastoreJars == "builtin") {
       if (builtinHiveVersion != hiveMetastoreVersion) {
         throw new IllegalArgumentException(
@@ -342,7 +349,8 @@ private[spark] object HiveUtils extends Logging {
       }
 
       logInfo(
-        s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion 
using Spark classes.")
+        log"Initializing HiveMetastoreConnection version " +
+          log"${MDC(LogKey.HIVE_METASTORE_VERSION, hiveMetastoreVersion)} 
using Spark classes.")
       new IsolatedClientLoader(
         version = metaVersion,
         sparkConf = conf,
@@ -355,7 +363,8 @@ private[spark] object HiveUtils extends Logging {
     } else if (hiveMetastoreJars == "maven") {
       // TODO: Support for loading the jars from an already downloaded 
location.
       logInfo(
-        s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion 
using maven.")
+        log"Initializing HiveMetastoreConnection version " +
+          log"${MDC(LogKey.HIVE_METASTORE_VERSION, hiveMetastoreVersion)} 
using maven.")
       IsolatedClientLoader.forVersion(
         hiveMetastoreVersion = hiveMetastoreVersion,
         hadoopVersion = VersionInfo.getVersion,
@@ -381,9 +390,7 @@ private[spark] object HiveUtils extends Logging {
               ).map(_.toUri.toURL)
           }
 
-      logInfo(
-        s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion " 
+
-          s"using path: ${jars.mkString(";")}")
+      logInitWithPath(jars)
       new IsolatedClientLoader(
         version = metaVersion,
         sparkConf = conf,
@@ -402,9 +409,7 @@ private[spark] object HiveUtils extends Logging {
             addLocalHiveJars(new File(path))
           }
 
-      logInfo(
-        s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion " 
+
-          s"using ${jars.mkString(":")}")
+      logInitWithPath(jars.toSeq)
       new IsolatedClientLoader(
         version = metaVersion,
         sparkConf = conf,
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 502cec3be9c8..43993f4974db 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -161,8 +161,9 @@ private[hive] class HiveClientImpl(
 
   // Log the default warehouse location.
   logInfo(
-    s"Warehouse location for Hive client (version ${version.fullVersion}) is " 
+
-    s"${conf.getVar(HiveConf.getConfVars("hive.metastore.warehouse.dir"))}")
+    log"Warehouse location for Hive client (version " +
+      log"${MDC(HIVE_CLIENT_VERSION, version.fullVersion)}) is " +
+    log"${MDC(PATH, 
conf.getVar(HiveConf.getConfVars("hive.metastore.warehouse.dir")))}")
 
   private def newState(): SessionState = {
     val hiveConf = newHiveConf(sparkConf, hadoopConf, extraConfig, 
Some(initClassLoader))
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
index e4bab4631ab1..7e7c246abe97 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
@@ -31,7 +31,7 @@ import org.apache.hadoop.hive.shims.ShimLoader
 import org.apache.spark.SparkConf
 import org.apache.spark.deploy.SparkSubmit
 import org.apache.spark.internal.{Logging, MDC}
-import org.apache.spark.internal.LogKey.{FALLBACK_VERSION, HADOOP_VERSION}
+import org.apache.spark.internal.LogKey.{FALLBACK_VERSION, HADOOP_VERSION, 
PATH}
 import org.apache.spark.sql.catalyst.util.quietly
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.hive.HiveUtils
@@ -149,7 +149,7 @@ private[hive] object IsolatedClientLoader extends Logging {
     // TODO: Remove copy logic.
     val tempDir = Utils.createTempDir(namePrefix = s"hive-${version}")
     allFiles.foreach(f => FileUtils.copyFileToDirectory(f, tempDir))
-    logInfo(s"Downloaded metastore jars to ${tempDir.getCanonicalPath}")
+    logInfo(log"Downloaded metastore jars to ${MDC(PATH, 
tempDir.getCanonicalPath)}")
     tempDir.listFiles().map(_.toURI.toURL).toImmutableArraySeq
   }
 
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala
index 3e1bdff8c007..d51894542f4f 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala
@@ -59,8 +59,8 @@ private[hive] object OrcFileOperator extends Logging {
       reader.getObjectInspector match {
         case oi: StructObjectInspector if oi.getAllStructFieldRefs.size() == 0 
=>
           logInfo(
-            s"ORC file $path has empty schema, it probably contains no rows. " 
+
-              "Trying to read another ORC file to figure out the schema.")
+            log"ORC file ${MDC(PATH, path)} has empty schema, it probably 
contains no rows. " +
+              log"Trying to read another ORC file to figure out the schema.")
           false
         case _ => true
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to