This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 46b7f17 [SPARK-31037][SQL] refine AQE config names 46b7f17 is described below commit 46b7f1796bd0b96977ce9b473601033f397a3b18 Author: Wenchen Fan <wenc...@databricks.com> AuthorDate: Fri Mar 6 00:46:34 2020 +0800 [SPARK-31037][SQL] refine AQE config names When introducing AQE to others, I feel the config names are a bit incoherent and hard to use. This PR refines the config names: 1. remove the "shuffle" prefix. AQE is all about shuffle and we don't need to add the "shuffle" prefix everywhere. 2. `targetPostShuffleInputSize` is obscure, rename to `advisoryShufflePartitionSizeInBytes`. 3. `reducePostShufflePartitions` doesn't match the actual optimization, rename to `coalesceShufflePartitions` 4. `minNumPostShufflePartitions` is obscure, rename it `minPartitionNum` under the `coalesceShufflePartitions` namespace 5. `maxNumPostShufflePartitions` is confusing with the word "max", rename it `initialPartitionNum` 6. `skewedJoinOptimization` is too verbose. skew join is a well-known terminology in database area, we can just say `skewJoin` Make the config names easy to understand. deprecate the config `spark.sql.adaptive.shuffle.targetPostShuffleInputSize` N/A Closes #27793 from cloud-fan/aqe. Authored-by: Wenchen Fan <wenc...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../org/apache/spark/sql/internal/SQLConf.scala | 142 +++++++++++---------- .../spark/sql/execution/ShuffledRowRDD.scala | 2 +- .../execution/adaptive/AdaptiveSparkPlanExec.scala | 2 +- ...tions.scala => CoalesceShufflePartitions.scala} | 14 +- .../execution/adaptive/OptimizeSkewedJoin.scala | 14 +- .../execution/exchange/EnsureRequirements.scala | 4 +- ....scala => CoalesceShufflePartitionsSuite.scala} | 12 +- .../adaptive/AdaptiveQueryExecSuite.scala | 8 +- .../apache/spark/sql/internal/SQLConfSuite.scala | 26 ++-- .../spark/sql/sources/BucketedReadSuite.scala | 2 +- 10 files changed, 117 insertions(+), 109 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 3dbfc65..b2b3d12 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -356,8 +356,16 @@ object SQLConf { .checkValue(_ > 0, "The value of spark.sql.shuffle.partitions must be positive") .createWithDefault(200) + val SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE = + buildConf("spark.sql.adaptive.shuffle.targetPostShuffleInputSize") + .internal() + .doc("(Deprecated since Spark 3.0)") + .bytesConf(ByteUnit.BYTE) + .createWithDefaultString("64MB") + val ADAPTIVE_EXECUTION_ENABLED = buildConf("spark.sql.adaptive.enabled") - .doc("When true, enable adaptive query execution.") + .doc("When true, enable adaptive query execution, which re-optimizes the query plan in the " + + "middle of query execution, based on accurate runtime statistics.") .booleanConf .createWithDefault(false) @@ -365,90 +373,90 @@ object SQLConf { .internal() .doc("Adaptive query execution is skipped when the query does not have exchanges or " + "sub-queries. By setting this config to true (together with " + - s"'${ADAPTIVE_EXECUTION_ENABLED.key}' enabled), Spark will force apply adaptive query " + + s"'${ADAPTIVE_EXECUTION_ENABLED.key}' set to true), Spark will force apply adaptive query " + "execution for all supported queries.") .booleanConf .createWithDefault(false) - val REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED = - buildConf("spark.sql.adaptive.shuffle.reducePostShufflePartitions") - .doc(s"When true and '${ADAPTIVE_EXECUTION_ENABLED.key}' is enabled, this enables reducing " + - "the number of post-shuffle partitions based on map output statistics.") - .booleanConf - .createWithDefault(true) + val ADVISORY_PARTITION_SIZE_IN_BYTES = + buildConf("spark.sql.adaptive.advisoryPartitionSizeInBytes") + .doc("The advisory size in bytes of the shuffle partition during adaptive optimization " + + s"(when ${ADAPTIVE_EXECUTION_ENABLED.key} is true). It takes effect when Spark " + + "coalesces small shuffle partitions or splits skewed shuffle partition.") + .fallbackConf(SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE) - val FETCH_SHUFFLE_BLOCKS_IN_BATCH_ENABLED = - buildConf("spark.sql.adaptive.shuffle.fetchShuffleBlocksInBatch") - .doc("Whether to fetch the continuous shuffle blocks in batch. Instead of fetching blocks " + - "one by one, fetching continuous shuffle blocks for the same map task in batch can " + - "reduce IO and improve performance. Note, multiple continuous blocks exist in single " + - s"fetch request only happen when '${ADAPTIVE_EXECUTION_ENABLED.key}' and " + - s"'${REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED.key}' is enabled, this feature also depends " + - "on a relocatable serializer, the concatenation support codec in use and the new version " + - "shuffle fetch protocol.") + val COALESCE_PARTITIONS_ENABLED = + buildConf("spark.sql.adaptive.coalescePartitions.enabled") + .doc(s"When true and '${ADAPTIVE_EXECUTION_ENABLED.key}' is true, Spark will coalesce " + + "contiguous shuffle partitions according to the target size (specified by " + + s"'${ADVISORY_PARTITION_SIZE_IN_BYTES.key}'), to avoid too many small tasks.") .booleanConf .createWithDefault(true) - val SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS = - buildConf("spark.sql.adaptive.shuffle.minNumPostShufflePartitions") - .doc("The advisory minimum number of post-shuffle partitions used when " + - s"'${ADAPTIVE_EXECUTION_ENABLED.key}' and " + - s"'${REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED.key}' is enabled.") + val COALESCE_PARTITIONS_MIN_PARTITION_NUM = + buildConf("spark.sql.adaptive.coalescePartitions.minPartitionNum") + .doc("The minimum number of shuffle partitions after coalescing. This configuration only " + + s"has an effect when '${ADAPTIVE_EXECUTION_ENABLED.key}' and " + + s"'${COALESCE_PARTITIONS_ENABLED.key}' are both true.") .intConf - .checkValue(_ > 0, "The minimum shuffle partition number " + - "must be a positive integer.") + .checkValue(_ > 0, "The minimum number of partitions must be positive.") .createWithDefault(1) - val SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE = - buildConf("spark.sql.adaptive.shuffle.targetPostShuffleInputSize") - .doc("The target post-shuffle input size in bytes of a task. This configuration only has " + - s"an effect when '${ADAPTIVE_EXECUTION_ENABLED.key}' and " + - s"'${REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED.key}' is enabled.") - .bytesConf(ByteUnit.BYTE) - .createWithDefaultString("64MB") - - val SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS = - buildConf("spark.sql.adaptive.shuffle.maxNumPostShufflePartitions") - .doc("The advisory maximum number of post-shuffle partitions used in adaptive execution. " + - "This is used as the initial number of pre-shuffle partitions. By default it equals to " + - "spark.sql.shuffle.partitions. This configuration only has an effect when " + - s"'${ADAPTIVE_EXECUTION_ENABLED.key}' and " + - s"'${REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED.key}' is enabled.") + val COALESCE_PARTITIONS_INITIAL_PARTITION_NUM = + buildConf("spark.sql.adaptive.coalescePartitions.initialPartitionNum") + .doc("The initial number of shuffle partitions before coalescing. By default it equals to " + + s"${SHUFFLE_PARTITIONS.key}. This configuration only has an effect when " + + s"'${ADAPTIVE_EXECUTION_ENABLED.key}' and '${COALESCE_PARTITIONS_ENABLED.key}' " + + "are both true.") .intConf - .checkValue(_ > 0, "The maximum shuffle partition number " + - "must be a positive integer.") + .checkValue(_ > 0, "The initial number of partitions must be positive.") .createOptional + val FETCH_SHUFFLE_BLOCKS_IN_BATCH = + buildConf("spark.sql.adaptive.fetchShuffleBlocksInBatch") + .internal() + .doc("Whether to fetch the contiguous shuffle blocks in batch. Instead of fetching blocks " + + "one by one, fetching contiguous shuffle blocks for the same map task in batch can " + + "reduce IO and improve performance. Note, multiple contiguous blocks exist in single " + + s"fetch request only happen when '${ADAPTIVE_EXECUTION_ENABLED.key}' and " + + s"'${COALESCE_PARTITIONS_ENABLED.key}' are both true. This feature also depends " + + "on a relocatable serializer, the concatenation support codec in use and the new version " + + "shuffle fetch protocol.") + .booleanConf + .createWithDefault(true) + val LOCAL_SHUFFLE_READER_ENABLED = - buildConf("spark.sql.adaptive.shuffle.localShuffleReader.enabled") - .doc(s"When true and '${ADAPTIVE_EXECUTION_ENABLED.key}' is enabled, this enables the " + - "optimization of converting the shuffle reader to local shuffle reader for the shuffle " + - "exchange of the broadcast hash join in probe side.") - .booleanConf - .createWithDefault(true) + buildConf("spark.sql.adaptive.localShuffleReader.enabled") + .doc(s"When true and '${ADAPTIVE_EXECUTION_ENABLED.key}' is true, Spark tries to use local " + + "shuffle reader to read the shuffle data when the shuffle partitioning is not needed, " + + "for example, after converting sort-merge join to broadcast-hash join.") + .booleanConf + .createWithDefault(true) - val ADAPTIVE_EXECUTION_SKEWED_JOIN_ENABLED = - buildConf("spark.sql.adaptive.skewedJoinOptimization.enabled") - .doc("When true and adaptive execution is enabled, a skewed join is automatically handled at " + - "runtime.") - .booleanConf - .createWithDefault(true) + val SKEW_JOIN_ENABLED = + buildConf("spark.sql.adaptive.skewJoin.enabled") + .doc(s"When true and '${ADAPTIVE_EXECUTION_ENABLED.key}' is true, Spark dynamically " + + "handles skew in sort-merge join by splitting (and replicating if needed) skewed " + + "partitions.") + .booleanConf + .createWithDefault(true) - val ADAPTIVE_EXECUTION_SKEWED_PARTITION_FACTOR = - buildConf("spark.sql.adaptive.skewedJoinOptimization.skewedPartitionFactor") - .doc("A partition is considered as a skewed partition if its size is larger than" + - " this factor multiple the median partition size and also larger than " + - s" ${SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key}") + val SKEW_JOIN_SKEWED_PARTITION_FACTOR = + buildConf("spark.sql.adaptive.skewJoin.skewedPartitionFactor") + .doc("A partition is considered as skewed if its size is larger than this factor " + + "multiplying the median partition size and also larger than " + + s"'${ADVISORY_PARTITION_SIZE_IN_BYTES.key}'") .intConf .checkValue(_ > 0, "The skew factor must be positive.") .createWithDefault(10) val NON_EMPTY_PARTITION_RATIO_FOR_BROADCAST_JOIN = buildConf("spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin") + .internal() .doc("The relation with a non-empty partition ratio lower than this config will not be " + "considered as the build side of a broadcast-hash join in adaptive execution regardless " + "of its size.This configuration only has an effect when " + - s"'${ADAPTIVE_EXECUTION_ENABLED.key}' is enabled.") + s"'${ADAPTIVE_EXECUTION_ENABLED.key}' is true.") .doubleConf .checkValue(_ >= 0, "The non-empty partition ratio must be positive number.") .createWithDefault(0.2) @@ -2257,7 +2265,9 @@ object SQLConf { DeprecatedConfig(ARROW_EXECUTION_ENABLED.key, "3.0", s"Use '${ARROW_PYSPARK_EXECUTION_ENABLED.key}' instead of it."), DeprecatedConfig(ARROW_FALLBACK_ENABLED.key, "3.0", - s"Use '${ARROW_PYSPARK_FALLBACK_ENABLED.key}' instead of it.") + s"Use '${ARROW_PYSPARK_FALLBACK_ENABLED.key}' instead of it."), + DeprecatedConfig(SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "3.0", + s"Use '${ADVISORY_PARTITION_SIZE_IN_BYTES.key}' instead of it.") ) Map(configs.map { cfg => cfg.key -> cfg } : _*) @@ -2418,19 +2428,17 @@ class SQLConf extends Serializable with Logging { def adaptiveExecutionEnabled: Boolean = getConf(ADAPTIVE_EXECUTION_ENABLED) - def targetPostShuffleInputSize: Long = getConf(SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE) - - def fetchShuffleBlocksInBatchEnabled: Boolean = getConf(FETCH_SHUFFLE_BLOCKS_IN_BATCH_ENABLED) + def fetchShuffleBlocksInBatch: Boolean = getConf(FETCH_SHUFFLE_BLOCKS_IN_BATCH) def nonEmptyPartitionRatioForBroadcastJoin: Double = getConf(NON_EMPTY_PARTITION_RATIO_FOR_BROADCAST_JOIN) - def reducePostShufflePartitionsEnabled: Boolean = getConf(REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED) + def coalesceShufflePartitionsEnabled: Boolean = getConf(COALESCE_PARTITIONS_ENABLED) - def minNumPostShufflePartitions: Int = getConf(SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS) + def minShufflePartitionNum: Int = getConf(COALESCE_PARTITIONS_MIN_PARTITION_NUM) - def maxNumPostShufflePartitions: Int = - getConf(SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS).getOrElse(numShufflePartitions) + def initialShufflePartitionNum: Int = + getConf(COALESCE_PARTITIONS_INITIAL_PARTITION_NUM).getOrElse(numShufflePartitions) def minBatchesToRetain: Int = getConf(MIN_BATCHES_TO_RETAIN) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala index eb02259..53ab049 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala @@ -127,7 +127,7 @@ class ShuffledRowRDD( Array.tabulate(dependency.partitioner.numPartitions)(i => CoalescedPartitionSpec(i, i + 1))) } - if (SQLConf.get.fetchShuffleBlocksInBatchEnabled) { + if (SQLConf.get.fetchShuffleBlocksInBatch) { dependency.rdd.context.setLocalProperty( SortShuffleManager.FETCH_SHUFFLE_BLOCKS_IN_BATCH_ENABLED_KEY, "true") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index f2ebe1a..b74401e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -91,7 +91,7 @@ case class AdaptiveSparkPlanExec( // before 'ReduceNumShufflePartitions', as the skewed partition handled // in 'OptimizeSkewedJoin' rule, should be omitted in 'ReduceNumShufflePartitions'. OptimizeSkewedJoin(conf), - ReduceNumShufflePartitions(conf), + CoalesceShufflePartitions(conf), // The rule of 'OptimizeLocalShuffleReader' need to make use of the 'partitionStartIndices' // in 'ReduceNumShufflePartitions' rule. So it must be after 'ReduceNumShufflePartitions' rule. OptimizeLocalShuffleReader(conf), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReduceNumShufflePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala similarity index 90% rename from sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReduceNumShufflePartitions.scala rename to sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala index 767a4b2..a8e2d8e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReduceNumShufflePartitions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala @@ -23,14 +23,14 @@ import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.internal.SQLConf /** - * A rule to reduce the post shuffle partitions based on the map output statistics, which can + * A rule to coalesce the shuffle partitions based on the map output statistics, which can * avoid many small reduce tasks that hurt performance. */ -case class ReduceNumShufflePartitions(conf: SQLConf) extends Rule[SparkPlan] { - import ReduceNumShufflePartitions._ +case class CoalesceShufflePartitions(conf: SQLConf) extends Rule[SparkPlan] { + import CoalesceShufflePartitions._ override def apply(plan: SparkPlan): SparkPlan = { - if (!conf.reducePostShufflePartitionsEnabled) { + if (!conf.coalesceShufflePartitionsEnabled) { return plan } if (!plan.collectLeaves().forall(_.isInstanceOf[QueryStageExec]) @@ -70,8 +70,8 @@ case class ReduceNumShufflePartitions(conf: SQLConf) extends Rule[SparkPlan] { validMetrics.toArray, firstPartitionIndex = 0, lastPartitionIndex = distinctNumPreShufflePartitions.head, - advisoryTargetSize = conf.targetPostShuffleInputSize, - minNumPartitions = conf.minNumPostShufflePartitions) + advisoryTargetSize = conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES), + minNumPartitions = conf.minShufflePartitionNum) // This transformation adds new nodes, so we must use `transformUp` here. val stageIds = shuffleStages.map(_.id).toSet plan.transformUp { @@ -88,6 +88,6 @@ case class ReduceNumShufflePartitions(conf: SQLConf) extends Rule[SparkPlan] { } } -object ReduceNumShufflePartitions { +object CoalesceShufflePartitions { val COALESCED_SHUFFLE_READER_DESCRIPTION = "coalesced" } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala index 979fee1..2e8adcf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala @@ -64,11 +64,11 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] { /** * A partition is considered as a skewed partition if its size is larger than the median * partition size * ADAPTIVE_EXECUTION_SKEWED_PARTITION_FACTOR and also larger than - * SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE. + * ADVISORY_PARTITION_SIZE_IN_BYTES. */ private def isSkewed(size: Long, medianSize: Long): Boolean = { - size > medianSize * conf.getConf(SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_FACTOR) && - size > conf.getConf(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE) + size > medianSize * conf.getConf(SQLConf.SKEW_JOIN_SKEWED_PARTITION_FACTOR) && + size > conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES) } private def medianSize(stats: MapOutputStatistics): Long = { @@ -87,7 +87,7 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] { * target post-shuffle partition size if avg size is smaller than it. */ private def targetSize(stats: MapOutputStatistics, medianSize: Long): Long = { - val targetPostShuffleSize = conf.getConf(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE) + val targetPostShuffleSize = conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES) val nonSkewSizes = stats.bytesByPartitionId.filterNot(isSkewed(_, medianSize)) // It's impossible that all the partitions are skewed, as we use median size to define skew. assert(nonSkewSizes.nonEmpty) @@ -271,7 +271,7 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] { rightStats: MapOutputStatistics, nonSkewPartitionIndices: Seq[Int]): Seq[ShufflePartitionSpec] = { assert(nonSkewPartitionIndices.nonEmpty) - val shouldCoalesce = conf.getConf(SQLConf.REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED) + val shouldCoalesce = conf.getConf(SQLConf.COALESCE_PARTITIONS_ENABLED) if (!shouldCoalesce || nonSkewPartitionIndices.length == 1) { nonSkewPartitionIndices.map(i => CoalescedPartitionSpec(i, i + 1)) } else { @@ -280,7 +280,7 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] { firstPartitionIndex = nonSkewPartitionIndices.head, // `lastPartitionIndex` is exclusive. lastPartitionIndex = nonSkewPartitionIndices.last + 1, - advisoryTargetSize = conf.targetPostShuffleInputSize) + advisoryTargetSize = conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES)) } } @@ -300,7 +300,7 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] { } override def apply(plan: SparkPlan): SparkPlan = { - if (!conf.getConf(SQLConf.ADAPTIVE_EXECUTION_SKEWED_JOIN_ENABLED)) { + if (!conf.getConf(SQLConf.SKEW_JOIN_ENABLED)) { return plan } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala index ab4176c..28ef793 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala @@ -36,8 +36,8 @@ import org.apache.spark.sql.internal.SQLConf */ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { private def defaultNumPreShufflePartitions: Int = - if (conf.adaptiveExecutionEnabled && conf.reducePostShufflePartitionsEnabled) { - conf.maxNumPostShufflePartitions + if (conf.adaptiveExecutionEnabled && conf.coalesceShufflePartitionsEnabled) { + conf.initialShufflePartitionNum } else { conf.numShufflePartitions } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ReduceNumShufflePartitionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala similarity index 96% rename from sql/core/src/test/scala/org/apache/spark/sql/execution/ReduceNumShufflePartitionsSuite.scala rename to sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala index a32b684..9e77f61 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ReduceNumShufflePartitionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala @@ -23,13 +23,13 @@ import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.internal.config.UI.UI_ENABLED import org.apache.spark.sql._ import org.apache.spark.sql.execution.adaptive._ +import org.apache.spark.sql.execution.adaptive.CoalesceShufflePartitions.COALESCED_SHUFFLE_READER_DESCRIPTION import org.apache.spark.sql.execution.adaptive.CustomShuffleReaderExec -import org.apache.spark.sql.execution.adaptive.ReduceNumShufflePartitions.COALESCED_SHUFFLE_READER_DESCRIPTION import org.apache.spark.sql.execution.exchange.ReusedExchangeExec import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf -class ReduceNumShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterAll { +class CoalesceShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterAll { private var originalActiveSparkSession: Option[SparkSession] = _ private var originalInstantiatedSparkSession: Option[SparkSession] = _ @@ -65,17 +65,17 @@ class ReduceNumShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterA .setAppName("test") .set(UI_ENABLED, false) .set(SQLConf.SHUFFLE_PARTITIONS.key, "5") - .set(SQLConf.SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS.key, "5") + .set(SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key, "5") .set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "true") .set(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key, "-1") .set( - SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, + SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key, targetPostShuffleInputSize.toString) minNumPostShufflePartitions match { case Some(numPartitions) => - sparkConf.set(SQLConf.SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS.key, numPartitions.toString) + sparkConf.set(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key, numPartitions.toString) case None => - sparkConf.set(SQLConf.SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS.key, "1") + sparkConf.set(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key, "1") } val spark = SparkSession.builder() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 500b6cc..a7fa63d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -135,7 +135,7 @@ class AdaptiveQueryExecSuite withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80", - SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key -> "10") { + SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "10") { val (plan, adaptivePlan) = runAdaptiveAndVerifyResult( "SELECT * FROM testData join testData2 ON key = a where value = '1'") val smj = findTopLevelSortMergeJoin(plan) @@ -167,7 +167,7 @@ class AdaptiveQueryExecSuite withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80", - SQLConf.REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED.key -> "false") { + SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "false") { val (plan, adaptivePlan) = runAdaptiveAndVerifyResult( "SELECT * FROM testData join testData2 ON key = a where value = '1'") val smj = findTopLevelSortMergeJoin(plan) @@ -584,7 +584,7 @@ class AdaptiveQueryExecSuite withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", - SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key -> "700") { + SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "700") { withTempView("skewData1", "skewData2") { spark .range(0, 1000, 1, 10) @@ -609,7 +609,7 @@ class AdaptiveQueryExecSuite withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", - SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key -> "2000") { + SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "2000") { withTempView("skewData1", "skewData2") { spark .range(0, 1000, 1, 10) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala index 48be211..10ea948 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala @@ -170,33 +170,33 @@ class SQLConfSuite extends QueryTest with SharedSparkSession { assert(e.getMessage === s"${SQLConf.CASE_SENSITIVE.key} should be boolean, but was 10") } - test("Test SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE's method") { + test("Test ADVISORY_PARTITION_SIZE_IN_BYTES's method") { spark.sessionState.conf.clear() - spark.conf.set(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "100") - assert(spark.conf.get(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE) === 100) + spark.conf.set(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key, "100") + assert(spark.conf.get(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES) === 100) - spark.conf.set(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "1k") - assert(spark.conf.get(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE) === 1024) + spark.conf.set(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key, "1k") + assert(spark.conf.get(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES) === 1024) - spark.conf.set(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "1M") - assert(spark.conf.get(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE) === 1048576) + spark.conf.set(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key, "1M") + assert(spark.conf.get(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES) === 1048576) - spark.conf.set(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "1g") - assert(spark.conf.get(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE) === 1073741824) + spark.conf.set(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key, "1g") + assert(spark.conf.get(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES) === 1073741824) - spark.conf.set(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "-1") - assert(spark.conf.get(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE) === -1) + spark.conf.set(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key, "-1") + assert(spark.conf.get(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES) === -1) // Test overflow exception intercept[IllegalArgumentException] { // This value exceeds Long.MaxValue - spark.conf.set(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "90000000000g") + spark.conf.set(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key, "90000000000g") } intercept[IllegalArgumentException] { // This value less than Long.MinValue - spark.conf.set(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "-90000000000g") + spark.conf.set(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key, "-90000000000g") } spark.sessionState.conf.clear() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index b54f4f2..57bbf20 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -821,7 +821,7 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { test("SPARK-29655 Read bucketed tables obeys spark.sql.shuffle.partitions") { withSQLConf( SQLConf.SHUFFLE_PARTITIONS.key -> "5", - SQLConf.SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS.key -> "7") { + SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "7") { val bucketSpec = Some(BucketSpec(6, Seq("i", "j"), Nil)) Seq(false, true).foreach { enableAdaptive => withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> s"$enableAdaptive") { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org