[ 
https://issues.apache.org/jira/browse/SPARK-46992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17820705#comment-17820705
 ] 

Denis Tarima edited comment on SPARK-46992 at 2/26/24 1:44 PM:
---------------------------------------------------------------

I think the root problem is that {{{}cache{}}}/{{{}persist{}}} changes the 
result. It might be a necessary performance trade-off, but if it's possible to 
keep the same result then the problem will disappear.

{{CacheManager}} is shared between sessions so 
{{{}persist{}}}/{{{}unpersist{}}} affects all {{Dataset}} instances immediately 
creating a possibility of inconsistent results. For example, thread 1 calls 
{{{}df.count(){}}}, thread 2 calls {{{}df.cache().count(){}}}, and finally 
thread 1 calls {{df.count()}} again - thread 1 may get different counts.

If fixing the root problem is infeasible then the secondary problem needs to be 
addressed: {{queryExecution.executedPlan}} is cached ({{{}lazy val{}}}) in 
{{Dataset}} instance, but it's not used by all queries in the same way causing 
inconsistency.
 - {{df}} and {{dfCached = df.cache()}} could have different logical plans so 
{{df}} wouldn't use cached data, but this change would create a backward 
incompatibility
 - {{Dataset}} could verify if it's cached in {{CacheManager}} on each access 
to {{queryExecution}} and use/keep another {{queryExecution}} instance when 
it's in a "cached" state.


was (Author: dtarima):
I think the root problem is that {{{}cache{}}}/{{{}persist{}}} changes the 
result. It might be a necessary performance trade-off, but if it's possible to 
keep the same result then the problem will disappear.

{{CacheManager}} is shared between sessions so 
{{{}persist{}}}/{{{}unpersist{}}} affects all {{Dataset}} instances immediately 
creating a possibility of inconsistent results. For example, thread 1 calls 
{{{}df.count(){}}}, thread 2 calls {{{}df.cache().count(){}}}, and finally 
thread 1 calls {{df.count()}} again - thread 1 may get different counts.

If fixing the root problem is infeasible then the secondary problem needs to be 
addressed: {{queryExecution.executedPlan}} is cached ({{{}lazy val{}}}) in 
{{Dataset}} instance, but it's not used by all queries causing inconsistency.
 - {{df}} and {{dfCached = df.cache()}} could have different logical plans so 
{{df}} wouldn't use cached data, but this change would create a backward 
incompatibility
 - {{Dataset}} could verify if it's cached in {{CacheManager}} on each access 
to {{queryExecution}} and use/keep another {{queryExecution}} instance when 
it's in a "cached" state.

> Inconsistent results with 'sort', 'cache', and AQE.
> ---------------------------------------------------
>
>                 Key: SPARK-46992
>                 URL: https://issues.apache.org/jira/browse/SPARK-46992
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 3.3.2, 3.5.0
>            Reporter: Denis Tarima
>            Priority: Critical
>              Labels: correctness, pull-request-available
>
>  
> With AQE enabled, having {color:#4c9aff}sort{color} in the plan changes 
> {color:#4c9aff}sample{color} results after caching.
> Moreover, when cached,  {color:#4c9aff}collect{color} returns records as if 
> it's not cached, which is inconsistent with {color:#4c9aff}count{color} and 
> {color:#4c9aff}show{color}.
> A script to reproduce:
> {code:scala}
> import spark.implicits._
> val df = (1 to 4).toDF("id").sort("id").sample(0.4, 123)
> println("NON CACHED:")
> println("  count: " + df.count())
> println("  collect: " + df.collect().mkString(" "))
> df.show()
> println("CACHED:")
> df.cache().count()
> println("  count: " + df.count())
> println("  collect: " + df.collect().mkString(" "))
> df.show()
> df.unpersist()
> {code}
> output:
> {code:java}
> NON CACHED:
>   count: 2
>   collect: [1] [4]
> +---+
> | id|
> +---+
> |  1|
> |  4|
> +---+
> CACHED:
>   count: 3
>   collect: [1] [4]
> +---+
> | id|
> +---+
> |  1|
> |  2|
> |  3|
> +---+
> {code}
> BTW, disabling AQE 
> [{color:#4c9aff}spark.conf.set("spark.databricks.optimizer.adaptive.enabled", 
> "false"){color}] helps on Databricks clusters, but locally it has no effect, 
> at least on Spark 3.3.2.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to