This is an automated email from the ASF dual-hosted git repository. gengliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new eaf6b518f67c [SPARK-47587][SQL] Hive module: Migrate logWarn with variables to structured logging framework eaf6b518f67c is described below commit eaf6b518f67c0e3ed04f264c3a89573bd7e74fe7 Author: panbingkun <panbing...@baidu.com> AuthorDate: Wed Apr 10 22:34:14 2024 -0700 [SPARK-47587][SQL] Hive module: Migrate logWarn with variables to structured logging framework ### What changes were proposed in this pull request? The pr aims to migrate `logWarning` in module `Hive` with variables to `structured logging framework`. ### Why are the changes needed? To enhance Apache Spark's logging system by implementing structured logging. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? - Pass GA. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #45927 from panbingkun/SPARK-47587. Authored-by: panbingkun <panbing...@baidu.com> Signed-off-by: Gengliang Wang <gengli...@apache.org> --- .../scala/org/apache/spark/internal/LogKey.scala | 9 +++++++ .../security/HBaseDelegationTokenProvider.scala | 7 ++--- .../main/scala/org/apache/spark/util/Utils.scala | 10 ++++---- .../spark/sql/hive/HiveExternalCatalog.scala | 28 ++++++++++---------- .../spark/sql/hive/HiveMetastoreCatalog.scala | 30 ++++++++++++++-------- .../org/apache/spark/sql/hive/HiveUtils.scala | 5 ++-- .../spark/sql/hive/client/HiveClientImpl.scala | 8 +++--- .../apache/spark/sql/hive/client/HiveShim.scala | 23 +++++++++-------- .../sql/hive/client/IsolatedClientLoader.scala | 13 ++++++---- .../spark/sql/hive/execution/HiveFileFormat.scala | 11 ++++---- .../spark/sql/hive/execution/HiveTempPath.scala | 5 ++-- .../spark/sql/hive/orc/OrcFileOperator.scala | 5 ++-- .../security/HiveDelegationTokenProvider.scala | 8 +++--- 13 files changed, 97 insertions(+), 65 deletions(-) diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala index a9a79de05c27..28b06f448784 100644 --- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala +++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala @@ -44,6 +44,7 @@ object LogKey extends Enumeration { val COMPONENT = Value val CONFIG = Value val CONFIG2 = Value + val CONFIG3 = Value val CONTAINER = Value val CONTAINER_ID = Value val COUNT = Value @@ -58,6 +59,7 @@ object LogKey extends Enumeration { val DRIVER_ID = Value val DROPPED_PARTITIONS = Value val END_POINT = Value + val ENGINE = Value val ERROR = Value val EVENT_LOOP = Value val EVENT_QUEUE = Value @@ -66,14 +68,19 @@ object LogKey extends Enumeration { val EXIT_CODE = Value val EXPRESSION_TERMS = Value val FAILURES = Value + val FALLBACK_VERSION = Value val FIELD_NAME = Value + val FILE_FORMAT = Value + val FILE_FORMAT2 = Value val FUNCTION_NAME = Value val FUNCTION_PARAMETER = Value val GROUP_ID = Value + val HADOOP_VERSION = Value val HIVE_OPERATION_STATE = Value val HIVE_OPERATION_TYPE = Value val HOST = Value val INDEX = Value + val INFERENCE_MODE = Value val JOB_ID = Value val JOIN_CONDITION = Value val JOIN_CONDITION_SUB_EXPRESSION = Value @@ -132,6 +139,8 @@ object LogKey extends Enumeration { val RULE_BATCH_NAME = Value val RULE_NAME = Value val RULE_NUMBER_OF_RUNS = Value + val SCHEMA = Value + val SCHEMA2 = Value val SERVICE_NAME = Value val SESSION_ID = Value val SHARD_ID = Value diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HBaseDelegationTokenProvider.scala b/core/src/main/scala/org/apache/spark/deploy/security/HBaseDelegationTokenProvider.scala index d60e5975071d..1b2e41bc0a2e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/HBaseDelegationTokenProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HBaseDelegationTokenProvider.scala @@ -27,7 +27,8 @@ import org.apache.hadoop.security.Credentials import org.apache.hadoop.security.token.{Token, TokenIdentifier} import org.apache.spark.SparkConf -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.SERVICE_NAME import org.apache.spark.security.HadoopDelegationTokenProvider import org.apache.spark.util.Utils @@ -53,8 +54,8 @@ private[security] class HBaseDelegationTokenProvider creds.addToken(token.getService, token) } catch { case NonFatal(e) => - logWarning(Utils.createFailedToGetTokenMessage(serviceName, e) + - s" Retrying to fetch HBase security token with $serviceName connection parameter.") + logWarning(Utils.createFailedToGetTokenMessage(serviceName, e) + log" Retrying to fetch " + + log"HBase security token with ${MDC(SERVICE_NAME, serviceName)} connection parameter.") // Seems to be spark is trying to get the token from HBase 2.x.x version or above where the // obtainToken(Configuration conf) API has been removed. Lets try obtaining the token from // another compatible API of HBase service. diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 7022506e5508..af91a4b32c6f 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -68,7 +68,7 @@ import org.slf4j.Logger import org.apache.spark._ import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.{Logging, MDC, MessageWithContext} import org.apache.spark.internal.LogKey._ import org.apache.spark.internal.config._ import org.apache.spark.internal.config.Streaming._ @@ -2972,10 +2972,10 @@ private[spark] object Utils } /** Returns a string message about delegation token generation failure */ - def createFailedToGetTokenMessage(serviceName: String, e: scala.Throwable): String = { - val message = "Failed to get token from service %s due to %s. " + - "If %s is not used, set spark.security.credentials.%s.enabled to false." - message.format(serviceName, e, serviceName, serviceName) + def createFailedToGetTokenMessage(serviceName: String, e: scala.Throwable): MessageWithContext = { + log"Failed to get token from service ${MDC(SERVICE_NAME, serviceName)} " + + log"due to ${MDC(ERROR, e)}. If ${MDC(SERVICE_NAME, serviceName)} is not used, " + + log"set spark.security.credentials.${MDC(SERVICE_NAME, serviceName)}.enabled to false." } /** diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 32fc8a452106..8c35e10b383f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -33,7 +33,8 @@ import org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_FORMAT import org.apache.thrift.TException import org.apache.spark.{SparkConf, SparkException} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{DATABASE_NAME, SCHEMA, SCHEMA2, TABLE_NAME} import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException @@ -175,9 +176,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat override def alterDatabase(dbDefinition: CatalogDatabase): Unit = withClient { val existingDb = getDatabase(dbDefinition.name) if (existingDb.properties == dbDefinition.properties) { - logWarning(s"Request to alter database ${dbDefinition.name} is a no-op because " + - s"the provided database properties are the same as the old ones. Hive does not " + - s"currently support altering other database fields.") + logWarning(log"Request to alter database ${MDC(DATABASE_NAME, dbDefinition.name)} is a " + + log"no-op because the provided database properties are the same as the old ones. Hive " + + log"does not currently support altering other database fields.") } client.alterDatabase(dbDefinition) } @@ -380,8 +381,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat } catch { case NonFatal(e) => val warningMessage = - s"Could not persist ${table.identifier.quotedString} in a Hive " + - "compatible way. Persisting it into Hive metastore in Spark SQL specific format." + log"Could not persist ${MDC(TABLE_NAME, table.identifier.quotedString)} in a Hive " + + log"compatible way. Persisting it into Hive metastore in Spark SQL specific format." logWarning(warningMessage, e) saveTableIntoHive(newSparkSQLSpecificMetastoreTable(), ignoreIfExists) } @@ -676,9 +677,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat client.alterTableDataSchema(db, table, newDataSchema, schemaProps) } catch { case NonFatal(e) => - val warningMessage = - s"Could not alter schema of table ${oldTable.identifier.quotedString} in a Hive " + - "compatible way. Updating Hive metastore in Spark SQL specific format." + val warningMessage = log"Could not alter schema of table " + + log"${MDC(TABLE_NAME, oldTable.identifier.quotedString)} in a Hive compatible way. " + + log"Updating Hive metastore in Spark SQL specific format." logWarning(warningMessage, e) client.alterTableDataSchema(db, table, EMPTY_DATA_SCHEMA, schemaProps) } @@ -800,10 +801,11 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat // schema we read back is different(ignore case and nullability) from the one in table // properties which was written when creating table, we should respect the table schema // from hive. - logWarning(s"The table schema given by Hive metastore(${table.schema.catalogString}) is " + - "different from the schema when this table was created by Spark SQL" + - s"(${schemaFromTableProps.catalogString}). We have to fall back to the table schema " + - "from Hive metastore which is not case preserving.") + logWarning(log"The table schema given by Hive metastore" + + log"(${MDC(SCHEMA, table.schema.catalogString)}) is different from the schema when " + + log"this table was created by Spark SQL" + + log"(${MDC(SCHEMA2, schemaFromTableProps.catalogString)}). We have to fall back to " + + log"the table schema from Hive metastore which is not case preserving.") hiveTable.copy(schemaPreservesCase = false) } } else { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index f1d99d359cde..5b3160c56304 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -25,7 +25,8 @@ import com.google.common.util.concurrent.Striped import org.apache.hadoop.fs.Path import org.apache.spark.SparkException -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{FILE_FORMAT, FILE_FORMAT2, INFERENCE_MODE, TABLE_NAME} import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier} import org.apache.spark.sql.catalyst.catalog._ @@ -104,21 +105,28 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log None } case _ => - logWarning(s"Table $tableIdentifier should be stored as $expectedFileFormat. " + - s"However, we are getting a ${relation.fileFormat} from the metastore cache. " + - "This cached entry will be invalidated.") + logWarningUnexpectedFileFormat(tableIdentifier, expectedFileFormat, + relation.fileFormat.toString) catalogProxy.invalidateCachedTable(tableIdentifier) None } case other => - logWarning(s"Table $tableIdentifier should be stored as $expectedFileFormat. " + - s"However, we are getting a $other from the metastore cache. " + - "This cached entry will be invalidated.") + logWarningUnexpectedFileFormat(tableIdentifier, expectedFileFormat, other.toString) catalogProxy.invalidateCachedTable(tableIdentifier) None } } + private def logWarningUnexpectedFileFormat( + tableIdentifier: QualifiedTableName, + expectedFileFormat: Class[_ <: FileFormat], + actualFileFormat: String): Unit = { + logWarning(log"Table ${MDC(TABLE_NAME, tableIdentifier)} should be stored as " + + log"${MDC(FILE_FORMAT, expectedFileFormat)}. However, we are getting a " + + log"${MDC(FILE_FORMAT2, actualFileFormat)} from the metastore cache. " + + log"This cached entry will be invalidated.") + } + // Return true for Apache ORC and Hive ORC-related configuration names. // Note that Spark doesn't support configurations like `hive.merge.orcfile.stripe.level`. private def isOrcProperty(key: String) = @@ -353,8 +361,9 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log val newSchema = StructType(dataSchema ++ relation.tableMeta.partitionSchema) relation.tableMeta.copy(schema = newSchema) case None => - logWarning(s"Unable to infer schema for table $tableName from file format " + - s"$fileFormat (inference mode: $inferenceMode). Using metastore schema.") + logWarning(log"Unable to infer schema for table ${MDC(TABLE_NAME, tableName)} from " + + log"file format ${MDC(FILE_FORMAT, fileFormat)} (inference mode: " + + log"${MDC(INFERENCE_MODE, inferenceMode)}). Using metastore schema.") relation.tableMeta } } else { @@ -367,7 +376,8 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log sparkSession.sessionState.catalog.alterTableDataSchema(identifier, newDataSchema) } catch { case NonFatal(ex) => - logWarning(s"Unable to save case-sensitive schema for table ${identifier.unquotedString}", ex) + logWarning(log"Unable to save case-sensitive schema for table " + + log"${MDC(TABLE_NAME, identifier.unquotedString)}", ex) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index 79d0af0f9a09..68f34bd2beb0 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -36,7 +36,8 @@ import org.apache.hive.common.util.HiveVersionInfo import org.apache.spark.SparkConf import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.PATH import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.execution.command.DDLUtils @@ -320,7 +321,7 @@ private[spark] object HiveUtils extends Logging { if (file.getName == "*") { val files = file.getParentFile.listFiles() if (files == null) { - logWarning(s"Hive jar path '${file.getPath}' does not exist.") + logWarning(log"Hive jar path '${MDC(PATH, file.getPath)}' does not exist.") Nil } else { files.filter(_.getName.toLowerCase(Locale.ROOT).endsWith(".jar")).map(_.toURI.toURL) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 92561bed1195..502cec3be9c8 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -230,8 +230,8 @@ private[hive] class HiveClientImpl( case e: Exception if causedByThrift(e) => caughtException = e logWarning( - "HiveClient got thrift exception, destroying client and retrying " + - s"(${retryLimit - numTries} tries remaining)", e) + log"HiveClient got thrift exception, destroying client and retrying " + + log"${MDC(RETRY_COUNT, numTries)} times", e) clientLoader.cachedHive = null Thread.sleep(retryDelayMillis) } @@ -1335,8 +1335,8 @@ private[hive] object HiveClientImpl extends Logging { // initialize spark or tez stuff, which is useless for spark. val engine = hiveConf.get("hive.execution.engine") if (engine != "mr") { - logWarning(s"Detected HiveConf hive.execution.engine is '$engine' and will be reset to 'mr'" + - " to disable useless hive logic") + logWarning(log"Detected HiveConf hive.execution.engine is '${MDC(ENGINE, engine)}' and " + + log"will be reset to 'mr' to disable useless hive logic") hiveConf.set("hive.execution.engine", "mr", SOURCE_SPARK) } hiveConf diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala index 9943c0178fcf..07daa2938628 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala @@ -37,7 +37,8 @@ import org.apache.hadoop.hive.ql.processors.{CommandProcessor, CommandProcessorF import org.apache.hadoop.hive.ql.session.SessionState import org.apache.hadoop.hive.serde.serdeConstants -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{CONFIG, CONFIG2, CONFIG3} import org.apache.spark.metrics.source.HiveCatalogMetrics import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow} import org.apache.spark.sql.catalyst.analysis.NoSuchPermanentFunctionException @@ -368,16 +369,16 @@ private[client] class Shim_v2_0 extends Shim with Logging { hive.getPartitionsByFilter(table, filter) } catch { case ex: MetaException if shouldFallback => - logWarning("Caught Hive MetaException attempting to get partition metadata by " + - "filter from Hive. Falling back to fetching all partition metadata, which will " + - "degrade performance. Modifying your Hive metastore configuration to set " + - s"${tryDirectSqlConfVar.varname} to true (if it is not true already) may resolve " + - "this problem. Or you can enable " + - s"${SQLConf.HIVE_METASTORE_PARTITION_PRUNING_FAST_FALLBACK.key} " + - "to alleviate performance downgrade. " + - "Otherwise, to avoid degraded performance you can set " + - s"${SQLConf.HIVE_METASTORE_PARTITION_PRUNING_FALLBACK_ON_EXCEPTION.key} " + - " to false and let the query fail instead.", ex) + logWarning(log"Caught Hive MetaException attempting to get partition metadata by " + + log"filter from Hive. Falling back to fetching all partition metadata, which will " + + log"degrade performance. Modifying your Hive metastore configuration to set " + + log"${MDC(CONFIG, tryDirectSqlConfVar.varname)} to true " + + log"(if it is not true already) may resolve this problem. Or you can enable " + + log"${MDC(CONFIG2, SQLConf.HIVE_METASTORE_PARTITION_PRUNING_FAST_FALLBACK.key)} " + + log"to alleviate performance downgrade. Otherwise, to avoid degraded performance " + + log"you can set ${MDC(CONFIG3, + SQLConf.HIVE_METASTORE_PARTITION_PRUNING_FALLBACK_ON_EXCEPTION.key)} " + + log"to false and let the query fail instead.", ex) // HiveShim clients are expected to handle a superset of the requested partitions prunePartitionsFastFallback(hive, table, catalogTable, predicates) case ex: MetaException => diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala index 5297910cbfa4..e4bab4631ab1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala @@ -30,7 +30,8 @@ import org.apache.hadoop.hive.shims.ShimLoader import org.apache.spark.SparkConf import org.apache.spark.deploy.SparkSubmit -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{FALLBACK_VERSION, HADOOP_VERSION} import org.apache.spark.sql.catalyst.util.quietly import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.hive.HiveUtils @@ -66,10 +67,12 @@ private[hive] object IsolatedClientLoader extends Logging { // If the error message contains hadoop, it is probably because the hadoop // version cannot be resolved. val fallbackVersion = "3.4.0" - logWarning(s"Failed to resolve Hadoop artifacts for the version $hadoopVersion. We " + - s"will change the hadoop version from $hadoopVersion to $fallbackVersion and try " + - "again. It is recommended to set jars used by Hive metastore client through " + - "spark.sql.hive.metastore.jars in the production environment.") + logWarning(log"Failed to resolve Hadoop artifacts for the version " + + log"${MDC(HADOOP_VERSION, hadoopVersion)}. We will change the hadoop version from " + + log"${MDC(HADOOP_VERSION, hadoopVersion)} to " + + log"${MDC(FALLBACK_VERSION, fallbackVersion)} and try again. It is recommended to " + + log"set jars used by Hive metastore client through spark.sql.hive.metastore.jars " + + log"in the production environment.") (downloadVersion( resolvedVersion, fallbackVersion, ivyPath, remoteRepos), fallbackVersion) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala index fa21be0c6514..1b76478a5cf3 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala @@ -31,7 +31,8 @@ import org.apache.hadoop.io.Writable import org.apache.hadoop.mapred.{JobConf, Reporter} import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.CLASS_NAME import org.apache.spark.internal.config.SPECULATION_ENABLED import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow @@ -76,10 +77,10 @@ class HiveFileFormat(fileSinkConf: FileSinkDesc) val outputCommitterClass = conf.get("mapred.output.committer.class", "") if (speculationEnabled && outputCommitterClass.contains("Direct")) { val warningMessage = - s"$outputCommitterClass may be an output committer that writes data directly to " + - "the final location. Because speculation is enabled, this output committer may " + - "cause data loss (see the case in SPARK-10063). If possible, please use an output " + - "committer that does not have this behavior (e.g. FileOutputCommitter)." + log"${MDC(CLASS_NAME, outputCommitterClass)} may be an output committer that writes data " + + log"directly to the final location. Because speculation is enabled, this output " + + log"committer may cause data loss (see the case in SPARK-10063). If possible, please " + + log"use an output committer that does not have this behavior (e.g. FileOutputCommitter)." logWarning(warningMessage) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTempPath.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTempPath.scala index eb8482da38e5..a16191b72a8d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTempPath.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTempPath.scala @@ -31,7 +31,8 @@ import org.apache.hadoop.hive.common.FileUtils import org.apache.hadoop.hive.ql.exec.TaskRunner import org.apache.spark.SparkException -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.PATH import org.apache.spark.sql.SparkSession import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.hive.HiveExternalCatalog @@ -140,7 +141,7 @@ class HiveTempPath(session: SparkSession, val hadoopConf: Configuration, path: P } catch { case NonFatal(e) => val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging") - logWarning(s"Unable to delete staging directory: $stagingDir.\n" + e) + logWarning(log"Unable to delete staging directory: ${MDC(PATH, stagingDir)}.", e) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala index 3fdd5a9c4cec..3e1bdff8c007 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala @@ -25,7 +25,8 @@ import org.apache.hadoop.hive.ql.io.orc.{OrcFile, Reader} import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.PATH import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.types.StructType @@ -77,7 +78,7 @@ private[hive] object OrcFileOperator extends Logging { } catch { case e: IOException => if (ignoreCorruptFiles) { - logWarning(s"Skipped the footer in the corrupted file: $path", e) + logWarning(log"Skipped the footer in the corrupted file: ${MDC(PATH, path)}", e) None } else { throw QueryExecutionErrors.cannotReadFooterForFileError(path, e) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/security/HiveDelegationTokenProvider.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/security/HiveDelegationTokenProvider.scala index 253c0a9ebafe..13ff721736b2 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/security/HiveDelegationTokenProvider.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/security/HiveDelegationTokenProvider.scala @@ -32,7 +32,8 @@ import org.apache.hadoop.security.token.Token import org.apache.spark.SparkConf import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.CLASS_NAME import org.apache.spark.internal.config.KEYTAB import org.apache.spark.security.HadoopDelegationTokenProvider import org.apache.spark.sql.hive.client.HiveClientImpl @@ -43,8 +44,9 @@ private[spark] class HiveDelegationTokenProvider override def serviceName: String = "hive" - private val classNotFoundErrorStr = s"You are attempting to use the " + - s"${getClass.getCanonicalName}, but your Spark distribution is not built with Hive libraries." + private val classNotFoundErrorStr = + log"You are attempting to use the ${MDC(CLASS_NAME, getClass.getCanonicalName)}, " + + log"but your Spark distribution is not built with Hive libraries." private def hiveConf(hadoopConf: Configuration): Configuration = { try { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org