[ 
https://issues.apache.org/jira/browse/SPARK-45443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17773050#comment-17773050
 ] 

Eren Avsarogullari commented on SPARK-45443:
--------------------------------------------

Hi [~ulysses], 

Firstly, thanks for reply.

For queries using AQE, if TableCacheQueryStage flow is disabled, IMR 
materialization will be triggered by ShuffleQueryStage (introduced by Sort' s 
Exchange node). Both ShuffleQueryStage nodes will also need to materialize same 
IMR instance in this case so i believe same issue may also occur in previous 
flow. TableCacheQueryStage materializes IMR eagerly as different from previous 
flow. Can this increase probability of concurrent IMR materialization for same 
IMR instance?

I think this behavior is not visible when IMR cached data size is low. However, 
replicated IMR materialization can be expensive and introduce regression when 
IMR cached data size is high (e.g: observing this behavior when IMR needs to 
read high shuffle data size). Also, the queries can have multiple IMR instances 
by referencing multiple replicated IMR instances, this can also increase 
probability of concurrent IMR materialization for same IMR instance.

Thinking on potential following solutions options if makes sense:
For queries using AQE, can introducing TableCacheQueryStage into PhysicalPlan 
once per unique IMR instance help? IMR instances can be compared if they are 
equivalent before TableCacheQueryStage is created by AdaptiveSparkPlanExec.

> Revisit TableCacheQueryStage to avoid replicated InMemoryRelation 
> materialization
> ---------------------------------------------------------------------------------
>
>                 Key: SPARK-45443
>                 URL: https://issues.apache.org/jira/browse/SPARK-45443
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.5.0
>            Reporter: Eren Avsarogullari
>            Priority: Major
>         Attachments: IMR Materialization - Stage 2.png, IMR Materialization - 
> Stage 3.png
>
>
> TableCacheQueryStage is created per InMemoryTableScanExec by 
> AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output 
> (cached RDD) to provide runtime stats in order to apply AQE  optimizations 
> into remaining physical plan stages. TableCacheQueryStage materializes 
> InMemoryTableScanExec eagerly by submitting job per TableCacheQueryStage 
> instance. For example, if there are 2 TableCacheQueryStage instances 
> referencing same IMR instance (cached RDD) and first InMemoryTableScanExec' s 
> materialization takes longer, following logic will return false 
> (inMemoryTableScan.isMaterialized => false) and this may cause replicated IMR 
> materialization. This behavior can be more visible when cached RDD size is 
> high.
> [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281]
> Would like to get community feedback. Thanks in advance.
> cc [~ulysses] [~cloud_fan]
> *Sample Query to simulate the problem:*
> // Both join legs uses same IMR instance
> {code:java}
> import spark.implicits._
> val arr = (1 to 12).map { i => {
>     val index = i % 5
>     (index, s"Employee_$index", s"Department_$index")
>   }
> }
> val df = arr.toDF("id", "name", "department")
>   .filter('id >= 0)
>   .sort("id")
>   .groupBy('id, 'name, 'department)
>   .count().as("count")
> df.persist()
> val df2 = df.sort("count").filter('count <= 2)
> val df3 = df.sort("count").filter('count >= 3)
> val df4 = df2.join(df3, Seq("id", "name", "department"), "fullouter")
> df4.show() {code}
> *Physical Plan:*
> {code:java}
> == Physical Plan ==
> AdaptiveSparkPlan (31)
> +- == Final Plan ==
>    CollectLimit (21)
>    +- * Project (20)
>       +- * SortMergeJoin FullOuter (19)
>          :- * Sort (10)
>          :  +- * Filter (9)
>          :     +- TableCacheQueryStage (8), Statistics(sizeInBytes=210.0 B, 
> rowCount=5)
>          :        +- InMemoryTableScan (1)
>          :              +- InMemoryRelation (2)
>          :                    +- AdaptiveSparkPlan (7)
>          :                       +- HashAggregate (6)
>          :                          +- Exchange (5)
>          :                             +- HashAggregate (4)
>          :                                +- LocalTableScan (3)
>          +- * Sort (18)
>             +- * Filter (17)
>                +- TableCacheQueryStage (16), Statistics(sizeInBytes=210.0 B, 
> rowCount=5)
>                   +- InMemoryTableScan (11)
>                         +- InMemoryRelation (12)
>                               +- AdaptiveSparkPlan (15)
>                                  +- HashAggregate (14)
>                                     +- Exchange (13)
>                                        +- HashAggregate (4)
>                                           +- LocalTableScan (3) {code}
> *Stages DAGs materializing the same IMR instance:*
> !IMR Materialization - Stage 2.png|width=303,height=134!
> !IMR Materialization - Stage 3.png|width=303,height=134!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to