The actual code is not given, so I am going with the plan output and your
explanation
- You're joining a large, bucketed table with a smaller DataFrame on a
common column (key_col).
- The subsequent window function also uses key_col
- However, a shuffle occurs for the window function even though the data
is already partitioned by key_col
Potential data skew, Though
the table is bucketed, there might be significant data skew within the
buckets. This can cause uneven distribution of data, triggering a shuffle
for the window function.
import pyspark.sql.functions as F
df = spark.table("your_bucketed_table")
df = df.withColumn("approx_count", F.approx_count_distinct("key_col"))
df.groupBy("key_col").agg(F.avg("approx_count").alias("avg_count")).show()
HTH
Mich Talebzadeh,
Architect | Data Engineer | Data Science | Financial Crime
PhD <https://en.wikipedia.org/wiki/Doctor_of_Philosophy> Imperial College
London <https://en.wikipedia.org/wiki/Imperial_College_London>
London, United Kingdom
view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
https://en.everybodywiki.com/Mich_Talebzadeh
*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
On Thu, 15 Aug 2024 at 14:30, Shay Elbaz <[email protected]> wrote:
> Hi Spark community,
>
> Please review the cleansed plan below. It is the result of joining a
> large, bucketed table with a smaller DF, and then applying a window
> function. Both the join and the window function use the same column, which
> is also the bucket column of the table ("key_col" in the plan).
> The join results in a map-side-join as expected, but then there is a
> shuffle for the window function, even though the data is already
> partitioned accordingly.
>
> Can anyone explain why?
>
> Using Spark 3.5.0
>
>
> Thanks,
> Shay
>
> == Physical Plan ==
> AdaptiveSparkPlan isFinalPlan=false
> +- Project ...
> +- Filter (rn#5441 = 1)
> +- Window [row_number() windowspecdefinition(key_col#5394, _w0#5442 ASC
> NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(),
> currentrow$())) AS rn#5441], [key_col#5394], [_w0#5442 ASC NULLS FIRST]
> +- WindowGroupLimit [key_col#5394], [_w0#5442 ASC NULLS FIRST],
> row_number(), 1, Final
> +- Sort [key_col#5394 ASC NULLS FIRST, _w0#5442 ASC NULLS FIRST],
> false, 0
> +- Exchange hashpartitioning(key_col#5394, 80000),
> ENSURE_REQUIREMENTS, [plan_id=592]
> +- WindowGroupLimit [key_col#5394], [_w0#5442 ASC NULLS
> FIRST], row_number(), 1, Partial
> +- Sort [key_col#5394 ASC NULLS FIRST, _w0#5442 ASC
> NULLS FIRST], false, 0
> +- Project ... (key_col stays the same)
> +- Project [coalesce(key_col#0, key_col#5009) AS
> key_col#5394, CASE WHEN ...
> +- SortMergeJoin [key_col#0], [key_col#5009],
> FullOuter
> :- Sort [key_col#0 ASC NULLS FIRST], false, 0
> : +- Project key_
> : +- FileScan parquet bucketed table ...
> +- Sort [key_col#5009 ASC NULLS FIRST],
> false, 0
> +- Exchange
> hashpartitioning(key_col#5009, 80000), REPARTITION_BY_NUM, [plan_id=572]
> +- Project
> +- Filter
> +- Scan small table...
>
>
>