[ 
https://issues.apache.org/jira/browse/SPARK-26708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17831116#comment-17831116
 ] 

Asif commented on SPARK-26708:
------------------------------

I believe the current caching logic is suboptimal and accordingly the bug test 
for it is testing a suboptimal approach.

The bug test for this is
{quote}test("SPARK-26708 Cache data and cached plan should stay consistent") {
val df = spark.range(0, 5).toDF("a")
val df1 = df.withColumn("b", $"a" + 1)
val df2 = df.filter($"a" > 1)

df.cache()
// Add df1 to the CacheManager; the buffer is currently empty.
df1.cache()
// After calling collect(), df1's buffer has been loaded.
df1.collect()
// Add df2 to the CacheManager; the buffer is currently empty.
df2.cache()

// Verify that df1 is a InMemoryRelation plan with dependency on another cached 
plan.
assertCacheDependency(df1)
val df1InnerPlan = df1.queryExecution.withCachedData
.asInstanceOf[InMemoryRelation].cacheBuilder.cachedPlan
// Verify that df2 is a InMemoryRelation plan with dependency on another cached 
plan.
assertCacheDependency(df2)

df.unpersist(blocking = true)

// Verify that df1's cache has stayed the same, since df1's cache already has 
data
// before df.unpersist().
val df1Limit = df1.limit(2)
val df1LimitInnerPlan = df1Limit.queryExecution.withCachedData.collectFirst {
case i: InMemoryRelation => i.cacheBuilder.cachedPlan
}
assert(df1LimitInnerPlan.isDefined && df1LimitInnerPlan.get == df1InnerPlan)

// Verify that df2's cache has been re-cached, with a new physical plan rid of 
dependency
// on df, since df2's cache had not been loaded before df.unpersist().
val df2Limit = df2.limit(2)
val df2LimitInnerPlan = df2Limit.queryExecution.withCachedData.collectFirst {
case i: InMemoryRelation => i.cacheBuilder.cachedPlan
}
assert(df2LimitInnerPlan.isDefined &&
!df2LimitInnerPlan.get.exists(_.isInstanceOf[InMemoryTableScanExec]))
}{quote}
 

The optimal caching should have resulted in df2LimitInnerPlan  actually 
containing  InMemoryTableScanExec which should have corresponded to df1.

The reason being that since df2 was already materialized, so it exists in the 
cache rightly.

And df2 is derivable from the cached df1 ( it just has extra projection but 
otherwise can serve the df2).

> Incorrect result caused by inconsistency between a SQL cache's cached RDD and 
> its physical plan
> -----------------------------------------------------------------------------------------------
>
>                 Key: SPARK-26708
>                 URL: https://issues.apache.org/jira/browse/SPARK-26708
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.4.0
>            Reporter: Xiao Li
>            Assignee: Wei Xue
>            Priority: Blocker
>              Labels: correctness
>             Fix For: 2.4.1, 3.0.0
>
>
> When performing non-cascading cache invalidation, {{recache}} is called on 
> the other cache entries which are dependent on the cache being invalidated. 
> It leads to the the physical plans of those cache entries being re-compiled. 
> For those cache entries, if the cache RDD has already been persisted, chances 
> are there will be inconsistency between the data and the new plan. It can 
> cause a correctness issue if the new plan's {{outputPartitioning}} or 
> {{outputOrdering}} is different from the that of the actual data, and 
> meanwhile the cache is used by another query that asks for specific 
> {{outputPartitioning}} or {{outputOrdering}} which happens to match the new 
> plan but not the actual data.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to