[ 
https://issues.apache.org/jira/browse/SPARK-54437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Wang updated SPARK-54437:
-----------------------------
    Attachment:     (was: without.png)

> Single Partition Optimization Framework for small data
> ------------------------------------------------------
>
>                 Key: SPARK-54437
>                 URL: https://issues.apache.org/jira/browse/SPARK-54437
>             Project: Spark
>          Issue Type: New Feature
>          Components: SQL
>    Affects Versions: 4.2.0
>            Reporter: Rui Wang
>            Assignee: Rui Wang
>            Priority: Major
>
> h4. Q1. What are you trying to do?
> This document suggests that we set `outputPartitioning` to `SinglePartition` 
> (instead of the default UnknownPartition(0)) when the query runs over small 
> data to help Spark better generate efficient execution plans for small 
> queries.
> h4. Q2. What problem is this proposal NOT designed to solve?
> This doc does not propose implementing a new fast execution engine for Spark. 
> This doc only suggests a minimal effort, low-hanging fruit change that sets 
> `outputPartitioning` to `SinglePartition` when we detect Spark input data is 
> small over a set of supported plan shapes.
> h4. Q3. How is it done today, and what are the limits of current practice?
> Currently, for a query plan if
> # Its input is LocalTableExec, which uses the UnknownPartitioning(0).
> # Its input is DataSourceScanExec, which also uses UnknownPartitioning(0)
> The problem from the `UnknownPartition(0)` is, the `EnsureRequirements` would 
> apply an Exchange node (to ensure partitioning requirement), which would 
> later turn into a shuffle implementation. Shuffle would break the execution 
> to more stages and each stage will loop into a full lifecycle of Spark jobs, 
> which is expensive if the query itself only needs a few hundred of million 
> seconds to run. This is true if Spark only runs a very simple aggregation 
> over a LocalTableScan with only a few thousand of rows or a small FileScan 
> with <= 64MB. For such small data, all the operators running into a single 
> stage without shuffle will be sufficient enough, and much faster.
>  
> h4. Q4. What is new in your approach and why do you think it will be 
> successful?
> First of all, our approach will be guarded by a config and it can be turned 
> off, and it is supposed to be turned off by default.
> The new framework proposed here will
> # Verify if the plan shape is supported,
> # Verify if the input data size is small enough
> # Set the output partitioning to SinglePartition on input data source 
> including LocalTableScan and FileScan
> # Propagate the SinglePartition through the query plan.
> # Spark will utilize SinglePartition information to eliminate unneeded 
> shuffles.
> the rough code looks like the following:
> {code:scala}
> def exploreSinglePartitionOpportunites() = if (framework_is_on && 
> isPlanSupported(queryPlan)) {
>  plan.transform {
>       case l: LocalTableScan if isSmall(l) => SetSinglePartition(l)
>       case f: FileScan if isSmall(f) => SetSinglePartition(f)
>   }
> def setOutputPartitioning(plan: LogicalPlan) {
>    for (child: plan.childrens) {
>       setOutputPartitioning(child)
>   }
>   plan.setOutputPartitioning(SinglePartition)
> }
> if (exploreSinglePartitionOpportunites()) {
>    setOutputPartitioning(rootPlan)
> }
> {code}
> h4. Q5. Who cares? If you are successful, what difference will it make?
> The majority of the Spark workload is not as huge as scanning terabytes of 
> data. They often only scan one single file after file prunning and some 
> megabytes, or consume a small local table. It could be an order of magnitude 
> of tens of millions of queries that could be sped up to a certain percentage. 
> For any means, this is non-trival impact in terms of its coverage.
> h4. Q6. What are the risks?
> SinglePartition setup might cause performance regression for certain cases 
> like JOIN with high cardinality. For example, even though the JOIN input 
> could be small, the output could explode. This risk is being addressed by 
> adding a layer in the framework to verify the query plan shape and only 
> support a selected plan shape at the beginning. For example for small data, 
> aggregation and sort should almost always be safe.
> The framework will also by default turn off until it is improved and expanded 
> by community and all feedback says it is mature to turn on as default.
> h4. Q7. How long will it take?
> About 4 weeks to build 
> The framework
> Support for basic plan shape
> Macro benchmark to help developers measure the performance if they start 
> extend the framework and the supported plan shape.
> h4. Q8. What are the mid-term and final “exams” to check for success?
> Enable the framework, having benchmarks to demonstrate the small query speeds 
> up.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to