[ https://issues.apache.org/jira/browse/SPARK-48290?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
André F. updated SPARK-48290: ----------------------------- Issue Type: Bug (was: Question) > AQE not working when joining dataframes with more than 2000 partitions > ---------------------------------------------------------------------- > > Key: SPARK-48290 > URL: https://issues.apache.org/jira/browse/SPARK-48290 > Project: Spark > Issue Type: Bug > Components: Optimizer > Affects Versions: 3.3.2, 3.5.1 > Environment: spark-standalone > spark3.5.1 > Reporter: André F. > Priority: Major > > We are joining 2 large dataframes with a considerable skew on the left side > in one specific key (>2000 skew ratio). > {code:java} > left side num partitions: 10335 > right side num partitions: 1241 > left side num rows: 20181947343 > right side num rows: 107462219 {code} > Since we have `{{{}spark.sql.adaptive.enabled{}}} ` we expect AQE to act > during the join, dealing with the skewed partition automatically. > During their join, we can see the following log indicating that the skew was > not detected since their statistics looks weirdly equal for min/median/max > sizes: > {code:java} > OptimizeSkewedJoin: number of skewed partitions: left 0, right 0 > OptimizeSkewedJoin: > Optimizing skewed join. > Left side partitions size info: > median size: 780925482, max size: 780925482, min size: 780925482, avg size: > 780925482 > Right side partitions size info: > median size: 3325797, max size: 3325797, min size: 3325797, avg size: 3325797 > {code} > Looking at this log line and the spark configuration possibilities, our two > main hypotheses to work around this behavior and correctly detect the skew > were: > # Increasing the `minNumPartitionsToHighlyCompress` so that Spark doesn’t > convert the statistics into a `CompressedMapStatus` and therefore is able to > identify the skewed partition. > # Allowing spark to use a `HighlyCompressedMapStatus`, but change other > configurations such as `spark.shuffle.accurateBlockThreshold` and > `spark.shuffle.accurateBlockSkewedFactor` so that even then the size of the > skewed partitions/blocks is accurately registered and consequently used in > the optimization. > We tried different values for `spark.shuffle.accurateBlockThreshold` (even > absurd ones like 1MB) and nothing seem to work. The statistics indicates that > the min/median and max are the same somehow and thus, the skew is not > detected. > However, when forcibly reducing `spark.sql.shuffle.partitions` to less than > 2000 partitions, the statistics looked correct and the optimized skewed join > acts as it should: > {code:java} > OptimizeSkewedJoin: number of skewed partitions: left 1, right 0 > OptimizeSkewedJoin: Left side partition 42 (263 GB) is skewed, split it into > 337 parts. > OptimizeSkewedJoin: > Optimizing skewed join. > Left side partitions size info: > median size: 862803419, max size: 282616632301, min size: 842320875, avg > size: 1019367139 > Right side partitions size info: > median size: 4320067, max size: 4376957, min size: 4248989, avg size: 4319766 > {code} > Should we assume that the statistics are becoming corrupted when Spark uses > `HighlyCompressedMapStatus`? Should we try another configuration property to > try to work around this problem? (Assuming that fine tuning all dataframes in > skewed joins in our ETL to have less than 2000 partitions is not an option) > -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org