James Xu created SPARK-56767:
--------------------------------

             Summary: Optimize hot-key skew joins by splitting into a broadcast 
branch for hot keys and a sort-merge branch for the rest
                 Key: SPARK-56767
                 URL: https://issues.apache.org/jira/browse/SPARK-56767
             Project: Spark
          Issue Type: Improvement
          Components: Optimizer
    Affects Versions: 4.1.1
            Reporter: James Xu


h2. What is the problem?

Data skew in shuffle joins is one of the most common causes of long-running or 
stalled Spark jobs. When a small number of join key values ("hot keys") account 
for a disproportionate share of rows on one side of a join, the shuffle 
partitions handling those keys become stragglers — they process far more data 
than average partitions, dominating overall job latency regardless of cluster 
size.

Spark's AQE-based skew join optimization (OptimizeSkewedJoin) detects skewed 
partitions at runtime and splits them by reading from multiple map output 
ranges. *However, this approach has a fundamental limitation: if skew 
originates from a small number of high-frequency key values that dominate 
entire map-side output ranges — for example when the input data is pre-sorted 
or clustered by key, causing all rows for a hot key to land in contiguous map 
output — those rows cannot be split across tasks regardless of AQE thresholds.* 
AQE has no visibility into key-level frequency, only partition-level byte 
sizes, so it cannot route hot-key rows to a broadcast join where they could be 
processed efficiently.
h2. Why does this deserve solving?

Hot-key skew is a well-known and frequently reported production problem. The 
standard workaround — salting the join key — requires rewriting the query, adds 
complexity, and is error-prone. A hint-driven, transparent optimization that 
requires no query rewriting would meaningfully reduce the operational burden 
for users working with skewed datasets (e.g. joins involving user IDs, product 
IDs, or any high-cardinality column with a power-law distribution).

The optimization is also naturally bounded in scope: it only fires when the 
user explicitly opts in via a hint, so there is no risk of silent regression 
for existing queries.
h2. How are we planning to solve this?

We introduce a new SKEW_JOIN(tableName(col1, col2, ...)) hint that triggers a 
hot-key broadcast split at query planning time:

1. Hot-key sampling. At planning time, Spark executes a lightweight sampling 
subquery against the hinted (skewed) table. The subquery groups by the hinted 
columns, counts occurrences per key, and returns the top-N keys whose count 
exceeds a configurable threshold (scaled by the sampling fraction). Sampling is 
done via TABLESAMPLE to limit overhead on large tables.
2. Plan split. If hot keys are found, the join is split into two branches:
  - Hot branch: A BroadcastHashJoinExec that processes only rows whose join key 
matches a hot key. The non-skewed side is broadcast after being filtered to 
matching hot-key rows.
  - Normal branch: A SortMergeJoinExec that processes all remaining rows 
(skewed side filtered to exclude hot keys).
  - The results of both branches are combined with a union.
3. Fallback. If sampling finds no hot keys, or if a type mismatch prevents safe 
broadcast filtering, the operator transparently falls back to a standard 
SortMergeJoinExec with no change to query results.

The feature supports inner, left outer, right outer, left semi, and left anti 
joins. It is compatible with AQE. NULL join keys are always routed through the 
normal branch.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to