André F. created SPARK-48290:
--------------------------------

             Summary: AQE not working when joining dataframes with more than 
2000 partitions
                 Key: SPARK-48290
                 URL: https://issues.apache.org/jira/browse/SPARK-48290
             Project: Spark
          Issue Type: Question
          Components: Optimizer, SQL
    Affects Versions: 3.5.1, 3.3.2
         Environment: spark-standalone

spark3.5.1

 
            Reporter: André F.


We are joining 2 large dataframes with a considerable skew on the left side in 
one specific key (>2000 skew ratio). Since we have 
`{{{}spark.sql.adaptive.enabled{}}} ` we expect AQE to act during the join, 
dealing with the skewed partition automatically.

During their join, we can see the following log indicating that the skew was 
not detected since their statistics looks weirdly equal for min/median/max 
sizes:
{code:java}
OptimizeSkewedJoin: number of skewed partitions: left 0, right 0
 OptimizeSkewedJoin: 
Optimizing skewed join.
Left side partitions size info:
median size: 780925482, max size: 780925482, min size: 780925482, avg size: 
780925482
Right side partitions size info:
median size: 3325797, max size: 3325797, min size: 3325797, avg size: 3325797
       {code}
Looking at this log line and the spark configuration possibilities, our two 
main hypotheses to work around this behavior and correctly detect the skew were:
 # Increasing the `minNumPartitionsToHighlyCompress` so that Spark doesn’t 
convert the statistics into a `CompressedMapStatus` and therefore is able to 
identify the skewed partition.
 # Allowing spark to use a `HighlyCompressedMapStatus`, but change other 
configurations such as `spark.shuffle.accurateBlockThreshold` and 
`spark.shuffle.accurateBlockSkewedFactor` so that even then the size of the 
skewed partitions/blocks is accurately registered and consequently used in the 
optimization.

We tried different values for `spark.shuffle.accurateBlockThreshold` (even 
absurd ones like 1MB) and nothing seem to work. The statistics indicates that 
the min/median and max are the same somehow and thus, the skew is not detected.

However, when forcibly reducing `spark.sql.shuffle.partitions` to less than 
2000 partitions, the statistics looked correct and the optimized skewed join 
acts as it should:
{code:java}
OptimizeSkewedJoin: number of skewed partitions: left 1, right 0
OptimizeSkewedJoin: Left side partition 42 (263 GB) is skewed, split it into 
337 parts.
OptimizeSkewedJoin: 
Optimizing skewed join.
Left side partitions size info:
median size: 862803419, max size: 282616632301, min size: 842320875, avg size: 
1019367139
Right side partitions size info:
median size: 4320067, max size: 4376957, min size: 4248989, avg size: 4319766 
{code}
Should we assume that the statistics are becoming corrupted when Spark uses 
`HighlyCompressedMapStatus`? Should we try another configuration property to 
try to work around this problem? (Assuming that fine tuning all dataframes in 
skewed joins in our ETL to have less than 2000 partitions is not an option)

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to