[ 
https://issues.apache.org/jira/browse/SPARK-47609?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Asif updated SPARK-47609:
-------------------------
    Description: 
This issue became apparent while bringing my PR 
[https://github.com/apache/spark/pull/43854]

in synch with latest master.

Though that PR is meant to do early collapse of projects so that the tree size 
is kept at minimum when projects keep getting added , in the analyzer phase 
itself.

But as part of the work, the CacheManager lookup also needed to be modified.

One of the newly added test in master failed. On analysis of failure it turns 
out that the cache manager is not picking cached InMemoryRelation for a subplan.

This shows up in following existing test

org.apache.spark.sql.DatasetCacheSuite
{quote}test("SPARK-26708 Cache data and cached plan should stay consistent") {
val df = spark.range(0, 5).toDF("a")
val df1 = df.withColumn("b", $"a" + 1)
val df2 = df.filter($"a" > 1)

df.cache()
// Add df1 to the CacheManager; the buffer is currently empty.
df1.cache()
{color:#4c9aff}// After calling collect(), df1's buffer has been loaded.{color}
df1.collect()
// Add df2 to the CacheManager; the buffer is currently empty.
df2.cache()

// Verify that df1 is a InMemoryRelation plan with dependency on another cached 
plan.
assertCacheDependency(df1)
val df1InnerPlan = df1.queryExecution.withCachedData
.asInstanceOf[InMemoryRelation].cacheBuilder.cachedPlan
// Verify that df2 is a InMemoryRelation plan with dependency on another cached 
plan.
assertCacheDependency(df2)

df.unpersist(blocking = true)

{color:#00875a}// Verify that df1's cache has stayed the same, since df1's 
cache already has data{color}
// before df.unpersist().
val df1Limit = df1.limit(2)
val df1LimitInnerPlan = df1Limit.queryExecution.withCachedData.collectFirst
Unknown macro: \{ case i}
assert(df1LimitInnerPlan.isDefined && df1LimitInnerPlan.get == df1InnerPlan)

// Verify that df2's cache has been re-cached, with a new physical plan rid of 
dependency
// on df, since df2's cache had not been loaded before df.unpersist().
val df2Limit = df2.limit(2)
val df2LimitInnerPlan = df2Limit.queryExecution.withCachedData.collectFirst
Unknown macro: \{ case i}{quote}
{quote}*{color:#de350b}// This assertion is not right{color}*
assert(df2LimitInnerPlan.isDefined &&
!df2LimitInnerPlan.get.exists(_.isInstanceOf[InMemoryTableScanExec]))
}
{quote}
 

Since df1 exists in the cache as InMemoryRelation,

val df = spark.range(0, 5).toDF("a")
val df1 = df.withColumn("b", $"a" + 1)
val df2 = df.filter($"a" > 1)

df2 is derivable from the cached df1.

So when val df2Limit = df2.limit(2), is created,  it should utilize the cached 
df1 .

  was:
This issue became apparent while bringing my PR 
[https://github.com/apache/spark/pull/43854]

in synch with latest master.

Though that PR is meant to do early collapse of projects so that the tree size 
is kept at minimum when projects keep getting added , in the analyzer phase 
itself.

But as part of the work, the CacheManager lookup also needed to be modified.

One of the newly added test in master failed. On analysis of failure it turns 
out that the cache manager is not picking cached InMemoryRelation for a subplan.

This shows up in following existing test
{quote}test("SPARK-26708 Cache data and cached plan should stay consistent") {
val df = spark.range(0, 5).toDF("a")
val df1 = df.withColumn("b", $"a" + 1)
val df2 = df.filter($"a" > 1)

df.cache()
// Add df1 to the CacheManager; the buffer is currently empty.
df1.cache()
{color:#4c9aff}// After calling collect(), df1's buffer has been loaded.{color}
df1.collect()
// Add df2 to the CacheManager; the buffer is currently empty.
df2.cache()

// Verify that df1 is a InMemoryRelation plan with dependency on another cached 
plan.
assertCacheDependency(df1)
val df1InnerPlan = df1.queryExecution.withCachedData
.asInstanceOf[InMemoryRelation].cacheBuilder.cachedPlan
// Verify that df2 is a InMemoryRelation plan with dependency on another cached 
plan.
assertCacheDependency(df2)

df.unpersist(blocking = true)

{color:#00875a}// Verify that df1's cache has stayed the same, since df1's 
cache already has data{color}
// before df.unpersist().
val df1Limit = df1.limit(2)
val df1LimitInnerPlan = df1Limit.queryExecution.withCachedData.collectFirst {
case i: InMemoryRelation => i.cacheBuilder.cachedPlan
}
assert(df1LimitInnerPlan.isDefined && df1LimitInnerPlan.get == df1InnerPlan)

// Verify that df2's cache has been re-cached, with a new physical plan rid of 
dependency
// on df, since df2's cache had not been loaded before df.unpersist().
val df2Limit = df2.limit(2)
val df2LimitInnerPlan = df2Limit.queryExecution.withCachedData.collectFirst {
case i: InMemoryRelation => i.cacheBuilder.cachedPlan
}{quote}
{quote}*{color:#de350b}// This assertion is not right{color}*
assert(df2LimitInnerPlan.isDefined &&
!df2LimitInnerPlan.get.exists(_.isInstanceOf[InMemoryTableScanExec]))
}{quote}
 

Since df1 exists in the cache as InMemoryRelation,

val df = spark.range(0, 5).toDF("a")
val df1 = df.withColumn("b", $"a" + 1)
val df2 = df.filter($"a" > 1)

df2 is derivable from the cached df1.

So when val df2Limit = df2.limit(2), is created,  it should utilize the cached 
df1 .


> CacheManager Lookup can miss picking InMemoryRelation corresponding to subplan
> ------------------------------------------------------------------------------
>
>                 Key: SPARK-47609
>                 URL: https://issues.apache.org/jira/browse/SPARK-47609
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.5.1
>            Reporter: Asif
>            Priority: Major
>
> This issue became apparent while bringing my PR 
> [https://github.com/apache/spark/pull/43854]
> in synch with latest master.
> Though that PR is meant to do early collapse of projects so that the tree 
> size is kept at minimum when projects keep getting added , in the analyzer 
> phase itself.
> But as part of the work, the CacheManager lookup also needed to be modified.
> One of the newly added test in master failed. On analysis of failure it turns 
> out that the cache manager is not picking cached InMemoryRelation for a 
> subplan.
> This shows up in following existing test
> org.apache.spark.sql.DatasetCacheSuite
> {quote}test("SPARK-26708 Cache data and cached plan should stay consistent") {
> val df = spark.range(0, 5).toDF("a")
> val df1 = df.withColumn("b", $"a" + 1)
> val df2 = df.filter($"a" > 1)
> df.cache()
> // Add df1 to the CacheManager; the buffer is currently empty.
> df1.cache()
> {color:#4c9aff}// After calling collect(), df1's buffer has been 
> loaded.{color}
> df1.collect()
> // Add df2 to the CacheManager; the buffer is currently empty.
> df2.cache()
> // Verify that df1 is a InMemoryRelation plan with dependency on another 
> cached plan.
> assertCacheDependency(df1)
> val df1InnerPlan = df1.queryExecution.withCachedData
> .asInstanceOf[InMemoryRelation].cacheBuilder.cachedPlan
> // Verify that df2 is a InMemoryRelation plan with dependency on another 
> cached plan.
> assertCacheDependency(df2)
> df.unpersist(blocking = true)
> {color:#00875a}// Verify that df1's cache has stayed the same, since df1's 
> cache already has data{color}
> // before df.unpersist().
> val df1Limit = df1.limit(2)
> val df1LimitInnerPlan = df1Limit.queryExecution.withCachedData.collectFirst
> Unknown macro: \{ case i}
> assert(df1LimitInnerPlan.isDefined && df1LimitInnerPlan.get == df1InnerPlan)
> // Verify that df2's cache has been re-cached, with a new physical plan rid 
> of dependency
> // on df, since df2's cache had not been loaded before df.unpersist().
> val df2Limit = df2.limit(2)
> val df2LimitInnerPlan = df2Limit.queryExecution.withCachedData.collectFirst
> Unknown macro: \{ case i}{quote}
> {quote}*{color:#de350b}// This assertion is not right{color}*
> assert(df2LimitInnerPlan.isDefined &&
> !df2LimitInnerPlan.get.exists(_.isInstanceOf[InMemoryTableScanExec]))
> }
> {quote}
>  
> Since df1 exists in the cache as InMemoryRelation,
> val df = spark.range(0, 5).toDF("a")
> val df1 = df.withColumn("b", $"a" + 1)
> val df2 = df.filter($"a" > 1)
> df2 is derivable from the cached df1.
> So when val df2Limit = df2.limit(2), is created,  it should utilize the 
> cached df1 .



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to