[ https://issues.apache.org/jira/browse/SPARK-47177?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ziqi Liu updated SPARK-47177: ----------------------------- Description: AQE plan is expected to display final plan after execution. This is not true for cached SQL plan: it will show the initial plan instead. This behavior change is introduced in [https://github.com/apache/spark/pull/40812] it tried to fix the concurrency issue with cached plan. I don't have a clear idea how yet, maybe we can check whether the AQE plan is finalized(make the final flag atomic first, of course), if not we can return the cloned one, otherwise it's thread-safe to return the final one, since it's immutable. A simple repro: {code:java} d1 = spark.range(1000).withColumn("key", expr("id % 100")).groupBy("key").agg({"key": "count"}) cached_d2 = d1.cache() df = cached_d2.withColumn("key2", expr("key % 10")).groupBy("key2").agg({"key2": "count"}) df.collect() {code} {code:java} >>> df.explain() == Physical Plan == AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == *(2) HashAggregate(keys=[key2#36L], functions=[count(key2#36L)]) +- AQEShuffleRead coalesced +- ShuffleQueryStage 1 +- Exchange hashpartitioning(key2#36L, 200), ENSURE_REQUIREMENTS, [plan_id=83] +- *(1) HashAggregate(keys=[key2#36L], functions=[partial_count(key2#36L)]) +- *(1) Project [(key#27L % 10) AS key2#36L] +- TableCacheQueryStage 0 +- InMemoryTableScan [key#27L] +- InMemoryRelation [key#27L, count(key)#33L], StorageLevel(disk, memory, deserialized, 1 replicas) +- AdaptiveSparkPlan isFinalPlan=false +- HashAggregate(keys=[key#4L], functions=[count(key#4L)]) +- Exchange hashpartitioning(key#4L, 200), ENSURE_REQUIREMENTS, [plan_id=33] +- HashAggregate(keys=[key#4L], functions=[partial_count(key#4L)]) +- Project [(id#2L % 100) AS key#4L] +- Range (0, 1000, step=1, splits=10) +- == Initial Plan == HashAggregate(keys=[key2#36L], functions=[count(key2#36L)]) +- Exchange hashpartitioning(key2#36L, 200), ENSURE_REQUIREMENTS, [plan_id=30] +- HashAggregate(keys=[key2#36L], functions=[partial_count(key2#36L)]) +- Project [(key#27L % 10) AS key2#36L] +- InMemoryTableScan [key#27L] +- InMemoryRelation [key#27L, count(key)#33L], StorageLevel(disk, memory, deserialized, 1 replicas) +- AdaptiveSparkPlan isFinalPlan=false +- HashAggregate(keys=[key#4L], functions=[count(key#4L)]) +- Exchange hashpartitioning(key#4L, 200), ENSURE_REQUIREMENTS, [plan_id=33] +- HashAggregate(keys=[key#4L], functions=[partial_count(key#4L)]) +- Project [(id#2L % 100) AS key#4L] +- Range (0, 1000, step=1, splits=10) {code} was: AQE plan is expected to display final plan after execution. This is not true for cached SQL plan: it will show the initial plan instead. This behavior change is introduced in [https://github.com/apache/spark/pull/40812] it tried to fix the concurrency issue with cached plan. I don't have a clear idea how yet, maybe we can check whether the AQE plan is finalized(make the final flag atomic first, of course), if not we can return the cloned one, otherwise it's thread-safe to return the final one, since it's immutable. A simple repro: {code:java} d1 = spark.range(1000).withColumn("key", expr("id % 100")).groupBy("key").agg({"key": "count"}) cached_d2 = d1.cache() df = cached_d2.withColumn("key2", expr("key % 10")).groupBy("key2").agg({"key2": "count"}) df.collect() {code} {code:java} Row(key2=7, count(key2)=10), Row(key2=3, count(key2)=10), Row(key2=1, count(key2)=10), Row(key2=8, count(key2)=10)] >>> df.explain() == Physical Plan == AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == *(2) HashAggregate(keys=[key2#36L], functions=[count(key2#36L)]) +- AQEShuffleRead coalesced +- ShuffleQueryStage 1 +- Exchange hashpartitioning(key2#36L, 200), ENSURE_REQUIREMENTS, [plan_id=83] +- *(1) HashAggregate(keys=[key2#36L], functions=[partial_count(key2#36L)]) +- *(1) Project [(key#27L % 10) AS key2#36L] +- TableCacheQueryStage 0 +- InMemoryTableScan [key#27L] +- InMemoryRelation [key#27L, count(key)#33L], StorageLevel(disk, memory, deserialized, 1 replicas) +- AdaptiveSparkPlan isFinalPlan=false +- HashAggregate(keys=[key#4L], functions=[count(key#4L)]) +- Exchange hashpartitioning(key#4L, 200), ENSURE_REQUIREMENTS, [plan_id=33] +- HashAggregate(keys=[key#4L], functions=[partial_count(key#4L)]) +- Project [(id#2L % 100) AS key#4L] +- Range (0, 1000, step=1, splits=10) +- == Initial Plan == HashAggregate(keys=[key2#36L], functions=[count(key2#36L)]) +- Exchange hashpartitioning(key2#36L, 200), ENSURE_REQUIREMENTS, [plan_id=30] +- HashAggregate(keys=[key2#36L], functions=[partial_count(key2#36L)]) +- Project [(key#27L % 10) AS key2#36L] +- InMemoryTableScan [key#27L] +- InMemoryRelation [key#27L, count(key)#33L], StorageLevel(disk, memory, deserialized, 1 replicas) +- AdaptiveSparkPlan isFinalPlan=false +- HashAggregate(keys=[key#4L], functions=[count(key#4L)]) +- Exchange hashpartitioning(key#4L, 200), ENSURE_REQUIREMENTS, [plan_id=33] +- HashAggregate(keys=[key#4L], functions=[partial_count(key#4L)]) +- Project [(id#2L % 100) AS key#4L] +- Range (0, 1000, step=1, splits=10) {code} > Cached SQL plan do not display final AQE plan in explain string > --------------------------------------------------------------- > > Key: SPARK-47177 > URL: https://issues.apache.org/jira/browse/SPARK-47177 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 3.4.2, 3.5.0, 4.0.0, 3.5.1, 3.5.2 > Reporter: Ziqi Liu > Priority: Major > > AQE plan is expected to display final plan after execution. This is not true > for cached SQL plan: it will show the initial plan instead. This behavior > change is introduced in [https://github.com/apache/spark/pull/40812] it tried > to fix the concurrency issue with cached plan. I don't have a clear idea how > yet, maybe we can check whether the AQE plan is finalized(make the final flag > atomic first, of course), if not we can return the cloned one, otherwise it's > thread-safe to return the final one, since it's immutable. > > A simple repro: > {code:java} > d1 = spark.range(1000).withColumn("key", expr("id % > 100")).groupBy("key").agg({"key": "count"}) > cached_d2 = d1.cache() > df = cached_d2.withColumn("key2", expr("key % > 10")).groupBy("key2").agg({"key2": "count"}) > df.collect() {code} > {code:java} > >>> df.explain() > == Physical Plan == > AdaptiveSparkPlan isFinalPlan=true > +- == Final Plan == > *(2) HashAggregate(keys=[key2#36L], functions=[count(key2#36L)]) > +- AQEShuffleRead coalesced > +- ShuffleQueryStage 1 > +- Exchange hashpartitioning(key2#36L, 200), ENSURE_REQUIREMENTS, > [plan_id=83] > +- *(1) HashAggregate(keys=[key2#36L], > functions=[partial_count(key2#36L)]) > +- *(1) Project [(key#27L % 10) AS key2#36L] > +- TableCacheQueryStage 0 > +- InMemoryTableScan [key#27L] > +- InMemoryRelation [key#27L, count(key)#33L], > StorageLevel(disk, memory, deserialized, 1 replicas) > +- AdaptiveSparkPlan isFinalPlan=false > +- HashAggregate(keys=[key#4L], > functions=[count(key#4L)]) > +- Exchange hashpartitioning(key#4L, > 200), ENSURE_REQUIREMENTS, [plan_id=33] > +- HashAggregate(keys=[key#4L], > functions=[partial_count(key#4L)]) > +- Project [(id#2L % 100) AS > key#4L] > +- Range (0, 1000, step=1, > splits=10) > +- == Initial Plan == > HashAggregate(keys=[key2#36L], functions=[count(key2#36L)]) > +- Exchange hashpartitioning(key2#36L, 200), ENSURE_REQUIREMENTS, > [plan_id=30] > +- HashAggregate(keys=[key2#36L], functions=[partial_count(key2#36L)]) > +- Project [(key#27L % 10) AS key2#36L] > +- InMemoryTableScan [key#27L] > +- InMemoryRelation [key#27L, count(key)#33L], > StorageLevel(disk, memory, deserialized, 1 replicas) > +- AdaptiveSparkPlan isFinalPlan=false > +- HashAggregate(keys=[key#4L], > functions=[count(key#4L)]) > +- Exchange hashpartitioning(key#4L, 200), > ENSURE_REQUIREMENTS, [plan_id=33] > +- HashAggregate(keys=[key#4L], > functions=[partial_count(key#4L)]) > +- Project [(id#2L % 100) AS key#4L] > +- Range (0, 1000, step=1, splits=10) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org