[ 
https://issues.apache.org/jira/browse/SPARK-56485?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ke Jia updated SPARK-56485:
---------------------------
    Description: 
When running TPC-DS Q4 on the database 1TB  with CBO enabled and {{}}
{code:java}
spark.sql.autoBroadcastJoinThreshold{code}
{{ set to }}
{code:java}
10MB{code}
{{{}{}}}, the query fails with a SparkException.
 
{code:java}
Py4JJavaError: An error occurred while calling o596.collectToPython.
: org.apache.spark.SparkException: Cannot broadcast the table that is larger 
than 8.0 GiB: 10.7 GiB
at 
org.apache.gluten.backendsapi.velox.VeloxSparkPlanExecApi.createBroadcastRelation(VeloxSparkPlanExecApi.scala:850)
at 
org.apache.spark.sql.execution.ColumnarBroadcastExchangeExec.$anonfun$relationFuture$2(ColumnarBroadcastExchangeExec.scala:79)
at org.apache.gluten.utils.Arm$.withResource(Arm.scala:25)
at org.apache.gluten.metrics.GlutenTimeMetric$.millis(GlutenTimeMetric.scala:37)
at 
org.apache.spark.sql.execution.ColumnarBroadcastExchangeExec.$anonfun$relationFuture$1(ColumnarBroadcastExchangeExec.scala:66)
at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$2(SQLExecution.scala:230)
at 
org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$1(SQLExecution.scala:225)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840){code}

 
The error indicates that Spark attempted to broadcast a table significantly 
larger than the 8GB limit (actual size ~10.7 GiB). Interestingly, the query 
runs successfully if CBO is disabled.

The issue stems from a bug in Spark's CBO Filter Estimation. When processing 
equality filters (e.g., d_year = 2001), the CBO incorrectly estimates the 
rowCount as 0.

Because the estimated row count is 0, the optimizer concludes that the join 
result will be very small and chooses a BroadcastHashJoin . However, the actual 
data size is roughly 10.7 GiB, which exceeds Spark's hard limit for 
broadcasting.

 

  was:
When running TPC-DS Q4 on the database {{wxd_icebergtpcdsdb1000g}} with CBO 
enabled and {{spark.sql.autoBroadcastJoinThreshold }}set to {{{}10MB{}}}, the 
query fails with a SparkException.
 {{Py4JJavaError: An error occurred while calling o596.collectToPython.
: org.apache.spark.SparkException: Cannot broadcast the table that is larger 
than 8.0 GiB: 10.7 GiB
        at 
org.apache.gluten.backendsapi.velox.VeloxSparkPlanExecApi.createBroadcastRelation(VeloxSparkPlanExecApi.scala:850)
        at 
org.apache.spark.sql.execution.ColumnarBroadcastExchangeExec.$anonfun$relationFuture$2(ColumnarBroadcastExchangeExec.scala:79)
        at org.apache.gluten.utils.Arm$.withResource(Arm.scala:25)
        at 
org.apache.gluten.metrics.GlutenTimeMetric$.millis(GlutenTimeMetric.scala:37)
        at 
org.apache.spark.sql.execution.ColumnarBroadcastExchangeExec.$anonfun$relationFuture$1(ColumnarBroadcastExchangeExec.scala:66)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$2(SQLExecution.scala:230)
        at 
org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$1(SQLExecution.scala:225)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.base/java.lang.Thread.run(Thread.java:840)}}
 
The error indicates that Spark attempted to broadcast a table significantly 
larger than the 8GB limit (actual size ~10.7 GiB). Interestingly, the query 
runs successfully if CBO is disabled.

The issue stems from a bug in Spark's CBO Filter Estimation. When processing 
equality filters (e.g., d_year = 2001), the CBO incorrectly estimates the 
rowCount as 0.

Because the estimated row count is 0, the optimizer concludes that the join 
result will be very small and chooses a BroadcastHashJoin . However, the actual 
data size is roughly 10.7 GiB, which exceeds Spark's hard limit for 
broadcasting.

