[
https://issues.apache.org/jira/browse/SPARK-48290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18050887#comment-18050887
]
subramaniam commented on SPARK-48290:
-------------------------------------
Any fixes for this?
> AQE not working when joining dataframes with more than 2000 partitions
> ----------------------------------------------------------------------
>
> Key: SPARK-48290
> URL: https://issues.apache.org/jira/browse/SPARK-48290
> Project: Spark
> Issue Type: Bug
> Components: Optimizer, SQL
> Affects Versions: 3.3.2, 3.5.1
> Environment: spark-standalone
> spark3.5.1
> Reporter: André F.
> Priority: Major
>
> We are joining 2 large dataframes with a considerable skew on the left side
> in one specific key (>2000 skew ratio).
> {code:java}
> left side num partitions: 10335
> right side num partitions: 1241
> left side num rows: 20181947343
> right side num rows: 107462219 {code}
> Since we have `{{{}spark.sql.adaptive.enabled{}}} ` we expect AQE to act
> during the join, dealing with the skewed partition automatically.
> During their join, we can see the following log indicating that the skew was
> not detected since their statistics looks weirdly equal for min/median/max
> sizes:
> {code:java}
> OptimizeSkewedJoin: number of skewed partitions: left 0, right 0
> OptimizeSkewedJoin:
> Optimizing skewed join.
> Left side partitions size info:
> median size: 780925482, max size: 780925482, min size: 780925482, avg size:
> 780925482
> Right side partitions size info:
> median size: 3325797, max size: 3325797, min size: 3325797, avg size: 3325797
> {code}
> Looking at this log line and the spark configuration possibilities, our two
> main hypotheses to work around this behavior and correctly detect the skew
> were:
> # Increasing the `minNumPartitionsToHighlyCompress` so that Spark doesn’t
> convert the statistics into a `CompressedMapStatus` and therefore is able to
> identify the skewed partition.
> # Allowing spark to use a `HighlyCompressedMapStatus`, but change other
> configurations such as `spark.shuffle.accurateBlockThreshold` and
> `spark.shuffle.accurateBlockSkewedFactor` so that even then the size of the
> skewed partitions/blocks is accurately registered and consequently used in
> the optimization.
> We tried different values for `spark.shuffle.accurateBlockThreshold` (even
> absurd ones like 1MB) and nothing seem to work. The statistics indicates that
> the min/median and max are the same somehow and thus, the skew is not
> detected.
> However, when forcibly reducing `spark.sql.shuffle.partitions` to less than
> 2000 partitions, the statistics looked correct and the optimized skewed join
> acts as it should:
> {code:java}
> OptimizeSkewedJoin: number of skewed partitions: left 1, right 0
> OptimizeSkewedJoin: Left side partition 42 (263 GB) is skewed, split it into
> 337 parts.
> OptimizeSkewedJoin:
> Optimizing skewed join.
> Left side partitions size info:
> median size: 862803419, max size: 282616632301, min size: 842320875, avg
> size: 1019367139
> Right side partitions size info:
> median size: 4320067, max size: 4376957, min size: 4248989, avg size: 4319766
> {code}
> Should we assume that the statistics are becoming corrupted when Spark uses
> `HighlyCompressedMapStatus`? Should we try another configuration property to
> try to work around this problem? (Assuming that fine tuning all dataframes in
> skewed joins in our ETL to have less than 2000 partitions is not an option)
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]