cloud-fan commented on a change in pull request #26434: [SPARK-29544] [SQL] 
optimize skewed partition based on data size
URL: https://github.com/apache/spark/pull/26434#discussion_r358057065
 
 

 ##########
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReduceNumShufflePartitions.scala
 ##########
 @@ -91,14 +93,21 @@ case class ReduceNumShufflePartitions(conf: SQLConf) 
extends Rule[SparkPlan] {
       val distinctNumPreShufflePartitions =
         validMetrics.map(stats => stats.bytesByPartitionId.length).distinct
       if (validMetrics.nonEmpty && distinctNumPreShufflePartitions.length == 
1) {
-        val partitionStartIndices = 
estimatePartitionStartIndices(validMetrics.toArray)
-        // This transformation adds new nodes, so we must use `transformUp` 
here.
-        plan.transformUp {
-          // even for shuffle exchange whose input RDD has 0 partition, we 
should still update its
-          // `partitionStartIndices`, so that all the leaf shuffles in a stage 
have the same
-          // number of output partitions.
-          case stage: QueryStageExec if 
ShuffleQueryStageExec.isShuffleQueryStageExec(stage) =>
-            CoalescedShuffleReaderExec(stage, partitionStartIndices)
+        val visitedStage = mutable.HashSet[QueryStageExec]()
+        plan.transformDown {
+          case stage: QueryStageExec if 
(ShuffleQueryStageExec.isShuffleQueryStageExec(stage)
+            && !visitedStage.contains(stage)) =>
+            val excludedPartitions =
+              ShuffleQueryStageExec.getShuffleStage(stage).excludedPartitions
+            val partitionIndices = estimatePartitionStartAndEndIndices(
+              validMetrics.toArray, excludedPartitions)
+            visitedStage += stage
+            CoalescedShuffleReaderExec(stage, partitionIndices)
+          case partialReader: PartialShuffleReader =>
 
 Review comment:
   we don't need to do it if we remove `PartialShuffleReader`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to