This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 62dd64a5d13d [SPARK-47583][CORE] SQL core: Migrate logError with 
variables to structured logging framework
62dd64a5d13d is described below

commit 62dd64a5d13d14a4e3bce50d9c264f8e494c7863
Author: Daniel Tenedorio <daniel.tenedo...@databricks.com>
AuthorDate: Wed Apr 24 13:43:05 2024 -0700

    [SPARK-47583][CORE] SQL core: Migrate logError with variables to structured 
logging framework
    
    ### What changes were proposed in this pull request?
    
    Migrate logError with variables of the sql/core module to structured 
logging framework. This transforms the logError entries of the following API
    ```
    def logError(msg: => String): Unit
    ```
    to
    ```
    def logError(entry: LogEntry): Unit
    ```
    
    ### Why are the changes needed?
    
    To enhance Apache Spark's logging system by implementing structured logging.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, Spark core logs will contain additional MDC
    
    ### How was this patch tested?
    
    Compiler and scala style checks, as well as code review.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #45969 from dtenedor/log-error-sql-core.
    
    Authored-by: Daniel Tenedorio <daniel.tenedo...@databricks.com>
    Signed-off-by: Gengliang Wang <gengli...@apache.org>
---
 .../scala/org/apache/spark/internal/LogKey.scala   |  6 +++++
 .../execution/BaseScriptTransformationExec.scala   | 10 ++++---
 .../execution/adaptive/AdaptiveSparkPlanExec.scala | 31 +++++++++++++---------
 .../command/InsertIntoDataSourceDirCommand.scala   |  4 ++-
 .../execution/command/createDataSourceTables.scala |  5 +++-
 .../apache/spark/sql/execution/command/ddl.scala   | 13 ++++-----
 .../execution/datasources/FileFormatWriter.scala   |  7 ++---
 .../datasources/v2/WriteToDataSourceV2Exec.scala   | 20 ++++++++------
 .../execution/exchange/BroadcastExchangeExec.scala |  4 ++-
 9 files changed, 63 insertions(+), 37 deletions(-)

diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala 
b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
index b9b0e372a2b0..fab5e80dd0e6 100644
--- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
+++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
@@ -34,6 +34,7 @@ object LogKey extends Enumeration {
   val ARGS = Value
   val BACKUP_FILE = Value
   val BATCH_ID = Value
+  val BATCH_WRITE = Value
   val BLOCK_ID = Value
   val BLOCK_MANAGER_ID = Value
   val BROADCAST_ID = Value
@@ -116,6 +117,7 @@ object LogKey extends Enumeration {
   val ESTIMATOR_PARAMETER_MAP = Value
   val EVENT_LOOP = Value
   val EVENT_QUEUE = Value
+  val EXCEPTION = Value
   val EXECUTE_INFO = Value
   val EXECUTE_KEY = Value
   val EXECUTION_PLAN_LEAVES = Value
@@ -162,6 +164,7 @@ object LogKey extends Enumeration {
   val HIVE_OPERATION_TYPE = Value
   val HOST = Value
   val HOST_PORT = Value
+  val IDENTIFIER = Value
   val INCOMPATIBLE_TYPES = Value
   val INDEX = Value
   val INDEX_FILE_NUM = Value
@@ -330,11 +333,13 @@ object LogKey extends Enumeration {
   val SPARK_PLAN_ID = Value
   val SQL_TEXT = Value
   val SRC_PATH = Value
+  val STAGE_ATTEMPT = Value
   val STAGE_ID = Value
   val START_INDEX = Value
   val STATEMENT_ID = Value
   val STATE_STORE_PROVIDER = Value
   val STATUS = Value
+  val STDERR = Value
   val STORAGE_LEVEL = Value
   val STORAGE_LEVEL_DESERIALIZED = Value
   val STORAGE_LEVEL_REPLICATION = Value
@@ -402,6 +407,7 @@ object LogKey extends Enumeration {
   val WEIGHTED_NUM = Value
   val WORKER_URL = Value
   val WRITE_AHEAD_LOG_INFO = Value
+  val WRITE_JOB_UUID = Value
   val XSD_PATH = Value
 
   type LogKey = Value
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala
index 91042b59677b..6e54bde46942 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala
@@ -27,7 +27,8 @@ import scala.util.control.NonFatal
 import org.apache.hadoop.conf.Configuration
 
 import org.apache.spark.{SparkFiles, TaskContext}
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey._
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, 
Cast, Expression, GenericInternalRow, JsonToStructs, Literal, StructsToJson, 
UnsafeProjection}
@@ -185,7 +186,7 @@ trait BaseScriptTransformationExec extends UnaryExecNode {
     if (!proc.isAlive) {
       val exitCode = proc.exitValue()
       if (exitCode != 0) {
-        logError(stderrBuffer.toString) // log the stderr circular buffer
+        logError(log"${MDC(STDERR, stderrBuffer.toString)}") // log the stderr 
circular buffer
         throw QueryExecutionErrors.subprocessExitedError(exitCode, 
stderrBuffer, cause)
       }
     }
@@ -329,12 +330,13 @@ abstract class BaseScriptTransformationWriterThread 
extends Thread with Logging
         // Javadoc this call will not throw an exception:
         _exception = t
         proc.destroy()
-        logError(s"Thread-${this.getClass.getSimpleName}-Feed exit cause by: 
", t)
+        logError(log"Thread-${MDC(CLASS_NAME, 
this.getClass.getSimpleName)}-Feed " +
+          log"exit cause by: ", t)
     } finally {
       try {
         Utils.tryLogNonFatalError(outputStream.close())
         if (proc.waitFor() != 0) {
-          logError(stderrBuffer.toString) // log the stderr circular buffer
+          logError(log"${MDC(STDERR, stderrBuffer.toString)}") // log the 
stderr circular buffer
         }
       } catch {
         case NonFatal(exceptionFromFinallyBlock) =>
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
index ca4400068250..12ca0652f1b7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
@@ -28,6 +28,8 @@ import scala.util.control.NonFatal
 
 import org.apache.spark.SparkException
 import org.apache.spark.broadcast
+import org.apache.spark.internal.{MDC, MessageWithContext}
+import org.apache.spark.internal.LogKey._
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
@@ -74,14 +76,14 @@ case class AdaptiveSparkPlanExec(
 
   @transient private val lock = new Object()
 
-  @transient private val logOnLevel: ( => String) => Unit = 
conf.adaptiveExecutionLogLevel match {
-    case "TRACE" => logTrace(_)
-    case "DEBUG" => logDebug(_)
-    case "INFO" => logInfo(_)
-    case "WARN" => logWarning(_)
-    case "ERROR" => logError(_)
-    case _ => logDebug(_)
-  }
+  @transient private val logOnLevel: ( => MessageWithContext) => Unit =
+    conf.adaptiveExecutionLogLevel match {
+      case "TRACE" => logTrace(_)
+      case "INFO" => logInfo(_)
+      case "WARN" => logWarning(_)
+      case "ERROR" => logError(_)
+      case _ => logDebug(_)
+    }
 
   @transient private val planChangeLogger = new PlanChangeLogger[SparkPlan]()
 
@@ -358,8 +360,9 @@ case class AdaptiveSparkPlanExec(
           val newCost = costEvaluator.evaluateCost(newPhysicalPlan)
           if (newCost < origCost ||
             (newCost == origCost && currentPhysicalPlan != newPhysicalPlan)) {
-            logOnLevel("Plan changed:\n" +
-              sideBySide(currentPhysicalPlan.treeString, 
newPhysicalPlan.treeString).mkString("\n"))
+            lazy val plans =
+              sideBySide(currentPhysicalPlan.treeString, 
newPhysicalPlan.treeString).mkString("\n")
+            logOnLevel(log"Plan changed:\n${MDC(QUERY_PLAN, plans)}")
             cleanUpTempTags(newPhysicalPlan)
             currentPhysicalPlan = newPhysicalPlan
             currentLogicalPlan = newLogicalPlan
@@ -389,7 +392,7 @@ case class AdaptiveSparkPlanExec(
     if (shouldUpdatePlan && currentPhysicalPlan.exists(_.subqueries.nonEmpty)) 
{
       getExecutionId.foreach(onUpdatePlan(_, Seq.empty))
     }
-    logOnLevel(s"Final plan:\n$currentPhysicalPlan")
+    logOnLevel(log"Final plan:\n${MDC(QUERY_PLAN, currentPhysicalPlan)}")
   }
 
   override def executeCollect(): Array[InternalRow] = {
@@ -742,7 +745,8 @@ case class AdaptiveSparkPlanExec(
       Some((finalPlan, optimized))
     } catch {
       case e: InvalidAQEPlanException[_] =>
-        logOnLevel(s"Re-optimize - ${e.getMessage()}:\n${e.plan}")
+        logOnLevel(log"Re-optimize - ${MDC(EXCEPTION, e.getMessage())}:\n" +
+          log"${MDC(QUERY_PLAN, e.plan)}")
         None
     }
   }
@@ -800,7 +804,8 @@ case class AdaptiveSparkPlanExec(
           s.cancel()
         } catch {
           case NonFatal(t) =>
-            logError(s"Exception in cancelling query stage: ${s.treeString}", 
t)
+            logError(log"Exception in cancelling query stage: " +
+              log"${MDC(QUERY_PLAN, s.treeString)}", t)
         }
       case _ =>
     }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala
index 67d38b28c83e..af894157c1ba 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.execution.command
 
+import org.apache.spark.internal.LogKey._
+import org.apache.spark.internal.MDC
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, 
CTERelationDef, LogicalPlan, WithCTE}
@@ -70,7 +72,7 @@ case class InsertIntoDataSourceDirCommand(
       
sparkSession.sessionState.executePlan(dataSource.planForWriting(saveMode, 
query)).toRdd
     } catch {
       case ex: AnalysisException =>
-        logError(s"Failed to write to directory " + 
storage.locationUri.toString, ex)
+        logError(log"Failed to write to directory ${MDC(URI, 
storage.locationUri.toString)}", ex)
         throw ex
     }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index 1283bd880908..1bf5fb26a1f6 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.command
 
 import java.net.URI
 
+import org.apache.spark.internal.LogKey._
+import org.apache.spark.internal.MDC
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, 
CTERelationDef, LogicalPlan, WithCTE}
@@ -230,7 +232,8 @@ case class CreateDataSourceTableAsSelectCommand(
       dataSource.writeAndRead(mode, query, outputColumnNames)
     } catch {
       case ex: AnalysisException =>
-        logError(s"Failed to write to table 
${table.identifier.unquotedString}", ex)
+        logError(log"Failed to write to table " +
+          log"${MDC(IDENTIFIER, table.identifier.unquotedString)}", ex)
         throw ex
     }
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 5a853a269848..3993cebd5eef 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -29,7 +29,7 @@ import org.apache.hadoop.fs._
 import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
 
 import org.apache.spark.internal.{Logging, MDC}
-import org.apache.spark.internal.LogKey.{ACTUAL_PARTITION_COLUMN, 
EXPECTED_PARTITION_COLUMN, PATH}
+import org.apache.spark.internal.LogKey._
 import org.apache.spark.internal.config.RDD_PARALLEL_LISTING_THRESHOLD
 import org.apache.spark.sql.{Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
@@ -737,10 +737,10 @@ case class RepairTableCommand(
       spark.catalog.refreshTable(tableIdentWithDB)
     } catch {
       case NonFatal(e) =>
-        logError(s"Cannot refresh the table '$tableIdentWithDB'. A query of 
the table " +
-          "might return wrong result if the table was cached. To avoid such 
issue, you should " +
-          "uncache the table manually via the UNCACHE TABLE command after 
table recovering will " +
-          "complete fully.", e)
+        logError(log"Cannot refresh the table '${MDC(IDENTIFIER, 
tableIdentWithDB)}'. " +
+          log"A query of the table might return wrong result if the table was 
cached. " +
+          log"To avoid such issue, you should uncache the table manually via 
the UNCACHE TABLE " +
+          log"command after table recovering will complete fully.", e)
     }
     logInfo(s"Recovered all partitions: added ($addedAmount), dropped 
($droppedAmount).")
     Seq.empty[Row]
@@ -1030,7 +1030,8 @@ object DDLUtils extends Logging {
       DataSource.lookupDataSource(provider, 
SQLConf.get).getConstructor().newInstance()
     } catch {
       case e: Throwable =>
-        logError(s"Failed to find data source: $provider when check data 
column names.", e)
+        logError(log"Failed to find data source: ${MDC(DATA_SOURCE_PROVIDER, 
provider)} " +
+          log"when check data column names.", e)
         return
     }
     source match {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
index 3bfa3413f679..65a73261e833 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
@@ -26,7 +26,8 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
 
 import org.apache.spark._
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey._
 import org.apache.spark.internal.io.{FileCommitProtocol, 
SparkHadoopWriterUtils}
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
@@ -281,7 +282,7 @@ object FileFormatWriter extends Logging {
       // return a set of all the partition paths that were updated during this 
job
       ret.map(_.summary.updatedPartitions).reduceOption(_ ++ 
_).getOrElse(Set.empty)
     } catch { case cause: Throwable =>
-      logError(s"Aborting job ${description.uuid}.", cause)
+      logError(log"Aborting job ${MDC(WRITE_JOB_UUID, description.uuid)}.", 
cause)
       committer.abortJob(job)
       throw cause
     }
@@ -404,7 +405,7 @@ object FileFormatWriter extends Logging {
     })(catchBlock = {
       // If there is an error, abort the task
       dataWriter.abort()
-      logError(s"Job $jobId aborted.")
+      logError(log"Job ${MDC(JOB_ID, jobId)} aborted.")
     }, finallyBlock = {
       dataWriter.close()
     })
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
index c65c15fb0ef2..b8aff58c1268 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
@@ -20,7 +20,8 @@ package org.apache.spark.sql.execution.datasources.v2
 import scala.jdk.CollectionConverters._
 
 import org.apache.spark.{SparkEnv, SparkException, TaskContext}
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey._
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
@@ -402,16 +403,17 @@ trait V2TableWriteExec extends V2CommandExec with 
UnaryExecNode {
       commitProgress = 
Some(StreamWriterCommitProgress(totalNumRowsAccumulator.value))
     } catch {
       case cause: Throwable =>
-        logError(s"Data source write support $batchWrite is aborting.")
+        logError(log"Data source write support ${MDC(BATCH_WRITE, batchWrite)} 
is aborting.")
         try {
           batchWrite.abort(messages)
         } catch {
           case t: Throwable =>
-            logError(s"Data source write support $batchWrite failed to abort.")
+            logError(log"Data source write support ${MDC(BATCH_WRITE, 
batchWrite)} " +
+              log"failed to abort.")
             cause.addSuppressed(t)
             throw QueryExecutionErrors.writingJobFailedError(cause)
         }
-        logError(s"Data source write support $batchWrite aborted.")
+        logError(log"Data source write support ${MDC(BATCH_WRITE, batchWrite)} 
aborted.")
         throw cause
     }
 
@@ -472,11 +474,13 @@ trait WritingSparkTask[W <: DataWriter[InternalRow]] 
extends Logging with Serial
 
     })(catchBlock = {
       // If there is an error, abort this writer
-      logError(s"Aborting commit for partition $partId (task $taskId, attempt 
$attemptId, " +
-            s"stage $stageId.$stageAttempt)")
+      logError(log"Aborting commit for partition ${MDC(PARTITION_ID, partId)} 
" +
+        log"(task ${MDC(TASK_ID, taskId)}, attempt ${MDC(TASK_ATTEMPT_ID, 
attemptId)}, " +
+        log"stage ${MDC(STAGE_ID, stageId)}.${MDC(STAGE_ATTEMPT, 
stageAttempt)})")
       dataWriter.abort()