The root of this miscalculation is in 
[FilterEstimation.scala|https://www.google.com/search?q=%5Bhttps://github.com/apache/spark/blob/15ffa544ca53cd9f8a25baaf11fa0171dac7c85f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala%23L347%5D(https://github.com/apache/spark/blob/15ffa544ca53cd9f8a25baaf11fa0171dac7c85f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala%23L347)].
 Under normal circumstances, Spark should use column statistics to estimate 
selectivity. In this case, although column information is present, most of the 
statistical fields (min, max, distinct count, etc.) are null/None, except for 
versioning information.
 {{Filter (isnotnull(d_year#7208) AND (d_year#7208 = 2001)), 
Statistics(sizeInBytes=1.0 B, rowCount=0)
+- RelationV2[d_date_sk#7202, d_date_id#7203, d_date#7204, d_month_seq#7205, 
d_week_seq#7206, d_quarter_seq#7207, d_year#7208, d_dow#7209, d_moy#7210, 
d_dom#7211, d_qoy#7212, d_fy_year#7213, d_fy_quarter_seq#7214, 
d_fy_week_seq#7215, d_day_name#7216, d_quarter_name#7217, d_holiday#7218, 
d_weekend#7219, d_following_holiday#7220, d_first_dom#7221, d_last_dom#7222, 
d_same_day_ly#7223, d_same_day_lq#7224, d_current_day#7225, ... 4 more fields] 
spark_catalog.wxd_icebergtpcdsdb1000g.date_dim, Statistics(sizeInBytes=20.1 
MiB, rowCount=7.30E+4)}}
 
As a result, Spark fails to enter the correct evaluation logic in [lines 
310-313|https://www.google.com/search?q=https://github.com/apache/spark/blob/15ffa544ca53cd9f8a25baaf11fa0171dac7c85f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala%23L310-L313]
 and defaults to an incorrect estimation.
 {{// Example of the empty stats being passed:
d_year#20690 -> ColumnStat(None,None,None,None,None,None,None,2)}}


> Incorrect RowCount Estimation in CBO leads to unintended BroadcastHashJoin
> --------------------------------------------------------------------------
>
>                 Key: SPARK-56485
>                 URL: https://issues.apache.org/jira/browse/SPARK-56485
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 4.1.1
>            Reporter: Ke Jia
>            Priority: Major
>
> When running TPC-DS Q4 on the database 1TB  with CBO enabled and {{}}
> {code:java}
> spark.sql.autoBroadcastJoinThreshold{code}
> {{ set to }}
> {code:java}
> 10MB{code}
> {{{}{}}}, the query fails with a SparkException.
>  
> {code:java}
> Py4JJavaError: An error occurred while calling o596.collectToPython.
> : org.apache.spark.SparkException: Cannot broadcast the table that is larger 
> than 8.0 GiB: 10.7 GiB
> at 
> org.apache.gluten.backendsapi.velox.VeloxSparkPlanExecApi.createBroadcastRelation(VeloxSparkPlanExecApi.scala:850)
> at 
> org.apache.spark.sql.execution.ColumnarBroadcastExchangeExec.$anonfun$relationFuture$2(ColumnarBroadcastExchangeExec.scala:79)
> at org.apache.gluten.utils.Arm$.withResource(Arm.scala:25)
> at 
> org.apache.gluten.metrics.GlutenTimeMetric$.millis(GlutenTimeMetric.scala:37)
> at 
> org.apache.spark.sql.execution.ColumnarBroadcastExchangeExec.$anonfun$relationFuture$1(ColumnarBroadcastExchangeExec.scala:66)
> at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$2(SQLExecution.scala:230)
> at 
> org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
> at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$1(SQLExecution.scala:225)
> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
> at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
> at java.base/java.lang.Thread.run(Thread.java:840){code}
>  
> The error indicates that Spark attempted to broadcast a table significantly 
> larger than the 8GB limit (actual size ~10.7 GiB). Interestingly, the query 
> runs successfully if CBO is disabled.
> The issue stems from a bug in Spark's CBO Filter Estimation. When processing 
> equality filters (e.g., d_year = 2001), the CBO incorrectly estimates the 
> rowCount as 0.
> Because the estimated row count is 0, the optimizer concludes that the join 
> result will be very small and chooses a BroadcastHashJoin . However, the 
> actual data size is roughly 10.7 GiB, which exceeds Spark's hard limit for 
> broadcasting.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to