[ 
https://issues.apache.org/jira/browse/SPARK-41214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eren Avsarogullari updated SPARK-41214:
---------------------------------------
    Summary: SubPlan metrics are missed when AQE is enabled under 
InMemoryRelation  (was: Sub-plan metrics under InMemoryRelation are missed when 
AQE Cached DataFrame Support is enabled)

> SubPlan metrics are missed when AQE is enabled under InMemoryRelation
> ---------------------------------------------------------------------
>
>                 Key: SPARK-41214
>                 URL: https://issues.apache.org/jira/browse/SPARK-41214
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.4.0
>            Reporter: Eren Avsarogullari
>            Priority: Major
>         Attachments: DAG when AQE=ON and AQECachedDFSupport=ON with fix.png, 
> DAG when AQE=ON and AQECachedDFSupport=ON without fix.png
>
>
> *spark.sql.optimizer.canChangeCachedPlanOutputPartitioning* enables AQE 
> optimizations under InMemoryRelation(IMR) nodes. Following sample query has 
> IMR node on both BroadcastHashJoin legs. However, 
> when spark.sql.optimizer.canChangeCachedPlanOutputPartitioning = true, 
> following datas are missed due to lack of final sub-plans (under IMR) 
> submissions (into UI).
> {code:java}
> - Physical operators' SQLMetrics (before AdaptiveSparkPlan) are missed such 
> as Exchange and HashAggregate on left BHJ leg and HashAggregate on right BHJ 
> leg,
> - WSCG blocks are missed on left BHJ leg, 
> - AQEShuffleRead node is missed on left BHJ leg. {code}
> *Sample to reproduce:*
> {code:java}
> val spark = SparkSession
>     .builder()
>     .config("spark.sql.adaptive.enabled", "true")
>     .config("spark.sql.optimizer.canChangeCachedPlanOutputPartitioning", 
> "true")
>     .master("local[*]")
>     .getOrCreate()
> import spark.implicits._
> // Create 1th DF 
> val arr = Seq(
>   (1, "Employee_1", "Department_1"),
>   (2, "Employee_2", "Department_2"))
> val df = arr.toDF("id", "name", "department")
>   .filter('id < 3)
>   .groupBy('name)
>   .count()
> df.cache()
> // Create 2th DF
> val arr2 = Seq((1, "Employee_1", "Department_1"))
> val df2 = arr2.toDF("id", "name", "department")
>   .filter('id > 0)
>   .groupBy('name)
>   .count()
> df2.cache()
> // Trigger query execution
> val df3 = df.join(df2, "name")
> df3.show() {code}
> *DAG before fix:*
> *!DAG when AQE=ON and AQECachedDFSupport=ON without 
> fix.png|width=33,height=86!*
> *DAG after fix:*
> *!DAG when AQE=ON and AQECachedDFSupport=ON with fix.png|width=33,height=82!*



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to