[ https://issues.apache.org/jira/browse/SPARK-47177?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
XiDuo You updated SPARK-47177: ------------------------------ Fix Version/s: 3.4.3 > Cached SQL plan do not display final AQE plan in explain string > --------------------------------------------------------------- > > Key: SPARK-47177 > URL: https://issues.apache.org/jira/browse/SPARK-47177 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 3.4.2, 3.5.0, 4.0.0, 3.5.1, 3.5.2 > Reporter: Ziqi Liu > Assignee: XiDuo You > Priority: Major > Labels: pull-request-available > Fix For: 4.0.0, 3.5.2, 3.4.3 > > > AQE plan is expected to display final plan after execution. This is not true > for cached SQL plan: it will show the initial plan instead. This behavior > change is introduced in [https://github.com/apache/spark/pull/40812] it tried > to fix the concurrency issue with cached plan. > *In short, the plan used to executed and the plan used to explain is not the > same instance, thus causing the inconsistency.* > > I don't have a clear idea how yet > * maybe we just a coarse granularity lock in explain? > * make innerChildren a function: clone the initial plan, every time checked > for whether the original AQE plan is finalized (making the final flag atomic > first, of course), if no return the cloned initial plan, if it's finalized, > clone the final plan and return that one. But still this won't be able to > reflect the AQE plan in real time, in a concurrent situation, but at least we > have initial version and final version. > > A simple repro: > {code:java} > d1 = spark.range(1000).withColumn("key", expr("id % > 100")).groupBy("key").agg({"key": "count"}) > cached_d2 = d1.cache() > df = cached_d2.filter("key > 10") > df.collect() {code} > {code:java} > >>> df.explain() > == Physical Plan == > AdaptiveSparkPlan isFinalPlan=true > +- == Final Plan == > *(1) Filter (isnotnull(key#4L) AND (key#4L > 10)) > +- TableCacheQueryStage 0 > +- InMemoryTableScan [key#4L, count(key)#10L], [isnotnull(key#4L), > (key#4L > 10)] > +- InMemoryRelation [key#4L, count(key)#10L], StorageLevel(disk, > memory, deserialized, 1 replicas) > +- AdaptiveSparkPlan isFinalPlan=false > +- HashAggregate(keys=[key#4L], > functions=[count(key#4L)]) > +- Exchange hashpartitioning(key#4L, 200), > ENSURE_REQUIREMENTS, [plan_id=24] > +- HashAggregate(keys=[key#4L], > functions=[partial_count(key#4L)]) > +- Project [(id#2L % 100) AS key#4L] > +- Range (0, 1000, step=1, splits=10) > +- == Initial Plan == > Filter (isnotnull(key#4L) AND (key#4L > 10)) > +- InMemoryTableScan [key#4L, count(key)#10L], [isnotnull(key#4L), (key#4L > > 10)] > +- InMemoryRelation [key#4L, count(key)#10L], StorageLevel(disk, > memory, deserialized, 1 replicas) > +- AdaptiveSparkPlan isFinalPlan=false > +- HashAggregate(keys=[key#4L], functions=[count(key#4L)]) > +- Exchange hashpartitioning(key#4L, 200), > ENSURE_REQUIREMENTS, [plan_id=24] > +- HashAggregate(keys=[key#4L], > functions=[partial_count(key#4L)]) > +- Project [(id#2L % 100) AS key#4L] > +- Range (0, 1000, step=1, splits=10){code} -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org