This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 46b7f17  [SPARK-31037][SQL] refine AQE config names
46b7f17 is described below

commit 46b7f1796bd0b96977ce9b473601033f397a3b18
Author: Wenchen Fan <wenc...@databricks.com>
AuthorDate: Fri Mar 6 00:46:34 2020 +0800

    [SPARK-31037][SQL] refine AQE config names
    
    When introducing AQE to others, I feel the config names are a bit 
incoherent and hard to use.
    This PR refines the config names:
    1. remove the "shuffle" prefix. AQE is all about shuffle and we don't need 
to add the "shuffle" prefix everywhere.
    2. `targetPostShuffleInputSize` is obscure, rename to 
`advisoryShufflePartitionSizeInBytes`.
    3. `reducePostShufflePartitions` doesn't match the actual optimization, 
rename to `coalesceShufflePartitions`
    4. `minNumPostShufflePartitions` is obscure, rename it `minPartitionNum` 
under the `coalesceShufflePartitions` namespace
    5. `maxNumPostShufflePartitions` is confusing with the word "max", rename 
it `initialPartitionNum`
    6. `skewedJoinOptimization` is too verbose. skew join is a well-known 
terminology in database area, we can just say `skewJoin`
    
    Make the config names easy to understand.
    
    deprecate the config `spark.sql.adaptive.shuffle.targetPostShuffleInputSize`
    
    N/A
    
    Closes #27793 from cloud-fan/aqe.
    
    Authored-by: Wenchen Fan <wenc...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../org/apache/spark/sql/internal/SQLConf.scala    | 142 +++++++++++----------
 .../spark/sql/execution/ShuffledRowRDD.scala       |   2 +-
 .../execution/adaptive/AdaptiveSparkPlanExec.scala |   2 +-
 ...tions.scala => CoalesceShufflePartitions.scala} |  14 +-
 .../execution/adaptive/OptimizeSkewedJoin.scala    |  14 +-
 .../execution/exchange/EnsureRequirements.scala    |   4 +-
 ....scala => CoalesceShufflePartitionsSuite.scala} |  12 +-
 .../adaptive/AdaptiveQueryExecSuite.scala          |   8 +-
 .../apache/spark/sql/internal/SQLConfSuite.scala   |  26 ++--
 .../spark/sql/sources/BucketedReadSuite.scala      |   2 +-
 10 files changed, 117 insertions(+), 109 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 3dbfc65..b2b3d12 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -356,8 +356,16 @@ object SQLConf {
     .checkValue(_ > 0, "The value of spark.sql.shuffle.partitions must be 
positive")
     .createWithDefault(200)
 
+  val SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE =
+    buildConf("spark.sql.adaptive.shuffle.targetPostShuffleInputSize")
+      .internal()
+      .doc("(Deprecated since Spark 3.0)")
+      .bytesConf(ByteUnit.BYTE)
+      .createWithDefaultString("64MB")
+
   val ADAPTIVE_EXECUTION_ENABLED = buildConf("spark.sql.adaptive.enabled")
-    .doc("When true, enable adaptive query execution.")
+    .doc("When true, enable adaptive query execution, which re-optimizes the 
query plan in the " +
+      "middle of query execution, based on accurate runtime statistics.")
     .booleanConf
     .createWithDefault(false)
 
@@ -365,90 +373,90 @@ object SQLConf {
     .internal()
     .doc("Adaptive query execution is skipped when the query does not have 
exchanges or " +
       "sub-queries. By setting this config to true (together with " +
-      s"'${ADAPTIVE_EXECUTION_ENABLED.key}' enabled), Spark will force apply 
adaptive query " +
+      s"'${ADAPTIVE_EXECUTION_ENABLED.key}' set to true), Spark will force 
apply adaptive query " +
       "execution for all supported queries.")
     .booleanConf
     .createWithDefault(false)
 
-  val REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED =
-    buildConf("spark.sql.adaptive.shuffle.reducePostShufflePartitions")
-      .doc(s"When true and '${ADAPTIVE_EXECUTION_ENABLED.key}' is enabled, 
this enables reducing " +
-        "the number of post-shuffle partitions based on map output 
statistics.")
-      .booleanConf
-      .createWithDefault(true)
+  val ADVISORY_PARTITION_SIZE_IN_BYTES =
+    buildConf("spark.sql.adaptive.advisoryPartitionSizeInBytes")
+      .doc("The advisory size in bytes of the shuffle partition during 
adaptive optimization " +
+        s"(when ${ADAPTIVE_EXECUTION_ENABLED.key} is true). It takes effect 
when Spark " +
+        "coalesces small shuffle partitions or splits skewed shuffle 
partition.")
+      .fallbackConf(SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE)
 
-  val FETCH_SHUFFLE_BLOCKS_IN_BATCH_ENABLED =
-    buildConf("spark.sql.adaptive.shuffle.fetchShuffleBlocksInBatch")
-      .doc("Whether to fetch the continuous shuffle blocks in batch. Instead 
of fetching blocks " +
-        "one by one, fetching continuous shuffle blocks for the same map task 
in batch can " +
-        "reduce IO and improve performance. Note, multiple continuous blocks 
exist in single " +
-        s"fetch request only happen when '${ADAPTIVE_EXECUTION_ENABLED.key}' 
and " +
-        s"'${REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED.key}' is enabled, this 
feature also depends " +
-        "on a relocatable serializer, the concatenation support codec in use 
and the new version " +
-        "shuffle fetch protocol.")
+  val COALESCE_PARTITIONS_ENABLED =
+    buildConf("spark.sql.adaptive.coalescePartitions.enabled")
+      .doc(s"When true and '${ADAPTIVE_EXECUTION_ENABLED.key}' is true, Spark 
will coalesce " +
+        "contiguous shuffle partitions according to the target size (specified 
by " +
+        s"'${ADVISORY_PARTITION_SIZE_IN_BYTES.key}'), to avoid too many small 
tasks.")
       .booleanConf
       .createWithDefault(true)
 
-  val SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS =
-    buildConf("spark.sql.adaptive.shuffle.minNumPostShufflePartitions")
-      .doc("The advisory minimum number of post-shuffle partitions used when " 
+
-        s"'${ADAPTIVE_EXECUTION_ENABLED.key}' and " +
-        s"'${REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED.key}' is enabled.")
+  val COALESCE_PARTITIONS_MIN_PARTITION_NUM =
+    buildConf("spark.sql.adaptive.coalescePartitions.minPartitionNum")
+      .doc("The minimum number of shuffle partitions after coalescing. This 
configuration only " +
+        s"has an effect when '${ADAPTIVE_EXECUTION_ENABLED.key}' and " +
+        s"'${COALESCE_PARTITIONS_ENABLED.key}' are both true.")
       .intConf
-      .checkValue(_ > 0, "The minimum shuffle partition number " +
-        "must be a positive integer.")
+      .checkValue(_ > 0, "The minimum number of partitions must be positive.")
       .createWithDefault(1)
 
-  val SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE =
-    buildConf("spark.sql.adaptive.shuffle.targetPostShuffleInputSize")
-      .doc("The target post-shuffle input size in bytes of a task. This 
configuration only has " +
-        s"an effect when '${ADAPTIVE_EXECUTION_ENABLED.key}' and " +
-        s"'${REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED.key}' is enabled.")
-      .bytesConf(ByteUnit.BYTE)
-      .createWithDefaultString("64MB")
-
-  val SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS =
-    buildConf("spark.sql.adaptive.shuffle.maxNumPostShufflePartitions")
-      .doc("The advisory maximum number of post-shuffle partitions used in 
adaptive execution. " +
-        "This is used as the initial number of pre-shuffle partitions. By 
default it equals to " +
-        "spark.sql.shuffle.partitions. This configuration only has an effect 
when " +
-        s"'${ADAPTIVE_EXECUTION_ENABLED.key}' and " +
-        s"'${REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED.key}' is enabled.")
+  val COALESCE_PARTITIONS_INITIAL_PARTITION_NUM =
+    buildConf("spark.sql.adaptive.coalescePartitions.initialPartitionNum")
+      .doc("The initial number of shuffle partitions before coalescing. By 
default it equals to " +
+        s"${SHUFFLE_PARTITIONS.key}. This configuration only has an effect 
when " +
+        s"'${ADAPTIVE_EXECUTION_ENABLED.key}' and 
'${COALESCE_PARTITIONS_ENABLED.key}' " +
+        "are both true.")
       .intConf
-      .checkValue(_ > 0, "The maximum shuffle partition number " +
-        "must be a positive integer.")
+      .checkValue(_ > 0, "The initial number of partitions must be positive.")
       .createOptional
 
+  val FETCH_SHUFFLE_BLOCKS_IN_BATCH =
+    buildConf("spark.sql.adaptive.fetchShuffleBlocksInBatch")
+      .internal()
+      .doc("Whether to fetch the contiguous shuffle blocks in batch. Instead 
of fetching blocks " +
+        "one by one, fetching contiguous shuffle blocks for the same map task 
in batch can " +
+        "reduce IO and improve performance. Note, multiple contiguous blocks 
exist in single " +
+        s"fetch request only happen when '${ADAPTIVE_EXECUTION_ENABLED.key}' 
and " +
+        s"'${COALESCE_PARTITIONS_ENABLED.key}' are both true. This feature 
also depends " +
+        "on a relocatable serializer, the concatenation support codec in use 
and the new version " +
+        "shuffle fetch protocol.")
+      .booleanConf
+      .createWithDefault(true)
+
   val LOCAL_SHUFFLE_READER_ENABLED =
-    buildConf("spark.sql.adaptive.shuffle.localShuffleReader.enabled")
-    .doc(s"When true and '${ADAPTIVE_EXECUTION_ENABLED.key}' is enabled, this 
enables the " +
-      "optimization of converting the shuffle reader to local shuffle reader 
for the shuffle " +
-      "exchange of the broadcast hash join in probe side.")
-    .booleanConf
-    .createWithDefault(true)
+    buildConf("spark.sql.adaptive.localShuffleReader.enabled")
+      .doc(s"When true and '${ADAPTIVE_EXECUTION_ENABLED.key}' is true, Spark 
tries to use local " +
+        "shuffle reader to read the shuffle data when the shuffle partitioning 
is not needed, " +
+        "for example, after converting sort-merge join to broadcast-hash 
join.")
+      .booleanConf
+      .createWithDefault(true)
 
-  val ADAPTIVE_EXECUTION_SKEWED_JOIN_ENABLED =
-    buildConf("spark.sql.adaptive.skewedJoinOptimization.enabled")
-    .doc("When true and adaptive execution is enabled, a skewed join is 
automatically handled at " +
-      "runtime.")
-    .booleanConf
-    .createWithDefault(true)
+  val SKEW_JOIN_ENABLED =
+    buildConf("spark.sql.adaptive.skewJoin.enabled")
+      .doc(s"When true and '${ADAPTIVE_EXECUTION_ENABLED.key}' is true, Spark 
dynamically " +
+        "handles skew in sort-merge join by splitting (and replicating if 
needed) skewed " +
+        "partitions.")
+      .booleanConf
+      .createWithDefault(true)
 
-  val ADAPTIVE_EXECUTION_SKEWED_PARTITION_FACTOR =
-    
buildConf("spark.sql.adaptive.skewedJoinOptimization.skewedPartitionFactor")
-      .doc("A partition is considered as a skewed partition if its size is 
larger than" +
-        " this factor multiple the median partition size and also larger than 
" +
-        s" ${SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key}")
+  val SKEW_JOIN_SKEWED_PARTITION_FACTOR =
+    buildConf("spark.sql.adaptive.skewJoin.skewedPartitionFactor")
+      .doc("A partition is considered as skewed if its size is larger than 
this factor " +
+        "multiplying the median partition size and also larger than " +
+        s"'${ADVISORY_PARTITION_SIZE_IN_BYTES.key}'")
       .intConf
       .checkValue(_ > 0, "The skew factor must be positive.")
       .createWithDefault(10)
 
   val NON_EMPTY_PARTITION_RATIO_FOR_BROADCAST_JOIN =
     buildConf("spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin")
+      .internal()
       .doc("The relation with a non-empty partition ratio lower than this 
config will not be " +
         "considered as the build side of a broadcast-hash join in adaptive 
execution regardless " +
         "of its size.This configuration only has an effect when " +
-        s"'${ADAPTIVE_EXECUTION_ENABLED.key}' is enabled.")
+        s"'${ADAPTIVE_EXECUTION_ENABLED.key}' is true.")
       .doubleConf
       .checkValue(_ >= 0, "The non-empty partition ratio must be positive 
number.")
       .createWithDefault(0.2)
@@ -2257,7 +2265,9 @@ object SQLConf {
       DeprecatedConfig(ARROW_EXECUTION_ENABLED.key, "3.0",
         s"Use '${ARROW_PYSPARK_EXECUTION_ENABLED.key}' instead of it."),
       DeprecatedConfig(ARROW_FALLBACK_ENABLED.key, "3.0",
-        s"Use '${ARROW_PYSPARK_FALLBACK_ENABLED.key}' instead of it.")
+        s"Use '${ARROW_PYSPARK_FALLBACK_ENABLED.key}' instead of it."),
+      DeprecatedConfig(SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "3.0",
+        s"Use '${ADVISORY_PARTITION_SIZE_IN_BYTES.key}' instead of it.")
     )
 
     Map(configs.map { cfg => cfg.key -> cfg } : _*)
@@ -2418,19 +2428,17 @@ class SQLConf extends Serializable with Logging {
 
   def adaptiveExecutionEnabled: Boolean = getConf(ADAPTIVE_EXECUTION_ENABLED)
 
-  def targetPostShuffleInputSize: Long = 
getConf(SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE)
-
-  def fetchShuffleBlocksInBatchEnabled: Boolean = 
getConf(FETCH_SHUFFLE_BLOCKS_IN_BATCH_ENABLED)
+  def fetchShuffleBlocksInBatch: Boolean = 
getConf(FETCH_SHUFFLE_BLOCKS_IN_BATCH)
 
   def nonEmptyPartitionRatioForBroadcastJoin: Double =
     getConf(NON_EMPTY_PARTITION_RATIO_FOR_BROADCAST_JOIN)
 
-  def reducePostShufflePartitionsEnabled: Boolean = 
getConf(REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED)
+  def coalesceShufflePartitionsEnabled: Boolean = 
getConf(COALESCE_PARTITIONS_ENABLED)
 
-  def minNumPostShufflePartitions: Int = 
getConf(SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS)
+  def minShufflePartitionNum: Int = 
getConf(COALESCE_PARTITIONS_MIN_PARTITION_NUM)
 
-  def maxNumPostShufflePartitions: Int =
-    
getConf(SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS).getOrElse(numShufflePartitions)
+  def initialShufflePartitionNum: Int =
+    
getConf(COALESCE_PARTITIONS_INITIAL_PARTITION_NUM).getOrElse(numShufflePartitions)
 
   def minBatchesToRetain: Int = getConf(MIN_BATCHES_TO_RETAIN)
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala
index eb02259..53ab049 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala
@@ -127,7 +127,7 @@ class ShuffledRowRDD(
       Array.tabulate(dependency.partitioner.numPartitions)(i => 
CoalescedPartitionSpec(i, i + 1)))
   }
 
-  if (SQLConf.get.fetchShuffleBlocksInBatchEnabled) {
+  if (SQLConf.get.fetchShuffleBlocksInBatch) {
     dependency.rdd.context.setLocalProperty(
       SortShuffleManager.FETCH_SHUFFLE_BLOCKS_IN_BATCH_ENABLED_KEY, "true")
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
index f2ebe1a..b74401e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
@@ -91,7 +91,7 @@ case class AdaptiveSparkPlanExec(
     // before 'ReduceNumShufflePartitions', as the skewed partition handled
     // in 'OptimizeSkewedJoin' rule, should be omitted in 
'ReduceNumShufflePartitions'.
     OptimizeSkewedJoin(conf),
-    ReduceNumShufflePartitions(conf),
+    CoalesceShufflePartitions(conf),
     // The rule of 'OptimizeLocalShuffleReader' need to make use of the 
'partitionStartIndices'
     // in 'ReduceNumShufflePartitions' rule. So it must be after 
'ReduceNumShufflePartitions' rule.
     OptimizeLocalShuffleReader(conf),
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReduceNumShufflePartitions.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
similarity index 90%
rename from 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReduceNumShufflePartitions.scala
rename to 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
index 767a4b2..a8e2d8e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReduceNumShufflePartitions.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
@@ -23,14 +23,14 @@ import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.internal.SQLConf
 
 /**
- * A rule to reduce the post shuffle partitions based on the map output 
statistics, which can
+ * A rule to coalesce the shuffle partitions based on the map output 
statistics, which can
  * avoid many small reduce tasks that hurt performance.
  */
-case class ReduceNumShufflePartitions(conf: SQLConf) extends Rule[SparkPlan] {
-  import ReduceNumShufflePartitions._
+case class CoalesceShufflePartitions(conf: SQLConf) extends Rule[SparkPlan] {
+  import CoalesceShufflePartitions._
 
   override def apply(plan: SparkPlan): SparkPlan = {
-    if (!conf.reducePostShufflePartitionsEnabled) {
+    if (!conf.coalesceShufflePartitionsEnabled) {
       return plan
     }
     if (!plan.collectLeaves().forall(_.isInstanceOf[QueryStageExec])
@@ -70,8 +70,8 @@ case class ReduceNumShufflePartitions(conf: SQLConf) extends 
Rule[SparkPlan] {
           validMetrics.toArray,
           firstPartitionIndex = 0,
           lastPartitionIndex = distinctNumPreShufflePartitions.head,
-          advisoryTargetSize = conf.targetPostShuffleInputSize,
-          minNumPartitions = conf.minNumPostShufflePartitions)
+          advisoryTargetSize = 
conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES),
+          minNumPartitions = conf.minShufflePartitionNum)
         // This transformation adds new nodes, so we must use `transformUp` 
here.
         val stageIds = shuffleStages.map(_.id).toSet
         plan.transformUp {
@@ -88,6 +88,6 @@ case class ReduceNumShufflePartitions(conf: SQLConf) extends 
Rule[SparkPlan] {
   }
 }
 
-object ReduceNumShufflePartitions {
+object CoalesceShufflePartitions {
   val COALESCED_SHUFFLE_READER_DESCRIPTION = "coalesced"
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
index 979fee1..2e8adcf 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
@@ -64,11 +64,11 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends 
Rule[SparkPlan] {
   /**
    * A partition is considered as a skewed partition if its size is larger 
than the median
    * partition size * ADAPTIVE_EXECUTION_SKEWED_PARTITION_FACTOR and also 
larger than
-   * SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.
+   * ADVISORY_PARTITION_SIZE_IN_BYTES.
    */
   private def isSkewed(size: Long, medianSize: Long): Boolean = {
-    size > medianSize * 
conf.getConf(SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_FACTOR) &&
-      size > conf.getConf(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE)
+    size > medianSize * 
conf.getConf(SQLConf.SKEW_JOIN_SKEWED_PARTITION_FACTOR) &&
+      size > conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES)
   }
 
   private def medianSize(stats: MapOutputStatistics): Long = {
@@ -87,7 +87,7 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends 
Rule[SparkPlan] {
    * target post-shuffle partition size if avg size is smaller than it.
    */
   private def targetSize(stats: MapOutputStatistics, medianSize: Long): Long = 
{
-    val targetPostShuffleSize = 
conf.getConf(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE)
+    val targetPostShuffleSize = 
conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES)
     val nonSkewSizes = stats.bytesByPartitionId.filterNot(isSkewed(_, 
medianSize))
     // It's impossible that all the partitions are skewed, as we use median 
size to define skew.
     assert(nonSkewSizes.nonEmpty)
@@ -271,7 +271,7 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends 
Rule[SparkPlan] {
       rightStats: MapOutputStatistics,
       nonSkewPartitionIndices: Seq[Int]): Seq[ShufflePartitionSpec] = {
     assert(nonSkewPartitionIndices.nonEmpty)
-    val shouldCoalesce = 
conf.getConf(SQLConf.REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED)
+    val shouldCoalesce = conf.getConf(SQLConf.COALESCE_PARTITIONS_ENABLED)
     if (!shouldCoalesce || nonSkewPartitionIndices.length == 1) {
       nonSkewPartitionIndices.map(i => CoalescedPartitionSpec(i, i + 1))
     } else {
@@ -280,7 +280,7 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends 
Rule[SparkPlan] {
         firstPartitionIndex = nonSkewPartitionIndices.head,
         // `lastPartitionIndex` is exclusive.
         lastPartitionIndex = nonSkewPartitionIndices.last + 1,
-        advisoryTargetSize = conf.targetPostShuffleInputSize)
+        advisoryTargetSize = 
conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES))
     }
   }
 
@@ -300,7 +300,7 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends 
Rule[SparkPlan] {
   }
 
   override def apply(plan: SparkPlan): SparkPlan = {
-    if (!conf.getConf(SQLConf.ADAPTIVE_EXECUTION_SKEWED_JOIN_ENABLED)) {
+    if (!conf.getConf(SQLConf.SKEW_JOIN_ENABLED)) {
       return plan
     }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
index ab4176c..28ef793 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
@@ -36,8 +36,8 @@ import org.apache.spark.sql.internal.SQLConf
  */
 case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
   private def defaultNumPreShufflePartitions: Int =
-    if (conf.adaptiveExecutionEnabled && 
conf.reducePostShufflePartitionsEnabled) {
-      conf.maxNumPostShufflePartitions
+    if (conf.adaptiveExecutionEnabled && 
conf.coalesceShufflePartitionsEnabled) {
+      conf.initialShufflePartitionNum
     } else {
       conf.numShufflePartitions
     }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ReduceNumShufflePartitionsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala
similarity index 96%
rename from 
sql/core/src/test/scala/org/apache/spark/sql/execution/ReduceNumShufflePartitionsSuite.scala
rename to 
sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala
index a32b684..9e77f61 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ReduceNumShufflePartitionsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala
@@ -23,13 +23,13 @@ import org.apache.spark.{SparkConf, SparkFunSuite}
 import org.apache.spark.internal.config.UI.UI_ENABLED
 import org.apache.spark.sql._
 import org.apache.spark.sql.execution.adaptive._
+import 
org.apache.spark.sql.execution.adaptive.CoalesceShufflePartitions.COALESCED_SHUFFLE_READER_DESCRIPTION
 import org.apache.spark.sql.execution.adaptive.CustomShuffleReaderExec
-import 
org.apache.spark.sql.execution.adaptive.ReduceNumShufflePartitions.COALESCED_SHUFFLE_READER_DESCRIPTION
 import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 
-class ReduceNumShufflePartitionsSuite extends SparkFunSuite with 
BeforeAndAfterAll {
+class CoalesceShufflePartitionsSuite extends SparkFunSuite with 
BeforeAndAfterAll {
 
   private var originalActiveSparkSession: Option[SparkSession] = _
   private var originalInstantiatedSparkSession: Option[SparkSession] = _
@@ -65,17 +65,17 @@ class ReduceNumShufflePartitionsSuite extends SparkFunSuite 
with BeforeAndAfterA
         .setAppName("test")
         .set(UI_ENABLED, false)
         .set(SQLConf.SHUFFLE_PARTITIONS.key, "5")
-        .set(SQLConf.SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS.key, "5")
+        .set(SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key, "5")
         .set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "true")
         .set(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key, "-1")
         .set(
-          SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key,
+          SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key,
           targetPostShuffleInputSize.toString)
     minNumPostShufflePartitions match {
       case Some(numPartitions) =>
-        sparkConf.set(SQLConf.SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS.key, 
numPartitions.toString)
+        sparkConf.set(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key, 
numPartitions.toString)
       case None =>
-        sparkConf.set(SQLConf.SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS.key, "1")
+        sparkConf.set(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key, "1")
     }
 
     val spark = SparkSession.builder()
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index 500b6cc..a7fa63d 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -135,7 +135,7 @@ class AdaptiveQueryExecSuite
     withSQLConf(
       SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
       SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80",
-      SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key -> "10") {
+      SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "10") {
       val (plan, adaptivePlan) = runAdaptiveAndVerifyResult(
         "SELECT * FROM testData join testData2 ON key = a where value = '1'")
       val smj = findTopLevelSortMergeJoin(plan)
@@ -167,7 +167,7 @@ class AdaptiveQueryExecSuite
     withSQLConf(
       SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
       SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80",
-      SQLConf.REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED.key -> "false") {
+      SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "false") {
       val (plan, adaptivePlan) = runAdaptiveAndVerifyResult(
         "SELECT * FROM testData join testData2 ON key = a where value = '1'")
       val smj = findTopLevelSortMergeJoin(plan)
@@ -584,7 +584,7 @@ class AdaptiveQueryExecSuite
     withSQLConf(
       SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
       SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
-      SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key -> "700") {
+      SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "700") {
       withTempView("skewData1", "skewData2") {
         spark
           .range(0, 1000, 1, 10)
@@ -609,7 +609,7 @@ class AdaptiveQueryExecSuite
     withSQLConf(
       SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
       SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
-      SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key -> "2000") {
+      SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "2000") {
       withTempView("skewData1", "skewData2") {
         spark
           .range(0, 1000, 1, 10)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
index 48be211..10ea948 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
@@ -170,33 +170,33 @@ class SQLConfSuite extends QueryTest with 
SharedSparkSession {
     assert(e.getMessage === s"${SQLConf.CASE_SENSITIVE.key} should be boolean, 
but was 10")
   }
 
-  test("Test SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE's method") {
+  test("Test ADVISORY_PARTITION_SIZE_IN_BYTES's method") {
     spark.sessionState.conf.clear()
 
-    spark.conf.set(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "100")
-    assert(spark.conf.get(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE) === 
100)
+    spark.conf.set(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key, "100")
+    assert(spark.conf.get(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES) === 100)
 
-    spark.conf.set(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "1k")
-    assert(spark.conf.get(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE) === 
1024)
+    spark.conf.set(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key, "1k")
+    assert(spark.conf.get(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES) === 1024)
 
-    spark.conf.set(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "1M")
-    assert(spark.conf.get(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE) === 
1048576)
+    spark.conf.set(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key, "1M")
+    assert(spark.conf.get(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES) === 
1048576)
 
-    spark.conf.set(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "1g")
-    assert(spark.conf.get(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE) === 
1073741824)
+    spark.conf.set(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key, "1g")
+    assert(spark.conf.get(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES) === 
1073741824)
 
-    spark.conf.set(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "-1")
-    assert(spark.conf.get(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE) === 
-1)
+    spark.conf.set(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key, "-1")
+    assert(spark.conf.get(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES) === -1)
 
     // Test overflow exception
     intercept[IllegalArgumentException] {
       // This value exceeds Long.MaxValue
-      spark.conf.set(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, 
"90000000000g")
+      spark.conf.set(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key, 
"90000000000g")
     }
 
     intercept[IllegalArgumentException] {
       // This value less than Long.MinValue
-      spark.conf.set(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, 
"-90000000000g")
+      spark.conf.set(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key, 
"-90000000000g")
     }
 
     spark.sessionState.conf.clear()
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
index b54f4f2..57bbf20 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
@@ -821,7 +821,7 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils {
   test("SPARK-29655 Read bucketed tables obeys spark.sql.shuffle.partitions") {
     withSQLConf(
       SQLConf.SHUFFLE_PARTITIONS.key -> "5",
-      SQLConf.SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS.key -> "7")  {
+      SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "7")  {
       val bucketSpec = Some(BucketSpec(6, Seq("i", "j"), Nil))
       Seq(false, true).foreach { enableAdaptive =>
         withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> 
s"$enableAdaptive") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to