JkSelf commented on a change in pull request #32594:
URL: https://github.com/apache/spark/pull/32594#discussion_r637663065



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
##########
@@ -158,76 +155,68 @@ object OptimizeSkewedJoin extends CustomShuffleReaderRule 
{
    *    3 tasks separately.
    */
   private def tryOptimizeJoinChildren(
-      left: ShuffleStageInfo,
-      right: ShuffleStageInfo,
+      left: ShuffleQueryStageExec,
+      right: ShuffleQueryStageExec,
       joinType: JoinType): Option[(SparkPlan, SparkPlan)] = {
-    assert(left.partitionsWithSizes.length == right.partitionsWithSizes.length)
-    val numPartitions = left.partitionsWithSizes.length
+    val leftSizes = left.mapStats.get.bytesByPartitionId
+    val rightSizes = right.mapStats.get.bytesByPartitionId
+    assert(leftSizes.length == rightSizes.length)
+    val numPartitions = leftSizes.length
     // We use the median size of the original shuffle partitions to detect 
skewed partitions.
-    val leftMedSize = medianSize(left.mapStats)
-    val rightMedSize = medianSize(right.mapStats)
+    val leftMedSize = medianSize(leftSizes)
+    val rightMedSize = medianSize(rightSizes)
     logDebug(
       s"""
          |Optimizing skewed join.
          |Left side partitions size info:
-         |${getSizeInfo(leftMedSize, left.mapStats.bytesByPartitionId)}
+         |${getSizeInfo(leftMedSize, leftSizes)}
          |Right side partitions size info:
-         |${getSizeInfo(rightMedSize, right.mapStats.bytesByPartitionId)}
+         |${getSizeInfo(rightMedSize, rightSizes)}
       """.stripMargin)
+
     val canSplitLeft = canSplitLeftSide(joinType)
     val canSplitRight = canSplitRightSide(joinType)
-    // We use the actual partition sizes (may be coalesced) to calculate 
target size, so that
-    // the final data distribution is even (coalesced partitions + split 
partitions).
-    val leftActualSizes = left.partitionsWithSizes.map(_._2)
-    val rightActualSizes = right.partitionsWithSizes.map(_._2)
-    val leftTargetSize = targetSize(leftActualSizes, leftMedSize)
-    val rightTargetSize = targetSize(rightActualSizes, rightMedSize)
+    val leftTargetSize = targetSize(leftSizes, leftMedSize)
+    val rightTargetSize = targetSize(rightSizes, rightMedSize)
 
     val leftSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec]
     val rightSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec]
     var numSkewedLeft = 0
     var numSkewedRight = 0
     for (partitionIndex <- 0 until numPartitions) {
-      val leftActualSize = leftActualSizes(partitionIndex)
-      val isLeftSkew = isSkewed(leftActualSize, leftMedSize) && canSplitLeft
-      val leftPartSpec = left.partitionsWithSizes(partitionIndex)._1
-      val isLeftCoalesced = leftPartSpec.startReducerIndex + 1 < 
leftPartSpec.endReducerIndex
-
-      val rightActualSize = rightActualSizes(partitionIndex)
-      val isRightSkew = isSkewed(rightActualSize, rightMedSize) && 
canSplitRight
-      val rightPartSpec = right.partitionsWithSizes(partitionIndex)._1
-      val isRightCoalesced = rightPartSpec.startReducerIndex + 1 < 
rightPartSpec.endReducerIndex
+      val leftSize = leftSizes(partitionIndex)
+      val isLeftSkew = isSkewed(leftSize, leftMedSize) && canSplitLeft
+      val rightSize = rightSizes(partitionIndex)
+      val isRightSkew = isSkewed(rightSize, rightMedSize) && canSplitRight
+      val noSkewPartitionSpec = Seq(CoalescedPartitionSpec(partitionIndex, 
partitionIndex + 1))
 
-      // A skewed partition should never be coalesced, but skip it here just 
to be safe.
-      val leftParts = if (isLeftSkew && !isLeftCoalesced) {
-        val reducerId = leftPartSpec.startReducerIndex
+      val leftParts = if (isLeftSkew) {
         val skewSpecs = createSkewPartitionSpecs(
-          left.mapStats.shuffleId, reducerId, leftTargetSize)
+          left.mapStats.get.shuffleId, partitionIndex, leftTargetSize)
         if (skewSpecs.isDefined) {
           logDebug(s"Left side partition $partitionIndex " +
-            s"(${FileUtils.byteCountToDisplaySize(leftActualSize)}) is skewed, 
" +
+            s"(${FileUtils.byteCountToDisplaySize(leftSize)}) is skewed, " +
             s"split it into ${skewSpecs.get.length} parts.")
           numSkewedLeft += 1
         }
-        skewSpecs.getOrElse(Seq(leftPartSpec))
+        skewSpecs.getOrElse(noSkewPartitionSpec)
       } else {
-        Seq(leftPartSpec)
+        noSkewPartitionSpec
       }
 
       // A skewed partition should never be coalesced, but skip it here just 
to be safe.

Review comment:
       This comment can be removed.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
##########
@@ -21,16 +21,160 @@ import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.MapOutputStatistics
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.execution.{CoalescedPartitionSpec, 
ShufflePartitionSpec}
+import org.apache.spark.sql.execution.{CoalescedPartitionSpec, 
PartialReducerPartitionSpec, ShufflePartitionSpec}
 
 object ShufflePartitionsUtil extends Logging {
   final val SMALL_PARTITION_FACTOR = 0.2
   final val MERGED_PARTITION_FACTOR = 1.2
 
   /**
-   * Coalesce the partitions from multiple shuffles. This method assumes that 
all the shuffles
-   * have the same number of partitions, and the partitions of same index will 
be read together
-   * by one task.
+   * Coalesce the partitions from multiple shuffles, either in their original 
states, or applied
+   * with skew handling partition specs. If called on partitions containing 
skew partition specs,
+   * this method will keep the skew partition specs intact and only coalesce 
the partitions outside
+   * the skew sections.
+   *
+   * This method will return an empty result if the shuffles have been 
coalesced already, or if
+   * they do not have the same number of partitions, or if the coalesced 
result is the same as the
+   * input partition layout.
+   *
+   * @return A sequence of sequence of [[ShufflePartitionSpec]]s, which each 
inner sequence as the
+   *         new partition specs for its corresponding shuffle after 
coalescing. If Nil is returned,
+   *         then no coalescing is applied.
+   */
+  def coalescePartitions(
+      mapOutputStatistics: Seq[Option[MapOutputStatistics]],
+      inputPartitionSpecs: Seq[Option[Seq[ShufflePartitionSpec]]],
+      advisoryTargetSize: Long,
+      minNumPartitions: Int): Seq[Seq[ShufflePartitionSpec]] = {
+    assert(mapOutputStatistics.length == inputPartitionSpecs.length)
+
+    if (mapOutputStatistics.isEmpty) {
+      return Seq.empty
+    }
+
+    // If `minNumPartitions` is very large, it is possible that we need to use 
a value less than
+    // `advisoryTargetSize` as the target size of a coalesced task.
+    val totalPostShuffleInputSize = 
mapOutputStatistics.flatMap(_.map(_.bytesByPartitionId.sum)).sum
+    // The max at here is to make sure that when we have an empty table, we 
only have a single
+    // coalesced partition.
+    // There is no particular reason that we pick 16. We just need a number to 
prevent
+    // `maxTargetSize` from being set to 0.
+    val maxTargetSize = math.max(
+      math.ceil(totalPostShuffleInputSize / minNumPartitions.toDouble).toLong, 
16)
+    val targetSize = math.min(maxTargetSize, advisoryTargetSize)
+
+    val shuffleIds = 
mapOutputStatistics.flatMap(_.map(_.shuffleId)).mkString(", ")
+    logInfo(s"For shuffle($shuffleIds), advisory target size: 
$advisoryTargetSize, " +
+      s"actual target size $targetSize.")
+
+    val numShuffles = mapOutputStatistics.length
+    // `ShuffleQueryStageExec#mapStats` returns None when the input RDD has 0 
partitions,
+    // we should skip it when calculating the `partitionStartIndices`.
+    val validMetrics = mapOutputStatistics.flatten
+
+    if (inputPartitionSpecs.forall(_.isEmpty)) {
+      // If all input RDDs have 0 partition, we create an empty partition for 
every shuffle reader.
+      if (validMetrics.isEmpty) {
+        return Seq.fill(numShuffles)(Seq(CoalescedPartitionSpec(0, 0)))
+      }
+
+      // We may have different pre-shuffle partition numbers, don't reduce 
shuffle partition number
+      // in that case. For example when we union fully aggregated data (data 
is arranged to a single
+      // partition) and a result of a SortMergeJoin (multiple partitions).
+      if (validMetrics.map(_.bytesByPartitionId.length).distinct.length > 1) {
+        return Seq.empty
+      }
+
+      val numPartitions = validMetrics.head.bytesByPartitionId.length
+      val newPartitionSpecs = coalescePartitions(
+        0, numPartitions, validMetrics, targetSize)
+      if (newPartitionSpecs.length < numPartitions) {
+        return Seq.fill(numShuffles)(newPartitionSpecs)
+      } else {
+        return Seq.empty
+      }
+    }
+
+    // Do not coalesce if any of the map output stats are missing or if not 
all shuffles have
+    // partition specs, which should not happen in practice.
+    if (!mapOutputStatistics.forall(_.isDefined) || 
!inputPartitionSpecs.forall(_.isDefined)) {

Review comment:
       It is better to move this check in the beginning of this method.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to