[ https://issues.apache.org/jira/browse/SPARK-46992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17820705#comment-17820705 ]
Denis Tarima edited comment on SPARK-46992 at 2/26/24 1:44 PM: --------------------------------------------------------------- I think the root problem is that {{{}cache{}}}/{{{}persist{}}} changes the result. It might be a necessary performance trade-off, but if it's possible to keep the same result then the problem will disappear. {{CacheManager}} is shared between sessions so {{{}persist{}}}/{{{}unpersist{}}} affects all {{Dataset}} instances immediately creating a possibility of inconsistent results. For example, thread 1 calls {{{}df.count(){}}}, thread 2 calls {{{}df.cache().count(){}}}, and finally thread 1 calls {{df.count()}} again - thread 1 may get different counts. If fixing the root problem is infeasible then the secondary problem needs to be addressed: {{queryExecution.executedPlan}} is cached ({{{}lazy val{}}}) in {{Dataset}} instance, but it's not used by all queries in the same way causing inconsistency. - {{df}} and {{dfCached = df.cache()}} could have different logical plans so {{df}} wouldn't use cached data, but this change would create a backward incompatibility - {{Dataset}} could verify if it's cached in {{CacheManager}} on each access to {{queryExecution}} and use/keep another {{queryExecution}} instance when it's in a "cached" state. was (Author: dtarima): I think the root problem is that {{{}cache{}}}/{{{}persist{}}} changes the result. It might be a necessary performance trade-off, but if it's possible to keep the same result then the problem will disappear. {{CacheManager}} is shared between sessions so {{{}persist{}}}/{{{}unpersist{}}} affects all {{Dataset}} instances immediately creating a possibility of inconsistent results. For example, thread 1 calls {{{}df.count(){}}}, thread 2 calls {{{}df.cache().count(){}}}, and finally thread 1 calls {{df.count()}} again - thread 1 may get different counts. If fixing the root problem is infeasible then the secondary problem needs to be addressed: {{queryExecution.executedPlan}} is cached ({{{}lazy val{}}}) in {{Dataset}} instance, but it's not used by all queries causing inconsistency. - {{df}} and {{dfCached = df.cache()}} could have different logical plans so {{df}} wouldn't use cached data, but this change would create a backward incompatibility - {{Dataset}} could verify if it's cached in {{CacheManager}} on each access to {{queryExecution}} and use/keep another {{queryExecution}} instance when it's in a "cached" state. > Inconsistent results with 'sort', 'cache', and AQE. > --------------------------------------------------- > > Key: SPARK-46992 > URL: https://issues.apache.org/jira/browse/SPARK-46992 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 3.3.2, 3.5.0 > Reporter: Denis Tarima > Priority: Critical > Labels: correctness, pull-request-available > > > With AQE enabled, having {color:#4c9aff}sort{color} in the plan changes > {color:#4c9aff}sample{color} results after caching. > Moreover, when cached, {color:#4c9aff}collect{color} returns records as if > it's not cached, which is inconsistent with {color:#4c9aff}count{color} and > {color:#4c9aff}show{color}. > A script to reproduce: > {code:scala} > import spark.implicits._ > val df = (1 to 4).toDF("id").sort("id").sample(0.4, 123) > println("NON CACHED:") > println(" count: " + df.count()) > println(" collect: " + df.collect().mkString(" ")) > df.show() > println("CACHED:") > df.cache().count() > println(" count: " + df.count()) > println(" collect: " + df.collect().mkString(" ")) > df.show() > df.unpersist() > {code} > output: > {code:java} > NON CACHED: > count: 2 > collect: [1] [4] > +---+ > | id| > +---+ > | 1| > | 4| > +---+ > CACHED: > count: 3 > collect: [1] [4] > +---+ > | id| > +---+ > | 1| > | 2| > | 3| > +---+ > {code} > BTW, disabling AQE > [{color:#4c9aff}spark.conf.set("spark.databricks.optimizer.adaptive.enabled", > "false"){color}] helps on Databricks clusters, but locally it has no effect, > at least on Spark 3.3.2. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org