[jira] [Comment Edited] (SPARK-26708) Incorrect result caused by inconsistency between a SQL cache's cached RDD and its physical plan

2024-03-26 Thread Asif (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-26708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17831116#comment-17831116
 ] 

Asif edited comment on SPARK-26708 at 3/27/24 12:58 AM:


I believe the current caching logic is suboptimal and accordingly the bug test 
for it is testing a suboptimal approach.

The bug test for this is
{quote}test("SPARK-26708 Cache data and cached plan should stay consistent") {
val df = spark.range(0, 5).toDF("a")
val df1 = df.withColumn("b", $"a" + 1)
val df2 = df.filter($"a" > 1)

df.cache()
// Add df1 to the CacheManager; the buffer is currently empty.
df1.cache()
// After calling collect(), df1's buffer has been loaded.
df1.collect()
// Add df2 to the CacheManager; the buffer is currently empty.
df2.cache()

// Verify that df1 is a InMemoryRelation plan with dependency on another cached 
plan.
assertCacheDependency(df1)
val df1InnerPlan = df1.queryExecution.withCachedData
.asInstanceOf[InMemoryRelation].cacheBuilder.cachedPlan
// Verify that df2 is a InMemoryRelation plan with dependency on another cached 
plan.
assertCacheDependency(df2)

df.unpersist(blocking = true)

// Verify that df1's cache has stayed the same, since df1's cache already has 
data
// before df.unpersist().
val df1Limit = df1.limit(2)
val df1LimitInnerPlan = df1Limit.queryExecution.withCachedData.collectFirst
Unknown macro: \{ case i}
assert(df1LimitInnerPlan.isDefined && df1LimitInnerPlan.get == df1InnerPlan)

// Verify that df2's cache has been re-cached, with a new physical plan rid of 
dependency
// on df, since df2's cache had not been loaded before df.unpersist().
val df2Limit = df2.limit(2)
val df2LimitInnerPlan = df2Limit.queryExecution.withCachedData.collectFirst
Unknown macro: \{ case i}
assert(df2LimitInnerPlan.isDefined &&
!df2LimitInnerPlan.get.exists(_.isInstanceOf[InMemoryTableScanExec]))
}
{quote}
 

The optimal caching should have resulted in df2LimitInnerPlan  actually 
containing  InMemoryTableScanExec which should have corresponded to df1.

The reason being that since df1 was already materialized, so it exists in the 
cache rightly.

And df2 is derivable from the cached df1 ( it just has extra projection but 
otherwise can serve the df2).


was (Author: ashahid7):
I believe the current caching logic is suboptimal and accordingly the bug test 
for it is testing a suboptimal approach.

The bug test for this is
{quote}test("SPARK-26708 Cache data and cached plan should stay consistent") {
val df = spark.range(0, 5).toDF("a")
val df1 = df.withColumn("b", $"a" + 1)
val df2 = df.filter($"a" > 1)

df.cache()
// Add df1 to the CacheManager; the buffer is currently empty.
df1.cache()
// After calling collect(), df1's buffer has been loaded.
df1.collect()
// Add df2 to the CacheManager; the buffer is currently empty.
df2.cache()

// Verify that df1 is a InMemoryRelation plan with dependency on another cached 
plan.
assertCacheDependency(df1)
val df1InnerPlan = df1.queryExecution.withCachedData
.asInstanceOf[InMemoryRelation].cacheBuilder.cachedPlan
// Verify that df2 is a InMemoryRelation plan with dependency on another cached 
plan.
assertCacheDependency(df2)

df.unpersist(blocking = true)

// Verify that df1's cache has stayed the same, since df1's cache already has 
data
// before df.unpersist().
val df1Limit = df1.limit(2)
val df1LimitInnerPlan = df1Limit.queryExecution.withCachedData.collectFirst {
case i: InMemoryRelation => i.cacheBuilder.cachedPlan
}
assert(df1LimitInnerPlan.isDefined && df1LimitInnerPlan.get == df1InnerPlan)

// Verify that df2's cache has been re-cached, with a new physical plan rid of 
dependency
// on df, since df2's cache had not been loaded before df.unpersist().
val df2Limit = df2.limit(2)
val df2LimitInnerPlan = df2Limit.queryExecution.withCachedData.collectFirst {
case i: InMemoryRelation => i.cacheBuilder.cachedPlan
}
assert(df2LimitInnerPlan.isDefined &&
!df2LimitInnerPlan.get.exists(_.isInstanceOf[InMemoryTableScanExec]))
}{quote}
 

