[ 
https://issues.apache.org/jira/browse/SPARK-47177?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

XiDuo You updated SPARK-47177:
------------------------------
    Fix Version/s: 3.4.3

> Cached SQL plan do not display final AQE plan in explain string
> ---------------------------------------------------------------
>
>                 Key: SPARK-47177
>                 URL: https://issues.apache.org/jira/browse/SPARK-47177
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 3.4.2, 3.5.0, 4.0.0, 3.5.1, 3.5.2
>            Reporter: Ziqi Liu
>            Assignee: XiDuo You
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 4.0.0, 3.5.2, 3.4.3
>
>
> AQE plan is expected to display final plan after execution. This is not true 
> for cached SQL plan: it will show the initial plan instead. This behavior 
> change is introduced in [https://github.com/apache/spark/pull/40812] it tried 
> to fix the concurrency issue with cached plan. 
> *In short, the plan used to executed and the plan used to explain is not the 
> same instance, thus causing the inconsistency.*
>  
> I don't have a clear idea how yet
>  * maybe we just a coarse granularity lock in explain?
>  * make innerChildren a function: clone the initial plan, every time checked 
> for whether the original AQE plan is finalized (making the final flag atomic 
> first, of course), if no return the cloned initial plan, if it's finalized, 
> clone the final plan and return that one. But still this won't be able to 
> reflect the AQE plan in real time, in a concurrent situation, but at least we 
> have initial version and final version.
>  
> A simple repro:
> {code:java}
> d1 = spark.range(1000).withColumn("key", expr("id % 
> 100")).groupBy("key").agg({"key": "count"})
> cached_d2 = d1.cache()
> df = cached_d2.filter("key > 10")
> df.collect() {code}
> {code:java}
> >>> df.explain()
> == Physical Plan ==
> AdaptiveSparkPlan isFinalPlan=true
> +- == Final Plan ==
>    *(1) Filter (isnotnull(key#4L) AND (key#4L > 10))
>    +- TableCacheQueryStage 0
>       +- InMemoryTableScan [key#4L, count(key)#10L], [isnotnull(key#4L), 
> (key#4L > 10)]
>             +- InMemoryRelation [key#4L, count(key)#10L], StorageLevel(disk, 
> memory, deserialized, 1 replicas)
>                   +- AdaptiveSparkPlan isFinalPlan=false
>                      +- HashAggregate(keys=[key#4L], 
> functions=[count(key#4L)])
>                         +- Exchange hashpartitioning(key#4L, 200), 
> ENSURE_REQUIREMENTS, [plan_id=24]
>                            +- HashAggregate(keys=[key#4L], 
> functions=[partial_count(key#4L)])
>                               +- Project [(id#2L % 100) AS key#4L]
>                                  +- Range (0, 1000, step=1, splits=10)
> +- == Initial Plan ==
>    Filter (isnotnull(key#4L) AND (key#4L > 10))
>    +- InMemoryTableScan [key#4L, count(key)#10L], [isnotnull(key#4L), (key#4L 
> > 10)]
>          +- InMemoryRelation [key#4L, count(key)#10L], StorageLevel(disk, 
> memory, deserialized, 1 replicas)
>                +- AdaptiveSparkPlan isFinalPlan=false
>                   +- HashAggregate(keys=[key#4L], functions=[count(key#4L)])
>                      +- Exchange hashpartitioning(key#4L, 200), 
> ENSURE_REQUIREMENTS, [plan_id=24]
>                         +- HashAggregate(keys=[key#4L], 
> functions=[partial_count(key#4L)])
>                            +- Project [(id#2L % 100) AS key#4L]
>                               +- Range (0, 1000, step=1, splits=10){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to