[ 
https://issues.apache.org/jira/browse/SPARK-48290?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

André F. updated SPARK-48290:
-----------------------------
    Description: 
We are joining 2 large dataframes with a considerable skew on the left side in 
one specific key (>2000 skew ratio).
{code:java}
left side num partitions: 10335
right side num partitions: 1241

left side num rows: 20181947343
right side num rows: 107462219 {code}
Since we have `{{{}spark.sql.adaptive.enabled{}}} ` we expect AQE to act during 
the join, dealing with the skewed partition automatically.

During their join, we can see the following log indicating that the skew was 
not detected since their statistics looks weirdly equal for min/median/max 
sizes:
{code:java}
OptimizeSkewedJoin: number of skewed partitions: left 0, right 0
 OptimizeSkewedJoin: 
Optimizing skewed join.
Left side partitions size info:
median size: 780925482, max size: 780925482, min size: 780925482, avg size: 
780925482
Right side partitions size info:
median size: 3325797, max size: 3325797, min size: 3325797, avg size: 3325797
       {code}
Looking at this log line and the spark configuration possibilities, our two 
main hypotheses to work around this behavior and correctly detect the skew were:
 # Increasing the `minNumPartitionsToHighlyCompress` so that Spark doesn’t 
convert the statistics into a `CompressedMapStatus` and therefore is able to 
identify the skewed partition.
 # Allowing spark to use a `HighlyCompressedMapStatus`, but change other 
configurations such as `spark.shuffle.accurateBlockThreshold` and 
`spark.shuffle.accurateBlockSkewedFactor` so that even then the size of the 
skewed partitions/blocks is accurately registered and consequently used in the 
optimization.

We tried different values for `spark.shuffle.accurateBlockThreshold` (even 
absurd ones like 1MB) and nothing seem to work. The statistics indicates that 
the min/median and max are the same somehow and thus, the skew is not detected.

However, when forcibly reducing `spark.sql.shuffle.partitions` to less than 
2000 partitions, the statistics looked correct and the optimized skewed join 
acts as it should:
{code:java}
OptimizeSkewedJoin: number of skewed partitions: left 1, right 0
OptimizeSkewedJoin: Left side partition 42 (263 GB) is skewed, split it into 
337 parts.
OptimizeSkewedJoin: 
Optimizing skewed join.
Left side partitions size info:
median size: 862803419, max size: 282616632301, min size: 842320875, avg size: 
1019367139
Right side partitions size info:
median size: 4320067, max size: 4376957, min size: 4248989, avg size: 4319766 
{code}
Should we assume that the statistics are becoming corrupted when Spark uses 
`HighlyCompressedMapStatus`? Should we try another configuration property to 
try to work around this problem? (Assuming that fine tuning all dataframes in 
skewed joins in our ETL to have less than 2000 partitions is not an option)

 

  was:
We are joining 2 large dataframes with a considerable skew on the left side in 
one specific key (>2000 skew ratio). Since we have 
`{{{}spark.sql.adaptive.enabled{}}} ` we expect AQE to act during the join, 
dealing with the skewed partition automatically.

During their join, we can see the following log indicating that the skew was 
not detected since their statistics looks weirdly equal for min/median/max 
sizes:
{code:java}
OptimizeSkewedJoin: number of skewed partitions: left 0, right 0
 OptimizeSkewedJoin: 
Optimizing skewed join.
Left side partitions size info:
median size: 780925482, max size: 780925482, min size: 780925482, avg size: 
780925482
Right side partitions size info:
median size: 3325797, max size: 3325797, min size: 3325797, avg size: 3325797
       {code}
Looking at this log line and the spark configuration possibilities, our two 
main hypotheses to work around this behavior and correctly detect the skew were:
 # Increasing the `minNumPartitionsToHighlyCompress` so that Spark doesn’t 
convert the statistics into a `CompressedMapStatus` and therefore is able to 
identify the skewed partition.
 # Allowing spark to use a `HighlyCompressedMapStatus`, but change other 
configurations such as `spark.shuffle.accurateBlockThreshold` and 
`spark.shuffle.accurateBlockSkewedFactor` so that even then the size of the 
skewed partitions/blocks is accurately registered and consequently used in the 
optimization.

We tried different values for `spark.shuffle.accurateBlockThreshold` (even 
absurd ones like 1MB) and nothing seem to work. The statistics indicates that 
the min/median and max are the same somehow and thus, the skew is not detected.

However, when forcibly reducing `spark.sql.shuffle.partitions` to less than 
2000 partitions, the statistics looked correct and the optimized skewed join 
acts as it should:
{code:java}
OptimizeSkewedJoin: number of skewed partitions: left 1, right 0
OptimizeSkewedJoin: Left side partition 42 (263 GB) is skewed, split it into 
337 parts.
OptimizeSkewedJoin: 
Optimizing skewed join.
Left side partitions size info:
median size: 862803419, max size: 282616632301, min size: 842320875, avg size: 
1019367139
Right side partitions size info:
median size: 4320067, max size: 4376957, min size: 4248989, avg size: 4319766 
{code}
Should we assume that the statistics are becoming corrupted when Spark uses 
`HighlyCompressedMapStatus`? Should we try another configuration property to 
try to work around this problem? (Assuming that fine tuning all dataframes in 
skewed joins in our ETL to have less than 2000 partitions is not an option)

 

    Environment: 
spark-standalone

spark3.5.1

  was:
spark-standalone

spark3.5.1

 


> AQE not working when joining dataframes with more than 2000 partitions
> ----------------------------------------------------------------------
>
>                 Key: SPARK-48290
>                 URL: https://issues.apache.org/jira/browse/SPARK-48290
>             Project: Spark
>          Issue Type: Question
>          Components: Optimizer, SQL
>    Affects Versions: 3.3.2, 3.5.1
>         Environment: spark-standalone
> spark3.5.1
>            Reporter: André F.
>            Priority: Major
>
> We are joining 2 large dataframes with a considerable skew on the left side 
> in one specific key (>2000 skew ratio).
> {code:java}
> left side num partitions: 10335
> right side num partitions: 1241
> left side num rows: 20181947343
> right side num rows: 107462219 {code}
> Since we have `{{{}spark.sql.adaptive.enabled{}}} ` we expect AQE to act 
> during the join, dealing with the skewed partition automatically.
> During their join, we can see the following log indicating that the skew was 
> not detected since their statistics looks weirdly equal for min/median/max 
> sizes:
> {code:java}
> OptimizeSkewedJoin: number of skewed partitions: left 0, right 0
>  OptimizeSkewedJoin: 
> Optimizing skewed join.
> Left side partitions size info:
> median size: 780925482, max size: 780925482, min size: 780925482, avg size: 
> 780925482
> Right side partitions size info:
> median size: 3325797, max size: 3325797, min size: 3325797, avg size: 3325797
>        {code}
> Looking at this log line and the spark configuration possibilities, our two 
> main hypotheses to work around this behavior and correctly detect the skew 
> were:
>  # Increasing the `minNumPartitionsToHighlyCompress` so that Spark doesn’t 
> convert the statistics into a `CompressedMapStatus` and therefore is able to 
> identify the skewed partition.
>  # Allowing spark to use a `HighlyCompressedMapStatus`, but change other 
> configurations such as `spark.shuffle.accurateBlockThreshold` and 
> `spark.shuffle.accurateBlockSkewedFactor` so that even then the size of the 
> skewed partitions/blocks is accurately registered and consequently used in 
> the optimization.
> We tried different values for `spark.shuffle.accurateBlockThreshold` (even 
> absurd ones like 1MB) and nothing seem to work. The statistics indicates that 
> the min/median and max are the same somehow and thus, the skew is not 
> detected.
> However, when forcibly reducing `spark.sql.shuffle.partitions` to less than 
> 2000 partitions, the statistics looked correct and the optimized skewed join 
> acts as it should:
> {code:java}
> OptimizeSkewedJoin: number of skewed partitions: left 1, right 0
> OptimizeSkewedJoin: Left side partition 42 (263 GB) is skewed, split it into 
> 337 parts.
> OptimizeSkewedJoin: 
> Optimizing skewed join.
> Left side partitions size info:
> median size: 862803419, max size: 282616632301, min size: 842320875, avg 
> size: 1019367139
> Right side partitions size info:
> median size: 4320067, max size: 4376957, min size: 4248989, avg size: 4319766 
> {code}
> Should we assume that the statistics are becoming corrupted when Spark uses 
> `HighlyCompressedMapStatus`? Should we try another configuration property to 
> try to work around this problem? (Assuming that fine tuning all dataframes in 
> skewed joins in our ETL to have less than 2000 partitions is not an option)
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to