The optimal caching should have resulted in df2LimitInnerPlan  actually 
containing  InMemoryTableScanExec which should have corresponded to df1.

The reason being that since df2 was already materialized, so it exists in the 
cache rightly.

And df2 is derivable from the cached df1 ( it just has extra projection but 
otherwise can serve the df2).

> Incorrect result caused by inconsistency between a SQL cache's cached RDD and 
> its physical plan
> ---
>
> Key: SPARK-26708
> URL: https://issues.apache.org/jira/browse/SPARK-26708
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Xiao Li
>Assignee: Wei Xue
>Priority: Blocker
>  Labels: correctness
> Fix For: 2.4.1, 3.0.0
>

[jira] [Comment Edited] (SPARK-26708) Incorrect result caused by inconsistency between a SQL cache's cached RDD and its physical plan

2024-03-26 Thread Asif (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-26708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17831117#comment-17831117
 ] 

Asif edited comment on SPARK-26708 at 3/27/24 12:54 AM:


Towards that please take a look at ticket & PR:

https://issues.apache.org/jira/browse/SPARK-45959

 

and the PR associated with it.

Though that PR primarily deals with aggressive collapse of projects at the end 
of analysis . But it also as part of fix, uses enhanced cached plan lookup and 
thus results in the above behaviour.


was (Author: ashahid7):
Towards that please take a look at ticket & PR:

[https://issues.apache.org/jira/browse/SPARK-45959|https://issues.apache.org/jira/browse/SPARK-45959]

> Incorrect result caused by inconsistency between a SQL cache's cached RDD and 
> its physical plan
> ---
>
> Key: SPARK-26708
> URL: https://issues.apache.org/jira/browse/SPARK-26708
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Xiao Li
>Assignee: Wei Xue
>Priority: Blocker
>  Labels: correctness
> Fix For: 2.4.1, 3.0.0
>
>
> When performing non-cascading cache invalidation, {{recache}} is called on 
> the other cache entries which are dependent on the cache being invalidated. 
> It leads to the the physical plans of those cache entries being re-compiled. 
> For those cache entries, if the cache RDD has already been persisted, chances 
> are there will be inconsistency between the data and the new plan. It can 
> cause a correctness issue if the new plan's {{outputPartitioning}} or 
> {{outputOrdering}} is different from the that of the actual data, and 
> meanwhile the cache is used by another query that asks for specific 
> {{outputPartitioning}} or {{outputOrdering}} which happens to match the new 
> plan but not the actual data.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-26708) Incorrect result caused by inconsistency between a SQL cache's cached RDD and its physical plan

2019-02-05 Thread Bruce Robbins (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16761289#comment-16761289
 ] 

Bruce Robbins edited comment on SPARK-26708 at 2/6/19 12:41 AM:


How does one hit this issue?

Edit: Ah, never mind. I see there is a test.


was (Author: bersprockets):
How does one hit this issue?

> Incorrect result caused by inconsistency between a SQL cache's cached RDD and 
> its physical plan
> ---
>
> Key: SPARK-26708
> URL: https://issues.apache.org/jira/browse/SPARK-26708
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Xiao Li
>Assignee: Maryann Xue
>Priority: Blocker
>  Labels: correctness
> Fix For: 2.4.1, 3.0.0
>
>
> When performing non-cascading cache invalidation, {{recache}} is called on 
> the other cache entries which are dependent on the cache being invalidated. 
> It leads to the the physical plans of those cache entries being re-compiled. 
> For those cache entries, if the cache RDD has already been persisted, chances 
> are there will be inconsistency between the data and the new plan. It can 
> cause a correctness issue if the new plan's {{outputPartitioning}} or 
> {{outputOrdering}} is different from the that of the actual data, and 
> meanwhile the cache is used by another query that asks for specific 
> {{outputPartitioning}} or {{outputOrdering}} which happens to match the new 
> plan but not the actual data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org