[ 
https://issues.apache.org/jira/browse/SPARK-54437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Wang updated SPARK-54437:
-----------------------------
    Attachment: without.png

> Single Partition Optimization Framework for small data
> ------------------------------------------------------
>
>                 Key: SPARK-54437
>                 URL: https://issues.apache.org/jira/browse/SPARK-54437
>             Project: Spark
>          Issue Type: New Feature
>          Components: SQL
>    Affects Versions: 4.2.0
>            Reporter: Rui Wang
>            Assignee: Rui Wang
>            Priority: Major
>         Attachments: without.png
>
>
> Q1. What are you trying to do?
> This document suggests that we set `outputPartitioning` to `SinglePartition` 
> (instead of the default UnknownPartition(0)) when the query runs over small 
> data to help Spark better generate efficient execution plans for small 
> queries.
> Q2. What problem is this proposal NOT designed to solve?
> This doc does not propose implementing a new fast execution engine for Spark. 
> This doc only suggests a minimal effort, low-hanging fruit change that sets 
> `outputPartitioning` to `SinglePartition` when we detect Spark input data is 
> small over a set of supported plan shapes.
> Q3. How is it done today, and what are the limits of current practice?
> Currently, for a query plan if
> Its input is LocalTableExec, which uses the default UnknownPartitioning(0)
> Its input is DataSourceScanExec, which also uses UnknownPartitioning(0)
> The problem from the `UnknownPartition(0)` is, the `EnsureRequirements` would 
> apply an Exchange node (to ensure partitioning requirement), which would 
> later turn into a shuffle implementation. Shuffle would break the execution 
> to more stages and each stage will loop into a full lifecycle of Spark jobs, 
> which is expensive if the query itself only needs a few hundred of million 
> seconds to run. This is true if Spark only runs a very simple aggregation 
> over a LocalTableScan with only a few thousand of rows or a small FileScan 
> with <= 64MB. However for such small data, all the operators running into a 
> single stage without shuffle will be sufficient enough, and much faster.
>  
> Q4. What is new in your approach and why do you think it will be successful?
> First of all, our approach will be guarded by a config and it can be turned 
> off, and it is supposed to be turned off by default.
> The new framework proposed here will
> Verify if the plan shape is supported,
> Verify if the input data size is small enough
> Set the output partitioning to SinglePartition on input data source including 
> LocalTableScan and FileScan
> Propagate the SinglePartition through the query plan.
> Spark will utilize SinglePartition information to eliminate unneeded shuffles.
> the rough code looks like the following:
> {code:scala}
> def exploreSinglePartitionOpportunites() = if (framework_is_on && 
> isPlanSupported(queryPlan)) {
>  plan.transform {
>       case l: LocalTableScan if isLocalTableSmall(l) => SetSinglePartition(l)
>       case f: FileScan if isLocalTableSmall(f) => SetSinglePartition(f)
>   }
> def setOutputPartitioning(plan: LogicalPlan) {
>    for (child: plan.childrens) {
>       setOutputPartitioning(child)
>   }
>   plan.setOutputPartitioning(SinglePartition)
> }
> if (exploreSinglePartitionOpportunites()) {
>    setOutputPartitioning(rootPlan)
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to