Hi Spark community, Please review the cleansed plan below. It is the result of joining a large, bucketed table with a smaller DF, and then applying a window function. Both the join and the window function use the same column, which is also the bucket column of the table ("key_col" in the plan). The join results in a map-side-join as expected, but then there is a shuffle for the window function, even though the data is already partitioned accordingly.
Can anyone explain why? Using Spark 3.5.0 Thanks, Shay == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- Project ... +- Filter (rn#5441 = 1) +- Window [row_number() windowspecdefinition(key_col#5394, _w0#5442 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rn#5441], [key_col#5394], [_w0#5442 ASC NULLS FIRST] +- WindowGroupLimit [key_col#5394], [_w0#5442 ASC NULLS FIRST], row_number(), 1, Final +- Sort [key_col#5394 ASC NULLS FIRST, _w0#5442 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(key_col#5394, 80000), ENSURE_REQUIREMENTS, [plan_id=592] +- WindowGroupLimit [key_col#5394], [_w0#5442 ASC NULLS FIRST], row_number(), 1, Partial +- Sort [key_col#5394 ASC NULLS FIRST, _w0#5442 ASC NULLS FIRST], false, 0 +- Project ... (key_col stays the same) +- Project [coalesce(key_col#0, key_col#5009) AS key_col#5394, CASE WHEN ... +- SortMergeJoin [key_col#0], [key_col#5009], FullOuter :- Sort [key_col#0 ASC NULLS FIRST], false, 0 : +- Project key_ : +- FileScan parquet bucketed table ... +- Sort [key_col#5009 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(key_col#5009, 80000), REPARTITION_BY_NUM, [plan_id=572] +- Project +- Filter +- Scan small table...