[ 
https://issues.apache.org/jira/browse/SPARK-37442?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-37442:
------------------------------------

    Assignee: Apache Spark

> In AQE, wrong InMemoryRelation size estimation causes "Cannot broadcast the 
> table that is larger than 8GB: 8 GB" failure
> ------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-37442
>                 URL: https://issues.apache.org/jira/browse/SPARK-37442
>             Project: Spark
>          Issue Type: Bug
>          Components: Optimizer, SQL
>    Affects Versions: 3.1.1, 3.2.0
>            Reporter: Michael Chen
>            Assignee: Apache Spark
>            Priority: Major
>
> There is a period in time where an InMemoryRelation will have the cached 
> buffers loaded, but the statistics will be inaccurate (anywhere between 0 -> 
> size in bytes reported by accumulators). When AQE is enabled, it is possible 
> that join planning strategies will happen in this window. In this scenario, 
> join children sizes including InMemoryRelation are greatly underestimated and 
> a broadcast join can be planned when it shouldn't be. We have seen scenarios 
> where a broadcast join is planned with the builder size greater than 8GB 
> because at planning time, the optimizer believes the InMemoryRelation is 0 
> bytes.
> Here is an example test case where the broadcast threshold is being ignored. 
> It can mimic the 8GB error by increasing the size of the tables.
> {code:java}
> withSQLConf(
>   SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
>   SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1048584") {
>   // Spark estimates a string column as 20 bytes so with 60k rows, these 
> relations should be
>   // estimated at ~120m bytes which is greater than the broadcast join 
> threshold
>   Seq.fill(60000)("a").toDF("key")
>     .createOrReplaceTempView("temp")
>   Seq.fill(60000)("b").toDF("key")
>     .createOrReplaceTempView("temp2")
>   Seq("a").toDF("key").createOrReplaceTempView("smallTemp")
>   spark.sql("SELECT key as newKey FROM temp").persist()
>   val query =
>   s"""
>      |SELECT t3.newKey
>      |FROM
>      |  (SELECT t1.newKey
>      |  FROM (SELECT key as newKey FROM temp) as t1
>      |        JOIN
>      |        (SELECT key FROM smallTemp) as t2
>      |        ON t1.newKey = t2.key
>      |  ) as t3
>      |  JOIN
>      |  (SELECT key FROM temp2) as t4
>      |  ON t3.newKey = t4.key
>      |UNION
>      |SELECT t1.newKey
>      |FROM
>      |    (SELECT key as newKey FROM temp) as t1
>      |    JOIN
>      |    (SELECT key FROM temp2) as t2
>      |    ON t1.newKey = t2.key
>      |""".stripMargin
>   val df = spark.sql(query)
>   df.collect()
>   val adaptivePlan = df.queryExecution.executedPlan
>   val bhj = findTopLevelBroadcastHashJoin(adaptivePlan)
>   assert(bhj.length == 1) {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to