[
https://issues.apache.org/jira/browse/SPARK-47177?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Dongjoon Hyun closed SPARK-47177.
---------------------------------
> Cached SQL plan do not display final AQE plan in explain string
> ---------------------------------------------------------------
>
> Key: SPARK-47177
> URL: https://issues.apache.org/jira/browse/SPARK-47177
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 3.4.2, 3.5.0, 3.5.1, 3.5.2, 4.0.0
> Reporter: Ziqi Liu
> Assignee: XiDuo You
> Priority: Major
> Labels: pull-request-available
> Fix For: 3.5.2, 3.4.3, 4.0.0
>
>
> AQE plan is expected to display final plan after execution. This is not true
> for cached SQL plan: it will show the initial plan instead. This behavior
> change is introduced in [https://github.com/apache/spark/pull/40812] it tried
> to fix the concurrency issue with cached plan.
> *In short, the plan used to executed and the plan used to explain is not the
> same instance, thus causing the inconsistency.*
>
> I don't have a clear idea how yet
> * maybe we just a coarse granularity lock in explain?
> * make innerChildren a function: clone the initial plan, every time checked
> for whether the original AQE plan is finalized (making the final flag atomic
> first, of course), if no return the cloned initial plan, if it's finalized,
> clone the final plan and return that one. But still this won't be able to
> reflect the AQE plan in real time, in a concurrent situation, but at least we
> have initial version and final version.
>
> A simple repro:
> {code:java}
> d1 = spark.range(1000).withColumn("key", expr("id %
> 100")).groupBy("key").agg({"key": "count"})
> cached_d2 = d1.cache()
> df = cached_d2.filter("key > 10")
> df.collect() {code}
> {code:java}
> >>> df.explain()
> == Physical Plan ==
> AdaptiveSparkPlan isFinalPlan=true
> +- == Final Plan ==
> *(1) Filter (isnotnull(key#4L) AND (key#4L > 10))
> +- TableCacheQueryStage 0
> +- InMemoryTableScan [key#4L, count(key)#10L], [isnotnull(key#4L),
> (key#4L > 10)]
> +- InMemoryRelation [key#4L, count(key)#10L], StorageLevel(disk,
> memory, deserialized, 1 replicas)
> +- AdaptiveSparkPlan isFinalPlan=false
> +- HashAggregate(keys=[key#4L],
> functions=[count(key#4L)])
> +- Exchange hashpartitioning(key#4L, 200),
> ENSURE_REQUIREMENTS, [plan_id=24]
> +- HashAggregate(keys=[key#4L],
> functions=[partial_count(key#4L)])
> +- Project [(id#2L % 100) AS key#4L]
> +- Range (0, 1000, step=1, splits=10)
> +- == Initial Plan ==
> Filter (isnotnull(key#4L) AND (key#4L > 10))
> +- InMemoryTableScan [key#4L, count(key)#10L], [isnotnull(key#4L), (key#4L
> > 10)]
> +- InMemoryRelation [key#4L, count(key)#10L], StorageLevel(disk,
> memory, deserialized, 1 replicas)
> +- AdaptiveSparkPlan isFinalPlan=false
> +- HashAggregate(keys=[key#4L], functions=[count(key#4L)])
> +- Exchange hashpartitioning(key#4L, 200),
> ENSURE_REQUIREMENTS, [plan_id=24]
> +- HashAggregate(keys=[key#4L],
> functions=[partial_count(key#4L)])
> +- Project [(id#2L % 100) AS key#4L]
> +- Range (0, 1000, step=1, splits=10){code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]