[
https://issues.apache.org/jira/browse/SPARK-54437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Rui Wang updated SPARK-54437:
-----------------------------
Attachment: without.png
> Single Partition Optimization Framework for small data
> ------------------------------------------------------
>
> Key: SPARK-54437
> URL: https://issues.apache.org/jira/browse/SPARK-54437
> Project: Spark
> Issue Type: New Feature
> Components: SQL
> Affects Versions: 4.2.0
> Reporter: Rui Wang
> Assignee: Rui Wang
> Priority: Major
> Attachments: without.png
>
>
> Q1. What are you trying to do?
> This document suggests that we set `outputPartitioning` to `SinglePartition`
> (instead of the default UnknownPartition(0)) when the query runs over small
> data to help Spark better generate efficient execution plans for small
> queries.
> Q2. What problem is this proposal NOT designed to solve?
> This doc does not propose implementing a new fast execution engine for Spark.
> This doc only suggests a minimal effort, low-hanging fruit change that sets
> `outputPartitioning` to `SinglePartition` when we detect Spark input data is
> small over a set of supported plan shapes.
> Q3. How is it done today, and what are the limits of current practice?
> Currently, for a query plan if
> Its input is LocalTableExec, which uses the default UnknownPartitioning(0)
> Its input is DataSourceScanExec, which also uses UnknownPartitioning(0)
> The problem from the `UnknownPartition(0)` is, the `EnsureRequirements` would
> apply an Exchange node (to ensure partitioning requirement), which would
> later turn into a shuffle implementation. Shuffle would break the execution
> to more stages and each stage will loop into a full lifecycle of Spark jobs,
> which is expensive if the query itself only needs a few hundred of million
> seconds to run. This is true if Spark only runs a very simple aggregation
> over a LocalTableScan with only a few thousand of rows or a small FileScan
> with <= 64MB. However for such small data, all the operators running into a
> single stage without shuffle will be sufficient enough, and much faster.
>
> Q4. What is new in your approach and why do you think it will be successful?
> First of all, our approach will be guarded by a config and it can be turned
> off, and it is supposed to be turned off by default.
> The new framework proposed here will
> Verify if the plan shape is supported,
> Verify if the input data size is small enough
> Set the output partitioning to SinglePartition on input data source including
> LocalTableScan and FileScan
> Propagate the SinglePartition through the query plan.
> Spark will utilize SinglePartition information to eliminate unneeded shuffles.
> the rough code looks like the following:
> {code:scala}
> def exploreSinglePartitionOpportunites() = if (framework_is_on &&
> isPlanSupported(queryPlan)) {
> plan.transform {
> case l: LocalTableScan if isLocalTableSmall(l) => SetSinglePartition(l)
> case f: FileScan if isLocalTableSmall(f) => SetSinglePartition(f)
> }
> def setOutputPartitioning(plan: LogicalPlan) {
> for (child: plan.childrens) {
> setOutputPartitioning(child)
> }
> plan.setOutputPartitioning(SinglePartition)
> }
> if (exploreSinglePartitionOpportunites()) {
> setOutputPartitioning(rootPlan)
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]