[jira] [Commented] (SPARK-37442) In AQE, wrong InMemoryRelation size estimation causes "Cannot broadcast the table that is larger than 8GB: 8 GB" failure

2022-08-16 Thread Dongjoon Hyun (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-37442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17580369#comment-17580369
 ] 

Dongjoon Hyun commented on SPARK-37442:
---

Hi, [~irelandbird]. Apache Spark 2.4 and 3.0 are End-Of-Life release . Please 
try to use the latest Apache Spark version like 3.3.0.

> In AQE, wrong InMemoryRelation size estimation causes "Cannot broadcast the 
> table that is larger than 8GB: 8 GB" failure
> 
>
> Key: SPARK-37442
> URL: https://issues.apache.org/jira/browse/SPARK-37442
> Project: Spark
>  Issue Type: Sub-task
>  Components: Optimizer, SQL
>Affects Versions: 3.1.1, 3.2.0
>Reporter: Michael Chen
>Assignee: Michael Chen
>Priority: Major
> Fix For: 3.2.1, 3.3.0
>
>
> There is a period in time where an InMemoryRelation will have the cached 
> buffers loaded, but the statistics will be inaccurate (anywhere between 0 -> 
> size in bytes reported by accumulators). When AQE is enabled, it is possible 
> that join planning strategies will happen in this window. In this scenario, 
> join children sizes including InMemoryRelation are greatly underestimated and 
> a broadcast join can be planned when it shouldn't be. We have seen scenarios 
> where a broadcast join is planned with the builder size greater than 8GB 
> because at planning time, the optimizer believes the InMemoryRelation is 0 
> bytes.
> Here is an example test case where the broadcast threshold is being ignored. 
> It can mimic the 8GB error by increasing the size of the tables.
> {code:java}
> withSQLConf(
>   SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
>   SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1048584") {
>   // Spark estimates a string column as 20 bytes so with 60k rows, these 
> relations should be
>   // estimated at ~120m bytes which is greater than the broadcast join 
> threshold
>   Seq.fill(6)("a").toDF("key")
> .createOrReplaceTempView("temp")
>   Seq.fill(6)("b").toDF("key")
> .createOrReplaceTempView("temp2")
>   Seq("a").toDF("key").createOrReplaceTempView("smallTemp")
>   spark.sql("SELECT key as newKey FROM temp").persist()
>   val query =
>   s"""
>  |SELECT t3.newKey
>  |FROM
>  |  (SELECT t1.newKey
>  |  FROM (SELECT key as newKey FROM temp) as t1
>  |JOIN
>  |(SELECT key FROM smallTemp) as t2
>  |ON t1.newKey = t2.key
>  |  ) as t3
>  |  JOIN
>  |  (SELECT key FROM temp2) as t4
>  |  ON t3.newKey = t4.key
>  |UNION
>  |SELECT t1.newKey
>  |FROM
>  |(SELECT key as newKey FROM temp) as t1
>  |JOIN
>  |(SELECT key FROM temp2) as t2
>  |ON t1.newKey = t2.key
>  |""".stripMargin
>   val df = spark.sql(query)
>   df.collect()
>   val adaptivePlan = df.queryExecution.executedPlan
>   val bhj = findTopLevelBroadcastHashJoin(adaptivePlan)
>   assert(bhj.length == 1) {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-37442) In AQE, wrong InMemoryRelation size estimation causes "Cannot broadcast the table that is larger than 8GB: 8 GB" failure

2022-08-15 Thread EmmaYang (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-37442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17579969#comment-17579969
 ] 

EmmaYang commented on SPARK-37442:
--

Hello, I exactly have this issue. and I am usign spark2.4

so my broadcast dataframe is built on top of files.  and the oveall files size 
is > 12gb, but I only use the sub dataframe, and in STORAGE, it showed only 2.5 
GB,  but still give me the broadcast hit 8GB error

 

so any workaround solution for it ?

 

Thank you.

 

: : : +- ResolvedHint (broadcast) : : : +- Filter isnotnull(invlv_pty_id#5078) 
: : : +- InMemoryRelation [invlv_pty_id#5078, invlv_pty_id#5078], 
StorageLevel(disk, memory, deserialized, 1 replicas) : : : +- *(1) Project 
[invlv_pty_id#5078, invlv_pty_id#5078] : : : +- *(1) FileScan csv 
[invlv_pty_id#5078] Batched: false, Format: CSV, Location: 
InMemoryFileIndex[hdfs://gftsdev/data/gfrrsnsd/standardization/hive/gfrrsnsd_standardization/trl_...,
 PartitionFilters: [], PushedFilters: [], ReadSchema: 
struct : : +- ResolvedHint (broadcast) : : +- Filter 
isnotnull(invlv_pty_id#5078) : : +- InMemoryRelation [invlv_pty_id#5078, 
invlv_pty_id#5078], StorageLevel(disk, memory, deserialized, 1 replicas) : : +- 
*(1) Project [invlv_pty_id#5078, invlv_pty_id#5078] : : +- *(1) FileScan csv 
[invlv_pty_id#5078] Batched: false, Format: CSV, Location: 
InMemoryFileIndex[hdfs://gftsdev/data/gfrrsnsd/standardization/hive/gfrrsnsd_standardization/trl_...,
 PartitionFilters: [], PushedFilters: [], ReadSchema: 
struct : +- ResolvedHint (broadcast) : +- Filter 
isnotnull(invlv_pty_id#5078) : +- InMemoryRelation [invlv_pty_id#5078, 
invlv_pty_id#5078], StorageLevel(disk, memory, deserialized, 1 replicas) : +- 
*(1) Project [invlv_pty_id#5078, invlv_pty_id#5078] : +- *(1) FileScan csv 
[invlv_pty_id#5078] Batched: false, Format: CSV, Location: 
InMemoryFileIndex[hdfs://gftsdev/data/gfrrsnsd/standardization/hive/gfrrsnsd_standardization/trl_...,
 PartitionFilters: [], PushedFilters: [], ReadSchema: 
struct +- ResolvedHint (broadcast) +- Filter 
isnotnull(invlv_pty_id#5078) +- InMemoryRelation [invlv_pty_id#5078, 
invlv_pty_id#5078], StorageLevel(disk, memory, deserialized, 1 replicas) +- 
*(1) Project [invlv_pty_id#5078, invlv_pty_id#5078] +- *(1) FileScan csv 
[invlv_pty_id#5078] Batched: false, Format: CSV, Location: 
InMemoryFileIndex[hdfs://gftsdev/data/gfrrsnsd/standardization/hive/gfrrsnsd_standardization/trl_...,
 PartitionFilters: [], PushedFilters: [], ReadSchema: 
struct

> In AQE, wrong InMemoryRelation size estimation causes "Cannot broadcast the 
> table that is larger than 8GB: 8 GB" failure
> 
>
> Key: SPARK-37442
> URL: https://issues.apache.org/jira/browse/SPARK-37442
> Project: Spark
>  Issue Type: Sub-task
>  Components: Optimizer, SQL
>Affects Versions: 3.1.1, 3.2.0
>Reporter: Michael Chen
>Assignee: Michael Chen
>Priority: Major
> Fix For: 3.2.1, 3.3.0
>
>
> There is a period in time where an InMemoryRelation will have the cached 
> buffers loaded, but the statistics will be inaccurate (anywhere between 0 -> 
> size in bytes reported by accumulators). When AQE is enabled, it is possible 
> that join planning strategies will happen in this window. In this scenario, 
> join children sizes including InMemoryRelation are greatly underestimated and 
> a broadcast join can be planned when it shouldn't be. We have seen scenarios 
> where a broadcast join is planned with the builder size greater than 8GB 
> because at planning time, the optimizer believes the InMemoryRelation is 0 
> bytes.
> Here is an example test case where the broadcast threshold is being ignored. 
> It can mimic the 8GB error by increasing the size of the tables.
> {code:java}
> withSQLConf(
>   SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
>   SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1048584") {
>   // Spark estimates a string column as 20 bytes so with 60k rows, these 
> relations should be
>   // estimated at ~120m bytes which is greater than the broadcast join 
> threshold
>   Seq.fill(6)("a").toDF("key")
> .createOrReplaceTempView("temp")
>   Seq.fill(6)("b").toDF("key")
> .createOrReplaceTempView("temp2")
>   Seq("a").toDF("key").createOrReplaceTempView("smallTemp")
>   spark.sql("SELECT key as newKey FROM temp").persist()
>   val query =
>   s"""
>  |SELECT t3.newKey
>  |FROM
>  |  (SELECT t1.newKey
>  |  FROM (SELECT key as newKey FROM temp) as t1
>  |JOIN
>  |(SELECT key FROM smallTemp) as t2
>  |ON t1.newKey = t2.key
>  |  ) as t3
>  |  JOIN
>  |  (SELECT key FROM temp2) as t4
>  |  ON t3.newKey = t4.key
>  |UNION
>  |SELECT t1.newKey
>   

[jira] [Commented] (SPARK-37442) In AQE, wrong InMemoryRelation size estimation causes "Cannot broadcast the table that is larger than 8GB: 8 GB" failure

2022-01-27 Thread Dongjoon Hyun (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-37442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17483405#comment-17483405
 ] 

Dongjoon Hyun commented on SPARK-37442:
---

I converted this into SPARK-37063's subtask.

> In AQE, wrong InMemoryRelation size estimation causes "Cannot broadcast the 
> table that is larger than 8GB: 8 GB" failure
> 
>
> Key: SPARK-37442
> URL: https://issues.apache.org/jira/browse/SPARK-37442
> Project: Spark
>  Issue Type: Sub-task
>  Components: Optimizer, SQL
>Affects Versions: 3.1.1, 3.2.0
>Reporter: Michael Chen
>Assignee: Michael Chen
>Priority: Major
> Fix For: 3.2.1, 3.3.0
>
>
> There is a period in time where an InMemoryRelation will have the cached 
> buffers loaded, but the statistics will be inaccurate (anywhere between 0 -> 
> size in bytes reported by accumulators). When AQE is enabled, it is possible 
> that join planning strategies will happen in this window. In this scenario, 
> join children sizes including InMemoryRelation are greatly underestimated and 
> a broadcast join can be planned when it shouldn't be. We have seen scenarios 
> where a broadcast join is planned with the builder size greater than 8GB 
> because at planning time, the optimizer believes the InMemoryRelation is 0 
> bytes.
> Here is an example test case where the broadcast threshold is being ignored. 
> It can mimic the 8GB error by increasing the size of the tables.
> {code:java}
> withSQLConf(
>   SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
>   SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1048584") {
>   // Spark estimates a string column as 20 bytes so with 60k rows, these 
> relations should be
>   // estimated at ~120m bytes which is greater than the broadcast join 
> threshold
>   Seq.fill(6)("a").toDF("key")
> .createOrReplaceTempView("temp")
>   Seq.fill(6)("b").toDF("key")
> .createOrReplaceTempView("temp2")
>   Seq("a").toDF("key").createOrReplaceTempView("smallTemp")
>   spark.sql("SELECT key as newKey FROM temp").persist()
>   val query =
>   s"""
>  |SELECT t3.newKey
>  |FROM
>  |  (SELECT t1.newKey
>  |  FROM (SELECT key as newKey FROM temp) as t1
>  |JOIN
>  |(SELECT key FROM smallTemp) as t2
>  |ON t1.newKey = t2.key
>  |  ) as t3
>  |  JOIN
>  |  (SELECT key FROM temp2) as t4
>  |  ON t3.newKey = t4.key
>  |UNION
>  |SELECT t1.newKey
>  |FROM
>  |(SELECT key as newKey FROM temp) as t1
>  |JOIN
>  |(SELECT key FROM temp2) as t2
>  |ON t1.newKey = t2.key
>  |""".stripMargin
>   val df = spark.sql(query)
>   df.collect()
>   val adaptivePlan = df.queryExecution.executedPlan
>   val bhj = findTopLevelBroadcastHashJoin(adaptivePlan)
>   assert(bhj.length == 1) {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-37442) In AQE, wrong InMemoryRelation size estimation causes "Cannot broadcast the table that is larger than 8GB: 8 GB" failure

2021-11-22 Thread Apache Spark (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-37442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17447577#comment-17447577
 ] 

Apache Spark commented on SPARK-37442:
--

User 'ChenMichael' has created a pull request for this issue:
https://github.com/apache/spark/pull/34684

> In AQE, wrong InMemoryRelation size estimation causes "Cannot broadcast the 
> table that is larger than 8GB: 8 GB" failure
> 
>
> Key: SPARK-37442
> URL: https://issues.apache.org/jira/browse/SPARK-37442
> Project: Spark
>  Issue Type: Bug
>  Components: Optimizer, SQL
>Affects Versions: 3.1.1, 3.2.0
>Reporter: Michael Chen
>Priority: Major
>
> There is a period in time where an InMemoryRelation will have the cached 
> buffers loaded, but the statistics will be inaccurate (anywhere between 0 -> 
> size in bytes reported by accumulators). When AQE is enabled, it is possible 
> that join planning strategies will happen in this window. In this scenario, 
> join children sizes including InMemoryRelation are greatly underestimated and 
> a broadcast join can be planned when it shouldn't be. We have seen scenarios 
> where a broadcast join is planned with the builder size greater than 8GB 
> because at planning time, the optimizer believes the InMemoryRelation is 0 
> bytes.
> Here is an example test case where the broadcast threshold is being ignored. 
> It can mimic the 8GB error by increasing the size of the tables.
> {code:java}
> withSQLConf(
>   SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
>   SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1048584") {
>   // Spark estimates a string column as 20 bytes so with 60k rows, these 
> relations should be
>   // estimated at ~120m bytes which is greater than the broadcast join 
> threshold
>   Seq.fill(6)("a").toDF("key")
> .createOrReplaceTempView("temp")
>   Seq.fill(6)("b").toDF("key")
> .createOrReplaceTempView("temp2")
>   Seq("a").toDF("key").createOrReplaceTempView("smallTemp")
>   spark.sql("SELECT key as newKey FROM temp").persist()
>   val query =
>   s"""
>  |SELECT t3.newKey
>  |FROM
>  |  (SELECT t1.newKey
>  |  FROM (SELECT key as newKey FROM temp) as t1
>  |JOIN
>  |(SELECT key FROM smallTemp) as t2
>  |ON t1.newKey = t2.key
>  |  ) as t3
>  |  JOIN
>  |  (SELECT key FROM temp2) as t4
>  |  ON t3.newKey = t4.key
>  |UNION
>  |SELECT t1.newKey
>  |FROM
>  |(SELECT key as newKey FROM temp) as t1
>  |JOIN
>  |(SELECT key FROM temp2) as t2
>  |ON t1.newKey = t2.key
>  |""".stripMargin
>   val df = spark.sql(query)
>   df.collect()
>   val adaptivePlan = df.queryExecution.executedPlan
>   val bhj = findTopLevelBroadcastHashJoin(adaptivePlan)
>   assert(bhj.length == 1) {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org