cloud-fan commented on a change in pull request #26434: [SPARK-29544] [SQL] optimize skewed partition based on data size URL: https://github.com/apache/spark/pull/26434#discussion_r358057471
########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReduceNumShufflePartitions.scala ########## @@ -91,14 +93,21 @@ case class ReduceNumShufflePartitions(conf: SQLConf) extends Rule[SparkPlan] { val distinctNumPreShufflePartitions = validMetrics.map(stats => stats.bytesByPartitionId.length).distinct if (validMetrics.nonEmpty && distinctNumPreShufflePartitions.length == 1) { - val partitionStartIndices = estimatePartitionStartIndices(validMetrics.toArray) - // This transformation adds new nodes, so we must use `transformUp` here. - plan.transformUp { - // even for shuffle exchange whose input RDD has 0 partition, we should still update its - // `partitionStartIndices`, so that all the leaf shuffles in a stage have the same - // number of output partitions. - case stage: QueryStageExec if ShuffleQueryStageExec.isShuffleQueryStageExec(stage) => - CoalescedShuffleReaderExec(stage, partitionStartIndices) + val visitedStage = mutable.HashSet[QueryStageExec]() + plan.transformDown { + case stage: QueryStageExec if (ShuffleQueryStageExec.isShuffleQueryStageExec(stage) + && !visitedStage.contains(stage)) => + val excludedPartitions = + ShuffleQueryStageExec.getShuffleStage(stage).excludedPartitions + val partitionIndices = estimatePartitionStartAndEndIndices( Review comment: We should only call it once, not once per stage. We should make sure all shuffles in the stage have the same `excludedPartitions`, otherwise we should give up this rule. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org