[
https://issues.apache.org/jira/browse/SPARK-54437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Rui Wang updated SPARK-54437:
-----------------------------
Attachment: (was: without.png)
> Single Partition Optimization Framework for small data
> ------------------------------------------------------
>
> Key: SPARK-54437
> URL: https://issues.apache.org/jira/browse/SPARK-54437
> Project: Spark
> Issue Type: New Feature
> Components: SQL
> Affects Versions: 4.2.0
> Reporter: Rui Wang
> Assignee: Rui Wang
> Priority: Major
>
> h4. Q1. What are you trying to do?
> This document suggests that we set `outputPartitioning` to `SinglePartition`
> (instead of the default UnknownPartition(0)) when the query runs over small
> data to help Spark better generate efficient execution plans for small
> queries.
> h4. Q2. What problem is this proposal NOT designed to solve?
> This doc does not propose implementing a new fast execution engine for Spark.
> This doc only suggests a minimal effort, low-hanging fruit change that sets
> `outputPartitioning` to `SinglePartition` when we detect Spark input data is
> small over a set of supported plan shapes.
> h4. Q3. How is it done today, and what are the limits of current practice?
> Currently, for a query plan if
> # Its input is LocalTableExec, which uses the UnknownPartitioning(0).
> # Its input is DataSourceScanExec, which also uses UnknownPartitioning(0)
> The problem from the `UnknownPartition(0)` is, the `EnsureRequirements` would
> apply an Exchange node (to ensure partitioning requirement), which would
> later turn into a shuffle implementation. Shuffle would break the execution
> to more stages and each stage will loop into a full lifecycle of Spark jobs,
> which is expensive if the query itself only needs a few hundred of million
> seconds to run. This is true if Spark only runs a very simple aggregation
> over a LocalTableScan with only a few thousand of rows or a small FileScan
> with <= 64MB. For such small data, all the operators running into a single
> stage without shuffle will be sufficient enough, and much faster.
>
> h4. Q4. What is new in your approach and why do you think it will be
> successful?
> First of all, our approach will be guarded by a config and it can be turned
> off, and it is supposed to be turned off by default.
> The new framework proposed here will
> # Verify if the plan shape is supported,
> # Verify if the input data size is small enough
> # Set the output partitioning to SinglePartition on input data source
> including LocalTableScan and FileScan
> # Propagate the SinglePartition through the query plan.
> # Spark will utilize SinglePartition information to eliminate unneeded
> shuffles.
> the rough code looks like the following:
> {code:scala}
> def exploreSinglePartitionOpportunites() = if (framework_is_on &&
> isPlanSupported(queryPlan)) {
> plan.transform {
> case l: LocalTableScan if isSmall(l) => SetSinglePartition(l)
> case f: FileScan if isSmall(f) => SetSinglePartition(f)
> }
> def setOutputPartitioning(plan: LogicalPlan) {
> for (child: plan.childrens) {
> setOutputPartitioning(child)
> }
> plan.setOutputPartitioning(SinglePartition)
> }
> if (exploreSinglePartitionOpportunites()) {
> setOutputPartitioning(rootPlan)
> }
> {code}
> h4. Q5. Who cares? If you are successful, what difference will it make?
> The majority of the Spark workload is not as huge as scanning terabytes of
> data. They often only scan one single file after file prunning and some
> megabytes, or consume a small local table. It could be an order of magnitude
> of tens of millions of queries that could be sped up to a certain percentage.
> For any means, this is non-trival impact in terms of its coverage.
> h4. Q6. What are the risks?
> SinglePartition setup might cause performance regression for certain cases
> like JOIN with high cardinality. For example, even though the JOIN input
> could be small, the output could explode. This risk is being addressed by
> adding a layer in the framework to verify the query plan shape and only
> support a selected plan shape at the beginning. For example for small data,
> aggregation and sort should almost always be safe.
> The framework will also by default turn off until it is improved and expanded
> by community and all feedback says it is mature to turn on as default.
> h4. Q7. How long will it take?
> About 4 weeks to build
> The framework
> Support for basic plan shape
> Macro benchmark to help developers measure the performance if they start
> extend the framework and the supported plan shape.
> h4. Q8. What are the mid-term and final “exams” to check for success?
> Enable the framework, having benchmarks to demonstrate the small query speeds
> up.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]