This is an automated email from the ASF dual-hosted git repository. gengliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new f10ad3d56896 [SPARK-47804] Add Dataframe cache debug log f10ad3d56896 is described below commit f10ad3d56896f8a0eb9b0c73a6ee628cfc7df3a2 Author: Xinyi Yu <xinyi...@databricks.com> AuthorDate: Mon Apr 15 18:37:36 2024 -0700 [SPARK-47804] Add Dataframe cache debug log ### What changes were proposed in this pull request? This PR adds a debug log for Dataframe cache that uses SQL conf to turn on. It logs necessary information on * cache hit during cache application (the application happens basically on every query) * cache miss * adding new cache entries * removing cache entries (including clear all entries) Because every query applies cache, this log could be huge and should be only turned on during some debugging process, and should not enabled by default in production. Example: ``` spark.conf.set("spark.sql.dataframeCache.logLevel", "warn") val df = spark.range(1, 10) df.collect() {"ts":"2024-04-10T16:41:10.010-0700","level":"WARN","msg":"Dataframe cache miss for input plan:\nRange (1, 10, step=1, splits=Some(10))\n","logger":"org.apache.spark.sql.execution.CacheManager"} {"ts":"2024-04-10T16:41:10.010-0700","level":"WARN","msg":"Last 20 Dataframe cache entry logical plans:\n[]","logger":"org.apache.spark.sql.execution.CacheManager"} df.cache() {"ts":"2024-04-10T16:42:18.647-0700","level":"WARN","msg":"Dataframe cache miss for input plan:\nRange (1, 10, step=1, splits=Some(10))\n","logger":"org.apache.spark.sql.execution.CacheManager"} {"ts":"2024-04-10T16:42:18.647-0700","level":"WARN","msg":"Last 20 Dataframe cache entry logical plans:\n[]","logger":"org.apache.spark.sql.execution.CacheManager"} {"ts":"2024-04-10T16:42:18.662-0700","level":"WARN","msg":"Added Dataframe cache entry:\nCachedData(\nlogicalPlan=Range (1, 10, step=1, splits=Some(10))\n\nInMemoryRelation=InMemoryRelation [id#2L], StorageLevel(disk, memory, deserialized, 1 replicas)\n +- *(1) Range (1, 10, step=1, splits=10)\n)\n","logger":"org.apache.spark.sql.execution.CacheManager"} df.count() {"ts":"2024-04-10T16:43:36.033-0700","level":"WARN","msg":"Dataframe cache hit for input plan:\nRange (1, 10, step=1, splits=Some(10))\nmatched with cache entry:\nCachedData(\nlogicalPlan=Range (1, 10, step=1, splits=Some(10))\n\nInMemoryRelation=InMemoryRelation [id#2L], StorageLevel(disk, memory, deserialized, 1 replicas)\n +- *(1) Range (1, 10, step=1, splits=10)\n)\n","logger":"org.apache.spark.sql.execution.CacheManager"} {"ts":"2024-04-10T16:43:36.041-0700","level":"WARN","msg":"Dataframe cache hit plan change summary:\n Aggregate [count(1) AS count#13L] Aggregate [count(1) AS count#13L]\n!+- Range (1, 10, step=1, splits=Some(10)) +- InMemoryRelation [id#2L], StorageLevel(disk, memory, deserialized, 1 replicas)\n! +- *(1) Range (1, 10, step=1, splits=10)","logger":"org.apache.spark.sql.execution.CacheManager"} df.unpersist() {"ts":"2024-04-10T16:44:15.965-0700","level":"WARN","msg":"Removed 1 Dataframe cache entries, with logical plans being \n[Range (1, 10, step=1, splits=Some(10))\n]","logger":"org.apache.spark.sql.execution.CacheManager"} ``` ### Why are the changes needed? Easier debugging. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Run local spark shell. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #45990 from anchovYu/SPARK-47804. Authored-by: Xinyi Yu <xinyi...@databricks.com> Signed-off-by: Gengliang Wang <gengli...@apache.org> --- .../scala/org/apache/spark/internal/LogKey.scala | 2 + .../scala/org/apache/spark/internal/Logging.scala | 32 ++++++++++++ common/utils/src/test/resources/log4j2.properties | 4 +- .../apache/spark/util/StructuredLoggingSuite.scala | 20 ++++++-- .../org/apache/spark/sql/internal/SQLConf.scala | 16 ++++++ .../apache/spark/sql/execution/CacheManager.scala | 59 ++++++++++++++++++++-- 6 files changed, 121 insertions(+), 12 deletions(-) diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala index 2967b7754525..77317782068b 100644 --- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala +++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala @@ -58,6 +58,7 @@ object LogKey extends Enumeration { val CSV_SOURCE = Value val DATA = Value val DATABASE_NAME = Value + val DATAFRAME_CACHE_ENTRY = Value val DRIVER_ID = Value val DROPPED_PARTITIONS = Value val END_POINT = Value @@ -131,6 +132,7 @@ object LogKey extends Enumeration { val QUERY_HINT = Value val QUERY_ID = Value val QUERY_PLAN = Value + val QUERY_PLAN_COMPARISON = Value val QUERY_PLAN_LENGTH_ACTUAL = Value val QUERY_PLAN_LENGTH_MAX = Value val RANGE = Value diff --git a/common/utils/src/main/scala/org/apache/spark/internal/Logging.scala b/common/utils/src/main/scala/org/apache/spark/internal/Logging.scala index 1e0419accb50..607f3637e641 100644 --- a/common/utils/src/main/scala/org/apache/spark/internal/Logging.scala +++ b/common/utils/src/main/scala/org/apache/spark/internal/Logging.scala @@ -157,10 +157,42 @@ trait Logging { if (log.isDebugEnabled) log.debug(msg) } + protected def logDebug(entry: LogEntry): Unit = { + if (log.isDebugEnabled) { + withLogContext(entry.context) { + log.debug(entry.message) + } + } + } + + protected def logDebug(entry: LogEntry, throwable: Throwable): Unit = { + if (log.isDebugEnabled) { + withLogContext(entry.context) { + log.debug(entry.message, throwable) + } + } + } + protected def logTrace(msg: => String): Unit = { if (log.isTraceEnabled) log.trace(msg) } + protected def logTrace(entry: LogEntry): Unit = { + if (log.isTraceEnabled) { + withLogContext(entry.context) { + log.trace(entry.message) + } + } + } + + protected def logTrace(entry: LogEntry, throwable: Throwable): Unit = { + if (log.isTraceEnabled) { + withLogContext(entry.context) { + log.trace(entry.message, throwable) + } + } + } + protected def logWarning(msg: => String): Unit = { if (log.isWarnEnabled) log.warn(msg) } diff --git a/common/utils/src/test/resources/log4j2.properties b/common/utils/src/test/resources/log4j2.properties index 2c7563ec8d3d..e3bd8689993d 100644 --- a/common/utils/src/test/resources/log4j2.properties +++ b/common/utils/src/test/resources/log4j2.properties @@ -40,11 +40,11 @@ appender.pattern.layout.pattern = %d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n%ex # Custom loggers logger.structured.name = org.apache.spark.util.StructuredLoggingSuite -logger.structured.level = info +logger.structured.level = trace logger.structured.appenderRefs = structured logger.structured.appenderRef.structured.ref = structured logger.pattern.name = org.apache.spark.util.PatternLoggingSuite -logger.pattern.level = info +logger.pattern.level = trace logger.pattern.appenderRefs = pattern logger.pattern.appenderRef.pattern.ref = pattern diff --git a/common/utils/src/test/scala/org/apache/spark/util/StructuredLoggingSuite.scala b/common/utils/src/test/scala/org/apache/spark/util/StructuredLoggingSuite.scala index c18b5517f0f2..fe03c190fa85 100644 --- a/common/utils/src/test/scala/org/apache/spark/util/StructuredLoggingSuite.scala +++ b/common/utils/src/test/scala/org/apache/spark/util/StructuredLoggingSuite.scala @@ -85,7 +85,9 @@ trait LoggingSuiteBase Seq( (Level.ERROR, () => logError(basicMsg)), (Level.WARN, () => logWarning(basicMsg)), - (Level.INFO, () => logInfo(basicMsg))).foreach { case (level, logFunc) => + (Level.INFO, () => logInfo(basicMsg)), + (Level.DEBUG, () => logDebug(basicMsg)), + (Level.TRACE, () => logTrace(basicMsg))).foreach { case (level, logFunc) => val logOutput = captureLogOutput(logFunc) assert(expectedPatternForBasicMsg(level).r.matches(logOutput)) } @@ -96,7 +98,9 @@ trait LoggingSuiteBase Seq( (Level.ERROR, () => logError(basicMsg, exception)), (Level.WARN, () => logWarning(basicMsg, exception)), - (Level.INFO, () => logInfo(basicMsg, exception))).foreach { case (level, logFunc) => + (Level.INFO, () => logInfo(basicMsg, exception)), + (Level.DEBUG, () => logDebug(basicMsg, exception)), + (Level.TRACE, () => logTrace(basicMsg, exception))).foreach { case (level, logFunc) => val logOutput = captureLogOutput(logFunc) assert(expectedPatternForBasicMsgWithException(level).r.matches(logOutput)) } @@ -106,7 +110,9 @@ trait LoggingSuiteBase Seq( (Level.ERROR, () => logError(msgWithMDC)), (Level.WARN, () => logWarning(msgWithMDC)), - (Level.INFO, () => logInfo(msgWithMDC))).foreach { + (Level.INFO, () => logInfo(msgWithMDC)), + (Level.DEBUG, () => logDebug(msgWithMDC)), + (Level.TRACE, () => logTrace(msgWithMDC))).foreach { case (level, logFunc) => val logOutput = captureLogOutput(logFunc) assert(expectedPatternForMsgWithMDC(level).r.matches(logOutput)) @@ -129,7 +135,9 @@ trait LoggingSuiteBase Seq( (Level.ERROR, () => logError(msgWithMDCAndException, exception)), (Level.WARN, () => logWarning(msgWithMDCAndException, exception)), - (Level.INFO, () => logInfo(msgWithMDCAndException, exception))).foreach { + (Level.INFO, () => logInfo(msgWithMDCAndException, exception)), + (Level.DEBUG, () => logDebug(msgWithMDCAndException, exception)), + (Level.TRACE, () => logTrace(msgWithMDCAndException, exception))).foreach { case (level, logFunc) => val logOutput = captureLogOutput(logFunc) assert(expectedPatternForMsgWithMDCAndException(level).r.matches(logOutput)) @@ -140,7 +148,9 @@ trait LoggingSuiteBase Seq( (Level.ERROR, () => logError(msgWithConcat)), (Level.WARN, () => logWarning(msgWithConcat)), - (Level.INFO, () => logInfo(msgWithConcat))).foreach { + (Level.INFO, () => logInfo(msgWithConcat)), + (Level.DEBUG, () => logDebug(msgWithConcat)), + (Level.TRACE, () => logTrace(msgWithConcat))).foreach { case (level, logFunc) => val logOutput = captureLogOutput(logFunc) verifyMsgWithConcat(level, logOutput) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index c8a5d997da7d..278d4dc8d302 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1615,6 +1615,20 @@ object SQLConf { .checkValues(StorageLevelMapper.values.map(_.name()).toSet) .createWithDefault(StorageLevelMapper.MEMORY_AND_DISK.name()) + val DATAFRAME_CACHE_LOG_LEVEL = buildConf("spark.sql.dataframeCache.logLevel") + .internal() + .doc("Configures the log level of Dataframe cache operations, including adding and removing " + + "entries from Dataframe cache, hit and miss on cache application. The default log " + + "level is 'trace'. This log should only be used for debugging purposes and not in the " + + "production environment, since it generates a large amount of logs.") + .version("4.0.0") + .stringConf + .transform(_.toUpperCase(Locale.ROOT)) + .checkValue(logLevel => Set("TRACE", "DEBUG", "INFO", "WARN", "ERROR").contains(logLevel), + "Invalid value for 'spark.sql.dataframeCache.logLevel'. Valid values are " + + "'trace', 'debug', 'info', 'warn' and 'error'.") + .createWithDefault("trace") + val CROSS_JOINS_ENABLED = buildConf("spark.sql.crossJoin.enabled") .internal() .doc("When false, we will throw an error if a query contains a cartesian product without " + @@ -5369,6 +5383,8 @@ class SQLConf extends Serializable with Logging with SqlApiConf { def defaultCacheStorageLevel: StorageLevel = StorageLevel.fromString(getConf(DEFAULT_CACHE_STORAGE_LEVEL)) + def dataframeCacheLogLevel: String = getConf(DATAFRAME_CACHE_LOG_LEVEL) + def crossJoinEnabled: Boolean = getConf(SQLConf.CROSS_JOINS_ENABLED) override def sessionLocalTimeZone: String = getConf(SQLConf.SESSION_LOCAL_TIMEZONE) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index 6c5639ef99d4..4f3cecd17894 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -21,13 +21,15 @@ import scala.collection.immutable.IndexedSeq import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{LogEntry, Logging, MDC} +import org.apache.spark.internal.LogKey._ import org.apache.spark.sql.{Dataset, SparkSession} import org.apache.spark.sql.catalyst.catalog.HiveTableRelation import org.apache.spark.sql.catalyst.expressions.{Attribute, SubqueryExpression} import org.apache.spark.sql.catalyst.optimizer.EliminateResolvedHint import org.apache.spark.sql.catalyst.plans.logical.{IgnoreCachedData, LogicalPlan, ResolvedHint, SubqueryAlias, View} import org.apache.spark.sql.catalyst.trees.TreePattern.PLAN_EXPRESSION +import org.apache.spark.sql.catalyst.util.sideBySide import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.execution.columnar.InMemoryRelation import org.apache.spark.sql.execution.command.CommandUtils @@ -38,7 +40,14 @@ import org.apache.spark.storage.StorageLevel import org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK /** Holds a cached logical plan and its data */ -case class CachedData(plan: LogicalPlan, cachedRepresentation: InMemoryRelation) +case class CachedData(plan: LogicalPlan, cachedRepresentation: InMemoryRelation) { + override def toString: String = + s""" + |CachedData( + |logicalPlan=$plan + |InMemoryRelation=$cachedRepresentation) + |""".stripMargin +} /** * Provides support in a SQLContext for caching query results and automatically using these cached @@ -62,6 +71,7 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper { def clearCache(): Unit = this.synchronized { cachedData.foreach(_.cachedRepresentation.cacheBuilder.clearCache()) cachedData = IndexedSeq[CachedData]() + CacheManager.logCacheOperation(log"Cleared all Dataframe cache entries") } /** Checks if the cache is empty. */ @@ -119,7 +129,10 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper { if (lookupCachedData(planToCache).nonEmpty) { logWarning("Data has already been cached.") } else { - cachedData = CachedData(planToCache, inMemoryRelation) +: cachedData + val cd = CachedData(planToCache, inMemoryRelation) + cachedData = cd +: cachedData + CacheManager.logCacheOperation(log"Added Dataframe cache entry:" + + log"${MDC(DATAFRAME_CACHE_ENTRY, cd)}") } } } @@ -204,6 +217,9 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper { cachedData = cachedData.filterNot(cd => plansToUncache.exists(_ eq cd)) } plansToUncache.foreach { _.cachedRepresentation.cacheBuilder.clearCache(blocking) } + CacheManager.logCacheOperation(log"Removed ${MDC(SIZE, plansToUncache.size)} Dataframe " + + log"cache entries, with logical plans being " + + log"\n[${MDC(QUERY_PLAN, plansToUncache.map(_.plan).mkString(",\n"))}]") // Re-compile dependent cached queries after removing the cached query. if (!cascade) { @@ -268,6 +284,8 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper { logWarning("While recaching, data was already added to cache.") } else { cachedData = recomputedPlan +: cachedData + CacheManager.logCacheOperation(log"Re-cached Dataframe cache entry:" + + log"${MDC(DATAFRAME_CACHE_ENTRY, recomputedPlan)}") } } } @@ -280,7 +298,13 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper { /** Optionally returns cached data for the given [[LogicalPlan]]. */ def lookupCachedData(plan: LogicalPlan): Option[CachedData] = { - cachedData.find(cd => plan.sameResult(cd.plan)) + val result = cachedData.find(cd => plan.sameResult(cd.plan)) + if (result.isDefined) { + CacheManager.logCacheOperation(log"Dataframe cache hit for input plan:" + + log"\n${MDC(QUERY_PLAN, plan)} matched with cache entry:" + + log"${MDC(DATAFRAME_CACHE_ENTRY, result.get)}") + } + result } /** Replaces segments of the given logical plan with cached versions where possible. */ @@ -301,9 +325,21 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper { }.getOrElse(currentFragment) } - newPlan.transformAllExpressionsWithPruning(_.containsPattern(PLAN_EXPRESSION)) { + val result = newPlan.transformAllExpressionsWithPruning(_.containsPattern(PLAN_EXPRESSION)) { case s: SubqueryExpression => s.withNewPlan(useCachedData(s.plan)) } + + if (result.fastEquals(plan)) { + CacheManager.logCacheOperation( + log"Dataframe cache miss for input plan:\n${MDC(QUERY_PLAN, plan)}") + CacheManager.logCacheOperation(log"Last 20 Dataframe cache entry logical plans:\n" + + log"[${MDC(DATAFRAME_CACHE_ENTRY, cachedData.take(20).map(_.plan).mkString(",\n"))}]") + } else { + CacheManager.logCacheOperation(log"Dataframe cache hit plan change summary:\n" + + log"${MDC( + QUERY_PLAN_COMPARISON, sideBySide(plan.treeString, result.treeString).mkString("\n"))}") + } + result } /** @@ -396,3 +432,16 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper { SparkSession.getOrCloneSessionWithConfigsOff(session, disableConfigs) } } + +object CacheManager extends Logging { + def logCacheOperation(f: => LogEntry): Unit = { + SQLConf.get.dataframeCacheLogLevel match { + case "TRACE" => logTrace(f) + case "DEBUG" => logDebug(f) + case "INFO" => logInfo(f) + case "WARN" => logWarning(f) + case "ERROR" => logError(f) + case _ => logTrace(f) + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org