This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push: new a77c9d6 [SPARK-36217][SQL] Rename CustomShuffleReader and OptimizeLocalShuffleReader in AQE a77c9d6 is described below commit a77c9d6d1726ee6cacc8bb457c4df02e3f16231b Author: Hyukjin Kwon <gurwls...@apache.org> AuthorDate: Mon Jul 26 22:41:54 2021 +0800 [SPARK-36217][SQL] Rename CustomShuffleReader and OptimizeLocalShuffleReader in AQE ### What changes were proposed in this pull request? This PR proposes to rename: - Rename `*Reader`/`*reader` to `*Read`/`*read` for rules and execution plan (user-facing doc/config name remain untouched) - `*ShuffleReaderExec` ->`*ShuffleReadExec` - `isLocalReader` -> `isLocalRead` - ... - Rename `CustomShuffle*` prefix to `AQEShuffle*` - Rename `OptimizeLocalShuffleReader` rule to `OptimizeShuffleWithLocalRead` ### Why are the changes needed? There are multiple problems in the current naming: - `CustomShuffle*` -> `AQEShuffle*` it sounds like it is a pluggable API. However, this is actually only used by AQE. - `OptimizeLocalShuffleReader` -> `OptimizeShuffleWithLocalRead` it is the name of a rule but it can be misread as a reader, which is counterintuative - `*ReaderExec` -> `*ReadExec` Reader execution reads a bit odd. It should better be read execution (like `ScanExec`, `ProjectExec` and `FilterExec`). I can't find the reason to name it with something that performs an action. See also the generated plans: Before: ``` ... * HashAggregate (12) +- CustomShuffleReader (11) +- ShuffleQueryStage (10) +- Exchange (9) ... ``` After: ``` ... * HashAggregate (12) +- AQEShuffleRead (11) +- ShuffleQueryStage (10) +- Exchange (9) .. ``` ### Does this PR introduce _any_ user-facing change? No, internal refactoring. ### How was this patch tested? Existing unittests should cover the changes. Closes #33429 from HyukjinKwon/SPARK-36217. Authored-by: Hyukjin Kwon <gurwls...@apache.org> Signed-off-by: Wenchen Fan <wenc...@databricks.com> (cherry picked from commit 6e3d404cec0ab741bee21553268b94184055aa5a) Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- ...ustomShuffledRDD.scala => AQEShuffledRDD.scala} | 8 +- .../spark/scheduler/AdaptiveSchedulingSuite.scala | 8 +- ...leReaderExec.scala => AQEShuffleReadExec.scala} | 20 +- ...leReaderRule.scala => AQEShuffleReadRule.scala} | 4 +- .../execution/adaptive/AdaptiveSparkPlanExec.scala | 12 +- .../adaptive/CoalesceShufflePartitions.scala | 12 +- ...er.scala => OptimizeShuffleWithLocalRead.scala} | 50 ++-- .../OptimizeSkewInRebalancePartitions.scala | 4 +- .../execution/adaptive/OptimizeSkewedJoin.scala | 10 +- .../execution/adaptive/ShufflePartitionsUtil.scala | 2 +- .../execution/exchange/ShuffleExchangeExec.scala | 4 +- .../scala/org/apache/spark/sql/ExplainSuite.scala | 4 +- .../execution/CoalesceShufflePartitionsSuite.scala | 76 +++--- .../adaptive/AdaptiveQueryExecSuite.scala | 294 ++++++++++----------- 14 files changed, 254 insertions(+), 254 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/CustomShuffledRDD.scala b/core/src/test/scala/org/apache/spark/scheduler/AQEShuffledRDD.scala similarity index 95% rename from core/src/test/scala/org/apache/spark/scheduler/CustomShuffledRDD.scala rename to core/src/test/scala/org/apache/spark/scheduler/AQEShuffledRDD.scala index 46e5e6f..ae5e0e9 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/CustomShuffledRDD.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/AQEShuffledRDD.scala @@ -65,7 +65,7 @@ class CoalescedPartitioner(val parent: Partitioner, val partitionStartIndices: A } } -private[spark] class CustomShuffledRDDPartition( +private[spark] class AQEShuffledRDDPartition( val index: Int, val startIndexInParent: Int, val endIndexInParent: Int) extends Partition { @@ -78,7 +78,7 @@ private[spark] class CustomShuffledRDDPartition( * A special ShuffledRDD that supports a ShuffleDependency object from outside and launching reduce * tasks that read multiple map output partitions. */ -class CustomShuffledRDD[K, V, C]( +class AQEShuffledRDD[K, V, C]( var dependency: ShuffleDependency[K, V, C], partitionStartIndices: Array[Int]) extends RDD[(K, C)](dependency.rdd.context, Seq(dependency)) { @@ -98,12 +98,12 @@ class CustomShuffledRDD[K, V, C]( Array.tabulate[Partition](partitionStartIndices.length) { i => val startIndex = partitionStartIndices(i) val endIndex = if (i < partitionStartIndices.length - 1) partitionStartIndices(i + 1) else n - new CustomShuffledRDDPartition(i, startIndex, endIndex) + new AQEShuffledRDDPartition(i, startIndex, endIndex) } } override def compute(p: Partition, context: TaskContext): Iterator[(K, C)] = { - val part = p.asInstanceOf[CustomShuffledRDDPartition] + val part = p.asInstanceOf[AQEShuffledRDDPartition] val metrics = context.taskMetrics().createTempShuffleReadMetrics() SparkEnv.get.shuffleManager.getReader( dependency.shuffleHandle, part.startIndexInParent, part.endIndexInParent, context, metrics) diff --git a/core/src/test/scala/org/apache/spark/scheduler/AdaptiveSchedulingSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/AdaptiveSchedulingSuite.scala index e0f474aa..71d213d 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/AdaptiveSchedulingSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/AdaptiveSchedulingSuite.scala @@ -36,7 +36,7 @@ class AdaptiveSchedulingSuite extends SparkFunSuite with LocalSparkContext { (x, x) } val dep = new ShuffleDependency[Int, Int, Int](rdd, new HashPartitioner(2)) - val shuffled = new CustomShuffledRDD[Int, Int, Int](dep) + val shuffled = new AQEShuffledRDD[Int, Int, Int](dep) sc.submitMapStage(dep).get() assert(AdaptiveSchedulingSuiteState.tasksRun == 3) assert(shuffled.collect().toSet == Set((1, 1), (2, 2), (3, 3))) @@ -50,7 +50,7 @@ class AdaptiveSchedulingSuite extends SparkFunSuite with LocalSparkContext { sc = new SparkContext("local", "test") val rdd = sc.parallelize(0 to 2, 3).map(x => (x, x)) val dep = new ShuffleDependency[Int, Int, Int](rdd, new HashPartitioner(3)) - val shuffled = new CustomShuffledRDD[Int, Int, Int](dep, Array(0, 2)) + val shuffled = new AQEShuffledRDD[Int, Int, Int](dep, Array(0, 2)) assert(shuffled.partitions.length === 2) assert(shuffled.glom().map(_.toSet).collect().toSet == Set(Set((0, 0), (1, 1)), Set((2, 2)))) } @@ -60,7 +60,7 @@ class AdaptiveSchedulingSuite extends SparkFunSuite with LocalSparkContext { val rdd = sc.parallelize(0 to 2, 3).map(x => (x, x)) // Also create lots of hash partitions so that some of them are empty val dep = new ShuffleDependency[Int, Int, Int](rdd, new HashPartitioner(5)) - val shuffled = new CustomShuffledRDD[Int, Int, Int](dep, Array(0)) + val shuffled = new AQEShuffledRDD[Int, Int, Int](dep, Array(0)) assert(shuffled.partitions.length === 1) assert(shuffled.collect().toSet == Set((0, 0), (1, 1), (2, 2))) } @@ -69,7 +69,7 @@ class AdaptiveSchedulingSuite extends SparkFunSuite with LocalSparkContext { sc = new SparkContext("local", "test") val rdd = sc.parallelize(0 to 2, 3).map(x => (x, x)) val dep = new ShuffleDependency[Int, Int, Int](rdd, new HashPartitioner(3)) - val shuffled = new CustomShuffledRDD[Int, Int, Int](dep, Array(0, 0, 0, 1, 1, 1, 2)) + val shuffled = new AQEShuffledRDD[Int, Int, Int](dep, Array(0, 0, 0, 1, 1, 1, 2)) assert(shuffled.partitions.length === 7) assert(shuffled.collect().toSet == Set((0, 0), (1, 1), (2, 2))) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEShuffleReadExec.scala similarity index 93% rename from sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala rename to sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEShuffleReadExec.scala index b8aef14..d897507 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEShuffleReadExec.scala @@ -37,12 +37,12 @@ import org.apache.spark.sql.vectorized.ColumnarBatch * @param partitionSpecs The partition specs that defines the arrangement, requires at least one * partition. */ -case class CustomShuffleReaderExec private( +case class AQEShuffleReadExec private( child: SparkPlan, partitionSpecs: Seq[ShufflePartitionSpec]) extends UnaryExecNode { - assert(partitionSpecs.nonEmpty, "CustomShuffleReaderExec requires at least one partition") + assert(partitionSpecs.nonEmpty, s"${getClass.getSimpleName} requires at least one partition") - // If this reader is to read shuffle files locally, then all partition specs should be + // If this is to read shuffle files locally, then all partition specs should be // `PartialMapperPartitionSpec`. if (partitionSpecs.exists(_.isInstanceOf[PartialMapperPartitionSpec])) { assert(partitionSpecs.forall(_.isInstanceOf[PartialMapperPartitionSpec])) @@ -52,7 +52,7 @@ case class CustomShuffleReaderExec private( override def output: Seq[Attribute] = child.output override lazy val outputPartitioning: Partitioning = { - // If it is a local shuffle reader with one mapper per task, then the output partitioning is + // If it is a local shuffle read with one mapper per task, then the output partitioning is // the same as the plan before shuffle. // TODO this check is based on assumptions of callers' behavior but is sufficient for now. if (partitionSpecs.forall(_.isInstanceOf[PartialMapperPartitionSpec]) && @@ -75,7 +75,7 @@ case class CustomShuffleReaderExec private( } override def stringArgs: Iterator[Any] = { - val desc = if (isLocalReader) { + val desc = if (isLocalRead) { "local" } else if (hasCoalescedPartition && hasSkewedPartition) { "coalesced and skewed" @@ -104,7 +104,7 @@ case class CustomShuffleReaderExec private( def hasSkewedPartition: Boolean = partitionSpecs.exists(_.isInstanceOf[PartialReducerPartitionSpec]) - def isLocalReader: Boolean = + def isLocalRead: Boolean = partitionSpecs.exists(_.isInstanceOf[PartialMapperPartitionSpec]) || partitionSpecs.exists(_.isInstanceOf[CoalescedMapperPartitionSpec]) @@ -114,7 +114,7 @@ case class CustomShuffleReaderExec private( } @transient private lazy val partitionDataSizes: Option[Seq[Long]] = { - if (!isLocalReader && shuffleStage.get.mapStats.isDefined) { + if (!isLocalRead && shuffleStage.get.mapStats.isDefined) { Some(partitionSpecs.map { case p: CoalescedPartitionSpec => assert(p.dataSize.isDefined) @@ -166,8 +166,8 @@ case class CustomShuffleReaderExec private( @transient override lazy val metrics: Map[String, SQLMetric] = { if (shuffleStage.isDefined) { Map("numPartitions" -> SQLMetrics.createMetric(sparkContext, "number of partitions")) ++ { - if (isLocalReader) { - // We split the mapper partition evenly when creating local shuffle reader, so no + if (isLocalRead) { + // We split the mapper partition evenly when creating local shuffle read, so no // data size info is available. Map.empty } else { @@ -208,6 +208,6 @@ case class CustomShuffleReaderExec private( shuffleRDD.asInstanceOf[RDD[ColumnarBatch]] } - override protected def withNewChildInternal(newChild: SparkPlan): CustomShuffleReaderExec = + override protected def withNewChildInternal(newChild: SparkPlan): AQEShuffleReadExec = copy(child = newChild) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderRule.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEShuffleReadRule.scala similarity index 88% rename from sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderRule.scala rename to sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEShuffleReadRule.scala index 3004a3d..1c7f2ea 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderRule.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEShuffleReadRule.scala @@ -22,9 +22,9 @@ import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.exchange.ShuffleOrigin /** - * Adaptive Query Execution rule that may create [[CustomShuffleReaderExec]] on top of query stages. + * Adaptive Query Execution rule that may create [[AQEShuffleReadExec]] on top of query stages. */ -trait CustomShuffleReaderRule extends Rule[SparkPlan] { +trait AQEShuffleReadRule extends Rule[SparkPlan] { /** * Returns the list of [[ShuffleOrigin]]s supported by this rule. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index 93beef8b..2f6619d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -97,13 +97,13 @@ case class AdaptiveSparkPlanExec( @transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq( PlanAdaptiveDynamicPruningFilters(this), ReuseAdaptiveSubquery(context.subqueryCache), - // Skew join does not handle `CustomShuffleReader` so needs to be applied first. + // Skew join does not handle `AQEShuffleRead` so needs to be applied first. OptimizeSkewedJoin, OptimizeSkewInRebalancePartitions, CoalesceShufflePartitions(context.session), - // `OptimizeLocalShuffleReader` needs to make use of 'CustomShuffleReaderExec.partitionSpecs' + // `OptimizeShuffleWithLocalRead` needs to make use of 'AQEShuffleReadExec.partitionSpecs' // added by `CoalesceShufflePartitions`, and must be executed after it. - OptimizeLocalShuffleReader + OptimizeShuffleWithLocalRead ) // A list of physical optimizer rules to be applied right after a new stage is created. The input @@ -116,7 +116,7 @@ case class AdaptiveSparkPlanExec( // The partitioning of the query output depends on the shuffle(s) in the final stage. If the // original plan contains a repartition operator, we need to preserve the specified partitioning, // whether or not the repartition-introduced shuffle is optimized out because of an underlying - // shuffle of the same partitioning. Thus, we need to exclude some `CustomShuffleReaderRule`s + // shuffle of the same partitioning. Thus, we need to exclude some `AQEShuffleReadRule`s // from the final stage, depending on the presence and properties of repartition operators. private def finalStageOptimizerRules: Seq[Rule[SparkPlan]] = { val origins = inputPlan.collect { @@ -124,7 +124,7 @@ case class AdaptiveSparkPlanExec( } val allRules = queryStageOptimizerRules ++ postStageCreationRules allRules.filter { - case c: CustomShuffleReaderRule => + case c: AQEShuffleReadRule => origins.forall(c.supportedShuffleOrigins.contains) case _ => true } @@ -134,7 +134,7 @@ case class AdaptiveSparkPlanExec( val optimized = rules.foldLeft(plan) { case (latestPlan, rule) => val applied = rule.apply(latestPlan) val result = rule match { - case c: CustomShuffleReaderRule if c.mayAddExtraShuffles => + case c: AQEShuffleReadRule if c.mayAddExtraShuffles => if (ValidateRequirements.validate(applied)) { applied } else { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala index 0f16675..7f3e453 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.internal.SQLConf * A rule to coalesce the shuffle partitions based on the map output statistics, which can * avoid many small reduce tasks that hurt performance. */ -case class CoalesceShufflePartitions(session: SparkSession) extends CustomShuffleReaderRule { +case class CoalesceShufflePartitions(session: SparkSession) extends AQEShuffleReadRule { override val supportedShuffleOrigins: Seq[ShuffleOrigin] = Seq(ENSURE_REQUIREMENTS, REPARTITION_BY_COL, REBALANCE_PARTITIONS_BY_NONE, @@ -88,23 +88,23 @@ case class CoalesceShufflePartitions(session: SparkSession) extends CustomShuffl val specsMap = shuffleStageInfos.zip(newPartitionSpecs).map { case (stageInfo, partSpecs) => (stageInfo.shuffleStage.id, partSpecs) }.toMap - updateShuffleReaders(plan, specsMap) + updateShuffleReads(plan, specsMap) } else { plan } } } - private def updateShuffleReaders( + private def updateShuffleReads( plan: SparkPlan, specsMap: Map[Int, Seq[ShufflePartitionSpec]]): SparkPlan = plan match { // Even for shuffle exchange whose input RDD has 0 partition, we should still update its // `partitionStartIndices`, so that all the leaf shuffles in a stage have the same // number of output partitions. case ShuffleStageInfo(stage, _) => specsMap.get(stage.id).map { specs => - CustomShuffleReaderExec(stage, specs) + AQEShuffleReadExec(stage, specs) }.getOrElse(plan) - case other => other.mapChildren(updateShuffleReaders(_, specsMap)) + case other => other.mapChildren(updateShuffleReads(_, specsMap)) } private def supportCoalesce(s: ShuffleExchangeLike): Boolean = { @@ -121,7 +121,7 @@ private object ShuffleStageInfo { : Option[(ShuffleQueryStageExec, Option[Seq[ShufflePartitionSpec]])] = plan match { case stage: ShuffleQueryStageExec => Some((stage, None)) - case CustomShuffleReaderExec(s: ShuffleQueryStageExec, partitionSpecs) => + case AQEShuffleReadExec(s: ShuffleQueryStageExec, partitionSpecs) => Some((s, Some(partitionSpecs))) case _ => None } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeShuffleWithLocalRead.scala similarity index 78% rename from sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala rename to sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeShuffleWithLocalRead.scala index 3e3d6d6..844acbd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeShuffleWithLocalRead.scala @@ -25,40 +25,40 @@ import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec import org.apache.spark.sql.internal.SQLConf /** - * A rule to optimize the shuffle reader to local reader iff no additional shuffles + * A rule to optimize the shuffle read to local read iff no additional shuffles * will be introduced: - * 1. if the input plan is a shuffle, add local reader directly as we can never introduce + * 1. if the input plan is a shuffle, add local read directly as we can never introduce * extra shuffles in this case. - * 2. otherwise, add local reader to the probe side of broadcast hash join and + * 2. otherwise, add local read to the probe side of broadcast hash join and * then run `EnsureRequirements` to check whether additional shuffle introduced. - * If introduced, we will revert all the local readers. + * If introduced, we will revert all the local reads. */ -object OptimizeLocalShuffleReader extends CustomShuffleReaderRule { +object OptimizeShuffleWithLocalRead extends AQEShuffleReadRule { override val supportedShuffleOrigins: Seq[ShuffleOrigin] = Seq(ENSURE_REQUIREMENTS, REBALANCE_PARTITIONS_BY_NONE) override def mayAddExtraShuffles: Boolean = true - // The build side is a broadcast query stage which should have been optimized using local reader + // The build side is a broadcast query stage which should have been optimized using local read // already. So we only need to deal with probe side here. - private def createProbeSideLocalReader(plan: SparkPlan): SparkPlan = { + private def createProbeSideLocalRead(plan: SparkPlan): SparkPlan = { plan.transformDown { case join @ BroadcastJoinWithShuffleLeft(shuffleStage, BuildRight) => - val localReader = createLocalReader(shuffleStage) - join.asInstanceOf[BroadcastHashJoinExec].copy(left = localReader) + val localRead = createLocalRead(shuffleStage) + join.asInstanceOf[BroadcastHashJoinExec].copy(left = localRead) case join @ BroadcastJoinWithShuffleRight(shuffleStage, BuildLeft) => - val localReader = createLocalReader(shuffleStage) - join.asInstanceOf[BroadcastHashJoinExec].copy(right = localReader) + val localRead = createLocalRead(shuffleStage) + join.asInstanceOf[BroadcastHashJoinExec].copy(right = localRead) } } - private def createLocalReader(plan: SparkPlan): CustomShuffleReaderExec = { + private def createLocalRead(plan: SparkPlan): AQEShuffleReadExec = { plan match { - case c @ CustomShuffleReaderExec(s: ShuffleQueryStageExec, _) => - CustomShuffleReaderExec(s, getPartitionSpecs(s, Some(c.partitionSpecs.length))) + case c @ AQEShuffleReadExec(s: ShuffleQueryStageExec, _) => + AQEShuffleReadExec(s, getPartitionSpecs(s, Some(c.partitionSpecs.length))) case s: ShuffleQueryStageExec => - CustomShuffleReaderExec(s, getPartitionSpecs(s, None)) + AQEShuffleReadExec(s, getPartitionSpecs(s, None)) } } @@ -111,16 +111,16 @@ object OptimizeLocalShuffleReader extends CustomShuffleReaderRule { } plan match { - case s: SparkPlan if canUseLocalShuffleReader(s) => - createLocalReader(s) + case s: SparkPlan if canUseLocalShuffleRead(s) => + createLocalRead(s) case s: SparkPlan => - createProbeSideLocalReader(s) + createProbeSideLocalRead(s) } } object BroadcastJoinWithShuffleLeft { def unapply(plan: SparkPlan): Option[(SparkPlan, BuildSide)] = plan match { - case join: BroadcastHashJoinExec if canUseLocalShuffleReader(join.left) => + case join: BroadcastHashJoinExec if canUseLocalShuffleRead(join.left) => Some((join.left, join.buildSide)) case _ => None } @@ -128,22 +128,22 @@ object OptimizeLocalShuffleReader extends CustomShuffleReaderRule { object BroadcastJoinWithShuffleRight { def unapply(plan: SparkPlan): Option[(SparkPlan, BuildSide)] = plan match { - case join: BroadcastHashJoinExec if canUseLocalShuffleReader(join.right) => + case join: BroadcastHashJoinExec if canUseLocalShuffleRead(join.right) => Some((join.right, join.buildSide)) case _ => None } } - def canUseLocalShuffleReader(plan: SparkPlan): Boolean = plan match { + def canUseLocalShuffleRead(plan: SparkPlan): Boolean = plan match { case s: ShuffleQueryStageExec => - s.mapStats.isDefined && supportLocalReader(s.shuffle) - case CustomShuffleReaderExec(s: ShuffleQueryStageExec, _) => - s.mapStats.isDefined && supportLocalReader(s.shuffle) && + s.mapStats.isDefined && supportLocalRead(s.shuffle) + case AQEShuffleReadExec(s: ShuffleQueryStageExec, _) => + s.mapStats.isDefined && supportLocalRead(s.shuffle) && s.shuffle.shuffleOrigin == ENSURE_REQUIREMENTS case _ => false } - private def supportLocalReader(s: ShuffleExchangeLike): Boolean = { + private def supportLocalRead(s: ShuffleExchangeLike): Boolean = { s.outputPartitioning != SinglePartition && supportedShuffleOrigins.contains(s.shuffleOrigin) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewInRebalancePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewInRebalancePartitions.scala index d8a198f..dc437403 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewInRebalancePartitions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewInRebalancePartitions.scala @@ -37,7 +37,7 @@ import org.apache.spark.sql.internal.SQLConf * Note that, this rule is only applied with the SparkPlan whose top-level node is * ShuffleQueryStageExec. */ -object OptimizeSkewInRebalancePartitions extends CustomShuffleReaderRule { +object OptimizeSkewInRebalancePartitions extends AQEShuffleReadRule { override def supportedShuffleOrigins: Seq[ShuffleOrigin] = Seq(REBALANCE_PARTITIONS_BY_NONE, REBALANCE_PARTITIONS_BY_COL) @@ -82,7 +82,7 @@ object OptimizeSkewInRebalancePartitions extends CustomShuffleReaderRule { if (newPartitionsSpec.length == mapStats.get.bytesByPartitionId.length) { shuffle } else { - CustomShuffleReaderExec(shuffle, newPartitionsSpec) + AQEShuffleReadExec(shuffle, newPartitionsSpec) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala index 810084a..fbfbce6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala @@ -48,7 +48,7 @@ import org.apache.spark.sql.internal.SQLConf * (L3, R3-1), (L3, R3-2), * (L4-1, R4-1), (L4-2, R4-1), (L4-1, R4-2), (L4-2, R4-2) */ -object OptimizeSkewedJoin extends CustomShuffleReaderRule { +object OptimizeSkewedJoin extends AQEShuffleReadRule { override val supportedShuffleOrigins: Seq[ShuffleOrigin] = Seq(ENSURE_REQUIREMENTS) @@ -110,9 +110,9 @@ object OptimizeSkewedJoin extends CustomShuffleReaderRule { * 2. Assuming partition0 is skewed in left side, and it has 5 mappers (Map0, Map1...Map4). * And we may split the 5 Mappers into 3 mapper ranges [(Map0, Map1), (Map2, Map3), (Map4)] * based on the map size and the max split number. - * 3. Wrap the join left child with a special shuffle reader that reads each mapper range with one + * 3. Wrap the join left child with a special shuffle read that loads each mapper range with one * task, so total 3 tasks. - * 4. Wrap the join right child with a special shuffle reader that reads partition0 3 times by + * 4. Wrap the join right child with a special shuffle read that loads partition0 3 times by * 3 tasks separately. */ private def tryOptimizeJoinChildren( @@ -196,8 +196,8 @@ object OptimizeSkewedJoin extends CustomShuffleReaderRule { } logDebug(s"number of skewed partitions: left $numSkewedLeft, right $numSkewedRight") if (numSkewedLeft > 0 || numSkewedRight > 0) { - Some((CustomShuffleReaderExec(left, leftSidePartitions.toSeq), - CustomShuffleReaderExec(right, rightSidePartitions.toSeq))) + Some((AQEShuffleReadExec(left, leftSidePartitions.toSeq), + AQEShuffleReadExec(right, rightSidePartitions.toSeq))) } else { None } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala index 4ef7d33..64f89b9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala @@ -86,7 +86,7 @@ object ShufflePartitionsUtil extends Logging { // we should skip it when calculating the `partitionStartIndices`. val validMetrics = mapOutputStatistics.flatten val numShuffles = mapOutputStatistics.length - // If all input RDDs have 0 partition, we create an empty partition for every shuffle reader. + // If all input RDDs have 0 partition, we create an empty partition for every shuffle read. if (validMetrics.isEmpty) { return Seq.fill(numShuffles)(Seq(CoalescedPartitionSpec(0, 0, 0))) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala index e8cf768..5a45af6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala @@ -99,13 +99,13 @@ case object REPARTITION_BY_NUM extends ShuffleOrigin // Indicates that the shuffle operator was added by the user-specified rebalance operator. // Spark will try to rebalance partitions that make per-partition size not too small and not -// too big. Local shuffle reader will be used if possible to reduce network traffic. +// too big. Local shuffle read will be used if possible to reduce network traffic. case object REBALANCE_PARTITIONS_BY_NONE extends ShuffleOrigin // Indicates that the shuffle operator was added by the user-specified rebalance operator with // columns. Spark will try to rebalance partitions that make per-partition size not too small and // not too big. -// Different from `REBALANCE_PARTITIONS_BY_NONE`, local shuffle reader cannot be used for it as +// Different from `REBALANCE_PARTITIONS_BY_NONE`, local shuffle read cannot be used for it as // the output needs to be partitioned by the given columns. case object REBALANCE_PARTITIONS_BY_COL extends ShuffleOrigin diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala index 1e99347..fbbdd42 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala @@ -536,7 +536,7 @@ class ExplainSuiteAE extends ExplainSuiteHelper with EnableAdaptiveExecutionSuit // AdaptiveSparkPlan (21) // +- == Final Plan == // * HashAggregate (12) - // +- CustomShuffleReader (11) + // +- AQEShuffleRead (11) // +- ShuffleQueryStage (10) // +- Exchange (9) // +- * HashAggregate (8) @@ -570,7 +570,7 @@ class ExplainSuiteAE extends ExplainSuiteHelper with EnableAdaptiveExecutionSuit |Output [5]: [k#x, count#xL, sum#xL, sum#x, count#xL] |Arguments: 1""".stripMargin, """ - |(11) CustomShuffleReader + |(11) AQEShuffleRead |Input [5]: [k#x, count#xL, sum#xL, sum#x, count#xL] |""".stripMargin, """ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala index fbae929..2a28517 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala @@ -24,7 +24,7 @@ import org.apache.spark.internal.config.IO_ENCRYPTION_ENABLED import org.apache.spark.internal.config.UI.UI_ENABLED import org.apache.spark.sql._ import org.apache.spark.sql.execution.adaptive._ -import org.apache.spark.sql.execution.adaptive.CustomShuffleReaderExec +import org.apache.spark.sql.execution.adaptive.AQEShuffleReadExec import org.apache.spark.sql.execution.exchange.ReusedExchangeExec import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf @@ -110,18 +110,18 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterAl // by the ExchangeCoordinator. val finalPlan = agg.queryExecution.executedPlan .asInstanceOf[AdaptiveSparkPlanExec].executedPlan - val shuffleReaders = finalPlan.collect { - case r @ CoalescedShuffleReader() => r + val shuffleReads = finalPlan.collect { + case r @ CoalescedShuffleRead() => r } minNumPostShufflePartitions match { case Some(numPartitions) => - assert(shuffleReaders.isEmpty) + assert(shuffleReads.isEmpty) case None => - assert(shuffleReaders.length === 1) - shuffleReaders.foreach { reader => - assert(reader.outputPartitioning.numPartitions === 3) + assert(shuffleReads.length === 1) + shuffleReads.foreach { read => + assert(read.outputPartitioning.numPartitions === 3) } } } @@ -156,18 +156,18 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterAl // by the ExchangeCoordinator. val finalPlan = join.queryExecution.executedPlan .asInstanceOf[AdaptiveSparkPlanExec].executedPlan - val shuffleReaders = finalPlan.collect { - case r @ CoalescedShuffleReader() => r + val shuffleReads = finalPlan.collect { + case r @ CoalescedShuffleRead() => r } minNumPostShufflePartitions match { case Some(numPartitions) => - assert(shuffleReaders.isEmpty) + assert(shuffleReads.isEmpty) case None => - assert(shuffleReaders.length === 2) - shuffleReaders.foreach { reader => - assert(reader.outputPartitioning.numPartitions === 2) + assert(shuffleReads.length === 2) + shuffleReads.foreach { read => + assert(read.outputPartitioning.numPartitions === 2) } } } @@ -207,18 +207,18 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterAl // by the ExchangeCoordinator. val finalPlan = join.queryExecution.executedPlan .asInstanceOf[AdaptiveSparkPlanExec].executedPlan - val shuffleReaders = finalPlan.collect { - case r @ CoalescedShuffleReader() => r + val shuffleReads = finalPlan.collect { + case r @ CoalescedShuffleRead() => r } minNumPostShufflePartitions match { case Some(numPartitions) => - assert(shuffleReaders.isEmpty) + assert(shuffleReads.isEmpty) case None => - assert(shuffleReaders.length === 2) - shuffleReaders.foreach { reader => - assert(reader.outputPartitioning.numPartitions === 2) + assert(shuffleReads.length === 2) + shuffleReads.foreach { read => + assert(read.outputPartitioning.numPartitions === 2) } } } @@ -258,18 +258,18 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterAl // by the ExchangeCoordinator. val finalPlan = join.queryExecution.executedPlan .asInstanceOf[AdaptiveSparkPlanExec].executedPlan - val shuffleReaders = finalPlan.collect { - case r @ CoalescedShuffleReader() => r + val shuffleReads = finalPlan.collect { + case r @ CoalescedShuffleRead() => r } minNumPostShufflePartitions match { case Some(numPartitions) => - assert(shuffleReaders.isEmpty) + assert(shuffleReads.isEmpty) case None => - assert(shuffleReaders.length === 2) - shuffleReaders.foreach { reader => - assert(reader.outputPartitioning.numPartitions === 3) + assert(shuffleReads.length === 2) + shuffleReads.foreach { read => + assert(read.outputPartitioning.numPartitions === 3) } } } @@ -300,10 +300,10 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterAl // Then, let's make sure we do not reduce number of post shuffle partitions. val finalPlan = join.queryExecution.executedPlan .asInstanceOf[AdaptiveSparkPlanExec].executedPlan - val shuffleReaders = finalPlan.collect { - case r @ CoalescedShuffleReader() => r + val shuffleReads = finalPlan.collect { + case r @ CoalescedShuffleRead() => r } - assert(shuffleReaders.length === 0) + assert(shuffleReads.length === 0) } finally { spark.sql("drop table t") } @@ -331,7 +331,7 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterAl }.length == 2) assert( finalPlan.collect { - case r @ CoalescedShuffleReader() => r + case r @ CoalescedShuffleRead() => r }.length == 3) @@ -357,14 +357,14 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterAl assert( finalPlan2.collect { - case r @ CoalescedShuffleReader() => r + case r @ CoalescedShuffleRead() => r }.length == 2, "finalPlan2") level1Stages.foreach(qs => assert(qs.plan.collect { - case r @ CoalescedShuffleReader() => r + case r @ CoalescedShuffleRead() => r }.length == 1, - "Wrong CoalescedShuffleReader below " + qs.simpleString(3))) + "Wrong CoalescedShuffleRead below " + qs.simpleString(3))) val leafStages = level1Stages.flatMap { stage => // All of the child stages of result stage have only one child stage. @@ -395,7 +395,7 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterAl .asInstanceOf[AdaptiveSparkPlanExec].executedPlan assert( finalPlan.collect { - case r @ CoalescedShuffleReader() => r + case r @ CoalescedShuffleRead() => r }.isEmpty) } withSparkSession(test, 200, None) @@ -416,7 +416,7 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterAl // the shuffle partition numbers. assert( finalPlan.collect { - case r @ CoalescedShuffleReader() => r + case r @ CoalescedShuffleRead() => r }.isEmpty) } withSparkSession(test, 100, None) @@ -432,7 +432,7 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterAl .asInstanceOf[AdaptiveSparkPlanExec].executedPlan assert( finalPlan.collect { - case r @ CoalescedShuffleReader() => r + case r @ CoalescedShuffleRead() => r }.isDefinedAt(0)) } Seq(true, false).foreach { enableIOEncryption => @@ -442,8 +442,8 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterAl } } -object CoalescedShuffleReader { - def unapply(reader: CustomShuffleReaderExec): Boolean = { - !reader.isLocalReader && !reader.hasSkewedPartition && reader.hasCoalescedPartition +object CoalescedShuffleRead { + def unapply(read: AQEShuffleReadExec): Boolean = { + !read.isLocalRead && !read.hasSkewedPartition && read.hasCoalescedPartition } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 8a74981..46ca786 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -139,21 +139,21 @@ class AdaptiveQueryExecSuite } } - private def checkNumLocalShuffleReaders( - plan: SparkPlan, numShufflesWithoutLocalReader: Int = 0): Unit = { + private def checkNumLocalShuffleReads( + plan: SparkPlan, numShufflesWithoutLocalRead: Int = 0): Unit = { val numShuffles = collect(plan) { case s: ShuffleQueryStageExec => s }.length - val numLocalReaders = collect(plan) { - case reader: CustomShuffleReaderExec if reader.isLocalReader => reader + val numLocalReads = collect(plan) { + case read: AQEShuffleReadExec if read.isLocalRead => read } - numLocalReaders.foreach { r => + numLocalReads.foreach { r => val rdd = r.execute() val parts = rdd.partitions assert(parts.forall(rdd.preferredLocations(_).nonEmpty)) } - assert(numShuffles === (numLocalReaders.length + numShufflesWithoutLocalReader)) + assert(numShuffles === (numLocalReads.length + numShufflesWithoutLocalRead)) } private def checkInitialPartitionNum(df: Dataset[_], numPartition: Int): Unit = { @@ -177,11 +177,11 @@ class AdaptiveQueryExecSuite assert(smj.size == 1) val bhj = findTopLevelBroadcastHashJoin(adaptivePlan) assert(bhj.size == 1) - checkNumLocalShuffleReaders(adaptivePlan) + checkNumLocalShuffleReads(adaptivePlan) } } - test("Reuse the parallelism of CoalescedShuffleReaderExec in LocalShuffleReaderExec") { + test("Reuse the parallelism of coalesced shuffle in local shuffle read") { withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80", @@ -192,12 +192,12 @@ class AdaptiveQueryExecSuite assert(smj.size == 1) val bhj = findTopLevelBroadcastHashJoin(adaptivePlan) assert(bhj.size == 1) - val localReaders = collect(adaptivePlan) { - case reader: CustomShuffleReaderExec if reader.isLocalReader => reader + val localReads = collect(adaptivePlan) { + case read: AQEShuffleReadExec if read.isLocalRead => read } - assert(localReaders.length == 2) - val localShuffleRDD0 = localReaders(0).execute().asInstanceOf[ShuffledRowRDD] - val localShuffleRDD1 = localReaders(1).execute().asInstanceOf[ShuffledRowRDD] + assert(localReads.length == 2) + val localShuffleRDD0 = localReads(0).execute().asInstanceOf[ShuffledRowRDD] + val localShuffleRDD1 = localReads(1).execute().asInstanceOf[ShuffledRowRDD] // The pre-shuffle partition size is [0, 0, 0, 72, 0] // We exclude the 0-size partitions, so only one partition, advisoryParallelism = 1 // the final parallelism is @@ -213,7 +213,7 @@ class AdaptiveQueryExecSuite } } - test("Reuse the default parallelism in LocalShuffleReaderExec") { + test("Reuse the default parallelism in local shuffle read") { withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80", @@ -224,12 +224,12 @@ class AdaptiveQueryExecSuite assert(smj.size == 1) val bhj = findTopLevelBroadcastHashJoin(adaptivePlan) assert(bhj.size == 1) - val localReaders = collect(adaptivePlan) { - case reader: CustomShuffleReaderExec if reader.isLocalReader => reader + val localReads = collect(adaptivePlan) { + case read: AQEShuffleReadExec if read.isLocalRead => read } - assert(localReaders.length == 2) - val localShuffleRDD0 = localReaders(0).execute().asInstanceOf[ShuffledRowRDD] - val localShuffleRDD1 = localReaders(1).execute().asInstanceOf[ShuffledRowRDD] + assert(localReads.length == 2) + val localShuffleRDD0 = localReads(0).execute().asInstanceOf[ShuffledRowRDD] + val localShuffleRDD1 = localReads(1).execute().asInstanceOf[ShuffledRowRDD] // the final parallelism is math.max(1, numReduces / numMappers): math.max(1, 5/2) = 2 // and the partitions length is 2 * numMappers = 4 assert(localShuffleRDD0.getPartitions.length == 4) @@ -252,11 +252,11 @@ class AdaptiveQueryExecSuite checkAnswer(testDf, Seq()) val plan = testDf.queryExecution.executedPlan assert(find(plan)(_.isInstanceOf[SortMergeJoinExec]).isDefined) - val coalescedReaders = collect(plan) { - case r: CustomShuffleReaderExec => r + val coalescedReads = collect(plan) { + case r: AQEShuffleReadExec => r } - assert(coalescedReaders.length == 3) - coalescedReaders.foreach(r => assert(r.partitionSpecs.length == 1)) + assert(coalescedReads.length == 3) + coalescedReads.foreach(r => assert(r.partitionSpecs.length == 1)) } withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1") { @@ -265,11 +265,11 @@ class AdaptiveQueryExecSuite checkAnswer(testDf, Seq()) val plan = testDf.queryExecution.executedPlan assert(find(plan)(_.isInstanceOf[BroadcastHashJoinExec]).isDefined) - val coalescedReaders = collect(plan) { - case r: CustomShuffleReaderExec => r + val coalescedReads = collect(plan) { + case r: AQEShuffleReadExec => r } - assert(coalescedReaders.length == 3, s"$plan") - coalescedReaders.foreach(r => assert(r.isLocalReader || r.partitionSpecs.length == 1)) + assert(coalescedReads.length == 3, s"$plan") + coalescedReads.foreach(r => assert(r.isLocalRead || r.partitionSpecs.length == 1)) } } } @@ -285,7 +285,7 @@ class AdaptiveQueryExecSuite assert(smj.size == 1) val bhj = findTopLevelBroadcastHashJoin(adaptivePlan) assert(bhj.size == 1) - checkNumLocalShuffleReaders(adaptivePlan) + checkNumLocalShuffleReads(adaptivePlan) } } @@ -301,7 +301,7 @@ class AdaptiveQueryExecSuite val bhj = findTopLevelBroadcastHashJoin(adaptivePlan) assert(bhj.size == 1) - checkNumLocalShuffleReaders(adaptivePlan) + checkNumLocalShuffleReads(adaptivePlan) } } @@ -342,11 +342,11 @@ class AdaptiveQueryExecSuite // +-LocalShuffleReader* // +- ShuffleExchange - // After applied the 'OptimizeLocalShuffleReader' rule, we can convert all the four - // shuffle reader to local shuffle reader in the bottom two 'BroadcastHashJoin'. + // After applied the 'OptimizeShuffleWithLocalRead' rule, we can convert all the four + // shuffle read to local shuffle read in the bottom two 'BroadcastHashJoin'. // For the top level 'BroadcastHashJoin', the probe side is not shuffle query stage - // and the build side shuffle query stage is also converted to local shuffle reader. - checkNumLocalShuffleReaders(adaptivePlan) + // and the build side shuffle query stage is also converted to local shuffle read. + checkNumLocalShuffleReads(adaptivePlan) } } @@ -390,8 +390,8 @@ class AdaptiveQueryExecSuite // +- CoalescedShuffleReader // +- ShuffleExchange - // The shuffle added by Aggregate can't apply local reader. - checkNumLocalShuffleReaders(adaptivePlan, 1) + // The shuffle added by Aggregate can't apply local read. + checkNumLocalShuffleReads(adaptivePlan, 1) } } @@ -436,8 +436,8 @@ class AdaptiveQueryExecSuite // +-LocalShuffleReader* // +- ShuffleExchange - // The shuffle added by Aggregate can't apply local reader. - checkNumLocalShuffleReaders(adaptivePlan, 1) + // The shuffle added by Aggregate can't apply local read. + checkNumLocalShuffleReads(adaptivePlan, 1) } } @@ -452,9 +452,9 @@ class AdaptiveQueryExecSuite assert(smj.size == 3) val bhj = findTopLevelBroadcastHashJoin(adaptivePlan) assert(bhj.size == 2) - // There is still a SMJ, and its two shuffles can't apply local reader. - checkNumLocalShuffleReaders(adaptivePlan, 2) - // Even with local shuffle reader, the query stage reuse can also work. + // There is still a SMJ, and its two shuffles can't apply local read. + checkNumLocalShuffleReads(adaptivePlan, 2) + // Even with local shuffle read, the query stage reuse can also work. val ex = findReusedExchange(adaptivePlan) assert(ex.size == 1) } @@ -471,8 +471,8 @@ class AdaptiveQueryExecSuite assert(smj.size == 1) val bhj = findTopLevelBroadcastHashJoin(adaptivePlan) assert(bhj.size == 1) - checkNumLocalShuffleReaders(adaptivePlan) - // Even with local shuffle reader, the query stage reuse can also work. + checkNumLocalShuffleReads(adaptivePlan) + // Even with local shuffle read, the query stage reuse can also work. val ex = findReusedExchange(adaptivePlan) assert(ex.size == 1) } @@ -491,8 +491,8 @@ class AdaptiveQueryExecSuite assert(smj.size == 1) val bhj = findTopLevelBroadcastHashJoin(adaptivePlan) assert(bhj.size == 1) - checkNumLocalShuffleReaders(adaptivePlan) - // Even with local shuffle reader, the query stage reuse can also work. + checkNumLocalShuffleReads(adaptivePlan) + // Even with local shuffle read, the query stage reuse can also work. val ex = findReusedExchange(adaptivePlan) assert(ex.nonEmpty) val sub = findReusedSubquery(adaptivePlan) @@ -512,8 +512,8 @@ class AdaptiveQueryExecSuite assert(smj.size == 1) val bhj = findTopLevelBroadcastHashJoin(adaptivePlan) assert(bhj.size == 1) - checkNumLocalShuffleReaders(adaptivePlan) - // Even with local shuffle reader, the query stage reuse can also work. + checkNumLocalShuffleReads(adaptivePlan) + // Even with local shuffle read, the query stage reuse can also work. val ex = findReusedExchange(adaptivePlan) assert(ex.isEmpty) val sub = findReusedSubquery(adaptivePlan) @@ -536,8 +536,8 @@ class AdaptiveQueryExecSuite assert(smj.size == 1) val bhj = findTopLevelBroadcastHashJoin(adaptivePlan) assert(bhj.size == 1) - checkNumLocalShuffleReaders(adaptivePlan) - // Even with local shuffle reader, the query stage reuse can also work. + checkNumLocalShuffleReads(adaptivePlan) + // Even with local shuffle read, the query stage reuse can also work. val ex = findReusedExchange(adaptivePlan) assert(ex.nonEmpty) assert(ex.head.child.isInstanceOf[BroadcastExchangeExec]) @@ -599,7 +599,7 @@ class AdaptiveQueryExecSuite } } - test("Change merge join to broadcast join without local shuffle reader") { + test("Change merge join to broadcast join without local shuffle read") { withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", SQLConf.LOCAL_SHUFFLE_READER_ENABLED.key -> "true", @@ -615,8 +615,8 @@ class AdaptiveQueryExecSuite assert(smj.size == 2) val bhj = findTopLevelBroadcastHashJoin(adaptivePlan) assert(bhj.size == 1) - // There is still a SMJ, and its two shuffles can't apply local reader. - checkNumLocalShuffleReaders(adaptivePlan, 2) + // There is still a SMJ, and its two shuffles can't apply local read. + checkNumLocalShuffleReads(adaptivePlan, 2) } } @@ -734,12 +734,12 @@ class AdaptiveQueryExecSuite rightSkewNum: Int): Unit = { assert(joins.size == 1 && joins.head.isSkewJoin) assert(joins.head.left.collect { - case r: CustomShuffleReaderExec => r + case r: AQEShuffleReadExec => r }.head.partitionSpecs.collect { case p: PartialReducerPartitionSpec => p.reducerIndex }.distinct.length == leftSkewNum) assert(joins.head.right.collect { - case r: CustomShuffleReaderExec => r + case r: AQEShuffleReadExec => r }.head.partitionSpecs.collect { case p: PartialReducerPartitionSpec => p.reducerIndex }.distinct.length == rightSkewNum) @@ -895,16 +895,16 @@ class AdaptiveQueryExecSuite } } - test("SPARK-34682: CustomShuffleReaderExec operating on canonicalized plan") { + test("SPARK-34682: AQEShuffleReadExec operating on canonicalized plan") { withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") { val (_, adaptivePlan) = runAdaptiveAndVerifyResult( "SELECT key FROM testData GROUP BY key") - val readers = collect(adaptivePlan) { - case r: CustomShuffleReaderExec => r + val reads = collect(adaptivePlan) { + case r: AQEShuffleReadExec => r } - assert(readers.length == 1) - val reader = readers.head - val c = reader.canonicalized.asInstanceOf[CustomShuffleReaderExec] + assert(reads.length == 1) + val read = reads.head + val c = read.canonicalized.asInstanceOf[AQEShuffleReadExec] // we can't just call execute() because that has separate checks for canonicalized plans val ex = intercept[IllegalStateException] { val doExecute = PrivateMethod[Unit](Symbol("doExecute")) @@ -914,22 +914,22 @@ class AdaptiveQueryExecSuite } } - test("metrics of the shuffle reader") { + test("metrics of the shuffle read") { withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") { val (_, adaptivePlan) = runAdaptiveAndVerifyResult( "SELECT key FROM testData GROUP BY key") - val readers = collect(adaptivePlan) { - case r: CustomShuffleReaderExec => r + val reads = collect(adaptivePlan) { + case r: AQEShuffleReadExec => r } - assert(readers.length == 1) - val reader = readers.head - assert(!reader.isLocalReader) - assert(!reader.hasSkewedPartition) - assert(reader.hasCoalescedPartition) - assert(reader.metrics.keys.toSeq.sorted == Seq( + assert(reads.length == 1) + val read = reads.head + assert(!read.isLocalRead) + assert(!read.hasSkewedPartition) + assert(read.hasCoalescedPartition) + assert(read.metrics.keys.toSeq.sorted == Seq( "numPartitions", "partitionDataSize")) - assert(reader.metrics("numPartitions").value == reader.partitionSpecs.length) - assert(reader.metrics("partitionDataSize").value > 0) + assert(read.metrics("numPartitions").value == read.partitionSpecs.length) + assert(read.metrics("partitionDataSize").value > 0) withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80") { val (_, adaptivePlan) = runAdaptiveAndVerifyResult( @@ -939,14 +939,14 @@ class AdaptiveQueryExecSuite }.head assert(join.buildSide == BuildLeft) - val readers = collect(join.right) { - case r: CustomShuffleReaderExec => r + val reads = collect(join.right) { + case r: AQEShuffleReadExec => r } - assert(readers.length == 1) - val reader = readers.head - assert(reader.isLocalReader) - assert(reader.metrics.keys.toSeq == Seq("numPartitions")) - assert(reader.metrics("numPartitions").value == reader.partitionSpecs.length) + assert(reads.length == 1) + val read = reads.head + assert(read.isLocalRead) + assert(read.metrics.keys.toSeq == Seq("numPartitions")) + assert(read.metrics("numPartitions").value == read.partitionSpecs.length) } withSQLConf( @@ -972,19 +972,19 @@ class AdaptiveQueryExecSuite .createOrReplaceTempView("skewData2") val (_, adaptivePlan) = runAdaptiveAndVerifyResult( "SELECT * FROM skewData1 join skewData2 ON key1 = key2") - val readers = collect(adaptivePlan) { - case r: CustomShuffleReaderExec => r + val reads = collect(adaptivePlan) { + case r: AQEShuffleReadExec => r } - readers.foreach { reader => - assert(!reader.isLocalReader) - assert(reader.hasCoalescedPartition) - assert(reader.hasSkewedPartition) - assert(reader.metrics.contains("numSkewedPartitions")) + reads.foreach { read => + assert(!read.isLocalRead) + assert(read.hasCoalescedPartition) + assert(read.hasSkewedPartition) + assert(read.metrics.contains("numSkewedPartitions")) } - assert(readers(0).metrics("numSkewedPartitions").value == 2) - assert(readers(0).metrics("numSkewedSplits").value == 11) - assert(readers(1).metrics("numSkewedPartitions").value == 1) - assert(readers(1).metrics("numSkewedSplits").value == 9) + assert(reads(0).metrics("numSkewedPartitions").value == 2) + assert(reads(0).metrics("numSkewedSplits").value == 11) + assert(reads(1).metrics("numSkewedPartitions").value == 1) + assert(reads(1).metrics("numSkewedSplits").value == 9) } } } @@ -1233,7 +1233,7 @@ class AdaptiveQueryExecSuite assert(bhj.size == 1) val join = findTopLevelBaseJoin(adaptivePlan) assert(join.isEmpty) - checkNumLocalShuffleReaders(adaptivePlan) + checkNumLocalShuffleReads(adaptivePlan) } } @@ -1252,7 +1252,7 @@ class AdaptiveQueryExecSuite // this is different compares to test(SPARK-32573) due to the rule // `EliminateUnnecessaryJoin` has been excluded. assert(join.nonEmpty) - checkNumLocalShuffleReaders(adaptivePlan) + checkNumLocalShuffleReads(adaptivePlan) } } @@ -1273,7 +1273,7 @@ class AdaptiveQueryExecSuite assert(smj.size == 1) val join = findTopLevelBaseJoin(adaptivePlan) assert(join.isEmpty) - checkNumLocalShuffleReaders(adaptivePlan) + checkNumLocalShuffleReads(adaptivePlan) }) } } @@ -1431,7 +1431,7 @@ class AdaptiveQueryExecSuite } } - test("SPARK-32932: Do not use local shuffle reader at final stage on write command") { + test("SPARK-32932: Do not use local shuffle read at final stage on write command") { withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key -> PartitionOverwriteMode.DYNAMIC.toString, SQLConf.SHUFFLE_PARTITIONS.key -> "5", SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") { @@ -1441,14 +1441,14 @@ class AdaptiveQueryExecSuite ) yield (i, j) val df = data.toDF("i", "j").repartition($"j") - var noLocalReader: Boolean = false + var noLocalread: Boolean = false val listener = new QueryExecutionListener { override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { qe.executedPlan match { case plan@(_: DataWritingCommandExec | _: V2TableWriteExec) => assert(plan.asInstanceOf[UnaryExecNode].child.isInstanceOf[AdaptiveSparkPlanExec]) - noLocalReader = collect(plan) { - case exec: CustomShuffleReaderExec if exec.isLocalReader => exec + noLocalread = collect(plan) { + case exec: AQEShuffleReadExec if exec.isLocalRead => exec }.isEmpty case _ => // ignore other events } @@ -1461,32 +1461,32 @@ class AdaptiveQueryExecSuite withTable("t") { df.write.partitionBy("j").saveAsTable("t") sparkContext.listenerBus.waitUntilEmpty() - assert(noLocalReader) - noLocalReader = false + assert(noLocalread) + noLocalread = false } // Test DataSource v2 val format = classOf[NoopDataSource].getName df.write.format(format).mode("overwrite").save() sparkContext.listenerBus.waitUntilEmpty() - assert(noLocalReader) - noLocalReader = false + assert(noLocalread) + noLocalread = false spark.listenerManager.unregister(listener) } } - test("SPARK-33494: Do not use local shuffle reader for repartition") { + test("SPARK-33494: Do not use local shuffle read for repartition") { withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") { val df = spark.table("testData").repartition('key) df.collect() - // local shuffle reader breaks partitioning and shouldn't be used for repartition operation + // local shuffle read breaks partitioning and shouldn't be used for repartition operation // which is specified by users. - checkNumLocalShuffleReaders(df.queryExecution.executedPlan, numShufflesWithoutLocalReader = 1) + checkNumLocalShuffleReads(df.queryExecution.executedPlan, numShufflesWithoutLocalRead = 1) } } - test("SPARK-33551: Do not use custom shuffle reader for repartition") { + test("SPARK-33551: Do not use AQE shuffle read for repartition") { def hasRepartitionShuffle(plan: SparkPlan): Boolean = { find(plan) { case s: ShuffleExchangeLike => @@ -1515,11 +1515,11 @@ class AdaptiveQueryExecSuite assert(!hasRepartitionShuffle(plan)) val bhj = findTopLevelBroadcastHashJoin(plan) assert(bhj.length == 1) - checkNumLocalShuffleReaders(plan, 1) + checkNumLocalShuffleReads(plan, 1) // Probe side is coalesced. - val customReader = bhj.head.right.find(_.isInstanceOf[CustomShuffleReaderExec]) - assert(customReader.isDefined) - assert(customReader.get.asInstanceOf[CustomShuffleReaderExec].hasCoalescedPartition) + val aqeRead = bhj.head.right.find(_.isInstanceOf[AQEShuffleReadExec]) + assert(aqeRead.isDefined) + assert(aqeRead.get.asInstanceOf[AQEShuffleReadExec].hasCoalescedPartition) // Repartition with partition default num specified. val dfRepartitionWithNum = df.repartition(5, 'b) @@ -1529,23 +1529,23 @@ class AdaptiveQueryExecSuite assert(hasRepartitionShuffle(planWithNum)) val bhjWithNum = findTopLevelBroadcastHashJoin(planWithNum) assert(bhjWithNum.length == 1) - checkNumLocalShuffleReaders(planWithNum, 1) + checkNumLocalShuffleReads(planWithNum, 1) // Probe side is coalesced. - assert(bhjWithNum.head.right.find(_.isInstanceOf[CustomShuffleReaderExec]).nonEmpty) + assert(bhjWithNum.head.right.find(_.isInstanceOf[AQEShuffleReadExec]).nonEmpty) // Repartition with partition non-default num specified. val dfRepartitionWithNum2 = df.repartition(3, 'b) dfRepartitionWithNum2.collect() val planWithNum2 = dfRepartitionWithNum2.queryExecution.executedPlan // The top shuffle from repartition is not optimized out, and this is the only shuffle that - // does not have local shuffle reader. + // does not have local shuffle read. assert(hasRepartitionShuffle(planWithNum2)) val bhjWithNum2 = findTopLevelBroadcastHashJoin(planWithNum2) assert(bhjWithNum2.length == 1) - checkNumLocalShuffleReaders(planWithNum2, 1) - val customReader2 = bhjWithNum2.head.right.find(_.isInstanceOf[CustomShuffleReaderExec]) - assert(customReader2.isDefined) - assert(customReader2.get.asInstanceOf[CustomShuffleReaderExec].isLocalReader) + checkNumLocalShuffleReads(planWithNum2, 1) + val aqeRead2 = bhjWithNum2.head.right.find(_.isInstanceOf[AQEShuffleReadExec]) + assert(aqeRead2.isDefined) + assert(aqeRead2.get.asInstanceOf[AQEShuffleReadExec].isLocalRead) } // Force skew join @@ -1565,10 +1565,10 @@ class AdaptiveQueryExecSuite // No skew join due to the repartition. assert(!smj.head.isSkewJoin) // Both sides are coalesced. - val customReaders = collect(smj.head) { - case c: CustomShuffleReaderExec if c.hasCoalescedPartition => c + val aqeReads = collect(smj.head) { + case c: AQEShuffleReadExec if c.hasCoalescedPartition => c } - assert(customReaders.length == 2) + assert(aqeReads.length == 2) // Repartition with default partition num specified. val dfRepartitionWithNum = df.repartition(5, 'b) @@ -1580,10 +1580,10 @@ class AdaptiveQueryExecSuite assert(smjWithNum.length == 1) // Skew join can apply as the repartition is not optimized out. assert(smjWithNum.head.isSkewJoin) - val customReadersWithNum = collect(smjWithNum.head) { - case c: CustomShuffleReaderExec => c + val aqeReadsWithNum = collect(smjWithNum.head) { + case c: AQEShuffleReadExec => c } - assert(customReadersWithNum.nonEmpty) + assert(aqeReadsWithNum.nonEmpty) // Repartition with default non-partition num specified. val dfRepartitionWithNum2 = df.repartition(3, 'b) @@ -1660,7 +1660,7 @@ class AdaptiveQueryExecSuite ds.collect() val plan = ds.queryExecution.executedPlan assert(collect(plan) { - case c: CustomShuffleReaderExec => c + case c: AQEShuffleReadExec => c }.isEmpty) assert(collect(plan) { case s: ShuffleExchangeExec if s.shuffleOrigin == origin && s.numPartitions == 2 => s @@ -1691,7 +1691,7 @@ class AdaptiveQueryExecSuite val (_, adaptive) = runAdaptiveAndVerifyResult("SELECT c1, count(*) FROM t GROUP BY c1") assert( collect(adaptive) { - case c @ CustomShuffleReaderExec(_, partitionSpecs) if partitionSpecs.length == 1 => + case c @ AQEShuffleReadExec(_, partitionSpecs) if partitionSpecs.length == 1 => assert(c.hasCoalescedPartition) c }.length == 1 @@ -1793,30 +1793,30 @@ class AdaptiveQueryExecSuite val query = s"SELECT /*+ $repartition */ * FROM testData" val (_, adaptivePlan) = runAdaptiveAndVerifyResult(query) collect(adaptivePlan) { - case r: CustomShuffleReaderExec => r + case r: AQEShuffleReadExec => r } match { - case Seq(customShuffleReader) => - assert(customShuffleReader.partitionSpecs.size === 1) - assert(!customShuffleReader.isLocalReader) + case Seq(aqeShuffleRead) => + assert(aqeShuffleRead.partitionSpecs.size === 1) + assert(!aqeShuffleRead.isLocalRead) case _ => - fail("There should be a CustomShuffleReaderExec") + fail("There should be a AQEShuffleReadExec") } } } } - test("SPARK-35650: Use local shuffle reader if can not coalesce number of partitions") { + test("SPARK-35650: Use local shuffle read if can not coalesce number of partitions") { withSQLConf(SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "false") { val query = "SELECT /*+ REPARTITION */ * FROM testData" val (_, adaptivePlan) = runAdaptiveAndVerifyResult(query) collect(adaptivePlan) { - case r: CustomShuffleReaderExec => r + case r: AQEShuffleReadExec => r } match { - case Seq(customShuffleReader) => - assert(customShuffleReader.partitionSpecs.size === 4) - assert(customShuffleReader.isLocalReader) + case Seq(aqeShuffleRead) => + assert(aqeShuffleRead.partitionSpecs.size === 4) + assert(aqeShuffleRead.isLocalRead) case _ => - fail("There should be a CustomShuffleReaderExec") + fail("There should be a AQEShuffleReadExec") } } } @@ -1838,13 +1838,13 @@ class AdaptiveQueryExecSuite def checkPartitionNumber( query: String, skewedPartitionNumber: Int, totalNumber: Int): Unit = { val (_, adaptive) = runAdaptiveAndVerifyResult(query) - val reader = collect(adaptive) { - case reader: CustomShuffleReaderExec => reader + val read = collect(adaptive) { + case read: AQEShuffleReadExec => read } - assert(reader.size == 1) - assert(reader.head.partitionSpecs.count(_.isInstanceOf[PartialReducerPartitionSpec]) == + assert(read.size == 1) + assert(read.head.partitionSpecs.count(_.isInstanceOf[PartialReducerPartitionSpec]) == skewedPartitionNumber) - assert(reader.head.partitionSpecs.size == totalNumber) + assert(read.head.partitionSpecs.size == totalNumber) } withSQLConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "150") { @@ -1873,11 +1873,11 @@ class AdaptiveQueryExecSuite .createOrReplaceTempView("t2") val (_, adaptive) = runAdaptiveAndVerifyResult("SELECT * FROM testData2 t1 left semi join t2 ON t1.a=t2.b") - val customReaders = collect(adaptive) { - case c: CustomShuffleReaderExec => c + val aqeReads = collect(adaptive) { + case c: AQEShuffleReadExec => c } - assert(customReaders.length == 2) - customReaders.foreach { c => + assert(aqeReads.length == 2) + aqeReads.foreach { c => val stats = c.child.asInstanceOf[QueryStageExec].getRuntimeStatistics assert(stats.sizeInBytes >= 0) assert(stats.rowCount.get >= 0) @@ -1890,12 +1890,12 @@ class AdaptiveQueryExecSuite withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") { val (_, adaptive) = runAdaptiveAndVerifyResult("SELECT sum(id) FROM RANGE(10) GROUP BY id % 3") - val coalesceReader = collect(adaptive) { - case r: CustomShuffleReaderExec if r.hasCoalescedPartition => r + val coalesceRead = collect(adaptive) { + case r: AQEShuffleReadExec if r.hasCoalescedPartition => r } - assert(coalesceReader.length == 1) + assert(coalesceRead.length == 1) // RANGE(10) is a very small dataset and AQE coalescing should produce one partition. - assert(coalesceReader.head.partitionSpecs.length == 1) + assert(coalesceRead.head.partitionSpecs.length == 1) } } @@ -1919,7 +1919,7 @@ class AdaptiveQueryExecSuite assert(smj.size == 1) val bhj = findTopLevelBroadcastHashJoin(adaptivePlan) assert(bhj.size == 1) - checkNumLocalShuffleReaders(adaptivePlan) + checkNumLocalShuffleReads(adaptivePlan) } withSQLConf(SQLConf.ADAPTIVE_CUSTOM_COST_EVALUATOR_CLASS.key -> --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org