This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new a77c9d6  [SPARK-36217][SQL] Rename CustomShuffleReader and 
OptimizeLocalShuffleReader in AQE
a77c9d6 is described below

commit a77c9d6d1726ee6cacc8bb457c4df02e3f16231b
Author: Hyukjin Kwon <gurwls...@apache.org>
AuthorDate: Mon Jul 26 22:41:54 2021 +0800

    [SPARK-36217][SQL] Rename CustomShuffleReader and 
OptimizeLocalShuffleReader in AQE
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to rename:
    
    - Rename `*Reader`/`*reader` to `*Read`/`*read` for rules and execution 
plan (user-facing doc/config name remain untouched)
      - `*ShuffleReaderExec` ->`*ShuffleReadExec`
      - `isLocalReader` -> `isLocalRead`
      - ...
    - Rename `CustomShuffle*` prefix to `AQEShuffle*`
    - Rename `OptimizeLocalShuffleReader` rule to `OptimizeShuffleWithLocalRead`
    
    ### Why are the changes needed?
    
    There are multiple problems in the current naming:
    
    - `CustomShuffle*` -> `AQEShuffle*`
        it sounds like it is a pluggable API. However, this is actually only 
used by AQE.
    - `OptimizeLocalShuffleReader` -> `OptimizeShuffleWithLocalRead`
        it is the name of a rule but it can be misread as a reader, which is 
counterintuative
    - `*ReaderExec` -> `*ReadExec`
        Reader execution reads a bit odd. It should better be read execution 
