This is an automated email from the ASF dual-hosted git repository. gengliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 149ac0f8893b [SPARK-47581][CORE] SQL catalyst: Migrate logWarning with variables to structured logging framework 149ac0f8893b is described below commit 149ac0f8893b5be8b8b0556ef47a2384aaad1850 Author: Daniel Tenedorio <daniel.tenedo...@databricks.com> AuthorDate: Mon Apr 8 22:56:10 2024 -0700 [SPARK-47581][CORE] SQL catalyst: Migrate logWarning with variables to structured logging framework ### What changes were proposed in this pull request? Migrate logWarning with variables of the Catalyst module to structured logging framework. This transforms the logWarning entries of the following API ``` def logWarning(msg: => String): Unit ``` to ``` def logWarning(entry: LogEntry): Unit ``` ### Why are the changes needed? To enhance Apache Spark's logging system by implementing structured logging. ### Does this PR introduce _any_ user-facing change? Yes, Spark core logs will contain additional MDC ### How was this patch tested? Compiler and scala style checks, as well as code review. ### Was this patch authored or co-authored using generative AI tooling? No Closes #45904 from dtenedor/log-warn-catalyst. Lead-authored-by: Daniel Tenedorio <daniel.tenedo...@databricks.com> Co-authored-by: Gengliang Wang <gengli...@apache.org> Signed-off-by: Gengliang Wang <gengli...@apache.org> --- .../scala/org/apache/spark/internal/LogKey.scala | 26 ++++++++++++++++++++++ .../sql/catalyst/analysis/FunctionRegistry.scala | 6 +++-- .../sql/catalyst/analysis/HintErrorLogger.scala | 19 ++++++++++------ .../catalyst/analysis/StreamingJoinHelper.scala | 19 ++++++++++------ .../analysis/UnsupportedOperationChecker.scala | 6 +++-- .../spark/sql/catalyst/catalog/interface.scala | 6 +++-- .../spark/sql/catalyst/csv/CSVHeaderChecker.scala | 25 ++++++++++++--------- .../catalyst/expressions/V2ExpressionUtils.scala | 10 +++++---- .../spark/sql/catalyst/optimizer/Optimizer.scala | 4 ++-- .../ReplaceNullWithFalseInPredicate.scala | 7 ++++-- .../spark/sql/catalyst/optimizer/joins.scala | 7 ++++-- .../spark/sql/catalyst/parser/AstBuilder.scala | 8 ++++--- .../spark/sql/catalyst/rules/RuleExecutor.scala | 14 +++++++----- .../spark/sql/catalyst/util/CharVarcharUtils.scala | 11 ++++----- .../apache/spark/sql/catalyst/util/ParseMode.scala | 9 +++++--- .../catalyst/util/ResolveDefaultColumnsUtil.scala | 10 ++++++--- .../spark/sql/catalyst/util/StringUtils.scala | 11 +++++---- 17 files changed, 133 insertions(+), 65 deletions(-) diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala index a0e99f1edc34..7fa0331515cb 100644 --- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala +++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala @@ -22,6 +22,7 @@ package org.apache.spark.internal */ object LogKey extends Enumeration { val ACCUMULATOR_ID = Value + val ANALYSIS_ERROR = Value val APP_DESC = Value val APP_ID = Value val APP_STATE = Value @@ -33,6 +34,10 @@ object LogKey extends Enumeration { val CATEGORICAL_FEATURES = Value val CLASS_LOADER = Value val CLASS_NAME = Value + val COLUMN_DATA_TYPE_SOURCE = Value + val COLUMN_DATA_TYPE_TARGET = Value + val COLUMN_DEFAULT_VALUE = Value + val COLUMN_NAME = Value val COMMAND = Value val COMMAND_OUTPUT = Value val COMPONENT = Value @@ -40,6 +45,12 @@ object LogKey extends Enumeration { val CONFIG2 = Value val CONTAINER_ID = Value val COUNT = Value + val CSV_HEADER_COLUMN_NAME = Value + val CSV_HEADER_COLUMN_NAMES = Value + val CSV_HEADER_LENGTH = Value + val CSV_SCHEMA_FIELD_NAME = Value + val CSV_SCHEMA_FIELD_NAMES = Value + val CSV_SOURCE = Value val DRIVER_ID = Value val END_POINT = Value val ERROR = Value @@ -48,13 +59,17 @@ object LogKey extends Enumeration { val EXECUTOR_ID = Value val EXECUTOR_STATE = Value val EXIT_CODE = Value + val EXPRESSION_TERMS = Value val FAILURES = Value + val FUNCTION_NAME = Value + val FUNCTION_PARAMETER = Value val GROUP_ID = Value val HIVE_OPERATION_STATE = Value val HIVE_OPERATION_TYPE = Value val HOST = Value val JOB_ID = Value val JOIN_CONDITION = Value + val JOIN_CONDITION_SUB_EXPRESSION = Value val LEARNING_RATE = Value val LINE = Value val LINE_NUM = Value @@ -68,21 +83,28 @@ object LogKey extends Enumeration { val MERGE_DIR_NAME = Value val METHOD_NAME = Value val MIN_SIZE = Value + val NUM_COLUMNS = Value val NUM_ITERATIONS = Value val OBJECT_ID = Value val OLD_BLOCK_MANAGER_ID = Value val OPTIMIZER_CLASS_NAME = Value val OP_TYPE = Value + val PARSE_MODE = Value val PARTITION_ID = Value + val PARTITION_SPECIFICATION = Value val PATH = Value val PATHS = Value val POD_ID = Value val PORT = Value + val QUERY_HINT = Value val QUERY_PLAN = Value + val QUERY_PLAN_LENGTH_ACTUAL = Value + val QUERY_PLAN_LENGTH_MAX = Value val RANGE = Value val RDD_ID = Value val REASON = Value val REDUCE_ID = Value + val RELATION_NAME = Value val REMOTE_ADDRESS = Value val RETRY_COUNT = Value val RETRY_INTERVAL = Value @@ -97,6 +119,7 @@ object LogKey extends Enumeration { val SHUFFLE_MERGE_ID = Value val SIZE = Value val SLEEP_TIME = Value + val SQL_TEXT = Value val STAGE_ID = Value val STATEMENT_ID = Value val SUBMISSION_ID = Value @@ -110,8 +133,11 @@ object LogKey extends Enumeration { val THREAD_NAME = Value val TID = Value val TIMEOUT = Value + val TIME_UNITS = Value val TOTAL_EFFECTIVE_TIME = Value val TOTAL_TIME = Value + val UNSUPPORTED_EXPRESSION = Value + val UNSUPPORTED_HINT_REASON = Value val URI = Value val USER_ID = Value val USER_NAME = Value diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index bbc063c32103..99ae3adde44f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -24,7 +24,8 @@ import scala.collection.mutable import scala.reflect.ClassTag import org.apache.spark.SparkUnsupportedOperationException -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.FUNCTION_NAME import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.FunctionIdentifier import org.apache.spark.sql.catalyst.expressions._ @@ -223,7 +224,8 @@ trait SimpleFunctionRegistryBase[T] extends FunctionRegistryBase[T] with Logging val newFunction = (info, builder) functionBuilders.put(name, newFunction) match { case Some(previousFunction) if previousFunction != newFunction => - logWarning(s"The function $name replaced a previously registered function.") + logWarning(log"The function ${MDC(FUNCTION_NAME, name)} replaced a " + + log"previously registered function.") case _ => } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HintErrorLogger.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HintErrorLogger.scala index 0287bb3d819f..7338ef21a713 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HintErrorLogger.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HintErrorLogger.scala @@ -17,7 +17,8 @@ package org.apache.spark.sql.catalyst.analysis -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{QUERY_HINT, RELATION_NAME, UNSUPPORTED_HINT_REASON} import org.apache.spark.sql.catalyst.plans.logical.{HintErrorHandler, HintInfo} /** @@ -27,27 +28,31 @@ object HintErrorLogger extends HintErrorHandler with Logging { import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ override def hintNotRecognized(name: String, parameters: Seq[Any]): Unit = { - logWarning(s"Unrecognized hint: ${hintToPrettyString(name, parameters)}") + logWarning(log"Unrecognized hint: " + + log"${MDC(QUERY_HINT, hintToPrettyString(name, parameters))}") } override def hintRelationsNotFound( name: String, parameters: Seq[Any], invalidRelations: Set[Seq[String]]): Unit = { invalidRelations.foreach { ident => - logWarning(s"Count not find relation '${ident.quoted}' specified in hint " + - s"'${hintToPrettyString(name, parameters)}'.") + logWarning(log"Count not find relation '${MDC(RELATION_NAME, ident.quoted)}' " + + log"specified in hint '${MDC(QUERY_HINT, hintToPrettyString(name, parameters))}'.") } } override def joinNotFoundForJoinHint(hint: HintInfo): Unit = { - logWarning(s"A join hint $hint is specified but it is not part of a join relation.") + logWarning(log"A join hint ${MDC(QUERY_HINT, hint)} is specified " + + log"but it is not part of a join relation.") } override def joinHintNotSupported(hint: HintInfo, reason: String): Unit = { - logWarning(s"Hint $hint is not supported in the query: $reason.") + logWarning(log"Hint ${MDC(QUERY_HINT, hint)} is not supported in the query: " + + log"${MDC(UNSUPPORTED_HINT_REASON, reason)}.") } override def hintOverridden(hint: HintInfo): Unit = { - logWarning(s"Hint $hint is overridden by another hint and will not take effect.") + logWarning(log"Hint ${MDC(QUERY_HINT, hint)} is overridden by another hint " + + log"and will not take effect.") } private def hintToPrettyString(name: String, parameters: Seq[Any]): String = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/StreamingJoinHelper.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/StreamingJoinHelper.scala index cb836550359c..e9c4dd0be7d9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/StreamingJoinHelper.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/StreamingJoinHelper.scala @@ -87,7 +87,8 @@ object StreamingJoinHelper extends PredicateHelper with Logging { l, r, attributesToFindStateWatermarkFor, attributesWithEventWatermark, eventWatermark) } catch { case NonFatal(e) => - logWarning(s"Error trying to extract state constraint from condition $joinCondition", e) + logWarning(log"Error trying to extract state constraint from condition " + + log"${MDC(JOIN_CONDITION, joinCondition)}", e) None } } @@ -165,8 +166,9 @@ object StreamingJoinHelper extends PredicateHelper with Logging { // Verify there is only one correct constraint term and of the correct type if (constraintTerms.size > 1) { - logWarning("Failed to extract state constraint terms: multiple time terms in condition\n\t" + - terms.mkString("\n\t")) + logWarning( + log"Failed to extract state constraint terms: multiple time terms in condition\n\t" + + log"${MDC(EXPRESSION_TERMS, terms.mkString("\n\t"))}") return None } if (constraintTerms.isEmpty) { @@ -263,9 +265,10 @@ object StreamingJoinHelper extends PredicateHelper with Logging { if (calendarInterval.months != 0) { invalid = true logWarning( - s"Failed to extract state value watermark from condition $exprToCollectFrom " + - s"as imprecise intervals like months and years cannot be used for" + - s"watermark calculation. Use interval in terms of day instead.") + log"Failed to extract state value watermark from condition " + + log"${MDC(JOIN_CONDITION, exprToCollectFrom)} " + + log"as imprecise intervals like months and years cannot be used for" + + log"watermark calculation. Use interval in terms of day instead.") Literal(0.0) } else { Literal(calendarInterval.days * MICROS_PER_DAY.toDouble + @@ -284,7 +287,9 @@ object StreamingJoinHelper extends PredicateHelper with Logging { Seq(negateIfNeeded(castedLit, negate)) case a @ _ => logWarning( - s"Failed to extract state value watermark from condition $exprToCollectFrom due to $a") + log"Failed to extract state value watermark from condition " + + log"${MDC(JOIN_CONDITION, exprToCollectFrom)} due to " + + log"${MDC(JOIN_CONDITION_SUB_EXPRESSION, a)}") invalid = true Seq.empty } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala index d57464fcefc0..e39ec267fa61 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala @@ -17,7 +17,8 @@ package org.apache.spark.sql.catalyst.analysis -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{ANALYSIS_ERROR, QUERY_PLAN} import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.ExtendedAnalysisException import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, CurrentDate, CurrentTimestampLike, Expression, GroupingSets, LocalTimestamp, MonotonicallyIncreasingID, SessionWindow, WindowExpression} @@ -133,7 +134,8 @@ object UnsupportedOperationChecker extends Logging { } } } catch { - case e: AnalysisException if !failWhenDetected => logWarning(s"${e.message};\n$plan") + case e: AnalysisException if !failWhenDetected => + logWarning(log"${MDC(ANALYSIS_ERROR, e.message)};\n${MDC(QUERY_PLAN, plan)}", e) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 10428877ba8d..4807d886c9f9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -32,7 +32,8 @@ import org.json4s.JsonAST.{JArray, JString} import org.json4s.jackson.JsonMethods._ import org.apache.spark.SparkException -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey._ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{CurrentUserContext, FunctionIdentifier, InternalRow, SQLConfHelper, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, Resolver, UnresolvedLeafNode} @@ -832,7 +833,8 @@ object CatalogColumnStat extends Logging { )) } catch { case NonFatal(e) => - logWarning(s"Failed to parse column statistics for column ${colName} in table $table", e) + logWarning(log"Failed to parse column statistics for column " + + log"${MDC(COLUMN_NAME, colName)} in table ${MDC(RELATION_NAME, table)}", e) None } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVHeaderChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVHeaderChecker.scala index 1fd264f2c3df..4e0985af6b60 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVHeaderChecker.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVHeaderChecker.scala @@ -21,7 +21,8 @@ import com.univocity.parsers.common.AbstractParser import com.univocity.parsers.csv.{CsvParser, CsvParserSettings} import org.apache.spark.SparkIllegalArgumentException -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC, MessageWithContext} +import org.apache.spark.internal.LogKey.{CSV_HEADER_COLUMN_NAME, CSV_HEADER_COLUMN_NAMES, CSV_HEADER_LENGTH, CSV_SCHEMA_FIELD_NAME, CSV_SCHEMA_FIELD_NAMES, CSV_SOURCE, NUM_COLUMNS} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StructType @@ -61,7 +62,7 @@ class CSVHeaderChecker( if (columnNames != null) { val fieldNames = schema.map(_.name).toIndexedSeq val (headerLen, schemaSize) = (columnNames.length, fieldNames.length) - var errorMessage: Option[String] = None + var errorMessage: Option[MessageWithContext] = None if (headerLen == schemaSize) { var i = 0 @@ -75,19 +76,21 @@ class CSVHeaderChecker( } if (nameInHeader != nameInSchema) { errorMessage = Some( - s"""|CSV header does not conform to the schema. - | Header: ${columnNames.mkString(", ")} - | Schema: ${fieldNames.mkString(", ")} - |Expected: ${fieldNames(i)} but found: ${columnNames(i)} - |$source""".stripMargin) + log"""|CSV header does not conform to the schema. + | Header: ${MDC(CSV_HEADER_COLUMN_NAMES, columnNames.mkString(", "))} + | Schema: ${MDC(CSV_SCHEMA_FIELD_NAMES, fieldNames.mkString(", "))} + |Expected: ${MDC(CSV_SCHEMA_FIELD_NAME, fieldNames(i))} + |but found: ${MDC(CSV_HEADER_COLUMN_NAME, columnNames(i))} + |${MDC(CSV_SOURCE, source)}""".stripMargin) } i += 1 } } else { errorMessage = Some( - s"""|Number of column in CSV header is not equal to number of fields in the schema: - | Header length: $headerLen, schema size: $schemaSize - |$source""".stripMargin) + log"""|Number of column in CSV header is not equal to number of fields in the schema: + | Header length: ${MDC(CSV_HEADER_LENGTH, headerLen)}, + | schema size: ${MDC(NUM_COLUMNS, schemaSize)} + |${MDC(CSV_SOURCE, source)}""".stripMargin) } errorMessage.foreach { msg => @@ -96,7 +99,7 @@ class CSVHeaderChecker( } else { throw new SparkIllegalArgumentException( errorClass = "_LEGACY_ERROR_TEMP_3241", - messageParameters = Map("msg" -> msg)) + messageParameters = Map("msg" -> msg.message)) } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/V2ExpressionUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/V2ExpressionUtils.scala index 621e01eedea7..c6cfccb74c16 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/V2ExpressionUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/V2ExpressionUtils.scala @@ -19,7 +19,8 @@ package org.apache.spark.sql.catalyst.expressions import java.lang.reflect.{Method, Modifier} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{FUNCTION_NAME, FUNCTION_PARAMETER} import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper} import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException @@ -134,9 +135,10 @@ object V2ExpressionUtils extends SQLConfHelper with Logging { } catch { case _: NoSuchFunctionException => val parameterString = args.map(_.dataType.typeName).mkString("(", ", ", ")") - logWarning(s"V2 function $name with parameter types $parameterString is used in " + - "partition transforms, but its definition couldn't be found in the function catalog " + - "provided") + logWarning(log"V2 function ${MDC(FUNCTION_NAME, name)} " + + log"with parameter types ${MDC(FUNCTION_PARAMETER, parameterString)} is used in " + + log"partition transforms, but its definition couldn't be found in the function catalog " + + log"provided") None case _: UnsupportedOperationException => None diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 0e31595f919d..3a4002127df1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -445,8 +445,8 @@ abstract class Optimizer(catalogManager: CatalogManager) val excludedRules = excludedRulesConf.filter { ruleName => val nonExcludable = nonExcludableRules.contains(ruleName) if (nonExcludable) { - logWarning(s"Optimization rule '${ruleName}' was not excluded from the optimizer " + - s"because this rule is a non-excludable rule.") + logWarning(log"Optimization rule '${MDC(RULE_NAME, ruleName)}' " + + log"was not excluded from the optimizer because this rule is a non-excludable rule.") } !nonExcludable } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceNullWithFalseInPredicate.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceNullWithFalseInPredicate.scala index 73972cae82de..772382f5f1e1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceNullWithFalseInPredicate.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceNullWithFalseInPredicate.scala @@ -18,6 +18,8 @@ package org.apache.spark.sql.catalyst.optimizer import org.apache.spark.SparkIllegalArgumentException +import org.apache.spark.internal.LogKey.{SQL_TEXT, UNSUPPORTED_EXPRESSION} +import org.apache.spark.internal.MDC import org.apache.spark.sql.catalyst.expressions.{And, ArrayExists, ArrayFilter, CaseWhen, EqualNullSafe, Expression, If, In, InSet, LambdaFunction, Literal, MapFilter, Not, Or} import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, TrueLiteral} import org.apache.spark.sql.catalyst.plans.logical.{DeleteAction, DeleteFromTable, Filter, InsertAction, InsertStarAction, Join, LogicalPlan, MergeAction, MergeIntoTable, ReplaceData, UpdateAction, UpdateStarAction, UpdateTable, WriteDelta} @@ -138,8 +140,9 @@ object ReplaceNullWithFalseInPredicate extends Rule[LogicalPlan] { "dataType" -> e.dataType.catalogString, "expr" -> e.sql)) } else { - val message = "Expected a Boolean type expression in replaceNullWithFalse, " + - s"but got the type `${e.dataType.catalogString}` in `${e.sql}`." + val message = log"Expected a Boolean type expression in replaceNullWithFalse, " + + log"but got the type `${MDC(UNSUPPORTED_EXPRESSION, e.dataType.catalogString)}` " + + log"in `${MDC(SQL_TEXT, e.sql)}`." logWarning(message) e } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala index 8f03b93dce70..655b7c3455b1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala @@ -20,6 +20,8 @@ package org.apache.spark.sql.catalyst.optimizer import scala.annotation.tailrec import scala.util.control.NonFatal +import org.apache.spark.internal.LogKey.JOIN_CONDITION +import org.apache.spark.internal.MDC import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.planning.{ExtractEquiJoinKeys, ExtractFiltersAndInnerJoins} @@ -262,8 +264,9 @@ object ExtractPythonUDFFromJoinCondition extends Rule[LogicalPlan] with Predicat // the new join conditions. val (udf, rest) = splitConjunctivePredicates(cond).partition(hasUnevaluablePythonUDF(_, j)) val newCondition = if (rest.isEmpty) { - logWarning(s"The join condition:$cond of the join plan contains PythonUDF only," + - s" it will be moved out and the join plan will be turned to cross join.") + logWarning(log"The join condition:${MDC(JOIN_CONDITION, cond)} " + + log"of the join plan contains PythonUDF only," + + log" it will be moved out and the join plan will be turned to cross join.") None } else { Some(rest.reduceLeft(And)) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index b2ae8c0d757f..b4ba2c1caa22 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -31,7 +31,8 @@ import org.apache.commons.codec.DecoderException import org.apache.commons.codec.binary.Hex import org.apache.spark.{SparkArithmeticException, SparkException, SparkIllegalArgumentException, SparkThrowable} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.PARTITION_SPECIFICATION import org.apache.spark.sql.catalyst.{FunctionIdentifier, SQLConfHelper, TableIdentifier} import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, ClusterBySpec} @@ -4599,8 +4600,9 @@ class AstBuilder extends DataTypeAstBuilder with SQLConfHelper with Logging { override def visitAnalyze(ctx: AnalyzeContext): LogicalPlan = withOrigin(ctx) { def checkPartitionSpec(): Unit = { if (ctx.partitionSpec != null) { - logWarning("Partition specification is ignored when collecting column statistics: " + - ctx.partitionSpec.getText) + logWarning( + log"Partition specification is ignored when collecting column statistics: " + + log"${MDC(PARTITION_SPECIFICATION, ctx.partitionSpec.getText)}") } } if (ctx.identifier != null && diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala index 566c48ff41b6..476ace2662f8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala @@ -257,16 +257,18 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging { // Only log if this is a rule that is supposed to run more than once. if (iteration != 2) { val endingMsg = if (batch.strategy.maxIterationsSetting == null) { - "." + log"." } else { - s", please set '${batch.strategy.maxIterationsSetting}' to a larger value." + log", please set '${MDC(NUM_ITERATIONS, batch.strategy.maxIterationsSetting)}' " + + log"to a larger value." } - val message = s"Max iterations (${iteration - 1}) reached for batch ${batch.name}" + - s"$endingMsg" + val log = log"Max iterations (${MDC(NUM_ITERATIONS, iteration - 1)}) " + + log"reached for batch ${MDC(RULE_BATCH_NAME, batch.name)}" + + endingMsg if (Utils.isTesting || batch.strategy.errorOnExceed) { - throw new RuntimeException(message) + throw new RuntimeException(log.message) } else { - logWarning(message) + logWarning(log) } } // Check idempotence for Once batches. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CharVarcharUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CharVarcharUtils.scala index 0e7bffcc3f95..06a88b5d7b51 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CharVarcharUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CharVarcharUtils.scala @@ -19,7 +19,8 @@ package org.apache.spark.sql.catalyst.util import scala.collection.mutable -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke import org.apache.spark.sql.catalyst.parser.CatalystSqlParser @@ -74,10 +75,10 @@ object CharVarcharUtils extends Logging with SparkCharVarcharUtils { if (SQLConf.get.charVarcharAsString) { replaceCharVarcharWithString(dt) } else if (hasCharVarchar(dt)) { - logWarning("The Spark cast operator does not support char/varchar type and simply treats" + - " them as string type. Please use string type directly to avoid confusion. Otherwise," + - s" you can set ${SQLConf.LEGACY_CHAR_VARCHAR_AS_STRING.key} to true, so that Spark treat" + - s" them as string type as same as Spark 3.0 and earlier") + logWarning(log"The Spark cast operator does not support char/varchar type and simply treats" + + log" them as string type. Please use string type directly to avoid confusion. Otherwise," + + log" you can set ${MDC(CONFIG, SQLConf.LEGACY_CHAR_VARCHAR_AS_STRING.key)} " + + log"to true, so that Spark treat them as string type as same as Spark 3.0 and earlier") replaceCharVarcharWithString(dt) } else { dt diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ParseMode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ParseMode.scala index b35da8e2c80f..dd1e466d1b38 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ParseMode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ParseMode.scala @@ -19,7 +19,8 @@ package org.apache.spark.sql.catalyst.util import java.util.Locale -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.PARSE_MODE sealed trait ParseMode { /** @@ -53,11 +54,13 @@ object ParseMode extends Logging { case DropMalformedMode.name => DropMalformedMode case FailFastMode.name => FailFastMode case _ => - logWarning(s"$v is not a valid parse mode. Using ${PermissiveMode.name}.") + logWarning(log"${MDC(PARSE_MODE, v)} is not a valid parse mode. " + + log"Using ${MDC(PARSE_MODE, PermissiveMode.name)}.") PermissiveMode } }.getOrElse { - logWarning(s"mode is null and not a valid parse mode. Using ${PermissiveMode.name}.") + logWarning(log"mode is null and not a valid parse mode. " + + log"Using ${MDC(PARSE_MODE, PermissiveMode.name)}.") PermissiveMode } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala index 84bbf97886c8..db9adef8ef3b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala @@ -20,7 +20,8 @@ package org.apache.spark.sql.catalyst.util import scala.collection.mutable.ArrayBuffer import org.apache.spark.{SparkThrowable, SparkUnsupportedOperationException} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey._ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper} import org.apache.spark.sql.catalyst.analysis._ @@ -321,8 +322,11 @@ object ResolveDefaultColumns extends QueryErrorsBase Option(casted.eval(EmptyRow)).map(Literal(_, targetType)) } catch { case e @ ( _: SparkThrowable | _: RuntimeException) => - logWarning(s"Failed to cast default value '$l' for column $colName from " + - s"${l.dataType} to $targetType due to ${e.getMessage}") + logWarning(log"Failed to cast default value '${MDC(COLUMN_DEFAULT_VALUE, l)}' " + + log"for column ${MDC(COLUMN_NAME, colName)} " + + log"from ${MDC(COLUMN_DATA_TYPE_SOURCE, l.dataType)} " + + log"to ${MDC(COLUMN_DATA_TYPE_TARGET, targetType)} " + + log"due to ${MDC(ERROR, e.getMessage)}", e) None } case _ => None diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala index f94a0650cce4..04df3635d475 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala @@ -21,7 +21,8 @@ import java.util.regex.{Pattern, PatternSyntaxException} import org.apache.commons.text.similarity.LevenshteinDistance -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey._ import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.internal.SQLConf @@ -136,9 +137,11 @@ object StringUtils extends Logging { override def toString: String = { if (atLimit) { logWarning( - "Truncated the string representation of a plan since it was too long. The " + - s"plan had length ${length} and the maximum is ${maxLength}. This behavior " + - s"can be adjusted by setting '${SQLConf.MAX_PLAN_STRING_LENGTH.key}'.") + log"Truncated the string representation of a plan since it was too long. The " + + log"plan had length ${MDC(QUERY_PLAN_LENGTH_ACTUAL, length)} " + + log"and the maximum is ${MDC(QUERY_PLAN_LENGTH_MAX, maxLength)}. This behavior " + + log"can be adjusted by setting " + + log"'${MDC(CONFIG, SQLConf.MAX_PLAN_STRING_LENGTH.key)}'.") val truncateMsg = if (maxLength == 0) { s"Truncated plan of $length characters" } else { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org