[jira] [Commented] (SPARK-37442) In AQE, wrong InMemoryRelation size estimation causes "Cannot broadcast the table that is larger than 8GB: 8 GB" failure
[ https://issues.apache.org/jira/browse/SPARK-37442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17580369#comment-17580369 ] Dongjoon Hyun commented on SPARK-37442: --- Hi, [~irelandbird]. Apache Spark 2.4 and 3.0 are End-Of-Life release . Please try to use the latest Apache Spark version like 3.3.0. > In AQE, wrong InMemoryRelation size estimation causes "Cannot broadcast the > table that is larger than 8GB: 8 GB" failure > > > Key: SPARK-37442 > URL: https://issues.apache.org/jira/browse/SPARK-37442 > Project: Spark > Issue Type: Sub-task > Components: Optimizer, SQL >Affects Versions: 3.1.1, 3.2.0 >Reporter: Michael Chen >Assignee: Michael Chen >Priority: Major > Fix For: 3.2.1, 3.3.0 > > > There is a period in time where an InMemoryRelation will have the cached > buffers loaded, but the statistics will be inaccurate (anywhere between 0 -> > size in bytes reported by accumulators). When AQE is enabled, it is possible > that join planning strategies will happen in this window. In this scenario, > join children sizes including InMemoryRelation are greatly underestimated and > a broadcast join can be planned when it shouldn't be. We have seen scenarios > where a broadcast join is planned with the builder size greater than 8GB > because at planning time, the optimizer believes the InMemoryRelation is 0 > bytes. > Here is an example test case where the broadcast threshold is being ignored. > It can mimic the 8GB error by increasing the size of the tables. > {code:java} > withSQLConf( > SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", > SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1048584") { > // Spark estimates a string column as 20 bytes so with 60k rows, these > relations should be > // estimated at ~120m bytes which is greater than the broadcast join > threshold > Seq.fill(6)("a").toDF("key") > .createOrReplaceTempView("temp") > Seq.fill(6)("b").toDF("key") > .createOrReplaceTempView("temp2") > Seq("a").toDF("key").createOrReplaceTempView("smallTemp") > spark.sql("SELECT key as newKey FROM temp").persist() > val query = > s""" > |SELECT t3.newKey > |FROM > | (SELECT t1.newKey > | FROM (SELECT key as newKey FROM temp) as t1 > |JOIN > |(SELECT key FROM smallTemp) as t2 > |ON t1.newKey = t2.key > | ) as t3 > | JOIN > | (SELECT key FROM temp2) as t4 > | ON t3.newKey = t4.key > |UNION > |SELECT t1.newKey > |FROM > |(SELECT key as newKey FROM temp) as t1 > |JOIN > |(SELECT key FROM temp2) as t2 > |ON t1.newKey = t2.key > |""".stripMargin > val df = spark.sql(query) > df.collect() > val adaptivePlan = df.queryExecution.executedPlan > val bhj = findTopLevelBroadcastHashJoin(adaptivePlan) > assert(bhj.length == 1) {code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-37442) In AQE, wrong InMemoryRelation size estimation causes "Cannot broadcast the table that is larger than 8GB: 8 GB" failure
[ https://issues.apache.org/jira/browse/SPARK-37442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17579969#comment-17579969 ] EmmaYang commented on SPARK-37442: -- Hello, I exactly have this issue. and I am usign spark2.4 so my broadcast dataframe is built on top of files. and the oveall files size is > 12gb, but I only use the sub dataframe, and in STORAGE, it showed only 2.5 GB, but still give me the broadcast hit 8GB error so any workaround solution for it ? Thank you. : : : +- ResolvedHint (broadcast) : : : +- Filter isnotnull(invlv_pty_id#5078) : : : +- InMemoryRelation [invlv_pty_id#5078, invlv_pty_id#5078], StorageLevel(disk, memory, deserialized, 1 replicas) : : : +- *(1) Project [invlv_pty_id#5078, invlv_pty_id#5078] : : : +- *(1) FileScan csv [invlv_pty_id#5078] Batched: false, Format: CSV, Location: InMemoryFileIndex[hdfs://gftsdev/data/gfrrsnsd/standardization/hive/gfrrsnsd_standardization/trl_..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct : : +- ResolvedHint (broadcast) : : +- Filter isnotnull(invlv_pty_id#5078) : : +- InMemoryRelation [invlv_pty_id#5078, invlv_pty_id#5078], StorageLevel(disk, memory, deserialized, 1 replicas) : : +- *(1) Project [invlv_pty_id#5078, invlv_pty_id#5078] : : +- *(1) FileScan csv [invlv_pty_id#5078] Batched: false, Format: CSV, Location: InMemoryFileIndex[hdfs://gftsdev/data/gfrrsnsd/standardization/hive/gfrrsnsd_standardization/trl_..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct : +- ResolvedHint (broadcast) : +- Filter isnotnull(invlv_pty_id#5078) : +- InMemoryRelation [invlv_pty_id#5078, invlv_pty_id#5078], StorageLevel(disk, memory, deserialized, 1 replicas) : +- *(1) Project [invlv_pty_id#5078, invlv_pty_id#5078] : +- *(1) FileScan csv [invlv_pty_id#5078] Batched: false, Format: CSV, Location: InMemoryFileIndex[hdfs://gftsdev/data/gfrrsnsd/standardization/hive/gfrrsnsd_standardization/trl_..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct +- ResolvedHint (broadcast) +- Filter isnotnull(invlv_pty_id#5078) +- InMemoryRelation [invlv_pty_id#5078, invlv_pty_id#5078], StorageLevel(disk, memory, deserialized, 1 replicas) +- *(1) Project [invlv_pty_id#5078, invlv_pty_id#5078] +- *(1) FileScan csv [invlv_pty_id#5078] Batched: false, Format: CSV, Location: InMemoryFileIndex[hdfs://gftsdev/data/gfrrsnsd/standardization/hive/gfrrsnsd_standardization/trl_..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct > In AQE, wrong InMemoryRelation size estimation causes "Cannot broadcast the > table that is larger than 8GB: 8 GB" failure > > > Key: SPARK-37442 > URL: https://issues.apache.org/jira/browse/SPARK-37442 > Project: Spark > Issue Type: Sub-task > Components: Optimizer, SQL >Affects Versions: 3.1.1, 3.2.0 >Reporter: Michael Chen >Assignee: Michael Chen >Priority: Major > Fix For: 3.2.1, 3.3.0 > > > There is a period in time where an InMemoryRelation will have the cached > buffers loaded, but the statistics will be inaccurate (anywhere between 0 -> > size in bytes reported by accumulators). When AQE is enabled, it is possible > that join planning strategies will happen in this window. In this scenario, > join children sizes including InMemoryRelation are greatly underestimated and > a broadcast join can be planned when it shouldn't be. We have seen scenarios > where a broadcast join is planned with the builder size greater than 8GB > because at planning time, the optimizer believes the InMemoryRelation is 0 > bytes. > Here is an example test case where the broadcast threshold is being ignored. > It can mimic the 8GB error by increasing the size of the tables. > {code:java} > withSQLConf( > SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", > SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1048584") { > // Spark estimates a string column as 20 bytes so with 60k rows, these > relations should be > // estimated at ~120m bytes which is greater than the broadcast join > threshold > Seq.fill(6)("a").toDF("key") > .createOrReplaceTempView("temp") > Seq.fill(6)("b").toDF("key") > .createOrReplaceTempView("temp2") > Seq("a").toDF("key").createOrReplaceTempView("smallTemp") > spark.sql("SELECT key as newKey FROM temp").persist() > val query = > s""" > |SELECT t3.newKey > |FROM > | (SELECT t1.newKey > | FROM (SELECT key as newKey FROM temp) as t1 > |JOIN > |(SELECT key FROM smallTemp) as t2 > |ON t1.newKey = t2.key > | ) as t3 > | JOIN > | (SELECT key FROM temp2) as t4 > | ON t3.newKey = t4.key > |UNION > |SELECT t1.newKey >
[jira] [Commented] (SPARK-37442) In AQE, wrong InMemoryRelation size estimation causes "Cannot broadcast the table that is larger than 8GB: 8 GB" failure
[ https://issues.apache.org/jira/browse/SPARK-37442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17483405#comment-17483405 ] Dongjoon Hyun commented on SPARK-37442: --- I converted this into SPARK-37063's subtask. > In AQE, wrong InMemoryRelation size estimation causes "Cannot broadcast the > table that is larger than 8GB: 8 GB" failure > > > Key: SPARK-37442 > URL: https://issues.apache.org/jira/browse/SPARK-37442 > Project: Spark > Issue Type: Sub-task > Components: Optimizer, SQL >Affects Versions: 3.1.1, 3.2.0 >Reporter: Michael Chen >Assignee: Michael Chen >Priority: Major > Fix For: 3.2.1, 3.3.0 > > > There is a period in time where an InMemoryRelation will have the cached > buffers loaded, but the statistics will be inaccurate (anywhere between 0 -> > size in bytes reported by accumulators). When AQE is enabled, it is possible > that join planning strategies will happen in this window. In this scenario, > join children sizes including InMemoryRelation are greatly underestimated and > a broadcast join can be planned when it shouldn't be. We have seen scenarios > where a broadcast join is planned with the builder size greater than 8GB > because at planning time, the optimizer believes the InMemoryRelation is 0 > bytes. > Here is an example test case where the broadcast threshold is being ignored. > It can mimic the 8GB error by increasing the size of the tables. > {code:java} > withSQLConf( > SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", > SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1048584") { > // Spark estimates a string column as 20 bytes so with 60k rows, these > relations should be > // estimated at ~120m bytes which is greater than the broadcast join > threshold > Seq.fill(6)("a").toDF("key") > .createOrReplaceTempView("temp") > Seq.fill(6)("b").toDF("key") > .createOrReplaceTempView("temp2") > Seq("a").toDF("key").createOrReplaceTempView("smallTemp") > spark.sql("SELECT key as newKey FROM temp").persist() > val query = > s""" > |SELECT t3.newKey > |FROM > | (SELECT t1.newKey > | FROM (SELECT key as newKey FROM temp) as t1 > |JOIN > |(SELECT key FROM smallTemp) as t2 > |ON t1.newKey = t2.key > | ) as t3 > | JOIN > | (SELECT key FROM temp2) as t4 > | ON t3.newKey = t4.key > |UNION > |SELECT t1.newKey > |FROM > |(SELECT key as newKey FROM temp) as t1 > |JOIN > |(SELECT key FROM temp2) as t2 > |ON t1.newKey = t2.key > |""".stripMargin > val df = spark.sql(query) > df.collect() > val adaptivePlan = df.queryExecution.executedPlan > val bhj = findTopLevelBroadcastHashJoin(adaptivePlan) > assert(bhj.length == 1) {code} > > -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-37442) In AQE, wrong InMemoryRelation size estimation causes "Cannot broadcast the table that is larger than 8GB: 8 GB" failure
[ https://issues.apache.org/jira/browse/SPARK-37442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17447577#comment-17447577 ] Apache Spark commented on SPARK-37442: -- User 'ChenMichael' has created a pull request for this issue: https://github.com/apache/spark/pull/34684 > In AQE, wrong InMemoryRelation size estimation causes "Cannot broadcast the > table that is larger than 8GB: 8 GB" failure > > > Key: SPARK-37442 > URL: https://issues.apache.org/jira/browse/SPARK-37442 > Project: Spark > Issue Type: Bug > Components: Optimizer, SQL >Affects Versions: 3.1.1, 3.2.0 >Reporter: Michael Chen >Priority: Major > > There is a period in time where an InMemoryRelation will have the cached > buffers loaded, but the statistics will be inaccurate (anywhere between 0 -> > size in bytes reported by accumulators). When AQE is enabled, it is possible > that join planning strategies will happen in this window. In this scenario, > join children sizes including InMemoryRelation are greatly underestimated and > a broadcast join can be planned when it shouldn't be. We have seen scenarios > where a broadcast join is planned with the builder size greater than 8GB > because at planning time, the optimizer believes the InMemoryRelation is 0 > bytes. > Here is an example test case where the broadcast threshold is being ignored. > It can mimic the 8GB error by increasing the size of the tables. > {code:java} > withSQLConf( > SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", > SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1048584") { > // Spark estimates a string column as 20 bytes so with 60k rows, these > relations should be > // estimated at ~120m bytes which is greater than the broadcast join > threshold > Seq.fill(6)("a").toDF("key") > .createOrReplaceTempView("temp") > Seq.fill(6)("b").toDF("key") > .createOrReplaceTempView("temp2") > Seq("a").toDF("key").createOrReplaceTempView("smallTemp") > spark.sql("SELECT key as newKey FROM temp").persist() > val query = > s""" > |SELECT t3.newKey > |FROM > | (SELECT t1.newKey > | FROM (SELECT key as newKey FROM temp) as t1 > |JOIN > |(SELECT key FROM smallTemp) as t2 > |ON t1.newKey = t2.key > | ) as t3 > | JOIN > | (SELECT key FROM temp2) as t4 > | ON t3.newKey = t4.key > |UNION > |SELECT t1.newKey > |FROM > |(SELECT key as newKey FROM temp) as t1 > |JOIN > |(SELECT key FROM temp2) as t2 > |ON t1.newKey = t2.key > |""".stripMargin > val df = spark.sql(query) > df.collect() > val adaptivePlan = df.queryExecution.executedPlan > val bhj = findTopLevelBroadcastHashJoin(adaptivePlan) > assert(bhj.length == 1) {code} > > -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org