This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f10ad3d56896 [SPARK-47804] Add Dataframe cache debug log
f10ad3d56896 is described below

commit f10ad3d56896f8a0eb9b0c73a6ee628cfc7df3a2
Author: Xinyi Yu <xinyi...@databricks.com>
AuthorDate: Mon Apr 15 18:37:36 2024 -0700

    [SPARK-47804] Add Dataframe cache debug log
    
    ### What changes were proposed in this pull request?
    
    This PR adds a debug log for Dataframe cache that uses SQL conf to turn on. 
It logs necessary information on
    * cache hit during cache application (the application happens basically on 
every query)
    * cache miss
    * adding new cache entries
    * removing cache entries (including clear all entries)
    
    Because every query applies cache, this log could be huge and should be 
only turned on during some debugging process, and should not enabled by default 
in production.
    
    Example:
    ```
    spark.conf.set("spark.sql.dataframeCache.logLevel", "warn")
    val df = spark.range(1, 10)
    
    df.collect()
    {"ts":"2024-04-10T16:41:10.010-0700","level":"WARN","msg":"Dataframe cache 
miss for input plan:\nRange (1, 10, step=1, 
splits=Some(10))\n","logger":"org.apache.spark.sql.execution.CacheManager"}
    {"ts":"2024-04-10T16:41:10.010-0700","level":"WARN","msg":"Last 20 
Dataframe cache entry logical 
plans:\n[]","logger":"org.apache.spark.sql.execution.CacheManager"}
    
    df.cache()
    {"ts":"2024-04-10T16:42:18.647-0700","level":"WARN","msg":"Dataframe cache 
miss for input plan:\nRange (1, 10, step=1, 
splits=Some(10))\n","logger":"org.apache.spark.sql.execution.CacheManager"}
    {"ts":"2024-04-10T16:42:18.647-0700","level":"WARN","msg":"Last 20 
Dataframe cache entry logical 
plans:\n[]","logger":"org.apache.spark.sql.execution.CacheManager"}
    {"ts":"2024-04-10T16:42:18.662-0700","level":"WARN","msg":"Added Dataframe 
cache entry:\nCachedData(\nlogicalPlan=Range (1, 10, step=1, 
splits=Some(10))\n\nInMemoryRelation=InMemoryRelation [id#2L], 
StorageLevel(disk, memory, deserialized, 1 replicas)\n   +- *(1) Range (1, 10, 
step=1, splits=10)\n)\n","logger":"org.apache.spark.sql.execution.CacheManager"}
    
    df.count()
    {"ts":"2024-04-10T16:43:36.033-0700","level":"WARN","msg":"Dataframe cache 
hit for input plan:\nRange (1, 10, step=1, splits=Some(10))\nmatched with cache 
entry:\nCachedData(\nlogicalPlan=Range (1, 10, step=1, 
splits=Some(10))\n\nInMemoryRelation=InMemoryRelation [id#2L], 
StorageLevel(disk, memory, deserialized, 1 replicas)\n   +- *(1) Range (1, 10, 
step=1, splits=10)\n)\n","logger":"org.apache.spark.sql.execution.CacheManager"}
    {"ts":"2024-04-10T16:43:36.041-0700","level":"WARN","msg":"Dataframe cache 
hit plan change summary:\n Aggregate [count(1) AS count#13L]           
Aggregate [count(1) AS count#13L]\n!+- Range (1, 10, step=1, splits=Some(10))   
+- InMemoryRelation [id#2L], StorageLevel(disk, memory, deserialized, 1 
replicas)\n!                                                  +- *(1) Range (1, 
10, step=1, splits=10)","logger":"org.apache.spark.sql.execution.CacheManager"}
    
    df.unpersist()
    {"ts":"2024-04-10T16:44:15.965-0700","level":"WARN","msg":"Removed 1 
Dataframe cache entries, with logical plans being \n[Range (1, 10, step=1, 
splits=Some(10))\n]","logger":"org.apache.spark.sql.execution.CacheManager"}
    ```
    
    ### Why are the changes needed?
    Easier debugging.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Run local spark shell.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #45990 from anchovYu/SPARK-47804.
    
    Authored-by: Xinyi Yu <xinyi...@databricks.com>
    Signed-off-by: Gengliang Wang <gengli...@apache.org>
