This is an automated email from the ASF dual-hosted git repository. gengliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 62dd64a5d13d [SPARK-47583][CORE] SQL core: Migrate logError with variables to structured logging framework 62dd64a5d13d is described below commit 62dd64a5d13d14a4e3bce50d9c264f8e494c7863 Author: Daniel Tenedorio <daniel.tenedo...@databricks.com> AuthorDate: Wed Apr 24 13:43:05 2024 -0700 [SPARK-47583][CORE] SQL core: Migrate logError with variables to structured logging framework ### What changes were proposed in this pull request? Migrate logError with variables of the sql/core module to structured logging framework. This transforms the logError entries of the following API ``` def logError(msg: => String): Unit ``` to ``` def logError(entry: LogEntry): Unit ``` ### Why are the changes needed? To enhance Apache Spark's logging system by implementing structured logging. ### Does this PR introduce _any_ user-facing change? Yes, Spark core logs will contain additional MDC ### How was this patch tested? Compiler and scala style checks, as well as code review. ### Was this patch authored or co-authored using generative AI tooling? No Closes #45969 from dtenedor/log-error-sql-core. Authored-by: Daniel Tenedorio <daniel.tenedo...@databricks.com> Signed-off-by: Gengliang Wang <gengli...@apache.org> --- .../scala/org/apache/spark/internal/LogKey.scala | 6 +++++ .../execution/BaseScriptTransformationExec.scala | 10 ++++--- .../execution/adaptive/AdaptiveSparkPlanExec.scala | 31 +++++++++++++--------- .../command/InsertIntoDataSourceDirCommand.scala | 4 ++- .../execution/command/createDataSourceTables.scala | 5 +++- .../apache/spark/sql/execution/command/ddl.scala | 13 ++++----- .../execution/datasources/FileFormatWriter.scala | 7 ++--- .../datasources/v2/WriteToDataSourceV2Exec.scala | 20 ++++++++------ .../execution/exchange/BroadcastExchangeExec.scala | 4 ++- 9 files changed, 63 insertions(+), 37 deletions(-) diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala index b9b0e372a2b0..fab5e80dd0e6 100644 --- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala +++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala @@ -34,6 +34,7 @@ object LogKey extends Enumeration { val ARGS = Value val BACKUP_FILE = Value val BATCH_ID = Value + val BATCH_WRITE = Value val BLOCK_ID = Value val BLOCK_MANAGER_ID = Value val BROADCAST_ID = Value @@ -116,6 +117,7 @@ object LogKey extends Enumeration { val ESTIMATOR_PARAMETER_MAP = Value val EVENT_LOOP = Value val EVENT_QUEUE = Value + val EXCEPTION = Value val EXECUTE_INFO = Value val EXECUTE_KEY = Value val EXECUTION_PLAN_LEAVES = Value @@ -162,6 +164,7 @@ object LogKey extends Enumeration { val HIVE_OPERATION_TYPE = Value val HOST = Value val HOST_PORT = Value + val IDENTIFIER = Value val INCOMPATIBLE_TYPES = Value val INDEX = Value val INDEX_FILE_NUM = Value @@ -330,11 +333,13 @@ object LogKey extends Enumeration { val SPARK_PLAN_ID = Value val SQL_TEXT = Value val SRC_PATH = Value + val STAGE_ATTEMPT = Value val STAGE_ID = Value val START_INDEX = Value val STATEMENT_ID = Value val STATE_STORE_PROVIDER = Value val STATUS = Value + val STDERR = Value val STORAGE_LEVEL = Value val STORAGE_LEVEL_DESERIALIZED = Value val STORAGE_LEVEL_REPLICATION = Value @@ -402,6 +407,7 @@ object LogKey extends Enumeration { val WEIGHTED_NUM = Value val WORKER_URL = Value val WRITE_AHEAD_LOG_INFO = Value + val WRITE_JOB_UUID = Value val XSD_PATH = Value type LogKey = Value diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala index 91042b59677b..6e54bde46942 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala @@ -27,7 +27,8 @@ import scala.util.control.NonFatal import org.apache.hadoop.conf.Configuration import org.apache.spark.{SparkFiles, TaskContext} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey._ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, Cast, Expression, GenericInternalRow, JsonToStructs, Literal, StructsToJson, UnsafeProjection} @@ -185,7 +186,7 @@ trait BaseScriptTransformationExec extends UnaryExecNode { if (!proc.isAlive) { val exitCode = proc.exitValue() if (exitCode != 0) { - logError(stderrBuffer.toString) // log the stderr circular buffer + logError(log"${MDC(STDERR, stderrBuffer.toString)}") // log the stderr circular buffer throw QueryExecutionErrors.subprocessExitedError(exitCode, stderrBuffer, cause) } } @@ -329,12 +330,13 @@ abstract class BaseScriptTransformationWriterThread extends Thread with Logging // Javadoc this call will not throw an exception: _exception = t proc.destroy() - logError(s"Thread-${this.getClass.getSimpleName}-Feed exit cause by: ", t) + logError(log"Thread-${MDC(CLASS_NAME, this.getClass.getSimpleName)}-Feed " + + log"exit cause by: ", t) } finally { try { Utils.tryLogNonFatalError(outputStream.close()) if (proc.waitFor() != 0) { - logError(stderrBuffer.toString) // log the stderr circular buffer + logError(log"${MDC(STDERR, stderrBuffer.toString)}") // log the stderr circular buffer } } catch { case NonFatal(exceptionFromFinallyBlock) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index ca4400068250..12ca0652f1b7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -28,6 +28,8 @@ import scala.util.control.NonFatal import org.apache.spark.SparkException import org.apache.spark.broadcast +import org.apache.spark.internal.{MDC, MessageWithContext} +import org.apache.spark.internal.LogKey._ import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow @@ -74,14 +76,14 @@ case class AdaptiveSparkPlanExec( @transient private val lock = new Object() - @transient private val logOnLevel: ( => String) => Unit = conf.adaptiveExecutionLogLevel match { - case "TRACE" => logTrace(_) - case "DEBUG" => logDebug(_) - case "INFO" => logInfo(_) - case "WARN" => logWarning(_) - case "ERROR" => logError(_) - case _ => logDebug(_) - } + @transient private val logOnLevel: ( => MessageWithContext) => Unit = + conf.adaptiveExecutionLogLevel match { + case "TRACE" => logTrace(_) + case "INFO" => logInfo(_) + case "WARN" => logWarning(_) + case "ERROR" => logError(_) + case _ => logDebug(_) + } @transient private val planChangeLogger = new PlanChangeLogger[SparkPlan]() @@ -358,8 +360,9 @@ case class AdaptiveSparkPlanExec( val newCost = costEvaluator.evaluateCost(newPhysicalPlan) if (newCost < origCost || (newCost == origCost && currentPhysicalPlan != newPhysicalPlan)) { - logOnLevel("Plan changed:\n" + - sideBySide(currentPhysicalPlan.treeString, newPhysicalPlan.treeString).mkString("\n")) + lazy val plans = + sideBySide(currentPhysicalPlan.treeString, newPhysicalPlan.treeString).mkString("\n") + logOnLevel(log"Plan changed:\n${MDC(QUERY_PLAN, plans)}") cleanUpTempTags(newPhysicalPlan) currentPhysicalPlan = newPhysicalPlan currentLogicalPlan = newLogicalPlan @@ -389,7 +392,7 @@ case class AdaptiveSparkPlanExec( if (shouldUpdatePlan && currentPhysicalPlan.exists(_.subqueries.nonEmpty)) { getExecutionId.foreach(onUpdatePlan(_, Seq.empty)) } - logOnLevel(s"Final plan:\n$currentPhysicalPlan") + logOnLevel(log"Final plan:\n${MDC(QUERY_PLAN, currentPhysicalPlan)}") } override def executeCollect(): Array[InternalRow] = { @@ -742,7 +745,8 @@ case class AdaptiveSparkPlanExec( Some((finalPlan, optimized)) } catch { case e: InvalidAQEPlanException[_] => - logOnLevel(s"Re-optimize - ${e.getMessage()}:\n${e.plan}") + logOnLevel(log"Re-optimize - ${MDC(EXCEPTION, e.getMessage())}:\n" + + log"${MDC(QUERY_PLAN, e.plan)}") None } } @@ -800,7 +804,8 @@ case class AdaptiveSparkPlanExec( s.cancel() } catch { case NonFatal(t) => - logError(s"Exception in cancelling query stage: ${s.treeString}", t) + logError(log"Exception in cancelling query stage: " + + log"${MDC(QUERY_PLAN, s.treeString)}", t) } case _ => } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala index 67d38b28c83e..af894157c1ba 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.command +import org.apache.spark.internal.LogKey._ +import org.apache.spark.internal.MDC import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, CTERelationDef, LogicalPlan, WithCTE} @@ -70,7 +72,7 @@ case class InsertIntoDataSourceDirCommand( sparkSession.sessionState.executePlan(dataSource.planForWriting(saveMode, query)).toRdd } catch { case ex: AnalysisException => - logError(s"Failed to write to directory " + storage.locationUri.toString, ex) + logError(log"Failed to write to directory ${MDC(URI, storage.locationUri.toString)}", ex) throw ex } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index 1283bd880908..1bf5fb26a1f6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.command import java.net.URI +import org.apache.spark.internal.LogKey._ +import org.apache.spark.internal.MDC import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, CTERelationDef, LogicalPlan, WithCTE} @@ -230,7 +232,8 @@ case class CreateDataSourceTableAsSelectCommand( dataSource.writeAndRead(mode, query, outputColumnNames) } catch { case ex: AnalysisException => - logError(s"Failed to write to table ${table.identifier.unquotedString}", ex) + logError(log"Failed to write to table " + + log"${MDC(IDENTIFIER, table.identifier.unquotedString)}", ex) throw ex } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 5a853a269848..3993cebd5eef 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -29,7 +29,7 @@ import org.apache.hadoop.fs._ import org.apache.hadoop.mapred.{FileInputFormat, JobConf} import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKey.{ACTUAL_PARTITION_COLUMN, EXPECTED_PARTITION_COLUMN, PATH} +import org.apache.spark.internal.LogKey._ import org.apache.spark.internal.config.RDD_PARALLEL_LISTING_THRESHOLD import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier @@ -737,10 +737,10 @@ case class RepairTableCommand( spark.catalog.refreshTable(tableIdentWithDB) } catch { case NonFatal(e) => - logError(s"Cannot refresh the table '$tableIdentWithDB'. A query of the table " + - "might return wrong result if the table was cached. To avoid such issue, you should " + - "uncache the table manually via the UNCACHE TABLE command after table recovering will " + - "complete fully.", e) + logError(log"Cannot refresh the table '${MDC(IDENTIFIER, tableIdentWithDB)}'. " + + log"A query of the table might return wrong result if the table was cached. " + + log"To avoid such issue, you should uncache the table manually via the UNCACHE TABLE " + + log"command after table recovering will complete fully.", e) } logInfo(s"Recovered all partitions: added ($addedAmount), dropped ($droppedAmount).") Seq.empty[Row] @@ -1030,7 +1030,8 @@ object DDLUtils extends Logging { DataSource.lookupDataSource(provider, SQLConf.get).getConstructor().newInstance() } catch { case e: Throwable => - logError(s"Failed to find data source: $provider when check data column names.", e) + logError(log"Failed to find data source: ${MDC(DATA_SOURCE_PROVIDER, provider)} " + + log"when check data column names.", e) return } source match { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index 3bfa3413f679..65a73261e833 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -26,7 +26,8 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl import org.apache.spark._ -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey._ import org.apache.spark.internal.io.{FileCommitProtocol, SparkHadoopWriterUtils} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow @@ -281,7 +282,7 @@ object FileFormatWriter extends Logging { // return a set of all the partition paths that were updated during this job ret.map(_.summary.updatedPartitions).reduceOption(_ ++ _).getOrElse(Set.empty) } catch { case cause: Throwable => - logError(s"Aborting job ${description.uuid}.", cause) + logError(log"Aborting job ${MDC(WRITE_JOB_UUID, description.uuid)}.", cause) committer.abortJob(job) throw cause } @@ -404,7 +405,7 @@ object FileFormatWriter extends Logging { })(catchBlock = { // If there is an error, abort the task dataWriter.abort() - logError(s"Job $jobId aborted.") + logError(log"Job ${MDC(JOB_ID, jobId)} aborted.") }, finallyBlock = { dataWriter.close() }) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala index c65c15fb0ef2..b8aff58c1268 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala @@ -20,7 +20,8 @@ package org.apache.spark.sql.execution.datasources.v2 import scala.jdk.CollectionConverters._ import org.apache.spark.{SparkEnv, SparkException, TaskContext} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey._ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.NoSuchTableException @@ -402,16 +403,17 @@ trait V2TableWriteExec extends V2CommandExec with UnaryExecNode { commitProgress = Some(StreamWriterCommitProgress(totalNumRowsAccumulator.value)) } catch { case cause: Throwable => - logError(s"Data source write support $batchWrite is aborting.") + logError(log"Data source write support ${MDC(BATCH_WRITE, batchWrite)} is aborting.") try { batchWrite.abort(messages) } catch { case t: Throwable => - logError(s"Data source write support $batchWrite failed to abort.") + logError(log"Data source write support ${MDC(BATCH_WRITE, batchWrite)} " + + log"failed to abort.") cause.addSuppressed(t) throw QueryExecutionErrors.writingJobFailedError(cause) } - logError(s"Data source write support $batchWrite aborted.") + logError(log"Data source write support ${MDC(BATCH_WRITE, batchWrite)} aborted.") throw cause } @@ -472,11 +474,13 @@ trait WritingSparkTask[W <: DataWriter[InternalRow]] extends Logging with Serial })(catchBlock = { // If there is an error, abort this writer - logError(s"Aborting commit for partition $partId (task $taskId, attempt $attemptId, " + - s"stage $stageId.$stageAttempt)") + logError(log"Aborting commit for partition ${MDC(PARTITION_ID, partId)} " + + log"(task ${MDC(TASK_ID, taskId)}, attempt ${MDC(TASK_ATTEMPT_ID, attemptId)}, " + + log"stage ${MDC(STAGE_ID, stageId)}.${MDC(STAGE_ATTEMPT, stageAttempt)})") dataWriter.abort() - logError(s"Aborted commit for partition $partId (task $taskId, attempt $attemptId, " + - s"stage $stageId.$stageAttempt)") + logError(log"Aborted commit for partition ${MDC(PARTITION_ID, partId)} " + + log"(task ${MDC(TASK_ID, taskId)}, attempt ${MDC(TASK_ATTEMPT_ID, attemptId)}, " + + log"stage ${MDC(STAGE_ID, stageId)}.${MDC(STAGE_ATTEMPT, stageAttempt)})") }, finallyBlock = { dataWriter.close() }) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala index 866a62a3a077..a760632b3fc6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala @@ -25,6 +25,8 @@ import scala.concurrent.duration.NANOSECONDS import scala.util.control.NonFatal import org.apache.spark.{broadcast, SparkException} +import org.apache.spark.internal.LogKey._ +import org.apache.spark.internal.MDC import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.UnsafeRow @@ -212,7 +214,7 @@ case class BroadcastExchangeExec( relationFuture.get(timeout, TimeUnit.SECONDS).asInstanceOf[broadcast.Broadcast[T]] } catch { case ex: TimeoutException => - logError(s"Could not execute broadcast in $timeout secs.", ex) + logError(log"Could not execute broadcast in ${MDC(TIMEOUT, timeout)} secs.", ex) if (!relationFuture.isDone) { sparkContext.cancelJobsWithTag(jobTag) relationFuture.cancel(true) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org