This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 3fb450c  [SPARK-31096][SQL] Replace `Array` with `Seq` in AQE 
`CustomShuffleReaderExec`
3fb450c is described below

commit 3fb450c0b7c4e415a29d51cd10e6be6ad8dff114
Author: maryannxue <maryann...@apache.org>
AuthorDate: Tue Mar 10 14:15:44 2020 +0800

    [SPARK-31096][SQL] Replace `Array` with `Seq` in AQE 
`CustomShuffleReaderExec`
    
    ### What changes were proposed in this pull request?
    This PR changes the type of `CustomShuffleReaderExec`'s `partitionSpecs` 
from `Array` to `Seq`, since `Array` compares references not values for 
equality, which could lead to potential plan reuse problem.
    
    ### Why are the changes needed?
    Unlike `Seq`, `Array` compares references not values for equality, which 
could lead to potential plan reuse problem.
    
    ### Does this PR introduce any user-facing change?
    No.
    
    ### How was this patch tested?
    Passes existing UTs.
    
    Closes #27857 from maryannxue/aqe-customreader-fix.
    
    Authored-by: maryannxue <maryann...@apache.org>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../adaptive/CustomShuffleReaderExec.scala         |  4 +--
 .../adaptive/OptimizeLocalShuffleReader.scala      | 10 +++---
 .../execution/adaptive/OptimizeSkewedJoin.scala    | 12 +++-----
 .../adaptive/ShufflePartitionsCoalescer.scala      |  6 ++--
 .../ShufflePartitionsCoalescerSuite.scala          | 36 +++++++++++-----------
 5 files changed, 33 insertions(+), 35 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala
