[
https://issues.apache.org/jira/browse/SPARK-54437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Rui Wang updated SPARK-54437:
-----------------------------
Description:
h4. Q1. What are you trying to do?
This document suggests that we set `outputPartitioning` to `SinglePartition`
(instead of the default UnknownPartition(0)) when the query runs over small
data to help Spark better generate efficient execution plans for small queries.
h4. Q2. What problem is this proposal NOT designed to solve?
This doc does not propose implementing a new fast execution engine for Spark.
This doc only suggests a minimal effort, low-hanging fruit change that sets
`outputPartitioning` to `SinglePartition` when we detect Spark input data is
small over a set of supported plan shapes.
h4. Q3. How is it done today, and what are the limits of current practice?
Currently, for a query plan if
# Its input is LocalTableExec, which uses the
[UnknownPartitioning(0)|https://github.com/apache/spark/blob/4ac02ab225f38429ba01a9ee064f8027c8f94e97/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala#L163].
# Its input is DataSourceScanExec, which also uses
[UnknownPartitioning(0)|https://github.com/apache/spark/blob/1716b292dec2ebe6c3d8dd75c9d4d59437219c08/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L478]
The problem from the `UnknownPartition(0)` is, the `EnsureRequirements` would
apply an Exchange node (to ensure partitioning requirement), which would later
turn into a shuffle implementation. Shuffle would break the execution to more
stages and each stage will loop into a full lifecycle of Spark jobs, which is
expensive if the query itself only needs a few hundred of million seconds to
run. This is true if Spark only runs a very simple aggregation over a
LocalTableScan with only a few thousand of rows or a small FileScan with <=
64MB. For such small data, all the operators running into a single stage
without shuffle will be sufficient enough, and much faster.
h4. Q4. What is new in your approach and why do you think it will be successful?
First of all, our approach will be guarded by a config and it can be turned
off, and it is supposed to be turned off by default.
The new framework proposed here will
# Verify if the plan shape is supported,
# Verify if the input data size is small enough
# Set the output partitioning to SinglePartition on input data source including
LocalTableScan and FileScan
# Propagate the SinglePartition through the query plan.
# Spark will utilize SinglePartition information to eliminate unneeded shuffles.
the rough code looks like the following:
{code:scala}
def exploreSinglePartitionOpportunites() = if (framework_is_on &&
isPlanSupported(queryPlan)) {
plan.transform {
case l: LocalTableScan if isSmall(l) => SetSinglePartition(l)
case f: FileScan if isSmall(f) => SetSinglePartition(f)
}
def setOutputPartitioning(plan: LogicalPlan) {
for (child: plan.childrens) {
setOutputPartitioning(child)
}
plan.setOutputPartitioning(SinglePartition)
}
if (exploreSinglePartitionOpportunites()) {
setOutputPartitioning(rootPlan)
}
{code}
h4. Q5. Who cares? If you are successful, what difference will it make?
The majority of the Spark workload is not as huge as scanning terabytes of
data. They often only scan one single file after file prunning and some
megabytes, or consume a small local table. It could be an order of magnitude of
tens of millions of queries that could be sped up to a certain percentage. For
any means, this is non-trival impact in terms of its coverage.
h4. Q6. What are the risks?
SinglePartition setup might cause performance regression for certain cases like
JOIN with high cardinality. For example, even though the JOIN input could be
small, the output could explode. This risk is being addressed by adding a layer
in the framework to verify the query plan shape and only support a selected
plan shape at the beginning. For example for small data, aggregation and sort
should almost always be safe.
The framework will also by default turn off until it is improved and expanded
by community and all feedback says it is mature to turn on as default.
h4. Q7. How long will it take?
About 4 weeks to build
The framework
Support for basic plan shape
Macro benchmark to help developers measure the performance if they start extend
the framework and the supported plan shape.
h4. Q8. What are the mid-term and final “exams” to check for success?
Enable the framework, having benchmarks to demonstrate the small query speeds
up.
was:
h4. Q1. What are you trying to do?
This document suggests that we set `outputPartitioning` to `SinglePartition`
(instead of the default UnknownPartition(0)) when the query runs over small
data to help Spark better generate efficient execution plans for small queries.
h4. Q2. What problem is this proposal NOT designed to solve?
This doc does not propose implementing a new fast execution engine for Spark.
This doc only suggests a minimal effort, low-hanging fruit change that sets
`outputPartitioning` to `SinglePartition` when we detect Spark input data is
small over a set of supported plan shapes.
h4. Q3. How is it done today, and what are the limits of current practice?
Currently, for a query plan if
# Its input is LocalTableExec, which uses the
[UnknownPartitioning(0)|https://github.com/apache/spark/blob/4ac02ab225f38429ba01a9ee064f8027c8f94e97/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala#L163].
# Its input is DataSourceScanExec, which also uses UnknownPartitioning(0)
The problem from the `UnknownPartition(0)` is, the `EnsureRequirements` would
apply an Exchange node (to ensure partitioning requirement), which would later
turn into a shuffle implementation. Shuffle would break the execution to more
stages and each stage will loop into a full lifecycle of Spark jobs, which is
expensive if the query itself only needs a few hundred of million seconds to
run. This is true if Spark only runs a very simple aggregation over a
LocalTableScan with only a few thousand of rows or a small FileScan with <=
64MB. For such small data, all the operators running into a single stage
without shuffle will be sufficient enough, and much faster.
h4. Q4. What is new in your approach and why do you think it will be successful?
First of all, our approach will be guarded by a config and it can be turned
off, and it is supposed to be turned off by default.
The new framework proposed here will
# Verify if the plan shape is supported,
# Verify if the input data size is small enough
# Set the output partitioning to SinglePartition on input data source including
LocalTableScan and FileScan
# Propagate the SinglePartition through the query plan.
# Spark will utilize SinglePartition information to eliminate unneeded shuffles.
the rough code looks like the following:
{code:scala}
def exploreSinglePartitionOpportunites() = if (framework_is_on &&
isPlanSupported(queryPlan)) {
plan.transform {
case l: LocalTableScan if isSmall(l) => SetSinglePartition(l)
case f: FileScan if isSmall(f) => SetSinglePartition(f)
}
def setOutputPartitioning(plan: LogicalPlan) {
for (child: plan.childrens) {
setOutputPartitioning(child)
}
plan.setOutputPartitioning(SinglePartition)
}
if (exploreSinglePartitionOpportunites()) {
setOutputPartitioning(rootPlan)
}
{code}
h4. Q5. Who cares? If you are successful, what difference will it make?
The majority of the Spark workload is not as huge as scanning terabytes of
data. They often only scan one single file after file prunning and some
megabytes, or consume a small local table. It could be an order of magnitude of
tens of millions of queries that could be sped up to a certain percentage. For
any means, this is non-trival impact in terms of its coverage.
h4. Q6. What are the risks?
SinglePartition setup might cause performance regression for certain cases like
JOIN with high cardinality. For example, even though the JOIN input could be
small, the output could explode. This risk is being addressed by adding a layer
in the framework to verify the query plan shape and only support a selected
plan shape at the beginning. For example for small data, aggregation and sort
should almost always be safe.
The framework will also by default turn off until it is improved and expanded
by community and all feedback says it is mature to turn on as default.
h4. Q7. How long will it take?
About 4 weeks to build
The framework
Support for basic plan shape
Macro benchmark to help developers measure the performance if they start extend
the framework and the supported plan shape.
h4. Q8. What are the mid-term and final “exams” to check for success?
Enable the framework, having benchmarks to demonstrate the small query speeds
up.
> Single Partition Optimization Framework for small data
> ------------------------------------------------------
>
> Key: SPARK-54437
> URL: https://issues.apache.org/jira/browse/SPARK-54437
> Project: Spark
> Issue Type: New Feature
> Components: SQL
> Affects Versions: 4.2.0
> Reporter: Rui Wang
> Assignee: Rui Wang
> Priority: Major
>
> h4. Q1. What are you trying to do?
> This document suggests that we set `outputPartitioning` to `SinglePartition`
> (instead of the default UnknownPartition(0)) when the query runs over small
> data to help Spark better generate efficient execution plans for small
> queries.
> h4. Q2. What problem is this proposal NOT designed to solve?
> This doc does not propose implementing a new fast execution engine for Spark.
> This doc only suggests a minimal effort, low-hanging fruit change that sets
> `outputPartitioning` to `SinglePartition` when we detect Spark input data is
> small over a set of supported plan shapes.
> h4. Q3. How is it done today, and what are the limits of current practice?
> Currently, for a query plan if
> # Its input is LocalTableExec, which uses the
> [UnknownPartitioning(0)|https://github.com/apache/spark/blob/4ac02ab225f38429ba01a9ee064f8027c8f94e97/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala#L163].
> # Its input is DataSourceScanExec, which also uses
> [UnknownPartitioning(0)|https://github.com/apache/spark/blob/1716b292dec2ebe6c3d8dd75c9d4d59437219c08/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L478]
> The problem from the `UnknownPartition(0)` is, the `EnsureRequirements` would
> apply an Exchange node (to ensure partitioning requirement), which would
> later turn into a shuffle implementation. Shuffle would break the execution
> to more stages and each stage will loop into a full lifecycle of Spark jobs,
> which is expensive if the query itself only needs a few hundred of million
> seconds to run. This is true if Spark only runs a very simple aggregation
> over a LocalTableScan with only a few thousand of rows or a small FileScan
> with <= 64MB. For such small data, all the operators running into a single
> stage without shuffle will be sufficient enough, and much faster.
>
> h4. Q4. What is new in your approach and why do you think it will be
> successful?
> First of all, our approach will be guarded by a config and it can be turned
> off, and it is supposed to be turned off by default.
> The new framework proposed here will
> # Verify if the plan shape is supported,
> # Verify if the input data size is small enough
> # Set the output partitioning to SinglePartition on input data source
> including LocalTableScan and FileScan
> # Propagate the SinglePartition through the query plan.
> # Spark will utilize SinglePartition information to eliminate unneeded
> shuffles.
> the rough code looks like the following:
> {code:scala}
> def exploreSinglePartitionOpportunites() = if (framework_is_on &&
> isPlanSupported(queryPlan)) {
> plan.transform {
> case l: LocalTableScan if isSmall(l) => SetSinglePartition(l)
> case f: FileScan if isSmall(f) => SetSinglePartition(f)
> }
> def setOutputPartitioning(plan: LogicalPlan) {
> for (child: plan.childrens) {
> setOutputPartitioning(child)
> }
> plan.setOutputPartitioning(SinglePartition)
> }
> if (exploreSinglePartitionOpportunites()) {
> setOutputPartitioning(rootPlan)
> }
> {code}
> h4. Q5. Who cares? If you are successful, what difference will it make?
> The majority of the Spark workload is not as huge as scanning terabytes of
> data. They often only scan one single file after file prunning and some
> megabytes, or consume a small local table. It could be an order of magnitude
> of tens of millions of queries that could be sped up to a certain percentage.
> For any means, this is non-trival impact in terms of its coverage.
> h4. Q6. What are the risks?
> SinglePartition setup might cause performance regression for certain cases
> like JOIN with high cardinality. For example, even though the JOIN input
> could be small, the output could explode. This risk is being addressed by
> adding a layer in the framework to verify the query plan shape and only
> support a selected plan shape at the beginning. For example for small data,
> aggregation and sort should almost always be safe.
> The framework will also by default turn off until it is improved and expanded
> by community and all feedback says it is mature to turn on as default.
> h4. Q7. How long will it take?
> About 4 weeks to build
> The framework
> Support for basic plan shape
> Macro benchmark to help developers measure the performance if they start
> extend the framework and the supported plan shape.
> h4. Q8. What are the mid-term and final “exams” to check for success?
> Enable the framework, having benchmarks to demonstrate the small query speeds
> up.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]