---
 .../scala/org/apache/spark/internal/LogKey.scala   |  2 +
 .../scala/org/apache/spark/internal/Logging.scala  | 32 ++++++++++++
 common/utils/src/test/resources/log4j2.properties  |  4 +-
 .../apache/spark/util/StructuredLoggingSuite.scala | 20 ++++++--
 .../org/apache/spark/sql/internal/SQLConf.scala    | 16 ++++++
 .../apache/spark/sql/execution/CacheManager.scala  | 59 ++++++++++++++++++++--
 6 files changed, 121 insertions(+), 12 deletions(-)

diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala 
b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
index 2967b7754525..77317782068b 100644
--- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
+++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
@@ -58,6 +58,7 @@ object LogKey extends Enumeration {
   val CSV_SOURCE = Value
   val DATA = Value
   val DATABASE_NAME = Value
+  val DATAFRAME_CACHE_ENTRY = Value
   val DRIVER_ID = Value
   val DROPPED_PARTITIONS = Value
   val END_POINT = Value
@@ -131,6 +132,7 @@ object LogKey extends Enumeration {
   val QUERY_HINT = Value
   val QUERY_ID = Value
   val QUERY_PLAN = Value
+  val QUERY_PLAN_COMPARISON = Value
   val QUERY_PLAN_LENGTH_ACTUAL = Value
   val QUERY_PLAN_LENGTH_MAX = Value
   val RANGE = Value
diff --git 
a/common/utils/src/main/scala/org/apache/spark/internal/Logging.scala 
b/common/utils/src/main/scala/org/apache/spark/internal/Logging.scala
index 1e0419accb50..607f3637e641 100644
--- a/common/utils/src/main/scala/org/apache/spark/internal/Logging.scala
+++ b/common/utils/src/main/scala/org/apache/spark/internal/Logging.scala
@@ -157,10 +157,42 @@ trait Logging {
     if (log.isDebugEnabled) log.debug(msg)
   }
 
+  protected def logDebug(entry: LogEntry): Unit = {
+    if (log.isDebugEnabled) {
+      withLogContext(entry.context) {
+        log.debug(entry.message)
+      }
+    }
+  }
+
+  protected def logDebug(entry: LogEntry, throwable: Throwable): Unit = {
+    if (log.isDebugEnabled) {
+      withLogContext(entry.context) {
+        log.debug(entry.message, throwable)
+      }
+    }
+  }
+
   protected def logTrace(msg: => String): Unit = {
     if (log.isTraceEnabled) log.trace(msg)
   }
 
+  protected def logTrace(entry: LogEntry): Unit = {
+    if (log.isTraceEnabled) {
+      withLogContext(entry.context) {
+        log.trace(entry.message)
+      }
+    }
+  }
+
+  protected def logTrace(entry: LogEntry, throwable: Throwable): Unit = {
+    if (log.isTraceEnabled) {
+      withLogContext(entry.context) {
+        log.trace(entry.message, throwable)
+      }
+    }
+  }
+
   protected def logWarning(msg: => String): Unit = {
     if (log.isWarnEnabled) log.warn(msg)
   }
diff --git a/common/utils/src/test/resources/log4j2.properties 
b/common/utils/src/test/resources/log4j2.properties
index 2c7563ec8d3d..e3bd8689993d 100644
--- a/common/utils/src/test/resources/log4j2.properties
+++ b/common/utils/src/test/resources/log4j2.properties
@@ -40,11 +40,11 @@ appender.pattern.layout.pattern = %d{yy/MM/dd HH:mm:ss} %p 
%c{1}: %m%n%ex
 
 # Custom loggers
 logger.structured.name = org.apache.spark.util.StructuredLoggingSuite
-logger.structured.level = info
+logger.structured.level = trace
 logger.structured.appenderRefs = structured
 logger.structured.appenderRef.structured.ref = structured
 
 logger.pattern.name = org.apache.spark.util.PatternLoggingSuite
-logger.pattern.level = info
+logger.pattern.level = trace
 logger.pattern.appenderRefs = pattern
 logger.pattern.appenderRef.pattern.ref = pattern
diff --git 
a/common/utils/src/test/scala/org/apache/spark/util/StructuredLoggingSuite.scala
 
b/common/utils/src/test/scala/org/apache/spark/util/StructuredLoggingSuite.scala
index c18b5517f0f2..fe03c190fa85 100644
--- 
a/common/utils/src/test/scala/org/apache/spark/util/StructuredLoggingSuite.scala
+++ 
b/common/utils/src/test/scala/org/apache/spark/util/StructuredLoggingSuite.scala
@@ -85,7 +85,9 @@ trait LoggingSuiteBase
     Seq(
       (Level.ERROR, () => logError(basicMsg)),
       (Level.WARN, () => logWarning(basicMsg)),
-      (Level.INFO, () => logInfo(basicMsg))).foreach { case (level, logFunc) =>
+      (Level.INFO, () => logInfo(basicMsg)),
+      (Level.DEBUG, () => logDebug(basicMsg)),
+      (Level.TRACE, () => logTrace(basicMsg))).foreach { case (level, logFunc) 
=>
       val logOutput = captureLogOutput(logFunc)
       assert(expectedPatternForBasicMsg(level).r.matches(logOutput))
     }
@@ -96,7 +98,9 @@ trait LoggingSuiteBase
     Seq(
       (Level.ERROR, () => logError(basicMsg, exception)),
       (Level.WARN, () => logWarning(basicMsg, exception)),
-      (Level.INFO, () => logInfo(basicMsg, exception))).foreach { case (level, 
logFunc) =>
+      (Level.INFO, () => logInfo(basicMsg, exception)),
+      (Level.DEBUG, () => logDebug(basicMsg, exception)),
+      (Level.TRACE, () => logTrace(basicMsg, exception))).foreach { case 
(level, logFunc) =>
       val logOutput = captureLogOutput(logFunc)
       
assert(expectedPatternForBasicMsgWithException(level).r.matches(logOutput))
     }
@@ -106,7 +110,9 @@ trait LoggingSuiteBase
     Seq(
       (Level.ERROR, () => logError(msgWithMDC)),
       (Level.WARN, () => logWarning(msgWithMDC)),
-      (Level.INFO, () => logInfo(msgWithMDC))).foreach {
+      (Level.INFO, () => logInfo(msgWithMDC)),
+      (Level.DEBUG, () => logDebug(msgWithMDC)),
+      (Level.TRACE, () => logTrace(msgWithMDC))).foreach {
         case (level, logFunc) =>
           val logOutput = captureLogOutput(logFunc)
           assert(expectedPatternForMsgWithMDC(level).r.matches(logOutput))
@@ -129,7 +135,9 @@ trait LoggingSuiteBase
     Seq(
       (Level.ERROR, () => logError(msgWithMDCAndException, exception)),
       (Level.WARN, () => logWarning(msgWithMDCAndException, exception)),
-      (Level.INFO, () => logInfo(msgWithMDCAndException, exception))).foreach {
+      (Level.INFO, () => logInfo(msgWithMDCAndException, exception)),
+      (Level.DEBUG, () => logDebug(msgWithMDCAndException, exception)),
+      (Level.TRACE, () => logTrace(msgWithMDCAndException, 
exception))).foreach {
         case (level, logFunc) =>
           val logOutput = captureLogOutput(logFunc)
           
assert(expectedPatternForMsgWithMDCAndException(level).r.matches(logOutput))
@@ -140,7 +148,9 @@ trait LoggingSuiteBase
     Seq(
       (Level.ERROR, () => logError(msgWithConcat)),
       (Level.WARN, () => logWarning(msgWithConcat)),
-      (Level.INFO, () => logInfo(msgWithConcat))).foreach {
+      (Level.INFO, () => logInfo(msgWithConcat)),
+      (Level.DEBUG, () => logDebug(msgWithConcat)),
+      (Level.TRACE, () => logTrace(msgWithConcat))).foreach {
         case (level, logFunc) =>
           val logOutput = captureLogOutput(logFunc)
           verifyMsgWithConcat(level, logOutput)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index c8a5d997da7d..278d4dc8d302 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1615,6 +1615,20 @@ object SQLConf {
     .checkValues(StorageLevelMapper.values.map(_.name()).toSet)
     .createWithDefault(StorageLevelMapper.MEMORY_AND_DISK.name())
 
+  val DATAFRAME_CACHE_LOG_LEVEL = 
buildConf("spark.sql.dataframeCache.logLevel")
+    .internal()
+    .doc("Configures the log level of Dataframe cache operations, including 
adding and removing " +
+      "entries from Dataframe cache, hit and miss on cache application. The 
default log " +
+      "level is 'trace'. This log should only be used for debugging purposes 
and not in the " +
+      "production environment, since it generates a large amount of logs.")
+    .version("4.0.0")
+    .stringConf
+    .transform(_.toUpperCase(Locale.ROOT))
+    .checkValue(logLevel => Set("TRACE", "DEBUG", "INFO", "WARN", 
"ERROR").contains(logLevel),
+      "Invalid value for 'spark.sql.dataframeCache.logLevel'. Valid values are 
" +
+        "'trace', 'debug', 'info', 'warn' and 'error'.")
+    .createWithDefault("trace")
+
   val CROSS_JOINS_ENABLED = buildConf("spark.sql.crossJoin.enabled")
     .internal()
     .doc("When false, we will throw an error if a query contains a cartesian 
product without " +
@@ -5369,6 +5383,8 @@ class SQLConf extends Serializable with Logging with 
SqlApiConf {
   def defaultCacheStorageLevel: StorageLevel =
     StorageLevel.fromString(getConf(DEFAULT_CACHE_STORAGE_LEVEL))
 
+  def dataframeCacheLogLevel: String = getConf(DATAFRAME_CACHE_LOG_LEVEL)
+
   def crossJoinEnabled: Boolean = getConf(SQLConf.CROSS_JOINS_ENABLED)
 
   override def sessionLocalTimeZone: String = 
getConf(SQLConf.SESSION_LOCAL_TIMEZONE)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index 6c5639ef99d4..4f3cecd17894 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -21,13 +21,15 @@ import scala.collection.immutable.IndexedSeq
 
 import org.apache.hadoop.fs.{FileSystem, Path}
 
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{LogEntry, Logging, MDC}
+import org.apache.spark.internal.LogKey._
 import org.apache.spark.sql.{Dataset, SparkSession}
 import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
SubqueryExpression}
 import org.apache.spark.sql.catalyst.optimizer.EliminateResolvedHint
 import org.apache.spark.sql.catalyst.plans.logical.{IgnoreCachedData, 
LogicalPlan, ResolvedHint, SubqueryAlias, View}
 import org.apache.spark.sql.catalyst.trees.TreePattern.PLAN_EXPRESSION
+import org.apache.spark.sql.catalyst.util.sideBySide
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
 import org.apache.spark.sql.execution.columnar.InMemoryRelation
 import org.apache.spark.sql.execution.command.CommandUtils
@@ -38,7 +40,14 @@ import org.apache.spark.storage.StorageLevel
 import org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK
 
 /** Holds a cached logical plan and its data */
-case class CachedData(plan: LogicalPlan, cachedRepresentation: 
InMemoryRelation)
+case class CachedData(plan: LogicalPlan, cachedRepresentation: 
InMemoryRelation) {
+  override def toString: String =
+    s"""
+       |CachedData(
+       |logicalPlan=$plan
+       |InMemoryRelation=$cachedRepresentation)
+       |""".stripMargin
+}
 
 /**
  * Provides support in a SQLContext for caching query results and 
automatically using these cached
@@ -62,6 +71,7 @@ class CacheManager extends Logging with 
AdaptiveSparkPlanHelper {
   def clearCache(): Unit = this.synchronized {
     cachedData.foreach(_.cachedRepresentation.cacheBuilder.clearCache())
     cachedData = IndexedSeq[CachedData]()
+    CacheManager.logCacheOperation(log"Cleared all Dataframe cache entries")
   }
 
   /** Checks if the cache is empty. */
@@ -119,7 +129,10 @@ class CacheManager extends Logging with 
AdaptiveSparkPlanHelper {
         if (lookupCachedData(planToCache).nonEmpty) {
           logWarning("Data has already been cached.")
         } else {
-          cachedData = CachedData(planToCache, inMemoryRelation) +: cachedData
+          val cd = CachedData(planToCache, inMemoryRelation)
+          cachedData = cd +: cachedData
+          CacheManager.logCacheOperation(log"Added Dataframe cache entry:" +
+            log"${MDC(DATAFRAME_CACHE_ENTRY, cd)}")
         }
       }
     }
@@ -204,6 +217,9 @@ class CacheManager extends Logging with 
AdaptiveSparkPlanHelper {
       cachedData = cachedData.filterNot(cd => plansToUncache.exists(_ eq cd))
     }
     plansToUncache.foreach { 
_.cachedRepresentation.cacheBuilder.clearCache(blocking) }
+    CacheManager.logCacheOperation(log"Removed ${MDC(SIZE, 
plansToUncache.size)} Dataframe " +
+      log"cache entries, with logical plans being " +
+      log"\n[${MDC(QUERY_PLAN, plansToUncache.map(_.plan).mkString(",\n"))}]")
 
     // Re-compile dependent cached queries after removing the cached query.
     if (!cascade) {
@@ -268,6 +284,8 @@ class CacheManager extends Logging with 
AdaptiveSparkPlanHelper {
           logWarning("While recaching, data was already added to cache.")
         } else {
           cachedData = recomputedPlan +: cachedData
+          CacheManager.logCacheOperation(log"Re-cached Dataframe cache entry:" 
+
+            log"${MDC(DATAFRAME_CACHE_ENTRY, recomputedPlan)}")
         }
       }
     }
@@ -280,7 +298,13 @@ class CacheManager extends Logging with 
AdaptiveSparkPlanHelper {
 
   /** Optionally returns cached data for the given [[LogicalPlan]]. */
   def lookupCachedData(plan: LogicalPlan): Option[CachedData] = {
-    cachedData.find(cd => plan.sameResult(cd.plan))
+    val result = cachedData.find(cd => plan.sameResult(cd.plan))
+    if (result.isDefined) {
+      CacheManager.logCacheOperation(log"Dataframe cache hit for input plan:" +
+        log"\n${MDC(QUERY_PLAN, plan)} matched with cache entry:" +
+        log"${MDC(DATAFRAME_CACHE_ENTRY, result.get)}")
+    }
+    result
   }
 
   /** Replaces segments of the given logical plan with cached versions where 
possible. */
@@ -301,9 +325,21 @@ class CacheManager extends Logging with 
AdaptiveSparkPlanHelper {
         }.getOrElse(currentFragment)
     }
 
-    
newPlan.transformAllExpressionsWithPruning(_.containsPattern(PLAN_EXPRESSION)) {
+    val result = 
newPlan.transformAllExpressionsWithPruning(_.containsPattern(PLAN_EXPRESSION)) {
       case s: SubqueryExpression => s.withNewPlan(useCachedData(s.plan))
     }
+
+    if (result.fastEquals(plan)) {
+      CacheManager.logCacheOperation(
+        log"Dataframe cache miss for input plan:\n${MDC(QUERY_PLAN, plan)}")
+      CacheManager.logCacheOperation(log"Last 20 Dataframe cache entry logical 
plans:\n" +
+        log"[${MDC(DATAFRAME_CACHE_ENTRY, 
cachedData.take(20).map(_.plan).mkString(",\n"))}]")
+    } else {
+      CacheManager.logCacheOperation(log"Dataframe cache hit plan change 
summary:\n" +
+        log"${MDC(
+          QUERY_PLAN_COMPARISON, sideBySide(plan.treeString, 
result.treeString).mkString("\n"))}")
+    }
+    result
   }
 
   /**
@@ -396,3 +432,16 @@ class CacheManager extends Logging with 
AdaptiveSparkPlanHelper {
     SparkSession.getOrCloneSessionWithConfigsOff(session, disableConfigs)
   }
 }
+
+object CacheManager extends Logging {
+  def logCacheOperation(f: => LogEntry): Unit = {
+    SQLConf.get.dataframeCacheLogLevel match {
+      case "TRACE" => logTrace(f)
+      case "DEBUG" => logDebug(f)
+      case "INFO" => logInfo(f)
+      case "WARN" => logWarning(f)
+      case "ERROR" => logError(f)
+      case _ => logTrace(f)
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to