index be372bb..ba3f725 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala
@@ -35,7 +35,7 @@ import 
org.apache.spark.sql.execution.exchange.{ReusedExchangeExec, ShuffleExcha
  */
 case class CustomShuffleReaderExec private(
     child: SparkPlan,
-    partitionSpecs: Array[ShufflePartitionSpec],
+    partitionSpecs: Seq[ShufflePartitionSpec],
     description: String) extends UnaryExecNode {
 
   override def output: Seq[Attribute] = child.output
@@ -71,7 +71,7 @@ case class CustomShuffleReaderExec private(
       cachedShuffleRDD = child match {
         case stage: ShuffleQueryStageExec =>
           new ShuffledRowRDD(
-            stage.shuffle.shuffleDependency, stage.shuffle.readMetrics, 
partitionSpecs)
+            stage.shuffle.shuffleDependency, stage.shuffle.readMetrics, 
partitionSpecs.toArray)
         case _ =>
           throw new IllegalStateException("operating on canonicalization plan")
       }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala
index e441763..fb6b40c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala
@@ -77,21 +77,21 @@ case class OptimizeLocalShuffleReader(conf: SQLConf) 
extends Rule[SparkPlan] {
   //       partition start indices based on block size to avoid data skew.
   private def getPartitionSpecs(
       shuffleStage: ShuffleQueryStageExec,
-      advisoryParallelism: Option[Int]): Array[ShufflePartitionSpec] = {
+      advisoryParallelism: Option[Int]): Seq[ShufflePartitionSpec] = {
     val shuffleDep = shuffleStage.shuffle.shuffleDependency
     val numReducers = shuffleDep.partitioner.numPartitions
     val expectedParallelism = advisoryParallelism.getOrElse(numReducers)
     val numMappers = shuffleDep.rdd.getNumPartitions
     val splitPoints = if (numMappers == 0) {
-      Array.empty
+      Seq.empty
     } else {
-      equallyDivide(numReducers, math.max(1, expectedParallelism / 
numMappers)).toArray
+      equallyDivide(numReducers, math.max(1, expectedParallelism / numMappers))
     }
     (0 until numMappers).flatMap { mapIndex =>
       (splitPoints :+ numReducers).sliding(2).map {
-        case Array(start, end) => PartialMapperPartitionSpec(mapIndex, start, 
end)
+        case Seq(start, end) => PartialMapperPartitionSpec(mapIndex, start, 
end)
       }
-    }.toArray
+    }
   }
 
   /**
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
index 2e8adcf..c3bcce4 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
@@ -108,7 +108,7 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends 
Rule[SparkPlan] {
   private def getMapStartIndices(
       stage: ShuffleQueryStageExec,
       partitionId: Int,
-      targetSize: Long): Array[Int] = {
+      targetSize: Long): Seq[Int] = {
     val shuffleId = stage.shuffle.shuffleDependency.shuffleHandle.shuffleId
     val mapPartitionSizes = getMapSizesForReduceId(shuffleId, partitionId)
     val partitionStartIndices = ArrayBuffer[Int]()
@@ -126,7 +126,7 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends 
Rule[SparkPlan] {
       i += 1
     }
 
-    partitionStartIndices.toArray
+    partitionStartIndices
   }
 
   private def getStatistics(stage: ShuffleQueryStageExec): MapOutputStatistics 
= {
@@ -255,10 +255,8 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends 
Rule[SparkPlan] {
       logDebug("number of skewed partitions: " +
         s"left ${leftSkewDesc.numPartitions}, right 
${rightSkewDesc.numPartitions}")
       if (leftSkewDesc.numPartitions > 0 || rightSkewDesc.numPartitions > 0) {
-        val newLeft = CustomShuffleReaderExec(
-          left, leftSidePartitions.toArray, leftSkewDesc.toString)
-        val newRight = CustomShuffleReaderExec(
-          right, rightSidePartitions.toArray, rightSkewDesc.toString)
+        val newLeft = CustomShuffleReaderExec(left, leftSidePartitions, 
leftSkewDesc.toString)
+        val newRight = CustomShuffleReaderExec(right, rightSidePartitions, 
rightSkewDesc.toString)
         smj.copy(
           left = s1.copy(child = newLeft), right = s2.copy(child = newRight), 
isSkewJoin = true)
       } else {
@@ -286,7 +284,7 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends 
Rule[SparkPlan] {
 
   private def createSkewPartitions(
       reducerIndex: Int,
-      mapStartIndices: Array[Int],
+      mapStartIndices: Seq[Int],
       numMappers: Int): Seq[PartialReducerPartitionSpec] = {
     mapStartIndices.indices.map { i =>
       val startMapIndex = mapStartIndices(i)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsCoalescer.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsCoalescer.scala
index c3b8bf6..8c58241 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsCoalescer.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsCoalescer.scala
@@ -47,7 +47,7 @@ object ShufflePartitionsCoalescer extends Logging {
    *  - coalesced partition 2: shuffle partition 2 (size 170 MiB)
    *  - coalesced partition 3: shuffle partition 3 and 4 (size 50 MiB)
    *
-   *  @return An array of [[CoalescedPartitionSpec]]s. For example, if 
partitions [0, 1, 2, 3, 4]
+   *  @return A sequence of [[CoalescedPartitionSpec]]s. For example, if 
partitions [0, 1, 2, 3, 4]
    *          split at indices [0, 2, 3], the returned partition specs will be:
    *          CoalescedPartitionSpec(0, 2), CoalescedPartitionSpec(2, 3) and
    *          CoalescedPartitionSpec(3, 5).
@@ -57,7 +57,7 @@ object ShufflePartitionsCoalescer extends Logging {
       firstPartitionIndex: Int,
       lastPartitionIndex: Int,
       advisoryTargetSize: Long,
-      minNumPartitions: Int = 1): Array[ShufflePartitionSpec] = {
+      minNumPartitions: Int = 1): Seq[ShufflePartitionSpec] = {
     // If `minNumPartitions` is very large, it is possible that we need to use 
a value less than
     // `advisoryTargetSize` as the target size of a coalesced task.
     val totalPostShuffleInputSize = 
mapOutputStatistics.map(_.bytesByPartitionId.sum).sum
@@ -112,6 +112,6 @@ object ShufflePartitionsCoalescer extends Logging {
     }
     partitionSpecs += CoalescedPartitionSpec(latestSplitPoint, 
lastPartitionIndex)
 
-    partitionSpecs.toArray
+    partitionSpecs
   }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ShufflePartitionsCoalescerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ShufflePartitionsCoalescerSuite.scala
index 0befa06..8aab299 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ShufflePartitionsCoalescerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ShufflePartitionsCoalescerSuite.scala
@@ -24,7 +24,7 @@ class ShufflePartitionsCoalescerSuite extends SparkFunSuite {
 
   private def checkEstimation(
       bytesByPartitionIdArray: Array[Array[Long]],
-      expectedPartitionStartIndices: Array[CoalescedPartitionSpec],
+      expectedPartitionStartIndices: Seq[CoalescedPartitionSpec],
       targetSize: Long,
       minNumPartitions: Int = 1): Unit = {
     val mapOutputStatistics = bytesByPartitionIdArray.zipWithIndex.map {
@@ -46,7 +46,7 @@ class ShufflePartitionsCoalescerSuite extends SparkFunSuite {
     {
       // All bytes per partition are 0.
       val bytesByPartitionId = Array[Long](0, 0, 0, 0, 0)
-      val expectedPartitionSpecs = Array(CoalescedPartitionSpec(0, 5))
+      val expectedPartitionSpecs = Seq(CoalescedPartitionSpec(0, 5))
       checkEstimation(Array(bytesByPartitionId), expectedPartitionSpecs, 
targetSize)
     }
 
@@ -54,21 +54,21 @@ class ShufflePartitionsCoalescerSuite extends SparkFunSuite 
{
       // Some bytes per partition are 0 and total size is less than the target 
size.
       // 1 coalesced partition is expected.
       val bytesByPartitionId = Array[Long](10, 0, 20, 0, 0)
-      val expectedPartitionSpecs = Array(CoalescedPartitionSpec(0, 5))
+      val expectedPartitionSpecs = Seq(CoalescedPartitionSpec(0, 5))
       checkEstimation(Array(bytesByPartitionId), expectedPartitionSpecs, 
targetSize)
     }
 
     {
       // 2 coalesced partitions are expected.
       val bytesByPartitionId = Array[Long](10, 0, 90, 20, 0)
-      val expectedPartitionSpecs = Array(CoalescedPartitionSpec(0, 3), 
CoalescedPartitionSpec(3, 5))
+      val expectedPartitionSpecs = Seq(CoalescedPartitionSpec(0, 3), 
CoalescedPartitionSpec(3, 5))
       checkEstimation(Array(bytesByPartitionId), expectedPartitionSpecs, 
targetSize)
     }
 
     {
       // There are a few large shuffle partitions.
       val bytesByPartitionId = Array[Long](110, 10, 100, 110, 0)
-      val expectedPartitionSpecs = Array(
+      val expectedPartitionSpecs = Seq(
         CoalescedPartitionSpec(0, 1),
         CoalescedPartitionSpec(1, 2),
         CoalescedPartitionSpec(2, 3),
@@ -80,7 +80,7 @@ class ShufflePartitionsCoalescerSuite extends SparkFunSuite {
     {
       // All shuffle partitions are larger than the targeted size.
       val bytesByPartitionId = Array[Long](100, 110, 100, 110, 110)
-      val expectedPartitionSpecs = Array(
+      val expectedPartitionSpecs = Seq(
         CoalescedPartitionSpec(0, 1),
         CoalescedPartitionSpec(1, 2),
         CoalescedPartitionSpec(2, 3),
@@ -92,7 +92,7 @@ class ShufflePartitionsCoalescerSuite extends SparkFunSuite {
     {
       // The last shuffle partition is in a single coalesced partition.
       val bytesByPartitionId = Array[Long](30, 30, 0, 40, 110)
-      val expectedPartitionSpecs = Array(CoalescedPartitionSpec(0, 4), 
CoalescedPartitionSpec(4, 5))
+      val expectedPartitionSpecs = Seq(CoalescedPartitionSpec(0, 4), 
CoalescedPartitionSpec(4, 5))
       checkEstimation(Array(bytesByPartitionId), expectedPartitionSpecs, 
targetSize)
     }
   }
@@ -106,7 +106,7 @@ class ShufflePartitionsCoalescerSuite extends SparkFunSuite 
{
       val bytesByPartitionId1 = Array[Long](0, 0, 0, 0, 0)
       val bytesByPartitionId2 = Array[Long](0, 0, 0, 0, 0, 0)
       intercept[AssertionError] {
-        checkEstimation(Array(bytesByPartitionId1, bytesByPartitionId2), 
Array.empty, targetSize)
+        checkEstimation(Array(bytesByPartitionId1, bytesByPartitionId2), 
Seq.empty, targetSize)
       }
     }
 
@@ -114,7 +114,7 @@ class ShufflePartitionsCoalescerSuite extends SparkFunSuite 
{
       // All bytes per partition are 0.
       val bytesByPartitionId1 = Array[Long](0, 0, 0, 0, 0)
       val bytesByPartitionId2 = Array[Long](0, 0, 0, 0, 0)
-      val expectedPartitionSpecs = Array(CoalescedPartitionSpec(0, 5))
+      val expectedPartitionSpecs = Seq(CoalescedPartitionSpec(0, 5))
       checkEstimation(
         Array(bytesByPartitionId1, bytesByPartitionId2),
         expectedPartitionSpecs,
@@ -126,7 +126,7 @@ class ShufflePartitionsCoalescerSuite extends SparkFunSuite 
{
       // 1 coalesced partition is expected.
       val bytesByPartitionId1 = Array[Long](0, 10, 0, 20, 0)
       val bytesByPartitionId2 = Array[Long](30, 0, 20, 0, 20)
-      val expectedPartitionSpecs = Array(CoalescedPartitionSpec(0, 5))
+      val expectedPartitionSpecs = Seq(CoalescedPartitionSpec(0, 5))
       checkEstimation(
         Array(bytesByPartitionId1, bytesByPartitionId2),
         expectedPartitionSpecs,
@@ -137,7 +137,7 @@ class ShufflePartitionsCoalescerSuite extends SparkFunSuite 
{
       // 2 coalesced partition are expected.
       val bytesByPartitionId1 = Array[Long](0, 10, 0, 20, 0)
       val bytesByPartitionId2 = Array[Long](30, 0, 70, 0, 30)
-      val expectedPartitionSpecs = Array(
+      val expectedPartitionSpecs = Seq(
         CoalescedPartitionSpec(0, 2),
         CoalescedPartitionSpec(2, 4),
         CoalescedPartitionSpec(4, 5))
@@ -151,7 +151,7 @@ class ShufflePartitionsCoalescerSuite extends SparkFunSuite 
{
       // 4 coalesced partition are expected.
       val bytesByPartitionId1 = Array[Long](0, 99, 0, 20, 0)
       val bytesByPartitionId2 = Array[Long](30, 0, 70, 0, 30)
-      val expectedPartitionSpecs = Array(
+      val expectedPartitionSpecs = Seq(
         CoalescedPartitionSpec(0, 1),
         CoalescedPartitionSpec(1, 2),
         CoalescedPartitionSpec(2, 4),
@@ -166,7 +166,7 @@ class ShufflePartitionsCoalescerSuite extends SparkFunSuite 
{
       // 2 coalesced partition are needed.
       val bytesByPartitionId1 = Array[Long](0, 100, 0, 30, 0)
       val bytesByPartitionId2 = Array[Long](30, 0, 70, 0, 30)
-      val expectedPartitionSpecs = Array(
+      val expectedPartitionSpecs = Seq(
         CoalescedPartitionSpec(0, 1),
         CoalescedPartitionSpec(1, 2),
         CoalescedPartitionSpec(2, 4),
@@ -181,7 +181,7 @@ class ShufflePartitionsCoalescerSuite extends SparkFunSuite 
{
       // There are a few large shuffle partitions.
       val bytesByPartitionId1 = Array[Long](0, 100, 40, 30, 0)
       val bytesByPartitionId2 = Array[Long](30, 0, 60, 0, 110)
-      val expectedPartitionSpecs = Array(
+      val expectedPartitionSpecs = Seq(
         CoalescedPartitionSpec(0, 1),
         CoalescedPartitionSpec(1, 2),
         CoalescedPartitionSpec(2, 3),
@@ -197,7 +197,7 @@ class ShufflePartitionsCoalescerSuite extends SparkFunSuite 
{
       // All pairs of shuffle partitions are larger than the targeted size.
       val bytesByPartitionId1 = Array[Long](100, 100, 40, 30, 0)
       val bytesByPartitionId2 = Array[Long](30, 0, 60, 70, 110)
-      val expectedPartitionSpecs = Array(
+      val expectedPartitionSpecs = Seq(
         CoalescedPartitionSpec(0, 1),
         CoalescedPartitionSpec(1, 2),
         CoalescedPartitionSpec(2, 3),
@@ -219,7 +219,7 @@ class ShufflePartitionsCoalescerSuite extends SparkFunSuite 
{
       // the size of data is 0.
       val bytesByPartitionId1 = Array[Long](0, 0, 0, 0, 0)
       val bytesByPartitionId2 = Array[Long](0, 0, 0, 0, 0)
-      val expectedPartitionSpecs = Array(CoalescedPartitionSpec(0, 5))
+      val expectedPartitionSpecs = Seq(CoalescedPartitionSpec(0, 5))
       checkEstimation(
         Array(bytesByPartitionId1, bytesByPartitionId2),
         expectedPartitionSpecs,
@@ -230,7 +230,7 @@ class ShufflePartitionsCoalescerSuite extends SparkFunSuite 
{
       // The minimal number of coalesced partitions is enforced.
       val bytesByPartitionId1 = Array[Long](10, 5, 5, 0, 20)
       val bytesByPartitionId2 = Array[Long](5, 10, 0, 10, 5)
-      val expectedPartitionSpecs = Array(CoalescedPartitionSpec(0, 3), 
CoalescedPartitionSpec(3, 5))
+      val expectedPartitionSpecs = Seq(CoalescedPartitionSpec(0, 3), 
CoalescedPartitionSpec(3, 5))
       checkEstimation(
         Array(bytesByPartitionId1, bytesByPartitionId2),
         expectedPartitionSpecs,
@@ -241,7 +241,7 @@ class ShufflePartitionsCoalescerSuite extends SparkFunSuite 
{
       // The number of coalesced partitions is determined by the algorithm.
       val bytesByPartitionId1 = Array[Long](10, 50, 20, 80, 20)
       val bytesByPartitionId2 = Array[Long](40, 10, 0, 10, 30)
-      val expectedPartitionSpecs = Array(
+      val expectedPartitionSpecs = Seq(
         CoalescedPartitionSpec(0, 1),
         CoalescedPartitionSpec(1, 3),
         CoalescedPartitionSpec(3, 4),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to