[
https://issues.apache.org/jira/browse/SPARK-26708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17831116#comment-17831116
]
Asif edited comment on SPARK-26708 at 3/27/24 12:58 AM:
I believe the current caching logic is suboptimal and accordingly the bug test
for it is testing a suboptimal approach.
The bug test for this is
{quote}test("SPARK-26708 Cache data and cached plan should stay consistent") {
val df = spark.range(0, 5).toDF("a")
val df1 = df.withColumn("b", $"a" + 1)
val df2 = df.filter($"a" > 1)
df.cache()
// Add df1 to the CacheManager; the buffer is currently empty.
df1.cache()
// After calling collect(), df1's buffer has been loaded.
df1.collect()
// Add df2 to the CacheManager; the buffer is currently empty.
df2.cache()
// Verify that df1 is a InMemoryRelation plan with dependency on another cached
plan.
assertCacheDependency(df1)
val df1InnerPlan = df1.queryExecution.withCachedData
.asInstanceOf[InMemoryRelation].cacheBuilder.cachedPlan
// Verify that df2 is a InMemoryRelation plan with dependency on another cached
plan.
assertCacheDependency(df2)
df.unpersist(blocking = true)
// Verify that df1's cache has stayed the same, since df1's cache already has
data
// before df.unpersist().
val df1Limit = df1.limit(2)
val df1LimitInnerPlan = df1Limit.queryExecution.withCachedData.collectFirst
Unknown macro: \{ case i}
assert(df1LimitInnerPlan.isDefined && df1LimitInnerPlan.get == df1InnerPlan)
// Verify that df2's cache has been re-cached, with a new physical plan rid of
dependency
// on df, since df2's cache had not been loaded before df.unpersist().
val df2Limit = df2.limit(2)
val df2LimitInnerPlan = df2Limit.queryExecution.withCachedData.collectFirst
Unknown macro: \{ case i}
assert(df2LimitInnerPlan.isDefined &&
!df2LimitInnerPlan.get.exists(_.isInstanceOf[InMemoryTableScanExec]))
}
{quote}
The optimal caching should have resulted in df2LimitInnerPlan actually
containing InMemoryTableScanExec which should have corresponded to df1.
The reason being that since df1 was already materialized, so it exists in the
cache rightly.
And df2 is derivable from the cached df1 ( it just has extra projection but
otherwise can serve the df2).
was (Author: ashahid7):
I believe the current caching logic is suboptimal and accordingly the bug test
for it is testing a suboptimal approach.
The bug test for this is
{quote}test("SPARK-26708 Cache data and cached plan should stay consistent") {
val df = spark.range(0, 5).toDF("a")
val df1 = df.withColumn("b", $"a" + 1)
val df2 = df.filter($"a" > 1)
df.cache()
// Add df1 to the CacheManager; the buffer is currently empty.
df1.cache()
// After calling collect(), df1's buffer has been loaded.
df1.collect()
// Add df2 to the CacheManager; the buffer is currently empty.
df2.cache()
// Verify that df1 is a InMemoryRelation plan with dependency on another cached
plan.
assertCacheDependency(df1)
val df1InnerPlan = df1.queryExecution.withCachedData
.asInstanceOf[InMemoryRelation].cacheBuilder.cachedPlan
// Verify that df2 is a InMemoryRelation plan with dependency on another cached
plan.
assertCacheDependency(df2)
df.unpersist(blocking = true)
// Verify that df1's cache has stayed the same, since df1's cache already has
data
// before df.unpersist().
val df1Limit = df1.limit(2)
val df1LimitInnerPlan = df1Limit.queryExecution.withCachedData.collectFirst {
case i: InMemoryRelation => i.cacheBuilder.cachedPlan
}
assert(df1LimitInnerPlan.isDefined && df1LimitInnerPlan.get == df1InnerPlan)
// Verify that df2's cache has been re-cached, with a new physical plan rid of
dependency
// on df, since df2's cache had not been loaded before df.unpersist().
val df2Limit = df2.limit(2)
val df2LimitInnerPlan = df2Limit.queryExecution.withCachedData.collectFirst {
case i: InMemoryRelation => i.cacheBuilder.cachedPlan
}
assert(df2LimitInnerPlan.isDefined &&
!df2LimitInnerPlan.get.exists(_.isInstanceOf[InMemoryTableScanExec]))
}{quote}
The optimal caching should have resulted in df2LimitInnerPlan actually
containing InMemoryTableScanExec which should have corresponded to df1.
The reason being that since df2 was already materialized, so it exists in the
cache rightly.
And df2 is derivable from the cached df1 ( it just has extra projection but
otherwise can serve the df2).
> Incorrect result caused by inconsistency between a SQL cache's cached RDD and
> its physical plan
> ---
>
> Key: SPARK-26708
> URL: https://issues.apache.org/jira/browse/SPARK-26708
> Project: Spark
> Issue Type: Bug
> Components: SQL
>Affects Versions: 2.4.0
>Reporter: Xiao Li
>Assignee: Wei Xue
>Priority: Blocker
> Labels: correctness
> Fix For: 2.4.1, 3.0.0
>