[ 
https://issues.apache.org/jira/browse/SPARK-27739?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Zhuge updated SPARK-27739:
-------------------------------
    Description: 
CacheManager.cacheQuery passes the stats for `planToCache` to InMemoryRelation. 
Since the plan has not been optimized, the stats is inaccurate because project 
and filter have not been applied. I'd suggest passing the stats from the 
optimized plan.
{code:java}
class CacheManager extends Logging {
...
  def cacheQuery(
      query: Dataset[_],
      tableName: Option[String] = None,
      storageLevel: StorageLevel = MEMORY_AND_DISK): Unit = {
    val planToCache = query.logicalPlan
    if (lookupCachedData(planToCache).nonEmpty) {
      logWarning("Asked to cache already cached data.")
    } else {
      val sparkSession = query.sparkSession
      val inMemoryRelation = InMemoryRelation(
        sparkSession.sessionState.conf.useCompression,
        sparkSession.sessionState.conf.columnBatchSize, storageLevel,
        sparkSession.sessionState.executePlan(planToCache).executedPlan,
        tableName,
        planToCache)                  <<<<<==
...
}

object InMemoryRelation {

  def apply(
      useCompression: Boolean,
      batchSize: Int,
      storageLevel: StorageLevel,
      child: SparkPlan,
      tableName: Option[String],
      logicalPlan: LogicalPlan): InMemoryRelation = {
    val cacheBuilder = CachedRDDBuilder(useCompression, batchSize, 
storageLevel, child, tableName)
    val relation = new InMemoryRelation(child.output, cacheBuilder, 
logicalPlan.outputOrdering)
    relation.statsOfPlanToCache = logicalPlan.stats           <<<<<==
    relation
  }
{code}

  was:
CacheManager.cacheQuery passes the stats for `planToCache` to InMemoryRelation. 
Since the plan has not been optimized, the stats is inaccurate because project 
and filter have not been applied.

{code:java}
class CacheManager extends Logging {
...
  def cacheQuery(
      query: Dataset[_],
      tableName: Option[String] = None,
      storageLevel: StorageLevel = MEMORY_AND_DISK): Unit = {
    val planToCache = query.logicalPlan
    if (lookupCachedData(planToCache).nonEmpty) {
      logWarning("Asked to cache already cached data.")
    } else {
      val sparkSession = query.sparkSession
      val inMemoryRelation = InMemoryRelation(
        sparkSession.sessionState.conf.useCompression,
        sparkSession.sessionState.conf.columnBatchSize, storageLevel,
        sparkSession.sessionState.executePlan(planToCache).executedPlan,
        tableName,
        planToCache)                  <<<<<==
...
}

object InMemoryRelation {

  def apply(
      useCompression: Boolean,
      batchSize: Int,
      storageLevel: StorageLevel,
      child: SparkPlan,
      tableName: Option[String],
      logicalPlan: LogicalPlan): InMemoryRelation = {
    val cacheBuilder = CachedRDDBuilder(useCompression, batchSize, 
storageLevel, child, tableName)
    val relation = new InMemoryRelation(child.output, cacheBuilder, 
logicalPlan.outputOrdering)
    relation.statsOfPlanToCache = logicalPlan.stats           <<<<<==
    relation
  }
{code}



> Persist should use stats from optimized plan
> --------------------------------------------
>
>                 Key: SPARK-27739
>                 URL: https://issues.apache.org/jira/browse/SPARK-27739
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 2.3.0, 2.4.0
>            Reporter: John Zhuge
>            Priority: Minor
>
> CacheManager.cacheQuery passes the stats for `planToCache` to 
> InMemoryRelation. Since the plan has not been optimized, the stats is 
> inaccurate because project and filter have not been applied. I'd suggest 
> passing the stats from the optimized plan.
> {code:java}
> class CacheManager extends Logging {
> ...
>   def cacheQuery(
>       query: Dataset[_],
>       tableName: Option[String] = None,
>       storageLevel: StorageLevel = MEMORY_AND_DISK): Unit = {
>     val planToCache = query.logicalPlan
>     if (lookupCachedData(planToCache).nonEmpty) {
>       logWarning("Asked to cache already cached data.")
>     } else {
>       val sparkSession = query.sparkSession
>       val inMemoryRelation = InMemoryRelation(
>         sparkSession.sessionState.conf.useCompression,
>         sparkSession.sessionState.conf.columnBatchSize, storageLevel,
>         sparkSession.sessionState.executePlan(planToCache).executedPlan,
>         tableName,
>         planToCache)                  <<<<<==
> ...
> }
> object InMemoryRelation {
>   def apply(
>       useCompression: Boolean,
>       batchSize: Int,
>       storageLevel: StorageLevel,
>       child: SparkPlan,
>       tableName: Option[String],
>       logicalPlan: LogicalPlan): InMemoryRelation = {
>     val cacheBuilder = CachedRDDBuilder(useCompression, batchSize, 
> storageLevel, child, tableName)
>     val relation = new InMemoryRelation(child.output, cacheBuilder, 
> logicalPlan.outputOrdering)
>     relation.statsOfPlanToCache = logicalPlan.stats           <<<<<==
>     relation
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to