[ 
https://issues.apache.org/jira/browse/SPARK-48290?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

André F. updated SPARK-48290:
-----------------------------
    Component/s:     (was: SQL)

> AQE not working when joining dataframes with more than 2000 partitions
> ----------------------------------------------------------------------
>
>                 Key: SPARK-48290
>                 URL: https://issues.apache.org/jira/browse/SPARK-48290
>             Project: Spark
>          Issue Type: Question
>          Components: Optimizer
>    Affects Versions: 3.3.2, 3.5.1
>         Environment: spark-standalone
> spark3.5.1
>            Reporter: André F.
>            Priority: Major
>
> We are joining 2 large dataframes with a considerable skew on the left side 
> in one specific key (>2000 skew ratio).
> {code:java}
> left side num partitions: 10335
> right side num partitions: 1241
> left side num rows: 20181947343
> right side num rows: 107462219 {code}
> Since we have `{{{}spark.sql.adaptive.enabled{}}} ` we expect AQE to act 
> during the join, dealing with the skewed partition automatically.
> During their join, we can see the following log indicating that the skew was 
> not detected since their statistics looks weirdly equal for min/median/max 
> sizes:
> {code:java}
> OptimizeSkewedJoin: number of skewed partitions: left 0, right 0
>  OptimizeSkewedJoin: 
> Optimizing skewed join.
> Left side partitions size info:
> median size: 780925482, max size: 780925482, min size: 780925482, avg size: 
> 780925482
> Right side partitions size info:
> median size: 3325797, max size: 3325797, min size: 3325797, avg size: 3325797
>        {code}
> Looking at this log line and the spark configuration possibilities, our two 
> main hypotheses to work around this behavior and correctly detect the skew 
> were:
>  # Increasing the `minNumPartitionsToHighlyCompress` so that Spark doesn’t 
> convert the statistics into a `CompressedMapStatus` and therefore is able to 
> identify the skewed partition.
>  # Allowing spark to use a `HighlyCompressedMapStatus`, but change other 
> configurations such as `spark.shuffle.accurateBlockThreshold` and 
> `spark.shuffle.accurateBlockSkewedFactor` so that even then the size of the 
> skewed partitions/blocks is accurately registered and consequently used in 
> the optimization.
> We tried different values for `spark.shuffle.accurateBlockThreshold` (even 
> absurd ones like 1MB) and nothing seem to work. The statistics indicates that 
> the min/median and max are the same somehow and thus, the skew is not 
> detected.
> However, when forcibly reducing `spark.sql.shuffle.partitions` to less than 
> 2000 partitions, the statistics looked correct and the optimized skewed join 
> acts as it should:
> {code:java}
> OptimizeSkewedJoin: number of skewed partitions: left 1, right 0
> OptimizeSkewedJoin: Left side partition 42 (263 GB) is skewed, split it into 
> 337 parts.
> OptimizeSkewedJoin: 
> Optimizing skewed join.
> Left side partitions size info:
> median size: 862803419, max size: 282616632301, min size: 842320875, avg 
> size: 1019367139
> Right side partitions size info:
> median size: 4320067, max size: 4376957, min size: 4248989, avg size: 4319766 
> {code}
> Should we assume that the statistics are becoming corrupted when Spark uses 
> `HighlyCompressedMapStatus`? Should we try another configuration property to 
> try to work around this problem? (Assuming that fine tuning all dataframes in 
> skewed joins in our ETL to have less than 2000 partitions is not an option)
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to