[jira] [Updated] (SPARK-47177) Cached SQL plan do not display final AQE plan in explain string

2024-03-05 Thread XiDuo You (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47177?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

XiDuo You updated SPARK-47177:
--
Fix Version/s: 3.4.3

> Cached SQL plan do not display final AQE plan in explain string
> ---
>
> Key: SPARK-47177
> URL: https://issues.apache.org/jira/browse/SPARK-47177
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.4.2, 3.5.0, 4.0.0, 3.5.1, 3.5.2
>Reporter: Ziqi Liu
>Assignee: XiDuo You
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0, 3.5.2, 3.4.3
>
>
> AQE plan is expected to display final plan after execution. This is not true 
> for cached SQL plan: it will show the initial plan instead. This behavior 
> change is introduced in [https://github.com/apache/spark/pull/40812] it tried 
> to fix the concurrency issue with cached plan. 
> *In short, the plan used to executed and the plan used to explain is not the 
> same instance, thus causing the inconsistency.*
>  
> I don't have a clear idea how yet
>  * maybe we just a coarse granularity lock in explain?
>  * make innerChildren a function: clone the initial plan, every time checked 
> for whether the original AQE plan is finalized (making the final flag atomic 
> first, of course), if no return the cloned initial plan, if it's finalized, 
> clone the final plan and return that one. But still this won't be able to 
> reflect the AQE plan in real time, in a concurrent situation, but at least we 
> have initial version and final version.
>  
> A simple repro:
> {code:java}
> d1 = spark.range(1000).withColumn("key", expr("id % 
> 100")).groupBy("key").agg({"key": "count"})
> cached_d2 = d1.cache()
> df = cached_d2.filter("key > 10")
> df.collect() {code}
> {code:java}
> >>> df.explain()
> == Physical Plan ==
> AdaptiveSparkPlan isFinalPlan=true
> +- == Final Plan ==
>    *(1) Filter (isnotnull(key#4L) AND (key#4L > 10))
>    +- TableCacheQueryStage 0
>       +- InMemoryTableScan [key#4L, count(key)#10L], [isnotnull(key#4L), 
> (key#4L > 10)]
>             +- InMemoryRelation [key#4L, count(key)#10L], StorageLevel(disk, 
> memory, deserialized, 1 replicas)
>                   +- AdaptiveSparkPlan isFinalPlan=false
>                      +- HashAggregate(keys=[key#4L], 
> functions=[count(key#4L)])
>                         +- Exchange hashpartitioning(key#4L, 200), 
> ENSURE_REQUIREMENTS, [plan_id=24]
>                            +- HashAggregate(keys=[key#4L], 
> functions=[partial_count(key#4L)])
>                               +- Project [(id#2L % 100) AS key#4L]
>                                  +- Range (0, 1000, step=1, splits=10)
> +- == Initial Plan ==
>    Filter (isnotnull(key#4L) AND (key#4L > 10))
>    +- InMemoryTableScan [key#4L, count(key)#10L], [isnotnull(key#4L), (key#4L 
> > 10)]
>          +- InMemoryRelation [key#4L, count(key)#10L], StorageLevel(disk, 
> memory, deserialized, 1 replicas)
>                +- AdaptiveSparkPlan isFinalPlan=false
>                   +- HashAggregate(keys=[key#4L], functions=[count(key#4L)])
>                      +- Exchange hashpartitioning(key#4L, 200), 
> ENSURE_REQUIREMENTS, [plan_id=24]
>                         +- HashAggregate(keys=[key#4L], 
> functions=[partial_count(key#4L)])
>                            +- Project [(id#2L % 100) AS key#4L]
>                               +- Range (0, 1000, step=1, splits=10){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-47177) Cached SQL plan do not display final AQE plan in explain string

2024-02-27 Thread Ziqi Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47177?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ziqi Liu updated SPARK-47177:
-
Description: 
AQE plan is expected to display final plan after execution. This is not true 
for cached SQL plan: it will show the initial plan instead. This behavior 
change is introduced in [https://github.com/apache/spark/pull/40812] it tried 
to fix the concurrency issue with cached plan. 

*In short, the plan used to executed and the plan used to explain is not the 
same instance, thus causing the inconsistency.*

 

I don't have a clear idea how yet
 * maybe we just a coarse granularity lock in explain?
 * make innerChildren a function: clone the initial plan, every time checked 
for whether the original AQE plan is finalized (making the final flag atomic 
first, of course), if no return the cloned initial plan, if it's finalized, 
clone the final plan and return that one. But still this won't be able to 
reflect the AQE plan in real time, in a concurrent situation, but at least we 
have initial version and final version.

 

A simple repro:
{code:java}
d1 = spark.range(1000).withColumn("key", expr("id % 
100")).groupBy("key").agg({"key": "count"})
cached_d2 = d1.cache()
df = cached_d2.filter("key > 10")
df.collect() {code}
{code:java}
>>> df.explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   *(1) Filter (isnotnull(key#4L) AND (key#4L > 10))
   +- TableCacheQueryStage 0
      +- InMemoryTableScan [key#4L, count(key)#10L], [isnotnull(key#4L), 
(key#4L > 10)]
            +- InMemoryRelation [key#4L, count(key)#10L], StorageLevel(disk, 
memory, deserialized, 1 replicas)
                  +- AdaptiveSparkPlan isFinalPlan=false
                     +- HashAggregate(keys=[key#4L], functions=[count(key#4L)])
                        +- Exchange hashpartitioning(key#4L, 200), 
ENSURE_REQUIREMENTS, [plan_id=24]
                           +- HashAggregate(keys=[key#4L], 
functions=[partial_count(key#4L)])
                              +- Project [(id#2L % 100) AS key#4L]
                                 +- Range (0, 1000, step=1, splits=10)
+- == Initial Plan ==
   Filter (isnotnull(key#4L) AND (key#4L > 10))
   +- InMemoryTableScan [key#4L, count(key)#10L], [isnotnull(key#4L), (key#4L > 
10)]
         +- InMemoryRelation [key#4L, count(key)#10L], StorageLevel(disk, 
memory, deserialized, 1 replicas)
               +- AdaptiveSparkPlan isFinalPlan=false
                  +- HashAggregate(keys=[key#4L], functions=[count(key#4L)])
                     +- Exchange hashpartitioning(key#4L, 200), 
ENSURE_REQUIREMENTS, [plan_id=24]
                        +- HashAggregate(keys=[key#4L], 
functions=[partial_count(key#4L)])
                           +- Project [(id#2L % 100) AS key#4L]
                              +- Range (0, 1000, step=1, splits=10){code}

  was:
AQE plan is expected to display final plan after execution. This is not true 
for cached SQL plan: it will show the initial plan instead. This behavior 
change is introduced in [https://github.com/apache/spark/pull/40812] it tried 
to fix the concurrency issue with cached plan. 

*In short, the plan used to executed and the plan used to explain is not the 
same instance, thus causing the inconsistency.*

 

I don't have a clear idea how yet
 * maybe we just a coarse granularity lock in explain?
 * make innerChildren a function: clone the initial plan, every time checked 
for whether the original AQE plan is finalized (making the final flag atomic 
first, of course), if no return the cloned initial plan, if it's finalized, 
clone the final plan and return that one. But still this won't be able to 
reflect the AQE plan in real time, in a concurrent situation, but at least we 
have initial version and final version.

 

A simple repro:
{code:java}
d1 = spark.range(1000).withColumn("key", expr("id % 
100")).groupBy("key").agg({"key": "count"})
cached_d2 = d1.cache()
df = cached_d2.withColumn("key2", expr("key % 
10")).groupBy("key2").agg({"key2": "count"})
df.collect() {code}
{code:java}
>>> df.explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   *(2) HashAggregate(keys=[key2#36L], functions=[count(key2#36L)])
   +- AQEShuffleRead coalesced
      +- ShuffleQueryStage 1
         +- Exchange hashpartitioning(key2#36L, 200), ENSURE_REQUIREMENTS, 
[plan_id=83]
            +- *(1) HashAggregate(keys=[key2#36L], 
functions=[partial_count(key2#36L)])
               +- *(1) Project [(key#27L % 10) AS key2#36L]
                  +- TableCacheQueryStage 0
                     +- InMemoryTableScan [key#27L]
                           +- InMemoryRelation [key#27L, count(key)#33L], 
StorageLevel(disk, memory, deserialized, 1 replicas)
                                 +- AdaptiveSparkPlan isFinalPlan=false
                                    +- HashAggregate(keys=[key#4L], 
functions=[count(key#4L)])
             

[jira] [Updated] (SPARK-47177) Cached SQL plan do not display final AQE plan in explain string

2024-02-27 Thread Ziqi Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47177?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ziqi Liu updated SPARK-47177:
-
Description: 
AQE plan is expected to display final plan after execution. This is not true 
for cached SQL plan: it will show the initial plan instead. This behavior 
change is introduced in [https://github.com/apache/spark/pull/40812] it tried 
to fix the concurrency issue with cached plan. 

*In short, the plan used to executed and the plan used to explain is not the 
same instance, thus causing the inconsistency.*

 

I don't have a clear idea how yet
 * maybe we just a coarse granularity lock in explain?
 * make innerChildren a function: clone the initial plan, every time checked 
for whether the original AQE plan is finalized (making the final flag atomic 
first, of course), if no return the cloned initial plan, if it's finalized, 
clone the final plan and return that one. But still this won't be able to 
reflect the AQE plan in real time, in a concurrent situation, but at least we 
have initial version and final version.

 

A simple repro:
{code:java}
d1 = spark.range(1000).withColumn("key", expr("id % 
100")).groupBy("key").agg({"key": "count"})
cached_d2 = d1.cache()
df = cached_d2.withColumn("key2", expr("key % 
10")).groupBy("key2").agg({"key2": "count"})
df.collect() {code}
{code:java}
>>> df.explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   *(2) HashAggregate(keys=[key2#36L], functions=[count(key2#36L)])
   +- AQEShuffleRead coalesced
      +- ShuffleQueryStage 1
         +- Exchange hashpartitioning(key2#36L, 200), ENSURE_REQUIREMENTS, 
[plan_id=83]
            +- *(1) HashAggregate(keys=[key2#36L], 
functions=[partial_count(key2#36L)])
               +- *(1) Project [(key#27L % 10) AS key2#36L]
                  +- TableCacheQueryStage 0
                     +- InMemoryTableScan [key#27L]
                           +- InMemoryRelation [key#27L, count(key)#33L], 
StorageLevel(disk, memory, deserialized, 1 replicas)
                                 +- AdaptiveSparkPlan isFinalPlan=false
                                    +- HashAggregate(keys=[key#4L], 
functions=[count(key#4L)])
                                       +- Exchange hashpartitioning(key#4L, 
200), ENSURE_REQUIREMENTS, [plan_id=33]
                                          +- HashAggregate(keys=[key#4L], 
functions=[partial_count(key#4L)])
                                             +- Project [(id#2L % 100) AS 
key#4L]
                                                +- Range (0, 1000, step=1, 
splits=10)
+- == Initial Plan ==
   HashAggregate(keys=[key2#36L], functions=[count(key2#36L)])
   +- Exchange hashpartitioning(key2#36L, 200), ENSURE_REQUIREMENTS, 
[plan_id=30]
      +- HashAggregate(keys=[key2#36L], functions=[partial_count(key2#36L)])
         +- Project [(key#27L % 10) AS key2#36L]
            +- InMemoryTableScan [key#27L]
                  +- InMemoryRelation [key#27L, count(key)#33L], 
StorageLevel(disk, memory, deserialized, 1 replicas)
                        +- AdaptiveSparkPlan isFinalPlan=false
                           +- HashAggregate(keys=[key#4L], 
functions=[count(key#4L)])
                              +- Exchange hashpartitioning(key#4L, 200), 
ENSURE_REQUIREMENTS, [plan_id=33]
                                 +- HashAggregate(keys=[key#4L], 
functions=[partial_count(key#4L)])
                                    +- Project [(id#2L % 100) AS key#4L]
                                       +- Range (0, 1000, step=1, splits=10) 
{code}

  was:
AQE plan is expected to display final plan after execution. This is not true 
for cached SQL plan: it will show the initial plan instead. This behavior 
change is introduced in [https://github.com/apache/spark/pull/40812] it tried 
to fix the concurrency issue with cached plan. 

*In short, the plan used to executed and the plan used to explain is not the 
same instance, thus causing the inconsistency.*

 

I don't have a clear idea how yet, maybe we just a coarse granularity lock in 
explain?

 

A simple repro:
{code:java}
d1 = spark.range(1000).withColumn("key", expr("id % 
100")).groupBy("key").agg({"key": "count"})
cached_d2 = d1.cache()
df = cached_d2.withColumn("key2", expr("key % 
10")).groupBy("key2").agg({"key2": "count"})
df.collect() {code}
{code:java}
>>> df.explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   *(2) HashAggregate(keys=[key2#36L], functions=[count(key2#36L)])
   +- AQEShuffleRead coalesced
      +- ShuffleQueryStage 1
         +- Exchange hashpartitioning(key2#36L, 200), ENSURE_REQUIREMENTS, 
[plan_id=83]
            +- *(1) HashAggregate(keys=[key2#36L], 
functions=[partial_count(key2#36L)])
               +- *(1) Project [(key#27L % 10) AS key2#36L]
                  +- TableCacheQueryStage 0
                     +- InMemoryTableScan [key#27L]
                           +- 

[jira] [Updated] (SPARK-47177) Cached SQL plan do not display final AQE plan in explain string

2024-02-27 Thread Ziqi Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47177?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ziqi Liu updated SPARK-47177:
-
Description: 
AQE plan is expected to display final plan after execution. This is not true 
for cached SQL plan: it will show the initial plan instead. This behavior 
change is introduced in [https://github.com/apache/spark/pull/40812] it tried 
to fix the concurrency issue with cached plan. 

*In short, the plan used to executed and the plan used to explain is not the 
same instance, thus causing the inconsistency.*

 

I don't have a clear idea how yet, maybe we just a coarse granularity lock in 
explain?

 

A simple repro:
{code:java}
d1 = spark.range(1000).withColumn("key", expr("id % 
100")).groupBy("key").agg({"key": "count"})
cached_d2 = d1.cache()
df = cached_d2.withColumn("key2", expr("key % 
10")).groupBy("key2").agg({"key2": "count"})
df.collect() {code}
{code:java}
>>> df.explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   *(2) HashAggregate(keys=[key2#36L], functions=[count(key2#36L)])
   +- AQEShuffleRead coalesced
      +- ShuffleQueryStage 1
         +- Exchange hashpartitioning(key2#36L, 200), ENSURE_REQUIREMENTS, 
[plan_id=83]
            +- *(1) HashAggregate(keys=[key2#36L], 
functions=[partial_count(key2#36L)])
               +- *(1) Project [(key#27L % 10) AS key2#36L]
                  +- TableCacheQueryStage 0
                     +- InMemoryTableScan [key#27L]
                           +- InMemoryRelation [key#27L, count(key)#33L], 
StorageLevel(disk, memory, deserialized, 1 replicas)
                                 +- AdaptiveSparkPlan isFinalPlan=false
                                    +- HashAggregate(keys=[key#4L], 
functions=[count(key#4L)])
                                       +- Exchange hashpartitioning(key#4L, 
200), ENSURE_REQUIREMENTS, [plan_id=33]
                                          +- HashAggregate(keys=[key#4L], 
functions=[partial_count(key#4L)])
                                             +- Project [(id#2L % 100) AS 
key#4L]
                                                +- Range (0, 1000, step=1, 
splits=10)
+- == Initial Plan ==
   HashAggregate(keys=[key2#36L], functions=[count(key2#36L)])
   +- Exchange hashpartitioning(key2#36L, 200), ENSURE_REQUIREMENTS, 
[plan_id=30]
      +- HashAggregate(keys=[key2#36L], functions=[partial_count(key2#36L)])
         +- Project [(key#27L % 10) AS key2#36L]
            +- InMemoryTableScan [key#27L]
                  +- InMemoryRelation [key#27L, count(key)#33L], 
StorageLevel(disk, memory, deserialized, 1 replicas)
                        +- AdaptiveSparkPlan isFinalPlan=false
                           +- HashAggregate(keys=[key#4L], 
functions=[count(key#4L)])
                              +- Exchange hashpartitioning(key#4L, 200), 
ENSURE_REQUIREMENTS, [plan_id=33]
                                 +- HashAggregate(keys=[key#4L], 
functions=[partial_count(key#4L)])
                                    +- Project [(id#2L % 100) AS key#4L]
                                       +- Range (0, 1000, step=1, splits=10) 
{code}

  was:
AQE plan is expected to display final plan after execution. This is not true 
for cached SQL plan: it will show the initial plan instead. This behavior 
change is introduced in [https://github.com/apache/spark/pull/40812] it tried 
to fix the concurrency issue with cached plan.  I don't have a clear idea how 
yet, maybe we can check whether the AQE plan is finalized(make the final flag 
atomic first, of course), if not we can return the cloned one, otherwise it's 
thread-safe to return the final one, since it's immutable.

 

A simple repro:
{code:java}
d1 = spark.range(1000).withColumn("key", expr("id % 
100")).groupBy("key").agg({"key": "count"})
cached_d2 = d1.cache()
df = cached_d2.withColumn("key2", expr("key % 
10")).groupBy("key2").agg({"key2": "count"})
df.collect() {code}
{code:java}
>>> df.explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   *(2) HashAggregate(keys=[key2#36L], functions=[count(key2#36L)])
   +- AQEShuffleRead coalesced
      +- ShuffleQueryStage 1
         +- Exchange hashpartitioning(key2#36L, 200), ENSURE_REQUIREMENTS, 
[plan_id=83]
            +- *(1) HashAggregate(keys=[key2#36L], 
functions=[partial_count(key2#36L)])
               +- *(1) Project [(key#27L % 10) AS key2#36L]
                  +- TableCacheQueryStage 0
                     +- InMemoryTableScan [key#27L]
                           +- InMemoryRelation [key#27L, count(key)#33L], 
StorageLevel(disk, memory, deserialized, 1 replicas)
                                 +- AdaptiveSparkPlan isFinalPlan=false
                                    +- HashAggregate(keys=[key#4L], 
functions=[count(key#4L)])
                                       +- Exchange hashpartitioning(key#4L, 
200), ENSURE_REQUIREMENTS, [plan_id=33]
                 

[jira] [Updated] (SPARK-47177) Cached SQL plan do not display final AQE plan in explain string

2024-02-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47177?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated SPARK-47177:
---
Labels: pull-request-available  (was: )

> Cached SQL plan do not display final AQE plan in explain string
> ---
>
> Key: SPARK-47177
> URL: https://issues.apache.org/jira/browse/SPARK-47177
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.4.2, 3.5.0, 4.0.0, 3.5.1, 3.5.2
>Reporter: Ziqi Liu
>Priority: Major
>  Labels: pull-request-available
>
> AQE plan is expected to display final plan after execution. This is not true 
> for cached SQL plan: it will show the initial plan instead. This behavior 
> change is introduced in [https://github.com/apache/spark/pull/40812] it tried 
> to fix the concurrency issue with cached plan.  I don't have a clear idea how 
> yet, maybe we can check whether the AQE plan is finalized(make the final flag 
> atomic first, of course), if not we can return the cloned one, otherwise it's 
> thread-safe to return the final one, since it's immutable.
>  
> A simple repro:
> {code:java}
> d1 = spark.range(1000).withColumn("key", expr("id % 
> 100")).groupBy("key").agg({"key": "count"})
> cached_d2 = d1.cache()
> df = cached_d2.withColumn("key2", expr("key % 
> 10")).groupBy("key2").agg({"key2": "count"})
> df.collect() {code}
> {code:java}
> >>> df.explain()
> == Physical Plan ==
> AdaptiveSparkPlan isFinalPlan=true
> +- == Final Plan ==
>    *(2) HashAggregate(keys=[key2#36L], functions=[count(key2#36L)])
>    +- AQEShuffleRead coalesced
>       +- ShuffleQueryStage 1
>          +- Exchange hashpartitioning(key2#36L, 200), ENSURE_REQUIREMENTS, 
> [plan_id=83]
>             +- *(1) HashAggregate(keys=[key2#36L], 
> functions=[partial_count(key2#36L)])
>                +- *(1) Project [(key#27L % 10) AS key2#36L]
>                   +- TableCacheQueryStage 0
>                      +- InMemoryTableScan [key#27L]
>                            +- InMemoryRelation [key#27L, count(key)#33L], 
> StorageLevel(disk, memory, deserialized, 1 replicas)
>                                  +- AdaptiveSparkPlan isFinalPlan=false
>                                     +- HashAggregate(keys=[key#4L], 
> functions=[count(key#4L)])
>                                        +- Exchange hashpartitioning(key#4L, 
> 200), ENSURE_REQUIREMENTS, [plan_id=33]
>                                           +- HashAggregate(keys=[key#4L], 
> functions=[partial_count(key#4L)])
>                                              +- Project [(id#2L % 100) AS 
> key#4L]
>                                                 +- Range (0, 1000, step=1, 
> splits=10)
> +- == Initial Plan ==
>    HashAggregate(keys=[key2#36L], functions=[count(key2#36L)])
>    +- Exchange hashpartitioning(key2#36L, 200), ENSURE_REQUIREMENTS, 
> [plan_id=30]
>       +- HashAggregate(keys=[key2#36L], functions=[partial_count(key2#36L)])
>          +- Project [(key#27L % 10) AS key2#36L]
>             +- InMemoryTableScan [key#27L]
>                   +- InMemoryRelation [key#27L, count(key)#33L], 
> StorageLevel(disk, memory, deserialized, 1 replicas)
>                         +- AdaptiveSparkPlan isFinalPlan=false
>                            +- HashAggregate(keys=[key#4L], 
> functions=[count(key#4L)])
>                               +- Exchange hashpartitioning(key#4L, 200), 
> ENSURE_REQUIREMENTS, [plan_id=33]
>                                  +- HashAggregate(keys=[key#4L], 
> functions=[partial_count(key#4L)])
>                                     +- Project [(id#2L % 100) AS key#4L]
>                                        +- Range (0, 1000, step=1, splits=10) 
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-47177) Cached SQL plan do not display final AQE plan in explain string

2024-02-26 Thread Ziqi Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47177?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ziqi Liu updated SPARK-47177:
-
Description: 
AQE plan is expected to display final plan after execution. This is not true 
for cached SQL plan: it will show the initial plan instead. This behavior 
change is introduced in [https://github.com/apache/spark/pull/40812] it tried 
to fix the concurrency issue with cached plan.  I don't have a clear idea how 
yet, maybe we can check whether the AQE plan is finalized(make the final flag 
atomic first, of course), if not we can return the cloned one, otherwise it's 
thread-safe to return the final one, since it's immutable.

 

A simple repro:
{code:java}
d1 = spark.range(1000).withColumn("key", expr("id % 
100")).groupBy("key").agg({"key": "count"})
cached_d2 = d1.cache()
df = cached_d2.withColumn("key2", expr("key % 
10")).groupBy("key2").agg({"key2": "count"})
df.collect() {code}
{code:java}
>>> df.explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   *(2) HashAggregate(keys=[key2#36L], functions=[count(key2#36L)])
   +- AQEShuffleRead coalesced
      +- ShuffleQueryStage 1
         +- Exchange hashpartitioning(key2#36L, 200), ENSURE_REQUIREMENTS, 
[plan_id=83]
            +- *(1) HashAggregate(keys=[key2#36L], 
functions=[partial_count(key2#36L)])
               +- *(1) Project [(key#27L % 10) AS key2#36L]
                  +- TableCacheQueryStage 0
                     +- InMemoryTableScan [key#27L]
                           +- InMemoryRelation [key#27L, count(key)#33L], 
StorageLevel(disk, memory, deserialized, 1 replicas)
                                 +- AdaptiveSparkPlan isFinalPlan=false
                                    +- HashAggregate(keys=[key#4L], 
functions=[count(key#4L)])
                                       +- Exchange hashpartitioning(key#4L, 
200), ENSURE_REQUIREMENTS, [plan_id=33]
                                          +- HashAggregate(keys=[key#4L], 
functions=[partial_count(key#4L)])
                                             +- Project [(id#2L % 100) AS 
key#4L]
                                                +- Range (0, 1000, step=1, 
splits=10)
+- == Initial Plan ==
   HashAggregate(keys=[key2#36L], functions=[count(key2#36L)])
   +- Exchange hashpartitioning(key2#36L, 200), ENSURE_REQUIREMENTS, 
[plan_id=30]
      +- HashAggregate(keys=[key2#36L], functions=[partial_count(key2#36L)])
         +- Project [(key#27L % 10) AS key2#36L]
            +- InMemoryTableScan [key#27L]
                  +- InMemoryRelation [key#27L, count(key)#33L], 
StorageLevel(disk, memory, deserialized, 1 replicas)
                        +- AdaptiveSparkPlan isFinalPlan=false
                           +- HashAggregate(keys=[key#4L], 
functions=[count(key#4L)])
                              +- Exchange hashpartitioning(key#4L, 200), 
ENSURE_REQUIREMENTS, [plan_id=33]
                                 +- HashAggregate(keys=[key#4L], 
functions=[partial_count(key#4L)])
                                    +- Project [(id#2L % 100) AS key#4L]
                                       +- Range (0, 1000, step=1, splits=10) 
{code}

  was:
AQE plan is expected to display final plan after execution. This is not true 
for cached SQL plan: it will show the initial plan instead. This behavior 
change is introduced in [https://github.com/apache/spark/pull/40812] it tried 
to fix the concurrency issue with cached plan.  I don't have a clear idea how 
yet, maybe we can check whether the AQE plan is finalized(make the final flag 
atomic first, of course), if not we can return the cloned one, otherwise it's 
thread-safe to return the final one, since it's immutable.

 

A simple repro:
{code:java}
d1 = spark.range(1000).withColumn("key", expr("id % 
100")).groupBy("key").agg({"key": "count"})
cached_d2 = d1.cache()
df = cached_d2.withColumn("key2", expr("key % 
10")).groupBy("key2").agg({"key2": "count"})
df.collect() {code}
{code:java}
Row(key2=7, count(key2)=10), Row(key2=3, count(key2)=10), Row(key2=1, 
count(key2)=10), Row(key2=8, count(key2)=10)]
>>> df.explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   *(2) HashAggregate(keys=[key2#36L], functions=[count(key2#36L)])
   +- AQEShuffleRead coalesced
      +- ShuffleQueryStage 1
         +- Exchange hashpartitioning(key2#36L, 200), ENSURE_REQUIREMENTS, 
[plan_id=83]
            +- *(1) HashAggregate(keys=[key2#36L], 
functions=[partial_count(key2#36L)])
               +- *(1) Project [(key#27L % 10) AS key2#36L]
                  +- TableCacheQueryStage 0
                     +- InMemoryTableScan [key#27L]
                           +- InMemoryRelation [key#27L, count(key)#33L], 
StorageLevel(disk, memory, deserialized, 1 replicas)
                                 +- AdaptiveSparkPlan isFinalPlan=false
                                    +- HashAggregate(keys=[key#4L],