Ammar Chalifah created SPARK-54542:
--------------------------------------

             Summary: Catalyst Distribution Inference to Avoid Unnecessary 
Exchange
                 Key: SPARK-54542
                 URL: https://issues.apache.org/jira/browse/SPARK-54542
             Project: Spark
          Issue Type: Wish
          Components: Spark Core
    Affects Versions: 3.5.7
         Environment: PySpark

Spark 3.5.7

 
            Reporter: Ammar Chalifah


I worked on a pipeline where the explicit distribution of the data changes, but 
the implicit doesn't change.

For example, the data initially is distributed by a column `part`, where `part` 
is a function of two other columns (`part = f(a, b)`). In the downstream 
stages, when an aggregation/join happens on `a`/`b`/`a` & `b`, mathematically a 
shuffle is not necessary. However, Catalyst expects a very explicit 
distribution equality, and in this case, it detects that the distribution is 
not equal (`hashpartitioning(part) != hashpartitioning(a, b)`). As a 
consequence, Catalyst inserts an extra shuffle, reducing the overall job 
performance.

The identity of `part` = `f(a, b)` could be explicitly described within the 
same SparkPlan, or could be hidden/abstracted away (e.g. intermediate data 
already stored, or separate action calls on top of cached data). However, it 
would be great if there is a supported API that let user inform Spark what is 
the expected distribution and whether shuffle is necessary or not.

*Personal Hack*

To solve the problem for my particular case, I created a tiny JAR with the sole 
purpose of surgically removing the exact unnecessary Exchange by a set of 
parameters (columns of a `hashpartitioning(...)` that shouldn't have an extra 
Exchange).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to