(like `ScanExec`, `ProjectExec` and `FilterExec`). I can't find the reason to 
name it with something that performs an action. See also the generated plans:
    
        Before:
    
        ```
        ...
        * HashAggregate (12)
           +- CustomShuffleReader (11)
              +- ShuffleQueryStage (10)
                 +- Exchange (9)
        ...
        ```
    
        After:
    
        ```
        ...
        * HashAggregate (12)
           +- AQEShuffleRead (11)
              +- ShuffleQueryStage (10)
                 +- Exchange (9)
        ..
        ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    No, internal refactoring.
    
    ### How was this patch tested?
    
    Existing unittests should cover the changes.
    
    Closes #33429 from HyukjinKwon/SPARK-36217.
    
    Authored-by: Hyukjin Kwon <gurwls...@apache.org>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
    (cherry picked from commit 6e3d404cec0ab741bee21553268b94184055aa5a)
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 ...ustomShuffledRDD.scala => AQEShuffledRDD.scala} |   8 +-
 .../spark/scheduler/AdaptiveSchedulingSuite.scala  |   8 +-
 ...leReaderExec.scala => AQEShuffleReadExec.scala} |  20 +-
 ...leReaderRule.scala => AQEShuffleReadRule.scala} |   4 +-
 .../execution/adaptive/AdaptiveSparkPlanExec.scala |  12 +-
 .../adaptive/CoalesceShufflePartitions.scala       |  12 +-
 ...er.scala => OptimizeShuffleWithLocalRead.scala} |  50 ++--
 .../OptimizeSkewInRebalancePartitions.scala        |   4 +-
 .../execution/adaptive/OptimizeSkewedJoin.scala    |  10 +-
 .../execution/adaptive/ShufflePartitionsUtil.scala |   2 +-
 .../execution/exchange/ShuffleExchangeExec.scala   |   4 +-
 .../scala/org/apache/spark/sql/ExplainSuite.scala  |   4 +-
 .../execution/CoalesceShufflePartitionsSuite.scala |  76 +++---
 .../adaptive/AdaptiveQueryExecSuite.scala          | 294 ++++++++++-----------
 14 files changed, 254 insertions(+), 254 deletions(-)

diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/CustomShuffledRDD.scala 
b/core/src/test/scala/org/apache/spark/scheduler/AQEShuffledRDD.scala
similarity index 95%
rename from 
core/src/test/scala/org/apache/spark/scheduler/CustomShuffledRDD.scala
rename to core/src/test/scala/org/apache/spark/scheduler/AQEShuffledRDD.scala
index 46e5e6f..ae5e0e9 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/CustomShuffledRDD.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/AQEShuffledRDD.scala
@@ -65,7 +65,7 @@ class CoalescedPartitioner(val parent: Partitioner, val 
partitionStartIndices: A
   }
 }
 
-private[spark] class CustomShuffledRDDPartition(
+private[spark] class AQEShuffledRDDPartition(
     val index: Int, val startIndexInParent: Int, val endIndexInParent: Int)
   extends Partition {
 
@@ -78,7 +78,7 @@ private[spark] class CustomShuffledRDDPartition(
  * A special ShuffledRDD that supports a ShuffleDependency object from outside 
and launching reduce
  * tasks that read multiple map output partitions.
  */
-class CustomShuffledRDD[K, V, C](
+class AQEShuffledRDD[K, V, C](
     var dependency: ShuffleDependency[K, V, C],
     partitionStartIndices: Array[Int])
   extends RDD[(K, C)](dependency.rdd.context, Seq(dependency)) {
@@ -98,12 +98,12 @@ class CustomShuffledRDD[K, V, C](
     Array.tabulate[Partition](partitionStartIndices.length) { i =>
       val startIndex = partitionStartIndices(i)
       val endIndex = if (i < partitionStartIndices.length - 1) 
partitionStartIndices(i + 1) else n
-      new CustomShuffledRDDPartition(i, startIndex, endIndex)
+      new AQEShuffledRDDPartition(i, startIndex, endIndex)
     }
   }
 
   override def compute(p: Partition, context: TaskContext): Iterator[(K, C)] = 
{
-    val part = p.asInstanceOf[CustomShuffledRDDPartition]
+    val part = p.asInstanceOf[AQEShuffledRDDPartition]
     val metrics = context.taskMetrics().createTempShuffleReadMetrics()
     SparkEnv.get.shuffleManager.getReader(
       dependency.shuffleHandle, part.startIndexInParent, 
part.endIndexInParent, context, metrics)
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/AdaptiveSchedulingSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/AdaptiveSchedulingSuite.scala
index e0f474aa..71d213d 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/AdaptiveSchedulingSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/AdaptiveSchedulingSuite.scala
@@ -36,7 +36,7 @@ class AdaptiveSchedulingSuite extends SparkFunSuite with 
LocalSparkContext {
         (x, x)
       }
       val dep = new ShuffleDependency[Int, Int, Int](rdd, new 
HashPartitioner(2))
-      val shuffled = new CustomShuffledRDD[Int, Int, Int](dep)
+      val shuffled = new AQEShuffledRDD[Int, Int, Int](dep)
       sc.submitMapStage(dep).get()
       assert(AdaptiveSchedulingSuiteState.tasksRun == 3)
       assert(shuffled.collect().toSet == Set((1, 1), (2, 2), (3, 3)))
@@ -50,7 +50,7 @@ class AdaptiveSchedulingSuite extends SparkFunSuite with 
LocalSparkContext {
     sc = new SparkContext("local", "test")
     val rdd = sc.parallelize(0 to 2, 3).map(x => (x, x))
     val dep = new ShuffleDependency[Int, Int, Int](rdd, new HashPartitioner(3))
-    val shuffled = new CustomShuffledRDD[Int, Int, Int](dep, Array(0, 2))
+    val shuffled = new AQEShuffledRDD[Int, Int, Int](dep, Array(0, 2))
     assert(shuffled.partitions.length === 2)
     assert(shuffled.glom().map(_.toSet).collect().toSet == Set(Set((0, 0), (1, 
1)), Set((2, 2))))
   }
@@ -60,7 +60,7 @@ class AdaptiveSchedulingSuite extends SparkFunSuite with 
LocalSparkContext {
     val rdd = sc.parallelize(0 to 2, 3).map(x => (x, x))
     // Also create lots of hash partitions so that some of them are empty
     val dep = new ShuffleDependency[Int, Int, Int](rdd, new HashPartitioner(5))
-    val shuffled = new CustomShuffledRDD[Int, Int, Int](dep, Array(0))
+    val shuffled = new AQEShuffledRDD[Int, Int, Int](dep, Array(0))
     assert(shuffled.partitions.length === 1)
     assert(shuffled.collect().toSet == Set((0, 0), (1, 1), (2, 2)))
   }
@@ -69,7 +69,7 @@ class AdaptiveSchedulingSuite extends SparkFunSuite with 
LocalSparkContext {
     sc = new SparkContext("local", "test")
     val rdd = sc.parallelize(0 to 2, 3).map(x => (x, x))
     val dep = new ShuffleDependency[Int, Int, Int](rdd, new HashPartitioner(3))
-    val shuffled = new CustomShuffledRDD[Int, Int, Int](dep, Array(0, 0, 0, 1, 
1, 1, 2))
+    val shuffled = new AQEShuffledRDD[Int, Int, Int](dep, Array(0, 0, 0, 1, 1, 
1, 2))
     assert(shuffled.partitions.length === 7)
     assert(shuffled.collect().toSet == Set((0, 0), (1, 1), (2, 2)))
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEShuffleReadExec.scala
similarity index 93%
rename from 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala
rename to 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEShuffleReadExec.scala
index b8aef14..d897507 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEShuffleReadExec.scala
@@ -37,12 +37,12 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
  * @param partitionSpecs  The partition specs that defines the arrangement, 
requires at least one
  *                        partition.
  */
-case class CustomShuffleReaderExec private(
+case class AQEShuffleReadExec private(
     child: SparkPlan,
     partitionSpecs: Seq[ShufflePartitionSpec]) extends UnaryExecNode {
-  assert(partitionSpecs.nonEmpty, "CustomShuffleReaderExec requires at least 
one partition")
+  assert(partitionSpecs.nonEmpty, s"${getClass.getSimpleName} requires at 
least one partition")
 
-  // If this reader is to read shuffle files locally, then all partition specs 
should be
+  // If this is to read shuffle files locally, then all partition specs should 
be
   // `PartialMapperPartitionSpec`.
   if (partitionSpecs.exists(_.isInstanceOf[PartialMapperPartitionSpec])) {
     assert(partitionSpecs.forall(_.isInstanceOf[PartialMapperPartitionSpec]))
@@ -52,7 +52,7 @@ case class CustomShuffleReaderExec private(
 
   override def output: Seq[Attribute] = child.output
   override lazy val outputPartitioning: Partitioning = {
-    // If it is a local shuffle reader with one mapper per task, then the 
output partitioning is
+    // If it is a local shuffle read with one mapper per task, then the output 
partitioning is
     // the same as the plan before shuffle.
     // TODO this check is based on assumptions of callers' behavior but is 
sufficient for now.
     if (partitionSpecs.forall(_.isInstanceOf[PartialMapperPartitionSpec]) &&
@@ -75,7 +75,7 @@ case class CustomShuffleReaderExec private(
   }
 
   override def stringArgs: Iterator[Any] = {
-    val desc = if (isLocalReader) {
+    val desc = if (isLocalRead) {
       "local"
     } else if (hasCoalescedPartition && hasSkewedPartition) {
       "coalesced and skewed"
@@ -104,7 +104,7 @@ case class CustomShuffleReaderExec private(
   def hasSkewedPartition: Boolean =
     partitionSpecs.exists(_.isInstanceOf[PartialReducerPartitionSpec])
 
-  def isLocalReader: Boolean =
+  def isLocalRead: Boolean =
     partitionSpecs.exists(_.isInstanceOf[PartialMapperPartitionSpec]) ||
       partitionSpecs.exists(_.isInstanceOf[CoalescedMapperPartitionSpec])
 
@@ -114,7 +114,7 @@ case class CustomShuffleReaderExec private(
   }
 
   @transient private lazy val partitionDataSizes: Option[Seq[Long]] = {
-    if (!isLocalReader && shuffleStage.get.mapStats.isDefined) {
+    if (!isLocalRead && shuffleStage.get.mapStats.isDefined) {
       Some(partitionSpecs.map {
         case p: CoalescedPartitionSpec =>
           assert(p.dataSize.isDefined)
@@ -166,8 +166,8 @@ case class CustomShuffleReaderExec private(
   @transient override lazy val metrics: Map[String, SQLMetric] = {
     if (shuffleStage.isDefined) {
       Map("numPartitions" -> SQLMetrics.createMetric(sparkContext, "number of 
partitions")) ++ {
-        if (isLocalReader) {
-          // We split the mapper partition evenly when creating local shuffle 
reader, so no
+        if (isLocalRead) {
+          // We split the mapper partition evenly when creating local shuffle 
read, so no
           // data size info is available.
           Map.empty
         } else {
@@ -208,6 +208,6 @@ case class CustomShuffleReaderExec private(
     shuffleRDD.asInstanceOf[RDD[ColumnarBatch]]
   }
 
-  override protected def withNewChildInternal(newChild: SparkPlan): 
CustomShuffleReaderExec =
+  override protected def withNewChildInternal(newChild: SparkPlan): 
AQEShuffleReadExec =
     copy(child = newChild)
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderRule.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEShuffleReadRule.scala
similarity index 88%
rename from 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderRule.scala
rename to 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEShuffleReadRule.scala
index 3004a3d..1c7f2ea 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderRule.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEShuffleReadRule.scala
@@ -22,9 +22,9 @@ import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.exchange.ShuffleOrigin
 
 /**
- * Adaptive Query Execution rule that may create [[CustomShuffleReaderExec]] 
on top of query stages.
+ * Adaptive Query Execution rule that may create [[AQEShuffleReadExec]] on top 
of query stages.
  */
-trait CustomShuffleReaderRule extends Rule[SparkPlan] {
+trait AQEShuffleReadRule extends Rule[SparkPlan] {
 
   /**
    * Returns the list of [[ShuffleOrigin]]s supported by this rule.
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
index 93beef8b..2f6619d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
@@ -97,13 +97,13 @@ case class AdaptiveSparkPlanExec(
   @transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq(
     PlanAdaptiveDynamicPruningFilters(this),
     ReuseAdaptiveSubquery(context.subqueryCache),
-    // Skew join does not handle `CustomShuffleReader` so needs to be applied 
first.
+    // Skew join does not handle `AQEShuffleRead` so needs to be applied first.
     OptimizeSkewedJoin,
     OptimizeSkewInRebalancePartitions,
     CoalesceShufflePartitions(context.session),
-    // `OptimizeLocalShuffleReader` needs to make use of 
'CustomShuffleReaderExec.partitionSpecs'
+    // `OptimizeShuffleWithLocalRead` needs to make use of 
'AQEShuffleReadExec.partitionSpecs'
     // added by `CoalesceShufflePartitions`, and must be executed after it.
-    OptimizeLocalShuffleReader
+    OptimizeShuffleWithLocalRead
   )
 
   // A list of physical optimizer rules to be applied right after a new stage 
is created. The input
@@ -116,7 +116,7 @@ case class AdaptiveSparkPlanExec(
   // The partitioning of the query output depends on the shuffle(s) in the 
final stage. If the
   // original plan contains a repartition operator, we need to preserve the 
specified partitioning,
   // whether or not the repartition-introduced shuffle is optimized out 
because of an underlying
-  // shuffle of the same partitioning. Thus, we need to exclude some 
`CustomShuffleReaderRule`s
+  // shuffle of the same partitioning. Thus, we need to exclude some 
`AQEShuffleReadRule`s
   // from the final stage, depending on the presence and properties of 
repartition operators.
   private def finalStageOptimizerRules: Seq[Rule[SparkPlan]] = {
     val origins = inputPlan.collect {
@@ -124,7 +124,7 @@ case class AdaptiveSparkPlanExec(
     }
     val allRules = queryStageOptimizerRules ++ postStageCreationRules
     allRules.filter {
-      case c: CustomShuffleReaderRule =>
+      case c: AQEShuffleReadRule =>
         origins.forall(c.supportedShuffleOrigins.contains)
       case _ => true
     }
@@ -134,7 +134,7 @@ case class AdaptiveSparkPlanExec(
     val optimized = rules.foldLeft(plan) { case (latestPlan, rule) =>
       val applied = rule.apply(latestPlan)
       val result = rule match {
-        case c: CustomShuffleReaderRule if c.mayAddExtraShuffles =>
+        case c: AQEShuffleReadRule if c.mayAddExtraShuffles =>
           if (ValidateRequirements.validate(applied)) {
             applied
           } else {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
index 0f16675..7f3e453 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.internal.SQLConf
  * A rule to coalesce the shuffle partitions based on the map output 
statistics, which can
  * avoid many small reduce tasks that hurt performance.
  */
-case class CoalesceShufflePartitions(session: SparkSession) extends 
CustomShuffleReaderRule {
+case class CoalesceShufflePartitions(session: SparkSession) extends 
AQEShuffleReadRule {
 
   override val supportedShuffleOrigins: Seq[ShuffleOrigin] =
     Seq(ENSURE_REQUIREMENTS, REPARTITION_BY_COL, REBALANCE_PARTITIONS_BY_NONE,
@@ -88,23 +88,23 @@ case class CoalesceShufflePartitions(session: SparkSession) 
extends CustomShuffl
         val specsMap = shuffleStageInfos.zip(newPartitionSpecs).map { case 
(stageInfo, partSpecs) =>
           (stageInfo.shuffleStage.id, partSpecs)
         }.toMap
-        updateShuffleReaders(plan, specsMap)
+        updateShuffleReads(plan, specsMap)
       } else {
         plan
       }
     }
   }
 
-  private def updateShuffleReaders(
+  private def updateShuffleReads(
       plan: SparkPlan, specsMap: Map[Int, Seq[ShufflePartitionSpec]]): 
SparkPlan = plan match {
     // Even for shuffle exchange whose input RDD has 0 partition, we should 
still update its
     // `partitionStartIndices`, so that all the leaf shuffles in a stage have 
the same
     // number of output partitions.
     case ShuffleStageInfo(stage, _) =>
       specsMap.get(stage.id).map { specs =>
-        CustomShuffleReaderExec(stage, specs)
+        AQEShuffleReadExec(stage, specs)
       }.getOrElse(plan)
-    case other => other.mapChildren(updateShuffleReaders(_, specsMap))
+    case other => other.mapChildren(updateShuffleReads(_, specsMap))
   }
 
   private def supportCoalesce(s: ShuffleExchangeLike): Boolean = {
@@ -121,7 +121,7 @@ private object ShuffleStageInfo {
   : Option[(ShuffleQueryStageExec, Option[Seq[ShufflePartitionSpec]])] = plan 
match {
     case stage: ShuffleQueryStageExec =>
       Some((stage, None))
-    case CustomShuffleReaderExec(s: ShuffleQueryStageExec, partitionSpecs) =>
+    case AQEShuffleReadExec(s: ShuffleQueryStageExec, partitionSpecs) =>
       Some((s, Some(partitionSpecs)))
     case _ => None
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeShuffleWithLocalRead.scala
similarity index 78%
rename from 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala
rename to 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeShuffleWithLocalRead.scala
index 3e3d6d6..844acbd 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeShuffleWithLocalRead.scala
@@ -25,40 +25,40 @@ import 
org.apache.spark.sql.execution.joins.BroadcastHashJoinExec
 import org.apache.spark.sql.internal.SQLConf
 
 /**
- * A rule to optimize the shuffle reader to local reader iff no additional 
shuffles
+ * A rule to optimize the shuffle read to local read iff no additional shuffles
  * will be introduced:
- * 1. if the input plan is a shuffle, add local reader directly as we can 
never introduce
+ * 1. if the input plan is a shuffle, add local read directly as we can never 
introduce
  * extra shuffles in this case.
- * 2. otherwise, add local reader to the probe side of broadcast hash join and
+ * 2. otherwise, add local read to the probe side of broadcast hash join and
  * then run `EnsureRequirements` to check whether additional shuffle 
introduced.
- * If introduced, we will revert all the local readers.
+ * If introduced, we will revert all the local reads.
  */
-object OptimizeLocalShuffleReader extends CustomShuffleReaderRule {
+object OptimizeShuffleWithLocalRead extends AQEShuffleReadRule {
 
   override val supportedShuffleOrigins: Seq[ShuffleOrigin] =
     Seq(ENSURE_REQUIREMENTS, REBALANCE_PARTITIONS_BY_NONE)
 
   override def mayAddExtraShuffles: Boolean = true
 
-  // The build side is a broadcast query stage which should have been 
optimized using local reader
+  // The build side is a broadcast query stage which should have been 
optimized using local read
   // already. So we only need to deal with probe side here.
-  private def createProbeSideLocalReader(plan: SparkPlan): SparkPlan = {
+  private def createProbeSideLocalRead(plan: SparkPlan): SparkPlan = {
     plan.transformDown {
       case join @ BroadcastJoinWithShuffleLeft(shuffleStage, BuildRight) =>
-        val localReader = createLocalReader(shuffleStage)
-        join.asInstanceOf[BroadcastHashJoinExec].copy(left = localReader)
+        val localRead = createLocalRead(shuffleStage)
+        join.asInstanceOf[BroadcastHashJoinExec].copy(left = localRead)
       case join @ BroadcastJoinWithShuffleRight(shuffleStage, BuildLeft) =>
-        val localReader = createLocalReader(shuffleStage)
-        join.asInstanceOf[BroadcastHashJoinExec].copy(right = localReader)
+        val localRead = createLocalRead(shuffleStage)
+        join.asInstanceOf[BroadcastHashJoinExec].copy(right = localRead)
     }
   }
 
-  private def createLocalReader(plan: SparkPlan): CustomShuffleReaderExec = {
+  private def createLocalRead(plan: SparkPlan): AQEShuffleReadExec = {
     plan match {
-      case c @ CustomShuffleReaderExec(s: ShuffleQueryStageExec, _) =>
-        CustomShuffleReaderExec(s, getPartitionSpecs(s, 
Some(c.partitionSpecs.length)))
+      case c @ AQEShuffleReadExec(s: ShuffleQueryStageExec, _) =>
+        AQEShuffleReadExec(s, getPartitionSpecs(s, 
Some(c.partitionSpecs.length)))
       case s: ShuffleQueryStageExec =>
-        CustomShuffleReaderExec(s, getPartitionSpecs(s, None))
+        AQEShuffleReadExec(s, getPartitionSpecs(s, None))
     }
   }
 
@@ -111,16 +111,16 @@ object OptimizeLocalShuffleReader extends 
CustomShuffleReaderRule {
     }
 
     plan match {
-      case s: SparkPlan if canUseLocalShuffleReader(s) =>
-        createLocalReader(s)
+      case s: SparkPlan if canUseLocalShuffleRead(s) =>
+        createLocalRead(s)
       case s: SparkPlan =>
-        createProbeSideLocalReader(s)
+        createProbeSideLocalRead(s)
     }
   }
 
   object BroadcastJoinWithShuffleLeft {
     def unapply(plan: SparkPlan): Option[(SparkPlan, BuildSide)] = plan match {
-      case join: BroadcastHashJoinExec if canUseLocalShuffleReader(join.left) 
=>
+      case join: BroadcastHashJoinExec if canUseLocalShuffleRead(join.left) =>
         Some((join.left, join.buildSide))
       case _ => None
     }
@@ -128,22 +128,22 @@ object OptimizeLocalShuffleReader extends 
CustomShuffleReaderRule {
 
   object BroadcastJoinWithShuffleRight {
     def unapply(plan: SparkPlan): Option[(SparkPlan, BuildSide)] = plan match {
-      case join: BroadcastHashJoinExec if canUseLocalShuffleReader(join.right) 
=>
+      case join: BroadcastHashJoinExec if canUseLocalShuffleRead(join.right) =>
         Some((join.right, join.buildSide))
       case _ => None
     }
   }
 
-  def canUseLocalShuffleReader(plan: SparkPlan): Boolean = plan match {
+  def canUseLocalShuffleRead(plan: SparkPlan): Boolean = plan match {
     case s: ShuffleQueryStageExec =>
-      s.mapStats.isDefined && supportLocalReader(s.shuffle)
-    case CustomShuffleReaderExec(s: ShuffleQueryStageExec, _) =>
-      s.mapStats.isDefined && supportLocalReader(s.shuffle) &&
+      s.mapStats.isDefined && supportLocalRead(s.shuffle)
+    case AQEShuffleReadExec(s: ShuffleQueryStageExec, _) =>
+      s.mapStats.isDefined && supportLocalRead(s.shuffle) &&
         s.shuffle.shuffleOrigin == ENSURE_REQUIREMENTS
     case _ => false
   }
 
-  private def supportLocalReader(s: ShuffleExchangeLike): Boolean = {
+  private def supportLocalRead(s: ShuffleExchangeLike): Boolean = {
     s.outputPartitioning != SinglePartition && 
supportedShuffleOrigins.contains(s.shuffleOrigin)
   }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewInRebalancePartitions.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewInRebalancePartitions.scala
index d8a198f..dc437403 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewInRebalancePartitions.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewInRebalancePartitions.scala
@@ -37,7 +37,7 @@ import org.apache.spark.sql.internal.SQLConf
  * Note that, this rule is only applied with the SparkPlan whose top-level 
node is
  * ShuffleQueryStageExec.
  */
-object OptimizeSkewInRebalancePartitions extends CustomShuffleReaderRule {
+object OptimizeSkewInRebalancePartitions extends AQEShuffleReadRule {
   override def supportedShuffleOrigins: Seq[ShuffleOrigin] =
     Seq(REBALANCE_PARTITIONS_BY_NONE, REBALANCE_PARTITIONS_BY_COL)
 
@@ -82,7 +82,7 @@ object OptimizeSkewInRebalancePartitions extends 
CustomShuffleReaderRule {
     if (newPartitionsSpec.length == mapStats.get.bytesByPartitionId.length) {
       shuffle
     } else {
-      CustomShuffleReaderExec(shuffle, newPartitionsSpec)
+      AQEShuffleReadExec(shuffle, newPartitionsSpec)
     }
   }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
index 810084a..fbfbce6 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
@@ -48,7 +48,7 @@ import org.apache.spark.sql.internal.SQLConf
  * (L3, R3-1), (L3, R3-2),
  * (L4-1, R4-1), (L4-2, R4-1), (L4-1, R4-2), (L4-2, R4-2)
  */
-object OptimizeSkewedJoin extends CustomShuffleReaderRule {
+object OptimizeSkewedJoin extends AQEShuffleReadRule {
 
   override val supportedShuffleOrigins: Seq[ShuffleOrigin] = 
Seq(ENSURE_REQUIREMENTS)
 
@@ -110,9 +110,9 @@ object OptimizeSkewedJoin extends CustomShuffleReaderRule {
    * 2. Assuming partition0 is skewed in left side, and it has 5 mappers 
(Map0, Map1...Map4).
    *    And we may split the 5 Mappers into 3 mapper ranges [(Map0, Map1), 
(Map2, Map3), (Map4)]
    *    based on the map size and the max split number.
-   * 3. Wrap the join left child with a special shuffle reader that reads each 
mapper range with one
+   * 3. Wrap the join left child with a special shuffle read that loads each 
mapper range with one
    *    task, so total 3 tasks.
-   * 4. Wrap the join right child with a special shuffle reader that reads 
partition0 3 times by
+   * 4. Wrap the join right child with a special shuffle read that loads 
partition0 3 times by
    *    3 tasks separately.
    */
   private def tryOptimizeJoinChildren(
@@ -196,8 +196,8 @@ object OptimizeSkewedJoin extends CustomShuffleReaderRule {
     }
     logDebug(s"number of skewed partitions: left $numSkewedLeft, right 
$numSkewedRight")
     if (numSkewedLeft > 0 || numSkewedRight > 0) {
-      Some((CustomShuffleReaderExec(left, leftSidePartitions.toSeq),
-        CustomShuffleReaderExec(right, rightSidePartitions.toSeq)))
+      Some((AQEShuffleReadExec(left, leftSidePartitions.toSeq),
+        AQEShuffleReadExec(right, rightSidePartitions.toSeq)))
     } else {
       None
     }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
index 4ef7d33..64f89b9 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
@@ -86,7 +86,7 @@ object ShufflePartitionsUtil extends Logging {
     // we should skip it when calculating the `partitionStartIndices`.
     val validMetrics = mapOutputStatistics.flatten
     val numShuffles = mapOutputStatistics.length
-    // If all input RDDs have 0 partition, we create an empty partition for 
every shuffle reader.
+    // If all input RDDs have 0 partition, we create an empty partition for 
every shuffle read.
     if (validMetrics.isEmpty) {
       return Seq.fill(numShuffles)(Seq(CoalescedPartitionSpec(0, 0, 0)))
     }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
index e8cf768..5a45af6 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
@@ -99,13 +99,13 @@ case object REPARTITION_BY_NUM extends ShuffleOrigin
 
 // Indicates that the shuffle operator was added by the user-specified 
rebalance operator.
 // Spark will try to rebalance partitions that make per-partition size not too 
small and not
-// too big. Local shuffle reader will be used if possible to reduce network 
traffic.
+// too big. Local shuffle read will be used if possible to reduce network 
traffic.
 case object REBALANCE_PARTITIONS_BY_NONE extends ShuffleOrigin
 
 // Indicates that the shuffle operator was added by the user-specified 
rebalance operator with
 // columns. Spark will try to rebalance partitions that make per-partition 
size not too small and
 // not too big.
-// Different from `REBALANCE_PARTITIONS_BY_NONE`, local shuffle reader cannot 
be used for it as
+// Different from `REBALANCE_PARTITIONS_BY_NONE`, local shuffle read cannot be 
used for it as
 // the output needs to be partitioned by the given columns.
 case object REBALANCE_PARTITIONS_BY_COL extends ShuffleOrigin
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
index 1e99347..fbbdd42 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
@@ -536,7 +536,7 @@ class ExplainSuiteAE extends ExplainSuiteHelper with 
EnableAdaptiveExecutionSuit
     // AdaptiveSparkPlan (21)
     // +- == Final Plan ==
     //    * HashAggregate (12)
-    //    +- CustomShuffleReader (11)
+    //    +- AQEShuffleRead (11)
     //       +- ShuffleQueryStage (10)
     //          +- Exchange (9)
     //             +- * HashAggregate (8)
@@ -570,7 +570,7 @@ class ExplainSuiteAE extends ExplainSuiteHelper with 
EnableAdaptiveExecutionSuit
         |Output [5]: [k#x, count#xL, sum#xL, sum#x, count#xL]
         |Arguments: 1""".stripMargin,
       """
-        |(11) CustomShuffleReader
+        |(11) AQEShuffleRead
         |Input [5]: [k#x, count#xL, sum#xL, sum#x, count#xL]
         |""".stripMargin,
       """
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala
index fbae929..2a28517 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala
@@ -24,7 +24,7 @@ import org.apache.spark.internal.config.IO_ENCRYPTION_ENABLED
 import org.apache.spark.internal.config.UI.UI_ENABLED
 import org.apache.spark.sql._
 import org.apache.spark.sql.execution.adaptive._
-import org.apache.spark.sql.execution.adaptive.CustomShuffleReaderExec
+import org.apache.spark.sql.execution.adaptive.AQEShuffleReadExec
 import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
@@ -110,18 +110,18 @@ class CoalesceShufflePartitionsSuite extends 
SparkFunSuite with BeforeAndAfterAl
         // by the ExchangeCoordinator.
         val finalPlan = agg.queryExecution.executedPlan
           .asInstanceOf[AdaptiveSparkPlanExec].executedPlan
-        val shuffleReaders = finalPlan.collect {
-          case r @ CoalescedShuffleReader() => r
+        val shuffleReads = finalPlan.collect {
+          case r @ CoalescedShuffleRead() => r
         }
 
         minNumPostShufflePartitions match {
           case Some(numPartitions) =>
-            assert(shuffleReaders.isEmpty)
+            assert(shuffleReads.isEmpty)
 
           case None =>
-            assert(shuffleReaders.length === 1)
-            shuffleReaders.foreach { reader =>
-              assert(reader.outputPartitioning.numPartitions === 3)
+            assert(shuffleReads.length === 1)
+            shuffleReads.foreach { read =>
+              assert(read.outputPartitioning.numPartitions === 3)
             }
         }
       }
@@ -156,18 +156,18 @@ class CoalesceShufflePartitionsSuite extends 
SparkFunSuite with BeforeAndAfterAl
         // by the ExchangeCoordinator.
         val finalPlan = join.queryExecution.executedPlan
           .asInstanceOf[AdaptiveSparkPlanExec].executedPlan
-        val shuffleReaders = finalPlan.collect {
-          case r @ CoalescedShuffleReader() => r
+        val shuffleReads = finalPlan.collect {
+          case r @ CoalescedShuffleRead() => r
         }
 
         minNumPostShufflePartitions match {
           case Some(numPartitions) =>
-            assert(shuffleReaders.isEmpty)
+            assert(shuffleReads.isEmpty)
 
           case None =>
-            assert(shuffleReaders.length === 2)
-            shuffleReaders.foreach { reader =>
-              assert(reader.outputPartitioning.numPartitions === 2)
+            assert(shuffleReads.length === 2)
+            shuffleReads.foreach { read =>
+              assert(read.outputPartitioning.numPartitions === 2)
             }
         }
       }
@@ -207,18 +207,18 @@ class CoalesceShufflePartitionsSuite extends 
SparkFunSuite with BeforeAndAfterAl
         // by the ExchangeCoordinator.
         val finalPlan = join.queryExecution.executedPlan
           .asInstanceOf[AdaptiveSparkPlanExec].executedPlan
-        val shuffleReaders = finalPlan.collect {
-          case r @ CoalescedShuffleReader() => r
+        val shuffleReads = finalPlan.collect {
+          case r @ CoalescedShuffleRead() => r
         }
 
         minNumPostShufflePartitions match {
           case Some(numPartitions) =>
-            assert(shuffleReaders.isEmpty)
+            assert(shuffleReads.isEmpty)
 
           case None =>
-            assert(shuffleReaders.length === 2)
-            shuffleReaders.foreach { reader =>
-              assert(reader.outputPartitioning.numPartitions === 2)
+            assert(shuffleReads.length === 2)
+            shuffleReads.foreach { read =>
+              assert(read.outputPartitioning.numPartitions === 2)
             }
         }
       }
@@ -258,18 +258,18 @@ class CoalesceShufflePartitionsSuite extends 
SparkFunSuite with BeforeAndAfterAl
         // by the ExchangeCoordinator.
         val finalPlan = join.queryExecution.executedPlan
           .asInstanceOf[AdaptiveSparkPlanExec].executedPlan
-        val shuffleReaders = finalPlan.collect {
-          case r @ CoalescedShuffleReader() => r
+        val shuffleReads = finalPlan.collect {
+          case r @ CoalescedShuffleRead() => r
         }
 
         minNumPostShufflePartitions match {
           case Some(numPartitions) =>
-            assert(shuffleReaders.isEmpty)
+            assert(shuffleReads.isEmpty)
 
           case None =>
-            assert(shuffleReaders.length === 2)
-            shuffleReaders.foreach { reader =>
-              assert(reader.outputPartitioning.numPartitions === 3)
+            assert(shuffleReads.length === 2)
+            shuffleReads.foreach { read =>
+              assert(read.outputPartitioning.numPartitions === 3)
             }
         }
       }
@@ -300,10 +300,10 @@ class CoalesceShufflePartitionsSuite extends 
SparkFunSuite with BeforeAndAfterAl
           // Then, let's make sure we do not reduce number of post shuffle 
partitions.
           val finalPlan = join.queryExecution.executedPlan
             .asInstanceOf[AdaptiveSparkPlanExec].executedPlan
-          val shuffleReaders = finalPlan.collect {
-            case r @ CoalescedShuffleReader() => r
+          val shuffleReads = finalPlan.collect {
+            case r @ CoalescedShuffleRead() => r
           }
-          assert(shuffleReaders.length === 0)
+          assert(shuffleReads.length === 0)
         } finally {
           spark.sql("drop table t")
         }
@@ -331,7 +331,7 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite 
with BeforeAndAfterAl
       }.length == 2)
       assert(
         finalPlan.collect {
-          case r @ CoalescedShuffleReader() => r
+          case r @ CoalescedShuffleRead() => r
         }.length == 3)
 
 
@@ -357,14 +357,14 @@ class CoalesceShufflePartitionsSuite extends 
SparkFunSuite with BeforeAndAfterAl
 
       assert(
         finalPlan2.collect {
-          case r @ CoalescedShuffleReader() => r
+          case r @ CoalescedShuffleRead() => r
         }.length == 2, "finalPlan2")
 
       level1Stages.foreach(qs =>
         assert(qs.plan.collect {
-          case r @ CoalescedShuffleReader() => r
+          case r @ CoalescedShuffleRead() => r
         }.length == 1,
-          "Wrong CoalescedShuffleReader below " + qs.simpleString(3)))
+          "Wrong CoalescedShuffleRead below " + qs.simpleString(3)))
 
       val leafStages = level1Stages.flatMap { stage =>
         // All of the child stages of result stage have only one child stage.
@@ -395,7 +395,7 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite 
with BeforeAndAfterAl
         .asInstanceOf[AdaptiveSparkPlanExec].executedPlan
       assert(
         finalPlan.collect {
-          case r @ CoalescedShuffleReader() => r
+          case r @ CoalescedShuffleRead() => r
         }.isEmpty)
     }
     withSparkSession(test, 200, None)
@@ -416,7 +416,7 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite 
with BeforeAndAfterAl
       // the shuffle partition numbers.
       assert(
         finalPlan.collect {
-          case r @ CoalescedShuffleReader() => r
+          case r @ CoalescedShuffleRead() => r
         }.isEmpty)
     }
     withSparkSession(test, 100, None)
@@ -432,7 +432,7 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite 
with BeforeAndAfterAl
         .asInstanceOf[AdaptiveSparkPlanExec].executedPlan
       assert(
         finalPlan.collect {
-          case r @ CoalescedShuffleReader() => r
+          case r @ CoalescedShuffleRead() => r
         }.isDefinedAt(0))
     }
     Seq(true, false).foreach { enableIOEncryption =>
@@ -442,8 +442,8 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite 
with BeforeAndAfterAl
   }
 }
 
-object CoalescedShuffleReader {
-  def unapply(reader: CustomShuffleReaderExec): Boolean = {
-    !reader.isLocalReader && !reader.hasSkewedPartition && 
reader.hasCoalescedPartition
+object CoalescedShuffleRead {
+  def unapply(read: AQEShuffleReadExec): Boolean = {
+    !read.isLocalRead && !read.hasSkewedPartition && read.hasCoalescedPartition
   }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index 8a74981..46ca786 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -139,21 +139,21 @@ class AdaptiveQueryExecSuite
     }
   }
 
-  private def checkNumLocalShuffleReaders(
-      plan: SparkPlan, numShufflesWithoutLocalReader: Int = 0): Unit = {
+  private def checkNumLocalShuffleReads(
+      plan: SparkPlan, numShufflesWithoutLocalRead: Int = 0): Unit = {
     val numShuffles = collect(plan) {
       case s: ShuffleQueryStageExec => s
     }.length
 
-    val numLocalReaders = collect(plan) {
-      case reader: CustomShuffleReaderExec if reader.isLocalReader => reader
+    val numLocalReads = collect(plan) {
+      case read: AQEShuffleReadExec if read.isLocalRead => read
     }
-    numLocalReaders.foreach { r =>
+    numLocalReads.foreach { r =>
       val rdd = r.execute()
       val parts = rdd.partitions
       assert(parts.forall(rdd.preferredLocations(_).nonEmpty))
     }
-    assert(numShuffles === (numLocalReaders.length + 
numShufflesWithoutLocalReader))
+    assert(numShuffles === (numLocalReads.length + 
numShufflesWithoutLocalRead))
   }
 
   private def checkInitialPartitionNum(df: Dataset[_], numPartition: Int): 
Unit = {
@@ -177,11 +177,11 @@ class AdaptiveQueryExecSuite
       assert(smj.size == 1)
       val bhj = findTopLevelBroadcastHashJoin(adaptivePlan)
       assert(bhj.size == 1)
-      checkNumLocalShuffleReaders(adaptivePlan)
+      checkNumLocalShuffleReads(adaptivePlan)
     }
   }
 
-  test("Reuse the parallelism of CoalescedShuffleReaderExec in 
LocalShuffleReaderExec") {
+  test("Reuse the parallelism of coalesced shuffle in local shuffle read") {
     withSQLConf(
       SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
       SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80",
@@ -192,12 +192,12 @@ class AdaptiveQueryExecSuite
       assert(smj.size == 1)
       val bhj = findTopLevelBroadcastHashJoin(adaptivePlan)
       assert(bhj.size == 1)
-      val localReaders = collect(adaptivePlan) {
-        case reader: CustomShuffleReaderExec if reader.isLocalReader => reader
+      val localReads = collect(adaptivePlan) {
+        case read: AQEShuffleReadExec if read.isLocalRead => read
       }
-      assert(localReaders.length == 2)
-      val localShuffleRDD0 = 
localReaders(0).execute().asInstanceOf[ShuffledRowRDD]
-      val localShuffleRDD1 = 
localReaders(1).execute().asInstanceOf[ShuffledRowRDD]
+      assert(localReads.length == 2)
+      val localShuffleRDD0 = 
localReads(0).execute().asInstanceOf[ShuffledRowRDD]
+      val localShuffleRDD1 = 
localReads(1).execute().asInstanceOf[ShuffledRowRDD]
       // The pre-shuffle partition size is [0, 0, 0, 72, 0]
       // We exclude the 0-size partitions, so only one partition, 
advisoryParallelism = 1
       // the final parallelism is
@@ -213,7 +213,7 @@ class AdaptiveQueryExecSuite
     }
   }
 
-  test("Reuse the default parallelism in LocalShuffleReaderExec") {
+  test("Reuse the default parallelism in local shuffle read") {
     withSQLConf(
       SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
       SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80",
@@ -224,12 +224,12 @@ class AdaptiveQueryExecSuite
       assert(smj.size == 1)
       val bhj = findTopLevelBroadcastHashJoin(adaptivePlan)
       assert(bhj.size == 1)
-      val localReaders = collect(adaptivePlan) {
-        case reader: CustomShuffleReaderExec if reader.isLocalReader => reader
+      val localReads = collect(adaptivePlan) {
+        case read: AQEShuffleReadExec if read.isLocalRead => read
       }
-      assert(localReaders.length == 2)
-      val localShuffleRDD0 = 
localReaders(0).execute().asInstanceOf[ShuffledRowRDD]
-      val localShuffleRDD1 = 
localReaders(1).execute().asInstanceOf[ShuffledRowRDD]
+      assert(localReads.length == 2)
+      val localShuffleRDD0 = 
localReads(0).execute().asInstanceOf[ShuffledRowRDD]
+      val localShuffleRDD1 = 
localReads(1).execute().asInstanceOf[ShuffledRowRDD]
       // the final parallelism is math.max(1, numReduces / numMappers): 
math.max(1, 5/2) = 2
       // and the partitions length is 2 * numMappers = 4
       assert(localShuffleRDD0.getPartitions.length == 4)
@@ -252,11 +252,11 @@ class AdaptiveQueryExecSuite
         checkAnswer(testDf, Seq())
         val plan = testDf.queryExecution.executedPlan
         assert(find(plan)(_.isInstanceOf[SortMergeJoinExec]).isDefined)
-        val coalescedReaders = collect(plan) {
-          case r: CustomShuffleReaderExec => r
+        val coalescedReads = collect(plan) {
+          case r: AQEShuffleReadExec => r
         }
-        assert(coalescedReaders.length == 3)
-        coalescedReaders.foreach(r => assert(r.partitionSpecs.length == 1))
+        assert(coalescedReads.length == 3)
+        coalescedReads.foreach(r => assert(r.partitionSpecs.length == 1))
       }
 
       withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1") {
@@ -265,11 +265,11 @@ class AdaptiveQueryExecSuite
         checkAnswer(testDf, Seq())
         val plan = testDf.queryExecution.executedPlan
         assert(find(plan)(_.isInstanceOf[BroadcastHashJoinExec]).isDefined)
-        val coalescedReaders = collect(plan) {
-          case r: CustomShuffleReaderExec => r
+        val coalescedReads = collect(plan) {
+          case r: AQEShuffleReadExec => r
         }
-        assert(coalescedReaders.length == 3, s"$plan")
-        coalescedReaders.foreach(r => assert(r.isLocalReader || 
r.partitionSpecs.length == 1))
+        assert(coalescedReads.length == 3, s"$plan")
+        coalescedReads.foreach(r => assert(r.isLocalRead || 
r.partitionSpecs.length == 1))
       }
     }
   }
@@ -285,7 +285,7 @@ class AdaptiveQueryExecSuite
       assert(smj.size == 1)
       val bhj = findTopLevelBroadcastHashJoin(adaptivePlan)
       assert(bhj.size == 1)
-      checkNumLocalShuffleReaders(adaptivePlan)
+      checkNumLocalShuffleReads(adaptivePlan)
     }
   }
 
@@ -301,7 +301,7 @@ class AdaptiveQueryExecSuite
       val bhj = findTopLevelBroadcastHashJoin(adaptivePlan)
       assert(bhj.size == 1)
 
-      checkNumLocalShuffleReaders(adaptivePlan)
+      checkNumLocalShuffleReads(adaptivePlan)
     }
   }
 
@@ -342,11 +342,11 @@ class AdaptiveQueryExecSuite
       //       +-LocalShuffleReader*
       //             +- ShuffleExchange
 
-      // After applied the 'OptimizeLocalShuffleReader' rule, we can convert 
all the four
-      // shuffle reader to local shuffle reader in the bottom two 
'BroadcastHashJoin'.
+      // After applied the 'OptimizeShuffleWithLocalRead' rule, we can convert 
all the four
+      // shuffle read to local shuffle read in the bottom two 
'BroadcastHashJoin'.
       // For the top level 'BroadcastHashJoin', the probe side is not shuffle 
query stage
-      // and the build side shuffle query stage is also converted to local 
shuffle reader.
-      checkNumLocalShuffleReaders(adaptivePlan)
+      // and the build side shuffle query stage is also converted to local 
shuffle read.
+      checkNumLocalShuffleReads(adaptivePlan)
     }
   }
 
@@ -390,8 +390,8 @@ class AdaptiveQueryExecSuite
       //          +- CoalescedShuffleReader
       //             +- ShuffleExchange
 
-      // The shuffle added by Aggregate can't apply local reader.
-      checkNumLocalShuffleReaders(adaptivePlan, 1)
+      // The shuffle added by Aggregate can't apply local read.
+      checkNumLocalShuffleReads(adaptivePlan, 1)
     }
   }
 
@@ -436,8 +436,8 @@ class AdaptiveQueryExecSuite
       //       +-LocalShuffleReader*
       //           +- ShuffleExchange
 
-      // The shuffle added by Aggregate can't apply local reader.
-      checkNumLocalShuffleReaders(adaptivePlan, 1)
+      // The shuffle added by Aggregate can't apply local read.
+      checkNumLocalShuffleReads(adaptivePlan, 1)
     }
   }
 
@@ -452,9 +452,9 @@ class AdaptiveQueryExecSuite
       assert(smj.size == 3)
       val bhj = findTopLevelBroadcastHashJoin(adaptivePlan)
       assert(bhj.size == 2)
-      // There is still a SMJ, and its two shuffles can't apply local reader.
-      checkNumLocalShuffleReaders(adaptivePlan, 2)
-      // Even with local shuffle reader, the query stage reuse can also work.
+      // There is still a SMJ, and its two shuffles can't apply local read.
+      checkNumLocalShuffleReads(adaptivePlan, 2)
+      // Even with local shuffle read, the query stage reuse can also work.
       val ex = findReusedExchange(adaptivePlan)
       assert(ex.size == 1)
     }
@@ -471,8 +471,8 @@ class AdaptiveQueryExecSuite
       assert(smj.size == 1)
       val bhj = findTopLevelBroadcastHashJoin(adaptivePlan)
       assert(bhj.size == 1)
-      checkNumLocalShuffleReaders(adaptivePlan)
-      // Even with local shuffle reader, the query stage reuse can also work.
+      checkNumLocalShuffleReads(adaptivePlan)
+      // Even with local shuffle read, the query stage reuse can also work.
       val ex = findReusedExchange(adaptivePlan)
       assert(ex.size == 1)
     }
@@ -491,8 +491,8 @@ class AdaptiveQueryExecSuite
       assert(smj.size == 1)
       val bhj = findTopLevelBroadcastHashJoin(adaptivePlan)
       assert(bhj.size == 1)
-      checkNumLocalShuffleReaders(adaptivePlan)
-      // Even with local shuffle reader, the query stage reuse can also work.
+      checkNumLocalShuffleReads(adaptivePlan)
+      // Even with local shuffle read, the query stage reuse can also work.
       val ex = findReusedExchange(adaptivePlan)
       assert(ex.nonEmpty)
       val sub = findReusedSubquery(adaptivePlan)
@@ -512,8 +512,8 @@ class AdaptiveQueryExecSuite
       assert(smj.size == 1)
       val bhj = findTopLevelBroadcastHashJoin(adaptivePlan)
       assert(bhj.size == 1)
-      checkNumLocalShuffleReaders(adaptivePlan)
-      // Even with local shuffle reader, the query stage reuse can also work.
+      checkNumLocalShuffleReads(adaptivePlan)
+      // Even with local shuffle read, the query stage reuse can also work.
       val ex = findReusedExchange(adaptivePlan)
       assert(ex.isEmpty)
       val sub = findReusedSubquery(adaptivePlan)
@@ -536,8 +536,8 @@ class AdaptiveQueryExecSuite
       assert(smj.size == 1)
       val bhj = findTopLevelBroadcastHashJoin(adaptivePlan)
       assert(bhj.size == 1)
-      checkNumLocalShuffleReaders(adaptivePlan)
-      // Even with local shuffle reader, the query stage reuse can also work.
+      checkNumLocalShuffleReads(adaptivePlan)
+      // Even with local shuffle read, the query stage reuse can also work.
       val ex = findReusedExchange(adaptivePlan)
       assert(ex.nonEmpty)
       assert(ex.head.child.isInstanceOf[BroadcastExchangeExec])
@@ -599,7 +599,7 @@ class AdaptiveQueryExecSuite
     }
   }
 
-  test("Change merge join to broadcast join without local shuffle reader") {
+  test("Change merge join to broadcast join without local shuffle read") {
     withSQLConf(
       SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
       SQLConf.LOCAL_SHUFFLE_READER_ENABLED.key -> "true",
@@ -615,8 +615,8 @@ class AdaptiveQueryExecSuite
       assert(smj.size == 2)
       val bhj = findTopLevelBroadcastHashJoin(adaptivePlan)
       assert(bhj.size == 1)
-      // There is still a SMJ, and its two shuffles can't apply local reader.
-      checkNumLocalShuffleReaders(adaptivePlan, 2)
+      // There is still a SMJ, and its two shuffles can't apply local read.
+      checkNumLocalShuffleReads(adaptivePlan, 2)
     }
   }
 
@@ -734,12 +734,12 @@ class AdaptiveQueryExecSuite
               rightSkewNum: Int): Unit = {
             assert(joins.size == 1 && joins.head.isSkewJoin)
             assert(joins.head.left.collect {
-              case r: CustomShuffleReaderExec => r
+              case r: AQEShuffleReadExec => r
             }.head.partitionSpecs.collect {
               case p: PartialReducerPartitionSpec => p.reducerIndex
             }.distinct.length == leftSkewNum)
             assert(joins.head.right.collect {
-              case r: CustomShuffleReaderExec => r
+              case r: AQEShuffleReadExec => r
             }.head.partitionSpecs.collect {
               case p: PartialReducerPartitionSpec => p.reducerIndex
             }.distinct.length == rightSkewNum)
@@ -895,16 +895,16 @@ class AdaptiveQueryExecSuite
     }
   }
 
-  test("SPARK-34682: CustomShuffleReaderExec operating on canonicalized plan") 
{
+  test("SPARK-34682: AQEShuffleReadExec operating on canonicalized plan") {
     withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
       val (_, adaptivePlan) = runAdaptiveAndVerifyResult(
         "SELECT key FROM testData GROUP BY key")
-      val readers = collect(adaptivePlan) {
-        case r: CustomShuffleReaderExec => r
+      val reads = collect(adaptivePlan) {
+        case r: AQEShuffleReadExec => r
       }
-      assert(readers.length == 1)
-      val reader = readers.head
-      val c = reader.canonicalized.asInstanceOf[CustomShuffleReaderExec]
+      assert(reads.length == 1)
+      val read = reads.head
+      val c = read.canonicalized.asInstanceOf[AQEShuffleReadExec]
       // we can't just call execute() because that has separate checks for 
canonicalized plans
       val ex = intercept[IllegalStateException] {
         val doExecute = PrivateMethod[Unit](Symbol("doExecute"))
@@ -914,22 +914,22 @@ class AdaptiveQueryExecSuite
     }
   }
 
-  test("metrics of the shuffle reader") {
+  test("metrics of the shuffle read") {
     withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
       val (_, adaptivePlan) = runAdaptiveAndVerifyResult(
         "SELECT key FROM testData GROUP BY key")
-      val readers = collect(adaptivePlan) {
-        case r: CustomShuffleReaderExec => r
+      val reads = collect(adaptivePlan) {
+        case r: AQEShuffleReadExec => r
       }
-      assert(readers.length == 1)
-      val reader = readers.head
-      assert(!reader.isLocalReader)
-      assert(!reader.hasSkewedPartition)
-      assert(reader.hasCoalescedPartition)
-      assert(reader.metrics.keys.toSeq.sorted == Seq(
+      assert(reads.length == 1)
+      val read = reads.head
+      assert(!read.isLocalRead)
+      assert(!read.hasSkewedPartition)
+      assert(read.hasCoalescedPartition)
+      assert(read.metrics.keys.toSeq.sorted == Seq(
         "numPartitions", "partitionDataSize"))
-      assert(reader.metrics("numPartitions").value == 
reader.partitionSpecs.length)
-      assert(reader.metrics("partitionDataSize").value > 0)
+      assert(read.metrics("numPartitions").value == read.partitionSpecs.length)
+      assert(read.metrics("partitionDataSize").value > 0)
 
       withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80") {
         val (_, adaptivePlan) = runAdaptiveAndVerifyResult(
@@ -939,14 +939,14 @@ class AdaptiveQueryExecSuite
         }.head
         assert(join.buildSide == BuildLeft)
 
-        val readers = collect(join.right) {
-          case r: CustomShuffleReaderExec => r
+        val reads = collect(join.right) {
+          case r: AQEShuffleReadExec => r
         }
-        assert(readers.length == 1)
-        val reader = readers.head
-        assert(reader.isLocalReader)
-        assert(reader.metrics.keys.toSeq == Seq("numPartitions"))
-        assert(reader.metrics("numPartitions").value == 
reader.partitionSpecs.length)
+        assert(reads.length == 1)
+        val read = reads.head
+        assert(read.isLocalRead)
+        assert(read.metrics.keys.toSeq == Seq("numPartitions"))
+        assert(read.metrics("numPartitions").value == 
read.partitionSpecs.length)
       }
 
       withSQLConf(
@@ -972,19 +972,19 @@ class AdaptiveQueryExecSuite
             .createOrReplaceTempView("skewData2")
           val (_, adaptivePlan) = runAdaptiveAndVerifyResult(
             "SELECT * FROM skewData1 join skewData2 ON key1 = key2")
-          val readers = collect(adaptivePlan) {
-            case r: CustomShuffleReaderExec => r
+          val reads = collect(adaptivePlan) {
+            case r: AQEShuffleReadExec => r
           }
-          readers.foreach { reader =>
-            assert(!reader.isLocalReader)
-            assert(reader.hasCoalescedPartition)
-            assert(reader.hasSkewedPartition)
-            assert(reader.metrics.contains("numSkewedPartitions"))
+          reads.foreach { read =>
+            assert(!read.isLocalRead)
+            assert(read.hasCoalescedPartition)
+            assert(read.hasSkewedPartition)
+            assert(read.metrics.contains("numSkewedPartitions"))
           }
-          assert(readers(0).metrics("numSkewedPartitions").value == 2)
-          assert(readers(0).metrics("numSkewedSplits").value == 11)
-          assert(readers(1).metrics("numSkewedPartitions").value == 1)
-          assert(readers(1).metrics("numSkewedSplits").value == 9)
+          assert(reads(0).metrics("numSkewedPartitions").value == 2)
+          assert(reads(0).metrics("numSkewedSplits").value == 11)
+          assert(reads(1).metrics("numSkewedPartitions").value == 1)
+          assert(reads(1).metrics("numSkewedSplits").value == 9)
         }
       }
     }
@@ -1233,7 +1233,7 @@ class AdaptiveQueryExecSuite
       assert(bhj.size == 1)
       val join = findTopLevelBaseJoin(adaptivePlan)
       assert(join.isEmpty)
-      checkNumLocalShuffleReaders(adaptivePlan)
+      checkNumLocalShuffleReads(adaptivePlan)
     }
   }
 
@@ -1252,7 +1252,7 @@ class AdaptiveQueryExecSuite
       // this is different compares to test(SPARK-32573) due to the rule
       // `EliminateUnnecessaryJoin` has been excluded.
       assert(join.nonEmpty)
-      checkNumLocalShuffleReaders(adaptivePlan)
+      checkNumLocalShuffleReads(adaptivePlan)
     }
   }
 
@@ -1273,7 +1273,7 @@ class AdaptiveQueryExecSuite
         assert(smj.size == 1)
         val join = findTopLevelBaseJoin(adaptivePlan)
         assert(join.isEmpty)
-        checkNumLocalShuffleReaders(adaptivePlan)
+        checkNumLocalShuffleReads(adaptivePlan)
       })
     }
   }
@@ -1431,7 +1431,7 @@ class AdaptiveQueryExecSuite
     }
   }
 
-  test("SPARK-32932: Do not use local shuffle reader at final stage on write 
command") {
+  test("SPARK-32932: Do not use local shuffle read at final stage on write 
command") {
     withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key -> 
PartitionOverwriteMode.DYNAMIC.toString,
       SQLConf.SHUFFLE_PARTITIONS.key -> "5",
       SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
@@ -1441,14 +1441,14 @@ class AdaptiveQueryExecSuite
       ) yield (i, j)
 
       val df = data.toDF("i", "j").repartition($"j")
-      var noLocalReader: Boolean = false
+      var noLocalread: Boolean = false
       val listener = new QueryExecutionListener {
         override def onSuccess(funcName: String, qe: QueryExecution, 
durationNs: Long): Unit = {
           qe.executedPlan match {
             case plan@(_: DataWritingCommandExec | _: V2TableWriteExec) =>
               
assert(plan.asInstanceOf[UnaryExecNode].child.isInstanceOf[AdaptiveSparkPlanExec])
-              noLocalReader = collect(plan) {
-                case exec: CustomShuffleReaderExec if exec.isLocalReader => 
exec
+              noLocalread = collect(plan) {
+                case exec: AQEShuffleReadExec if exec.isLocalRead => exec
               }.isEmpty
             case _ => // ignore other events
           }
@@ -1461,32 +1461,32 @@ class AdaptiveQueryExecSuite
       withTable("t") {
         df.write.partitionBy("j").saveAsTable("t")
         sparkContext.listenerBus.waitUntilEmpty()
-        assert(noLocalReader)
-        noLocalReader = false
+        assert(noLocalread)
+        noLocalread = false
       }
 
       // Test DataSource v2
       val format = classOf[NoopDataSource].getName
       df.write.format(format).mode("overwrite").save()
       sparkContext.listenerBus.waitUntilEmpty()
-      assert(noLocalReader)
-      noLocalReader = false
+      assert(noLocalread)
+      noLocalread = false
 
       spark.listenerManager.unregister(listener)
     }
   }
 
-  test("SPARK-33494: Do not use local shuffle reader for repartition") {
+  test("SPARK-33494: Do not use local shuffle read for repartition") {
     withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
       val df = spark.table("testData").repartition('key)
       df.collect()
-      // local shuffle reader breaks partitioning and shouldn't be used for 
repartition operation
+      // local shuffle read breaks partitioning and shouldn't be used for 
repartition operation
       // which is specified by users.
-      checkNumLocalShuffleReaders(df.queryExecution.executedPlan, 
numShufflesWithoutLocalReader = 1)
+      checkNumLocalShuffleReads(df.queryExecution.executedPlan, 
numShufflesWithoutLocalRead = 1)
     }
   }
 
-  test("SPARK-33551: Do not use custom shuffle reader for repartition") {
+  test("SPARK-33551: Do not use AQE shuffle read for repartition") {
     def hasRepartitionShuffle(plan: SparkPlan): Boolean = {
       find(plan) {
         case s: ShuffleExchangeLike =>
@@ -1515,11 +1515,11 @@ class AdaptiveQueryExecSuite
         assert(!hasRepartitionShuffle(plan))
         val bhj = findTopLevelBroadcastHashJoin(plan)
         assert(bhj.length == 1)
-        checkNumLocalShuffleReaders(plan, 1)
+        checkNumLocalShuffleReads(plan, 1)
         // Probe side is coalesced.
-        val customReader = 
bhj.head.right.find(_.isInstanceOf[CustomShuffleReaderExec])
-        assert(customReader.isDefined)
-        
assert(customReader.get.asInstanceOf[CustomShuffleReaderExec].hasCoalescedPartition)
+        val aqeRead = bhj.head.right.find(_.isInstanceOf[AQEShuffleReadExec])
+        assert(aqeRead.isDefined)
+        
assert(aqeRead.get.asInstanceOf[AQEShuffleReadExec].hasCoalescedPartition)
 
         // Repartition with partition default num specified.
         val dfRepartitionWithNum = df.repartition(5, 'b)
@@ -1529,23 +1529,23 @@ class AdaptiveQueryExecSuite
         assert(hasRepartitionShuffle(planWithNum))
         val bhjWithNum = findTopLevelBroadcastHashJoin(planWithNum)
         assert(bhjWithNum.length == 1)
-        checkNumLocalShuffleReaders(planWithNum, 1)
+        checkNumLocalShuffleReads(planWithNum, 1)
         // Probe side is coalesced.
-        
assert(bhjWithNum.head.right.find(_.isInstanceOf[CustomShuffleReaderExec]).nonEmpty)
+        
assert(bhjWithNum.head.right.find(_.isInstanceOf[AQEShuffleReadExec]).nonEmpty)
 
         // Repartition with partition non-default num specified.
         val dfRepartitionWithNum2 = df.repartition(3, 'b)
         dfRepartitionWithNum2.collect()
         val planWithNum2 = dfRepartitionWithNum2.queryExecution.executedPlan
         // The top shuffle from repartition is not optimized out, and this is 
the only shuffle that
-        // does not have local shuffle reader.
+        // does not have local shuffle read.
         assert(hasRepartitionShuffle(planWithNum2))
         val bhjWithNum2 = findTopLevelBroadcastHashJoin(planWithNum2)
         assert(bhjWithNum2.length == 1)
-        checkNumLocalShuffleReaders(planWithNum2, 1)
-        val customReader2 = 
bhjWithNum2.head.right.find(_.isInstanceOf[CustomShuffleReaderExec])
-        assert(customReader2.isDefined)
-        
assert(customReader2.get.asInstanceOf[CustomShuffleReaderExec].isLocalReader)
+        checkNumLocalShuffleReads(planWithNum2, 1)
+        val aqeRead2 = 
bhjWithNum2.head.right.find(_.isInstanceOf[AQEShuffleReadExec])
+        assert(aqeRead2.isDefined)
+        assert(aqeRead2.get.asInstanceOf[AQEShuffleReadExec].isLocalRead)
       }
 
       // Force skew join
@@ -1565,10 +1565,10 @@ class AdaptiveQueryExecSuite
         // No skew join due to the repartition.
         assert(!smj.head.isSkewJoin)
         // Both sides are coalesced.
-        val customReaders = collect(smj.head) {
-          case c: CustomShuffleReaderExec if c.hasCoalescedPartition => c
+        val aqeReads = collect(smj.head) {
+          case c: AQEShuffleReadExec if c.hasCoalescedPartition => c
         }
-        assert(customReaders.length == 2)
+        assert(aqeReads.length == 2)
 
         // Repartition with default partition num specified.
         val dfRepartitionWithNum = df.repartition(5, 'b)
@@ -1580,10 +1580,10 @@ class AdaptiveQueryExecSuite
         assert(smjWithNum.length == 1)
         // Skew join can apply as the repartition is not optimized out.
         assert(smjWithNum.head.isSkewJoin)
-        val customReadersWithNum = collect(smjWithNum.head) {
-          case c: CustomShuffleReaderExec => c
+        val aqeReadsWithNum = collect(smjWithNum.head) {
+          case c: AQEShuffleReadExec => c
         }
-        assert(customReadersWithNum.nonEmpty)
+        assert(aqeReadsWithNum.nonEmpty)
 
         // Repartition with default non-partition num specified.
         val dfRepartitionWithNum2 = df.repartition(3, 'b)
@@ -1660,7 +1660,7 @@ class AdaptiveQueryExecSuite
       ds.collect()
       val plan = ds.queryExecution.executedPlan
       assert(collect(plan) {
-        case c: CustomShuffleReaderExec => c
+        case c: AQEShuffleReadExec => c
       }.isEmpty)
       assert(collect(plan) {
         case s: ShuffleExchangeExec if s.shuffleOrigin == origin && 
s.numPartitions == 2 => s
@@ -1691,7 +1691,7 @@ class AdaptiveQueryExecSuite
         val (_, adaptive) = runAdaptiveAndVerifyResult("SELECT c1, count(*) 
FROM t GROUP BY c1")
         assert(
           collect(adaptive) {
-            case c @ CustomShuffleReaderExec(_, partitionSpecs) if 
partitionSpecs.length == 1 =>
+            case c @ AQEShuffleReadExec(_, partitionSpecs) if 
partitionSpecs.length == 1 =>
               assert(c.hasCoalescedPartition)
               c
           }.length == 1
@@ -1793,30 +1793,30 @@ class AdaptiveQueryExecSuite
           val query = s"SELECT /*+ $repartition */ * FROM testData"
           val (_, adaptivePlan) = runAdaptiveAndVerifyResult(query)
           collect(adaptivePlan) {
-            case r: CustomShuffleReaderExec => r
+            case r: AQEShuffleReadExec => r
           } match {
-            case Seq(customShuffleReader) =>
-              assert(customShuffleReader.partitionSpecs.size === 1)
-              assert(!customShuffleReader.isLocalReader)
+            case Seq(aqeShuffleRead) =>
+              assert(aqeShuffleRead.partitionSpecs.size === 1)
+              assert(!aqeShuffleRead.isLocalRead)
             case _ =>
-              fail("There should be a CustomShuffleReaderExec")
+              fail("There should be a AQEShuffleReadExec")
           }
         }
     }
   }
 
-  test("SPARK-35650: Use local shuffle reader if can not coalesce number of 
partitions") {
+  test("SPARK-35650: Use local shuffle read if can not coalesce number of 
partitions") {
     withSQLConf(SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "false") {
       val query = "SELECT /*+ REPARTITION */ * FROM testData"
       val (_, adaptivePlan) = runAdaptiveAndVerifyResult(query)
       collect(adaptivePlan) {
-        case r: CustomShuffleReaderExec => r
+        case r: AQEShuffleReadExec => r
       } match {
-        case Seq(customShuffleReader) =>
-          assert(customShuffleReader.partitionSpecs.size === 4)
-          assert(customShuffleReader.isLocalReader)
+        case Seq(aqeShuffleRead) =>
+          assert(aqeShuffleRead.partitionSpecs.size === 4)
+          assert(aqeShuffleRead.isLocalRead)
         case _ =>
-          fail("There should be a CustomShuffleReaderExec")
+          fail("There should be a AQEShuffleReadExec")
       }
     }
   }
@@ -1838,13 +1838,13 @@ class AdaptiveQueryExecSuite
         def checkPartitionNumber(
             query: String, skewedPartitionNumber: Int, totalNumber: Int): Unit 
= {
           val (_, adaptive) = runAdaptiveAndVerifyResult(query)
-          val reader = collect(adaptive) {
-            case reader: CustomShuffleReaderExec => reader
+          val read = collect(adaptive) {
+            case read: AQEShuffleReadExec => read
           }
-          assert(reader.size == 1)
-          
assert(reader.head.partitionSpecs.count(_.isInstanceOf[PartialReducerPartitionSpec])
 ==
+          assert(read.size == 1)
+          
assert(read.head.partitionSpecs.count(_.isInstanceOf[PartialReducerPartitionSpec])
 ==
             skewedPartitionNumber)
-          assert(reader.head.partitionSpecs.size == totalNumber)
+          assert(read.head.partitionSpecs.size == totalNumber)
         }
 
         withSQLConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "150") {
@@ -1873,11 +1873,11 @@ class AdaptiveQueryExecSuite
           .createOrReplaceTempView("t2")
         val (_, adaptive) =
           runAdaptiveAndVerifyResult("SELECT * FROM testData2 t1 left semi 
join t2 ON t1.a=t2.b")
-        val customReaders = collect(adaptive) {
-          case c: CustomShuffleReaderExec => c
+        val aqeReads = collect(adaptive) {
+          case c: AQEShuffleReadExec => c
         }
-        assert(customReaders.length == 2)
-        customReaders.foreach { c =>
+        assert(aqeReads.length == 2)
+        aqeReads.foreach { c =>
           val stats = c.child.asInstanceOf[QueryStageExec].getRuntimeStatistics
           assert(stats.sizeInBytes >= 0)
           assert(stats.rowCount.get >= 0)
@@ -1890,12 +1890,12 @@ class AdaptiveQueryExecSuite
     withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
       val (_, adaptive) =
         runAdaptiveAndVerifyResult("SELECT sum(id) FROM RANGE(10) GROUP BY id 
% 3")
-      val coalesceReader = collect(adaptive) {
-        case r: CustomShuffleReaderExec if r.hasCoalescedPartition => r
+      val coalesceRead = collect(adaptive) {
+        case r: AQEShuffleReadExec if r.hasCoalescedPartition => r
       }
-      assert(coalesceReader.length == 1)
+      assert(coalesceRead.length == 1)
       // RANGE(10) is a very small dataset and AQE coalescing should produce 
one partition.
-      assert(coalesceReader.head.partitionSpecs.length == 1)
+      assert(coalesceRead.head.partitionSpecs.length == 1)
     }
   }
 
@@ -1919,7 +1919,7 @@ class AdaptiveQueryExecSuite
         assert(smj.size == 1)
         val bhj = findTopLevelBroadcastHashJoin(adaptivePlan)
         assert(bhj.size == 1)
-        checkNumLocalShuffleReaders(adaptivePlan)
+        checkNumLocalShuffleReads(adaptivePlan)
       }
 
       withSQLConf(SQLConf.ADAPTIVE_CUSTOM_COST_EVALUATOR_CLASS.key ->

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to