James Xu created SPARK-56767:
--------------------------------
Summary: Optimize hot-key skew joins by splitting into a broadcast
branch for hot keys and a sort-merge branch for the rest
Key: SPARK-56767
URL: https://issues.apache.org/jira/browse/SPARK-56767
Project: Spark
Issue Type: Improvement
Components: Optimizer
Affects Versions: 4.1.1
Reporter: James Xu
h2. What is the problem?
Data skew in shuffle joins is one of the most common causes of long-running or
stalled Spark jobs. When a small number of join key values ("hot keys") account
for a disproportionate share of rows on one side of a join, the shuffle
partitions handling those keys become stragglers — they process far more data
than average partitions, dominating overall job latency regardless of cluster
size.
Spark's AQE-based skew join optimization (OptimizeSkewedJoin) detects skewed
partitions at runtime and splits them by reading from multiple map output
ranges. *However, this approach has a fundamental limitation: if skew
originates from a small number of high-frequency key values that dominate
entire map-side output ranges — for example when the input data is pre-sorted
or clustered by key, causing all rows for a hot key to land in contiguous map
output — those rows cannot be split across tasks regardless of AQE thresholds.*
AQE has no visibility into key-level frequency, only partition-level byte
sizes, so it cannot route hot-key rows to a broadcast join where they could be
processed efficiently.
h2. Why does this deserve solving?
Hot-key skew is a well-known and frequently reported production problem. The
standard workaround — salting the join key — requires rewriting the query, adds
complexity, and is error-prone. A hint-driven, transparent optimization that
requires no query rewriting would meaningfully reduce the operational burden
for users working with skewed datasets (e.g. joins involving user IDs, product
IDs, or any high-cardinality column with a power-law distribution).
The optimization is also naturally bounded in scope: it only fires when the
user explicitly opts in via a hint, so there is no risk of silent regression
for existing queries.
h2. How are we planning to solve this?
We introduce a new SKEW_JOIN(tableName(col1, col2, ...)) hint that triggers a
hot-key broadcast split at query planning time:
1. Hot-key sampling. At planning time, Spark executes a lightweight sampling
subquery against the hinted (skewed) table. The subquery groups by the hinted
columns, counts occurrences per key, and returns the top-N keys whose count
exceeds a configurable threshold (scaled by the sampling fraction). Sampling is
done via TABLESAMPLE to limit overhead on large tables.
2. Plan split. If hot keys are found, the join is split into two branches:
- Hot branch: A BroadcastHashJoinExec that processes only rows whose join key
matches a hot key. The non-skewed side is broadcast after being filtered to
matching hot-key rows.
- Normal branch: A SortMergeJoinExec that processes all remaining rows
(skewed side filtered to exclude hot keys).
- The results of both branches are combined with a union.
3. Fallback. If sampling finds no hot keys, or if a type mismatch prevents safe
broadcast filtering, the operator transparently falls back to a standard
SortMergeJoinExec with no change to query results.
The feature supports inner, left outer, right outer, left semi, and left anti
joins. It is compatible with AQE. NULL join keys are always routed through the
normal branch.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]