James Xu created SPARK-56906:
--------------------------------

             Summary: [SQL] Add SKEW_JOIN hint with SkewKeyJoinExec to 
parallelize output-skewed joins
                 Key: SPARK-56906
                 URL: https://issues.apache.org/jira/browse/SPARK-56906
             Project: Spark
          Issue Type: Improvement
          Components: Optimizer
    Affects Versions: 4.2.0
            Reporter: James Xu


h2. Problem:


In production fact-to-fact joins, a single reducer can become a straggler even
when all shuffle partitions appear balanced. This happens when a hot key appears
with high frequency on *both* sides of the join, producing a Cartesian
explosion of output rows concentrated on one reducer. Adaptive Query Execution
(AQE) sees no oversized shuffle blocks and does nothing, yet one reducer
produces 50M+ rows while others finish in seconds.

Example query pattern:
{code:java}
SELECT /*+ SKEW_JOIN(orders(user_id)) */
  o.order_id, o.amount, e.event_type, e.ts
FROM orders o
JOIN order_events e ON o.user_id = e.user_id;
-- With data:
--   orders:      5,000 rows for user_id = 42, scattered across ~1,000 mappers
--   order_events: 10,000 rows for user_id = 42, scattered across ~1,000 mappers
--   Output for user_id = 42: 5,000 × 10,000 = 50,000,000 rows on one reducer 
{code}
This pattern is common in:
- Fact-to-fact joins (orders JOIN page_views ON user_id)
- Event correlation (impressions JOIN conversions ON campaign_id)
- Graph traversals (nodes JOIN edges on hub nodes)
- Sessionization (events JOIN sessions on session_id)
h2. Root Cause:


AQE's OptimizeSkewedJoin rule inspects MapOutputStatistics (shuffle block
sizes) and splits partitions that exceed a size threshold. The rule has no
visibility into key frequencies within blocks. When a hot key's rows are
spread thinly across many mappers, each per-mapper block is small and the
partition size appears normal. AQE never triggers.

Even if AQE did detect the case, its remedy — splitting the skewed partition
along mapper ranges and replicating the entire other-side partition K times —
would multiply read traffic for all cold keys in the same hash bucket, not
just the hot key. The actual problem is a Cartesian product on a single key,
not a large shuffle block.
h2. Solution:


Add a SKEW_JOIN hint that triggers a new physical operator,
SkewKeyJoinExec, as an alternative join strategy. The operator:

1. Samples the skewed side at prepare time (configurable fraction, default 10%)
2. Identifies hot keys by frequency, using an adaptive threshold based on
   estimated row count and shuffle partition count
3. Rewrites the join into two branches:

   Hot branch:  BroadcastHashJoinExec(A_hot, B_hot)
                -- streamed side filtered to hot keys, broadcast side filtered
                -- to matching hot keys; executes on all executors holding
                -- hot-key rows (output parallelization)

   Cold branch: SortMergeJoinExec(A_cold, B_cold)
                -- both sides exclude hot keys; shuffle partitions are now
                -- balanced with no residual skew

   Union:       UnionExec(hot_result, cold_result)

Algebraic identity (inner join, A is skewed side):

  A ⋈ B = (A.filter(k ∈ H) ⋈ B.filter(k ∈ H))
        ∪ (A.filter(k ∉ H) ⋈ B.filter(k ∉ H))

The rewrite is correct for inner and outer joins where the skewed side is the
streamed/large side. Safety guards:

- NULL keys are excluded from hot-key sampling and routed through the SMJ branch
- If broadcast-side join key types do not match skew column types (e.g. due to
  implicit cast), falls back to full SortMergeJoin with a warning
- Non-deterministic expressions on either side cause rejection with
  INVALID_SKEW_JOIN_HINT, since reading the side twice could produce
  different values

Hint syntax:

 
{code:java}
  /*+ SKEW_JOIN(table_name(col1, col2)) */
  /*+ SKEW_JOIN(orders(user_id)) */
  /*+ SKEW_JOIN(t1(a, b)) */  -- multi-column equi-join {code}
  SKEWJOIN (no underscore) is accepted as an alias.

 
h2. Expected Impact:


In a synthetic workload matching the fact-to-fact join pattern above (5K × 10K
hot key rows, 1,000 mappers, 200 reducers), projected metrics:

- Before: one reducer produces 50M rows; stage wall-clock dominated by
  straggler running ~45–60 minutes; single executor OOM risk
- After: hot-key rows distributed across all executors via broadcast join;
  cold branch SMJ has no skew; stage wall-clock drops to ~2–3 minutes;
  no single-point bottleneck

AQE remains the default for partition-size skew. SKEW_JOIN is an opt-in hint
for key-frequency skew that AQE cannot see.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to