This is an automated email from the ASF dual-hosted git repository. gengliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new f77495909b29 [SPARK-47588][CORE] Hive module: Migrate logInfo with variables to structured logging framework f77495909b29 is described below commit f77495909b29fe4883afcfd8fec7be048fe494a3 Author: Gengliang Wang <gengli...@apache.org> AuthorDate: Tue Apr 16 22:32:34 2024 -0700 [SPARK-47588][CORE] Hive module: Migrate logInfo with variables to structured logging framework ### What changes were proposed in this pull request? Migrate logInfo in Hive module with variables to structured logging framework. ### Why are the changes needed? To enhance Apache Spark's logging system by implementing structured logging. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? GA tests ### Was this patch authored or co-authored using generative AI tooling? No Closes #46086 from gengliangwang/hive_loginfo. Authored-by: Gengliang Wang <gengli...@apache.org> Signed-off-by: Gengliang Wang <gengli...@apache.org> --- .../scala/org/apache/spark/internal/LogKey.scala | 4 +++ .../spark/sql/hive/HiveExternalCatalog.scala | 30 ++++++++++++---------- .../spark/sql/hive/HiveMetastoreCatalog.scala | 9 ++++--- .../org/apache/spark/sql/hive/HiveUtils.scala | 27 +++++++++++-------- .../spark/sql/hive/client/HiveClientImpl.scala | 5 ++-- .../sql/hive/client/IsolatedClientLoader.scala | 4 +-- .../spark/sql/hive/orc/OrcFileOperator.scala | 4 +-- 7 files changed, 48 insertions(+), 35 deletions(-) diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala index bfeb733af30a..838ef0355e3a 100644 --- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala +++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala @@ -95,10 +95,13 @@ object LogKey extends Enumeration { val GROUP_ID = Value val HADOOP_VERSION = Value val HISTORY_DIR = Value + val HIVE_CLIENT_VERSION = Value + val HIVE_METASTORE_VERSION = Value val HIVE_OPERATION_STATE = Value val HIVE_OPERATION_TYPE = Value val HOST = Value val HOST_PORT = Value + val INCOMPATIBLE_TYPES = Value val INDEX = Value val INFERENCE_MODE = Value val INITIAL_CAPACITY = Value @@ -152,6 +155,7 @@ object LogKey extends Enumeration { val POLICY = Value val PORT = Value val PRODUCER_ID = Value + val PROVIDER = Value val QUERY_CACHE_VALUE = Value val QUERY_HINT = Value val QUERY_ID = Value diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 8c35e10b383f..60f2d2f3e5fe 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -34,7 +34,7 @@ import org.apache.thrift.TException import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKey.{DATABASE_NAME, SCHEMA, SCHEMA2, TABLE_NAME} +import org.apache.spark.internal.LogKey.{DATABASE_NAME, INCOMPATIBLE_TYPES, PROVIDER, SCHEMA, SCHEMA2, TABLE_NAME} import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException @@ -338,35 +338,37 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat val (hiveCompatibleTable, logMessage) = maybeSerde match { case _ if options.skipHiveMetadata => val message = - s"Persisting data source table $qualifiedTableName into Hive metastore in" + - "Spark SQL specific format, which is NOT compatible with Hive." + log"Persisting data source table ${MDC(TABLE_NAME, qualifiedTableName)} into Hive " + + log"metastore in Spark SQL specific format, which is NOT compatible with Hive." (None, message) case _ if incompatibleTypes.nonEmpty => + val incompatibleTypesStr = incompatibleTypes.mkString(", ") val message = - s"Hive incompatible types found: ${incompatibleTypes.mkString(", ")}. " + - s"Persisting data source table $qualifiedTableName into Hive metastore in " + - "Spark SQL specific format, which is NOT compatible with Hive." + log"Hive incompatible types found: ${MDC(INCOMPATIBLE_TYPES, incompatibleTypesStr)}. " + + log"Persisting data source table ${MDC(TABLE_NAME, qualifiedTableName)} into Hive " + + log"metastore in Spark SQL specific format, which is NOT compatible with Hive." (None, message) // our bucketing is un-compatible with hive(different hash function) case Some(serde) if table.bucketSpec.nonEmpty => val message = - s"Persisting bucketed data source table $qualifiedTableName into " + - "Hive metastore in Spark SQL specific format, which is NOT compatible with " + - "Hive bucketed table. But Hive can read this table as a non-bucketed table." + log"Persisting bucketed data source table ${MDC(TABLE_NAME, qualifiedTableName)} into " + + log"Hive metastore in Spark SQL specific format, which is NOT compatible with " + + log"Hive bucketed table. But Hive can read this table as a non-bucketed table." (Some(newHiveCompatibleMetastoreTable(serde)), message) case Some(serde) => val message = - s"Persisting file based data source table $qualifiedTableName into " + - s"Hive metastore in Hive compatible format." + log"Persisting file based data source table ${MDC(TABLE_NAME, qualifiedTableName)} " + + log"into Hive metastore in Hive compatible format." (Some(newHiveCompatibleMetastoreTable(serde)), message) case _ => val message = - s"Couldn't find corresponding Hive SerDe for data source provider $provider. " + - s"Persisting data source table $qualifiedTableName into Hive metastore in " + - s"Spark SQL specific format, which is NOT compatible with Hive." + log"Couldn't find corresponding Hive SerDe for data source provider " + + log"${MDC(PROVIDER, provider)}. Persisting data source table " + + log"${MDC(TABLE_NAME, qualifiedTableName)} into Hive metastore in " + + log"Spark SQL specific format, which is NOT compatible with Hive." (None, message) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 5b3160c56304..df52cf712bdd 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -26,7 +26,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.SparkException import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKey.{FILE_FORMAT, FILE_FORMAT2, INFERENCE_MODE, TABLE_NAME} +import org.apache.spark.internal.LogKey._ import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier} import org.apache.spark.sql.catalyst.catalog._ @@ -339,8 +339,8 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log val shouldInfer = (inferenceMode != NEVER_INFER) && !relation.tableMeta.schemaPreservesCase val tableName = relation.tableMeta.identifier.unquotedString if (shouldInfer) { - logInfo(s"Inferring case-sensitive schema for table $tableName (inference mode: " + - s"$inferenceMode)") + logInfo(log"Inferring case-sensitive schema for table ${MDC(TABLE_NAME, tableName)} " + + log"(inference mode: ${MDC(INFERENCE_MODE, inferenceMode)})})") val fileIndex = fileIndexOpt.getOrElse { val rootPath = new Path(relation.tableMeta.location) new InMemoryFileIndex(sparkSession, Seq(rootPath), options, None) @@ -372,7 +372,8 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log } private def updateDataSchema(identifier: TableIdentifier, newDataSchema: StructType): Unit = try { - logInfo(s"Saving case-sensitive schema for table ${identifier.unquotedString}") + logInfo( + log"Saving case-sensitive schema for table ${MDC(TABLE_NAME, identifier.unquotedString)}") sparkSession.sessionState.catalog.alterTableDataSchema(identifier, newDataSchema) } catch { case NonFatal(ex) => diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index 68f34bd2beb0..3fc761785acc 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -37,7 +37,7 @@ import org.apache.hive.common.util.HiveVersionInfo import org.apache.spark.SparkConf import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKey.PATH +import org.apache.spark.internal.LogKey import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.execution.command.DDLUtils @@ -287,7 +287,8 @@ private[spark] object HiveUtils extends Logging { protected[hive] def newClientForExecution( conf: SparkConf, hadoopConf: Configuration): HiveClientImpl = { - logInfo(s"Initializing execution hive, version $builtinHiveVersion") + logInfo(log"Initializing execution hive, version " + + log"${MDC(LogKey.HIVE_METASTORE_VERSION, builtinHiveVersion)}") val loader = new IsolatedClientLoader( version = IsolatedClientLoader.hiveVersion(builtinHiveVersion), sparkConf = conf, @@ -321,7 +322,7 @@ private[spark] object HiveUtils extends Logging { if (file.getName == "*") { val files = file.getParentFile.listFiles() if (files == null) { - logWarning(log"Hive jar path '${MDC(PATH, file.getPath)}' does not exist.") + logWarning(log"Hive jar path '${MDC(LogKey.PATH, file.getPath)}' does not exist.") Nil } else { files.filter(_.getName.toLowerCase(Locale.ROOT).endsWith(".jar")).map(_.toURI.toURL) @@ -332,6 +333,12 @@ private[spark] object HiveUtils extends Logging { } } + def logInitWithPath(jars: Seq[URL]): Unit = { + logInfo(log"Initializing HiveMetastoreConnection version " + + log"${MDC(LogKey.HIVE_METASTORE_VERSION, hiveMetastoreVersion)} using paths: " + + log"${MDC(LogKey.PATH, jars.mkString(", "))}") + } + val isolatedLoader = if (hiveMetastoreJars == "builtin") { if (builtinHiveVersion != hiveMetastoreVersion) { throw new IllegalArgumentException( @@ -342,7 +349,8 @@ private[spark] object HiveUtils extends Logging { } logInfo( - s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion using Spark classes.") + log"Initializing HiveMetastoreConnection version " + + log"${MDC(LogKey.HIVE_METASTORE_VERSION, hiveMetastoreVersion)} using Spark classes.") new IsolatedClientLoader( version = metaVersion, sparkConf = conf, @@ -355,7 +363,8 @@ private[spark] object HiveUtils extends Logging { } else if (hiveMetastoreJars == "maven") { // TODO: Support for loading the jars from an already downloaded location. logInfo( - s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion using maven.") + log"Initializing HiveMetastoreConnection version " + + log"${MDC(LogKey.HIVE_METASTORE_VERSION, hiveMetastoreVersion)} using maven.") IsolatedClientLoader.forVersion( hiveMetastoreVersion = hiveMetastoreVersion, hadoopVersion = VersionInfo.getVersion, @@ -381,9 +390,7 @@ private[spark] object HiveUtils extends Logging { ).map(_.toUri.toURL) } - logInfo( - s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion " + - s"using path: ${jars.mkString(";")}") + logInitWithPath(jars) new IsolatedClientLoader( version = metaVersion, sparkConf = conf, @@ -402,9 +409,7 @@ private[spark] object HiveUtils extends Logging { addLocalHiveJars(new File(path)) } - logInfo( - s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion " + - s"using ${jars.mkString(":")}") + logInitWithPath(jars.toSeq) new IsolatedClientLoader( version = metaVersion, sparkConf = conf, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 502cec3be9c8..43993f4974db 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -161,8 +161,9 @@ private[hive] class HiveClientImpl( // Log the default warehouse location. logInfo( - s"Warehouse location for Hive client (version ${version.fullVersion}) is " + - s"${conf.getVar(HiveConf.getConfVars("hive.metastore.warehouse.dir"))}") + log"Warehouse location for Hive client (version " + + log"${MDC(HIVE_CLIENT_VERSION, version.fullVersion)}) is " + + log"${MDC(PATH, conf.getVar(HiveConf.getConfVars("hive.metastore.warehouse.dir")))}") private def newState(): SessionState = { val hiveConf = newHiveConf(sparkConf, hadoopConf, extraConfig, Some(initClassLoader)) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala index e4bab4631ab1..7e7c246abe97 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala @@ -31,7 +31,7 @@ import org.apache.hadoop.hive.shims.ShimLoader import org.apache.spark.SparkConf import org.apache.spark.deploy.SparkSubmit import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKey.{FALLBACK_VERSION, HADOOP_VERSION} +import org.apache.spark.internal.LogKey.{FALLBACK_VERSION, HADOOP_VERSION, PATH} import org.apache.spark.sql.catalyst.util.quietly import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.hive.HiveUtils @@ -149,7 +149,7 @@ private[hive] object IsolatedClientLoader extends Logging { // TODO: Remove copy logic. val tempDir = Utils.createTempDir(namePrefix = s"hive-${version}") allFiles.foreach(f => FileUtils.copyFileToDirectory(f, tempDir)) - logInfo(s"Downloaded metastore jars to ${tempDir.getCanonicalPath}") + logInfo(log"Downloaded metastore jars to ${MDC(PATH, tempDir.getCanonicalPath)}") tempDir.listFiles().map(_.toURI.toURL).toImmutableArraySeq } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala index 3e1bdff8c007..d51894542f4f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala @@ -59,8 +59,8 @@ private[hive] object OrcFileOperator extends Logging { reader.getObjectInspector match { case oi: StructObjectInspector if oi.getAllStructFieldRefs.size() == 0 => logInfo( - s"ORC file $path has empty schema, it probably contains no rows. " + - "Trying to read another ORC file to figure out the schema.") + log"ORC file ${MDC(PATH, path)} has empty schema, it probably contains no rows. " + + log"Trying to read another ORC file to figure out the schema.") false case _ => true } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org