[
https://issues.apache.org/jira/browse/SPARK-56485?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ke Jia updated SPARK-56485:
---------------------------
Description:
When running TPC-DS Q4 on the database 1TB with CBO enabled and {{}}
{code:java}
spark.sql.autoBroadcastJoinThreshold{code}
{{ set to }}
{code:java}
10MB{code}
{{{}{}}}, the query fails with a SparkException.
{code:java}
Py4JJavaError: An error occurred while calling o596.collectToPython.
: org.apache.spark.SparkException: Cannot broadcast the table that is larger
than 8.0 GiB: 10.7 GiB
at
org.apache.gluten.backendsapi.velox.VeloxSparkPlanExecApi.createBroadcastRelation(VeloxSparkPlanExecApi.scala:850)
at
org.apache.spark.sql.execution.ColumnarBroadcastExchangeExec.$anonfun$relationFuture$2(ColumnarBroadcastExchangeExec.scala:79)
at org.apache.gluten.utils.Arm$.withResource(Arm.scala:25)
at org.apache.gluten.metrics.GlutenTimeMetric$.millis(GlutenTimeMetric.scala:37)
at
org.apache.spark.sql.execution.ColumnarBroadcastExchangeExec.$anonfun$relationFuture$1(ColumnarBroadcastExchangeExec.scala:66)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$2(SQLExecution.scala:230)
at
org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$1(SQLExecution.scala:225)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840){code}
The error indicates that Spark attempted to broadcast a table significantly
larger than the 8GB limit (actual size ~10.7 GiB). Interestingly, the query
runs successfully if CBO is disabled.
The issue stems from a bug in Spark's CBO Filter Estimation. When processing
equality filters (e.g., d_year = 2001), the CBO incorrectly estimates the
rowCount as 0.
Because the estimated row count is 0, the optimizer concludes that the join
result will be very small and chooses a BroadcastHashJoin . However, the actual
data size is roughly 10.7 GiB, which exceeds Spark's hard limit for
broadcasting.
was:
When running TPC-DS Q4 on the database {{wxd_icebergtpcdsdb1000g}} with CBO
enabled and {{spark.sql.autoBroadcastJoinThreshold }}set to {{{}10MB{}}}, the
query fails with a SparkException.
{{Py4JJavaError: An error occurred while calling o596.collectToPython.
: org.apache.spark.SparkException: Cannot broadcast the table that is larger
than 8.0 GiB: 10.7 GiB
at
org.apache.gluten.backendsapi.velox.VeloxSparkPlanExecApi.createBroadcastRelation(VeloxSparkPlanExecApi.scala:850)
at
org.apache.spark.sql.execution.ColumnarBroadcastExchangeExec.$anonfun$relationFuture$2(ColumnarBroadcastExchangeExec.scala:79)
at org.apache.gluten.utils.Arm$.withResource(Arm.scala:25)
at
org.apache.gluten.metrics.GlutenTimeMetric$.millis(GlutenTimeMetric.scala:37)
at
org.apache.spark.sql.execution.ColumnarBroadcastExchangeExec.$anonfun$relationFuture$1(ColumnarBroadcastExchangeExec.scala:66)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$2(SQLExecution.scala:230)
at
org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$1(SQLExecution.scala:225)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)}}
The error indicates that Spark attempted to broadcast a table significantly
larger than the 8GB limit (actual size ~10.7 GiB). Interestingly, the query
runs successfully if CBO is disabled.
The issue stems from a bug in Spark's CBO Filter Estimation. When processing
equality filters (e.g., d_year = 2001), the CBO incorrectly estimates the
rowCount as 0.
Because the estimated row count is 0, the optimizer concludes that the join
result will be very small and chooses a BroadcastHashJoin . However, the actual
data size is roughly 10.7 GiB, which exceeds Spark's hard limit for
broadcasting.
The root of this miscalculation is in
[FilterEstimation.scala|https://www.google.com/search?q=%5Bhttps://github.com/apache/spark/blob/15ffa544ca53cd9f8a25baaf11fa0171dac7c85f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala%23L347%5D(https://github.com/apache/spark/blob/15ffa544ca53cd9f8a25baaf11fa0171dac7c85f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala%23L347)].
Under normal circumstances, Spark should use column statistics to estimate
selectivity. In this case, although column information is present, most of the
statistical fields (min, max, distinct count, etc.) are null/None, except for
versioning information.
{{Filter (isnotnull(d_year#7208) AND (d_year#7208 = 2001)),
Statistics(sizeInBytes=1.0 B, rowCount=0)
+- RelationV2[d_date_sk#7202, d_date_id#7203, d_date#7204, d_month_seq#7205,
d_week_seq#7206, d_quarter_seq#7207, d_year#7208, d_dow#7209, d_moy#7210,
d_dom#7211, d_qoy#7212, d_fy_year#7213, d_fy_quarter_seq#7214,
d_fy_week_seq#7215, d_day_name#7216, d_quarter_name#7217, d_holiday#7218,
d_weekend#7219, d_following_holiday#7220, d_first_dom#7221, d_last_dom#7222,
d_same_day_ly#7223, d_same_day_lq#7224, d_current_day#7225, ... 4 more fields]
spark_catalog.wxd_icebergtpcdsdb1000g.date_dim, Statistics(sizeInBytes=20.1
MiB, rowCount=7.30E+4)}}
As a result, Spark fails to enter the correct evaluation logic in [lines
310-313|https://www.google.com/search?q=https://github.com/apache/spark/blob/15ffa544ca53cd9f8a25baaf11fa0171dac7c85f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala%23L310-L313]
and defaults to an incorrect estimation.
{{// Example of the empty stats being passed:
d_year#20690 -> ColumnStat(None,None,None,None,None,None,None,2)}}
> Incorrect RowCount Estimation in CBO leads to unintended BroadcastHashJoin
> --------------------------------------------------------------------------
>
> Key: SPARK-56485
> URL: https://issues.apache.org/jira/browse/SPARK-56485
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 4.1.1
> Reporter: Ke Jia
> Priority: Major
>
> When running TPC-DS Q4 on the database 1TB with CBO enabled and {{}}
> {code:java}
> spark.sql.autoBroadcastJoinThreshold{code}
> {{ set to }}
> {code:java}
> 10MB{code}
> {{{}{}}}, the query fails with a SparkException.
>
> {code:java}
> Py4JJavaError: An error occurred while calling o596.collectToPython.
> : org.apache.spark.SparkException: Cannot broadcast the table that is larger
> than 8.0 GiB: 10.7 GiB
> at
> org.apache.gluten.backendsapi.velox.VeloxSparkPlanExecApi.createBroadcastRelation(VeloxSparkPlanExecApi.scala:850)
> at
> org.apache.spark.sql.execution.ColumnarBroadcastExchangeExec.$anonfun$relationFuture$2(ColumnarBroadcastExchangeExec.scala:79)
> at org.apache.gluten.utils.Arm$.withResource(Arm.scala:25)
> at
> org.apache.gluten.metrics.GlutenTimeMetric$.millis(GlutenTimeMetric.scala:37)
> at
> org.apache.spark.sql.execution.ColumnarBroadcastExchangeExec.$anonfun$relationFuture$1(ColumnarBroadcastExchangeExec.scala:66)
> at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$2(SQLExecution.scala:230)
> at
> org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
> at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$1(SQLExecution.scala:225)
> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
> at java.base/java.lang.Thread.run(Thread.java:840){code}
>
> The error indicates that Spark attempted to broadcast a table significantly
> larger than the 8GB limit (actual size ~10.7 GiB). Interestingly, the query
> runs successfully if CBO is disabled.
> The issue stems from a bug in Spark's CBO Filter Estimation. When processing
> equality filters (e.g., d_year = 2001), the CBO incorrectly estimates the
> rowCount as 0.
> Because the estimated row count is 0, the optimizer concludes that the join
> result will be very small and chooses a BroadcastHashJoin . However, the
> actual data size is roughly 10.7 GiB, which exceeds Spark's hard limit for
> broadcasting.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]