Ke Jia created SPARK-56485:
------------------------------

             Summary: Incorrect RowCount Estimation in CBO leads to unintended 
BroadcastHashJoin
                 Key: SPARK-56485
                 URL: https://issues.apache.org/jira/browse/SPARK-56485
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 4.1.1
            Reporter: Ke Jia


When running TPC-DS Q4 on the database {{wxd_icebergtpcdsdb1000g}} with CBO 
enabled and {{spark.sql.autoBroadcastJoinThreshold }}set to {{{}10MB{}}}, the 
query fails with a SparkException.
 {{Py4JJavaError: An error occurred while calling o596.collectToPython.
: org.apache.spark.SparkException: Cannot broadcast the table that is larger 
than 8.0 GiB: 10.7 GiB
        at 
org.apache.gluten.backendsapi.velox.VeloxSparkPlanExecApi.createBroadcastRelation(VeloxSparkPlanExecApi.scala:850)
        at 
org.apache.spark.sql.execution.ColumnarBroadcastExchangeExec.$anonfun$relationFuture$2(ColumnarBroadcastExchangeExec.scala:79)
        at org.apache.gluten.utils.Arm$.withResource(Arm.scala:25)
        at 
org.apache.gluten.metrics.GlutenTimeMetric$.millis(GlutenTimeMetric.scala:37)
        at 
org.apache.spark.sql.execution.ColumnarBroadcastExchangeExec.$anonfun$relationFuture$1(ColumnarBroadcastExchangeExec.scala:66)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$2(SQLExecution.scala:230)
        at 
org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$1(SQLExecution.scala:225)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.base/java.lang.Thread.run(Thread.java:840)}}
 
The error indicates that Spark attempted to broadcast a table significantly 
larger than the 8GB limit (actual size ~10.7 GiB). Interestingly, the query 
runs successfully if CBO is disabled.

The issue stems from a bug in Spark's CBO Filter Estimation. When processing 
equality filters (e.g., d_year = 2001), the CBO incorrectly estimates the 
rowCount as 0.

Because the estimated row count is 0, the optimizer concludes that the join 
result will be very small and chooses a BroadcastHashJoin . However, the actual 
data size is roughly 10.7 GiB, which exceeds Spark's hard limit for 
broadcasting.

The root of this miscalculation is in 
[FilterEstimation.scala|https://www.google.com/search?q=%5Bhttps://github.com/apache/spark/blob/15ffa544ca53cd9f8a25baaf11fa0171dac7c85f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala%23L347%5D(https://github.com/apache/spark/blob/15ffa544ca53cd9f8a25baaf11fa0171dac7c85f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala%23L347)].
 Under normal circumstances, Spark should use column statistics to estimate 
selectivity. In this case, although column information is present, most of the 
statistical fields (min, max, distinct count, etc.) are null/None, except for 
versioning information.
 {{Filter (isnotnull(d_year#7208) AND (d_year#7208 = 2001)), 
Statistics(sizeInBytes=1.0 B, rowCount=0)
+- RelationV2[d_date_sk#7202, d_date_id#7203, d_date#7204, d_month_seq#7205, 
d_week_seq#7206, d_quarter_seq#7207, d_year#7208, d_dow#7209, d_moy#7210, 
d_dom#7211, d_qoy#7212, d_fy_year#7213, d_fy_quarter_seq#7214, 
d_fy_week_seq#7215, d_day_name#7216, d_quarter_name#7217, d_holiday#7218, 
d_weekend#7219, d_following_holiday#7220, d_first_dom#7221, d_last_dom#7222, 
d_same_day_ly#7223, d_same_day_lq#7224, d_current_day#7225, ... 4 more fields] 
spark_catalog.wxd_icebergtpcdsdb1000g.date_dim, Statistics(sizeInBytes=20.1 
MiB, rowCount=7.30E+4)}}
 
As a result, Spark fails to enter the correct evaluation logic in [lines 
310-313|https://www.google.com/search?q=https://github.com/apache/spark/blob/15ffa544ca53cd9f8a25baaf11fa0171dac7c85f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala%23L310-L313]
 and defaults to an incorrect estimation.
 {{// Example of the empty stats being passed:
d_year#20690 -> ColumnStat(None,None,None,None,None,None,None,2)}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to