cloud-fan commented on a change in pull request #32875: URL: https://github.com/apache/spark/pull/32875#discussion_r769712224
########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala ########## @@ -68,63 +68,95 @@ case class EnsureRequirements( // Get the indexes of children which have specified distribution requirements and need to have // same number of partitions. val childrenIndexes = requiredChildDistributions.zipWithIndex.filter { - case (UnspecifiedDistribution, _) => false - case (_: BroadcastDistribution, _) => false - case _ => true + case (_: ClusteredDistribution, _) => true + case _ => false }.map(_._2) - val childrenNumPartitions = - childrenIndexes.map(children(_).outputPartitioning.numPartitions).toSet + // If there are more than one children, we'll need to check partitioning & distribution of them + // and see if extra shuffles are necessary. + if (childrenIndexes.length > 1) { + val specs = childrenIndexes.map(i => { + val requiredDist = requiredChildDistributions(i) + assert(requiredDist.isInstanceOf[ClusteredDistribution], + s"Expected ClusteredDistribution but found ${requiredDist.getClass.getSimpleName}") + i -> children(i).outputPartitioning.createShuffleSpec( + requiredDist.asInstanceOf[ClusteredDistribution]) + }).toMap - if (childrenNumPartitions.size > 1) { - // Get the number of partitions which is explicitly required by the distributions. - val requiredNumPartitions = { - val numPartitionsSet = childrenIndexes.flatMap { - index => requiredChildDistributions(index).requiredNumPartitions - }.toSet - assert(numPartitionsSet.size <= 1, - s"$requiredChildDistributions have incompatible requirements of the number of partitions") - numPartitionsSet.headOption - } + // Find out the shuffle spec that gives better parallelism. + // + // NOTE: this is not optimal for the case when there are more than 2 children. Consider: + // (10, 10, 11) + // it's better to pick 10 in this case since we only need to shuffle one side - we'd need to + // shuffle two sides if we pick 11. + // + // However this should be sufficient for now since in Spark nodes with multiple children + // always have exactly 2 children. - // If there are non-shuffle children that satisfy the required distribution, we have - // some tradeoffs when picking the expected number of shuffle partitions: - // 1. We should avoid shuffling these children. - // 2. We should have a reasonable parallelism. - val nonShuffleChildrenNumPartitions = - childrenIndexes.map(children).filterNot(_.isInstanceOf[ShuffleExchangeExec]) - .map(_.outputPartitioning.numPartitions) - val expectedChildrenNumPartitions = if (nonShuffleChildrenNumPartitions.nonEmpty) { - if (nonShuffleChildrenNumPartitions.length == childrenIndexes.length) { - // Here we pick the max number of partitions among these non-shuffle children. - nonShuffleChildrenNumPartitions.max + // Whether we should consider `spark.sql.shuffle.partitions` and ensure enough parallelism + // during shuffle. To achieve a good trade-off between parallelism and shuffle cost, we only + // consider the minimum parallelism iff ALL children need to be re-shuffled. + // + // A child is considered to be re-shuffled iff: + // 1. It can't create partitioning by itself, i.e., `canCreatePartitioning` returns false. + // 2. It already has `ShuffleExchangeExec`. + // + // On the other hand, in scenarios such as: + // HashPartitioning(5) <-> HashPartitioning(6) + // while `spark.sql.shuffle.partitions` is 10, we'll only re-shuffle the left side and make it + // HashPartitioning(6). + val canIgnoreMinPartitions = specs.exists(p => + p._2.canCreatePartitioning && !children(p._1).isInstanceOf[ShuffleExchangeExec] + ) + // Choose all the specs that can be used to shuffle other children + val candidateSpecs = specs + .filter(_._2.canCreatePartitioning) + .filter(p => canIgnoreMinPartitions || + children(p._1).outputPartitioning.numPartitions >= conf.defaultNumShufflePartitions) + val bestSpec = if (candidateSpecs.isEmpty) { + None + } else { + // When choosing specs, we should consider those children with no `Exchange` node + // first. For instance, if we have: + // A: (No_Exchange, 100) <---> B: (Exchange, 120) Review comment: Yea it's a tricky cost problem. I think this is the same as before so should be OK. @sunchao can you confirm? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org