Ammar Chalifah created SPARK-54542:
--------------------------------------
Summary: Catalyst Distribution Inference to Avoid Unnecessary
Exchange
Key: SPARK-54542
URL: https://issues.apache.org/jira/browse/SPARK-54542
Project: Spark
Issue Type: Wish
Components: Spark Core
Affects Versions: 3.5.7
Environment: PySpark
Spark 3.5.7
Reporter: Ammar Chalifah
I worked on a pipeline where the explicit distribution of the data changes, but
the implicit doesn't change.
For example, the data initially is distributed by a column `part`, where `part`
is a function of two other columns (`part = f(a, b)`). In the downstream
stages, when an aggregation/join happens on `a`/`b`/`a` & `b`, mathematically a
shuffle is not necessary. However, Catalyst expects a very explicit
distribution equality, and in this case, it detects that the distribution is
not equal (`hashpartitioning(part) != hashpartitioning(a, b)`). As a
consequence, Catalyst inserts an extra shuffle, reducing the overall job
performance.
The identity of `part` = `f(a, b)` could be explicitly described within the
same SparkPlan, or could be hidden/abstracted away (e.g. intermediate data
already stored, or separate action calls on top of cached data). However, it
would be great if there is a supported API that let user inform Spark what is
the expected distribution and whether shuffle is necessary or not.
*Personal Hack*
To solve the problem for my particular case, I created a tiny JAR with the sole
purpose of surgically removing the exact unnecessary Exchange by a set of
parameters (columns of a `hashpartitioning(...)` that shouldn't have an extra
Exchange).
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]