This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new c9f3370  [SPARK-30864][SQL][DOC] add the user guide for Adaptive Query 
Execution
c9f3370 is described below

commit c9f3370cb6b021ad2e7a8c6cc71109de353f6f04
Author: jiake <ke.a....@intel.com>
AuthorDate: Mon Mar 16 23:33:56 2020 +0800

    [SPARK-30864][SQL][DOC] add the user guide for Adaptive Query Execution
    
    ### What changes were proposed in this pull request?
    This PR will add the user guide for AQE and the detailed configurations 
about the three mainly features in AQE.
    
    ### Why are the changes needed?
    Add the detailed configurations.
    
    ### Does this PR introduce any user-facing change?
    No
    
    ### How was this patch tested?
    only add doc no need ut.
    
    Closes #27616 from JkSelf/aqeuserguide.
    
    Authored-by: jiake <ke.a....@intel.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
    (cherry picked from commit 21c02ee5d0c0f5951927ff1654cf1351c8f066e6)
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 docs/sql-performance-tuning.md                     | 60 ++++++++++++++++++++++
 .../execution/adaptive/OptimizeSkewedJoin.scala    |  2 +-
 2 files changed, 61 insertions(+), 1 deletion(-)

diff --git a/docs/sql-performance-tuning.md b/docs/sql-performance-tuning.md
index 5a86c0c..489575d 100644
--- a/docs/sql-performance-tuning.md
+++ b/docs/sql-performance-tuning.md
@@ -186,3 +186,63 @@ The "REPARTITION_BY_RANGE" hint must have column names and 
a partition number is
     SELECT /*+ REPARTITION(3, c) */ * FROM t
     SELECT /*+ REPARTITION_BY_RANGE(c) */ * FROM t
     SELECT /*+ REPARTITION_BY_RANGE(3, c) */ * FROM t
+
+## Adaptive Query Execution
+Adaptive Query Execution (AQE) is an optimization technique in Spark SQL that 
makes use of the runtime statistics to choose the most efficient query 
execution plan. AQE is disabled by default. Spark SQL can use the umbrella 
configuration of `spark.sql.adaptive.enabled` to control whether turn it 
on/off. As of Spark 3.0, there are three major features in AQE, including 
coalescing post-shuffle partitions, converting sort-merge join to broadcast 
join, and skew join optimization.
+
+### Coalescing Post Shuffle Partitions
+This feature coalesces the post shuffle partitions based on the map output 
statistics when both `spark.sql.adaptive.enabled` and 
`spark.sql.adaptive.coalescePartitions.enabled` configurations are true. This 
feature simplifies the tuning of shuffle partition number when running queries. 
You do not need to set a proper shuffle partition number to fit your dataset. 
Spark can pick the proper shuffle partition number at runtime once you set a 
large enough initial number of shuffle partitions  [...]
+ <table class="table">
+   <tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
+   <tr>
+     <td><code>spark.sql.adaptive.coalescePartitions.enabled</code></td>
+     <td>true</td>
+     <td>
+       When true and <code>spark.sql.adaptive.enabled</code> is true, Spark 
will coalesce contiguous shuffle partitions according to the target size 
(specified by <code>spark.sql.adaptive.advisoryPartitionSizeInBytes</code>), to 
avoid too many small tasks.
+     </td>
+   </tr>
+   <tr>
+     
<td><code>spark.sql.adaptive.coalescePartitions.minPartitionNum</code></td>
+     <td>Default Parallelism</td>
+     <td>
+       The minimum number of shuffle partitions after coalescing. If not set, 
the default value is the default parallelism of the Spark cluster. This 
configuration only has an effect when <code>spark.sql.adaptive.enabled</code> 
and <code>spark.sql.adaptive.coalescePartitions.enabled</code> are both enabled.
+     </td>
+   </tr>
+   <tr>
+     
<td><code>spark.sql.adaptive.coalescePartitions.initialPartitionNum</code></td>
+     <td>200</td>
+     <td>
+       The initial number of shuffle partitions before coalescing. By default 
it equals to <code>spark.sql.shuffle.partitions</code>. This configuration only 
has an effect when <code>spark.sql.adaptive.enabled</code> and 
<code>spark.sql.adaptive.coalescePartitions.enabled</code> are both enabled.
+     </td>
+   </tr>
+   <tr>
+     <td><code>spark.sql.adaptive.advisoryPartitionSizeInBytes</code></td>
+     <td>64 MB</td>
+     <td>
+       The advisory size in bytes of the shuffle partition during adaptive 
optimization (when <code>spark.sql.adaptive.enabled</code> is true). It takes 
effect when Spark coalesces small shuffle partitions or splits skewed shuffle 
partition.
+     </td>
+   </tr>
+ </table>
+ 
+### Converting sort-merge join to broadcast join
+AQE converts sort-merge join to broadcast hash join when the runtime 
statistics of any join side is smaller than the broadcast hash join threshold. 
This is not as efficient as planning a broadcast hash join in the first place, 
but it's better than keep doing the sort-merge join, as we can save the sorting 
of both the join sides, and read shuffle files locally to save network 
traffic(if `spark.sql.adaptive.localShuffleReader.enabled` is true)
+
+### Optimizing Skew Join
+Data skew can severely downgrade the performance of join queries. This feature 
dynamically handles skew in sort-merge join by splitting (and replicating if 
needed) skewed tasks into roughly evenly sized tasks. It takes effect when both 
`spark.sql.adaptive.enabled` and `spark.sql.adaptive.skewJoin.enabled` 
configurations are enabled.
+  <table class="table">
+     <tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
+     <tr>
+       <td><code>spark.sql.adaptive.skewJoin.enabled</code></td>
+       <td>true</td>
+       <td>
+         When true and <code>spark.sql.adaptive.enabled</code> is true, Spark 
dynamically handles skew in sort-merge join by splitting (and replicating if 
needed) skewed partitions.
+       </td>
+     </tr>
+     <tr>
+       <td><code>spark.sql.adaptive.skewJoin.skewedPartitionFactor</code></td>
+       <td>10</td>
+       <td>
+         A partition is considered as skewed if its size is larger than this 
factor multiplying the median partition size and also larger than 
<code>spark.sql.adaptive.advisoryPartitionSizeInBytes</code>.
+       </td>
+     </tr>
+   </table>
\ No newline at end of file
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
index a75a3f3..db65af6 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
@@ -304,7 +304,7 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends 
Rule[SparkPlan] {
 
     if (shuffleStages.length == 2) {
       // When multi table join, there will be too many complex combination to 
consider.
-      // Currently we only handle 2 table join like following two use cases.
+      // Currently we only handle 2 table join like following use case.
       // SMJ
       //   Sort
       //     Shuffle


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to