[ https://issues.apache.org/jira/browse/SPARK-41214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Eren Avsarogullari updated SPARK-41214: --------------------------------------- Summary: SubPlan metrics are missed when AQE is enabled under InMemoryRelation (was: Sub-plan metrics under InMemoryRelation are missed when AQE Cached DataFrame Support is enabled) > SubPlan metrics are missed when AQE is enabled under InMemoryRelation > --------------------------------------------------------------------- > > Key: SPARK-41214 > URL: https://issues.apache.org/jira/browse/SPARK-41214 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 3.4.0 > Reporter: Eren Avsarogullari > Priority: Major > Attachments: DAG when AQE=ON and AQECachedDFSupport=ON with fix.png, > DAG when AQE=ON and AQECachedDFSupport=ON without fix.png > > > *spark.sql.optimizer.canChangeCachedPlanOutputPartitioning* enables AQE > optimizations under InMemoryRelation(IMR) nodes. Following sample query has > IMR node on both BroadcastHashJoin legs. However, > when spark.sql.optimizer.canChangeCachedPlanOutputPartitioning = true, > following datas are missed due to lack of final sub-plans (under IMR) > submissions (into UI). > {code:java} > - Physical operators' SQLMetrics (before AdaptiveSparkPlan) are missed such > as Exchange and HashAggregate on left BHJ leg and HashAggregate on right BHJ > leg, > - WSCG blocks are missed on left BHJ leg, > - AQEShuffleRead node is missed on left BHJ leg. {code} > *Sample to reproduce:* > {code:java} > val spark = SparkSession > .builder() > .config("spark.sql.adaptive.enabled", "true") > .config("spark.sql.optimizer.canChangeCachedPlanOutputPartitioning", > "true") > .master("local[*]") > .getOrCreate() > import spark.implicits._ > // Create 1th DF > val arr = Seq( > (1, "Employee_1", "Department_1"), > (2, "Employee_2", "Department_2")) > val df = arr.toDF("id", "name", "department") > .filter('id < 3) > .groupBy('name) > .count() > df.cache() > // Create 2th DF > val arr2 = Seq((1, "Employee_1", "Department_1")) > val df2 = arr2.toDF("id", "name", "department") > .filter('id > 0) > .groupBy('name) > .count() > df2.cache() > // Trigger query execution > val df3 = df.join(df2, "name") > df3.show() {code} > *DAG before fix:* > *!DAG when AQE=ON and AQECachedDFSupport=ON without > fix.png|width=33,height=86!* > *DAG after fix:* > *!DAG when AQE=ON and AQECachedDFSupport=ON with fix.png|width=33,height=82!* -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org