-      logError(s"Aborted commit for partition $partId (task $taskId, attempt 
$attemptId, " +
-            s"stage $stageId.$stageAttempt)")
+      logError(log"Aborted commit for partition ${MDC(PARTITION_ID, partId)} " 
+
+        log"(task ${MDC(TASK_ID, taskId)}, attempt ${MDC(TASK_ATTEMPT_ID, 
attemptId)}, " +
+        log"stage ${MDC(STAGE_ID, stageId)}.${MDC(STAGE_ATTEMPT, 
stageAttempt)})")
     }, finallyBlock = {
       dataWriter.close()
     })
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
index 866a62a3a077..a760632b3fc6 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
@@ -25,6 +25,8 @@ import scala.concurrent.duration.NANOSECONDS
 import scala.util.control.NonFatal
 
 import org.apache.spark.{broadcast, SparkException}
+import org.apache.spark.internal.LogKey._
+import org.apache.spark.internal.MDC
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
@@ -212,7 +214,7 @@ case class BroadcastExchangeExec(
       relationFuture.get(timeout, 
TimeUnit.SECONDS).asInstanceOf[broadcast.Broadcast[T]]
     } catch {
       case ex: TimeoutException =>
-        logError(s"Could not execute broadcast in $timeout secs.", ex)
+        logError(log"Could not execute broadcast in ${MDC(TIMEOUT, timeout)} 
secs.", ex)
         if (!relationFuture.isDone) {
           sparkContext.cancelJobsWithTag(jobTag)
           relationFuture.cancel(true)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to