AngersZhuuuu commented on a change in pull request #29692:
URL: https://github.com/apache/spark/pull/29692#discussion_r486218258



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
##########
@@ -248,6 +249,74 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends 
Rule[SparkPlan] {
       } else {
         smj
       }
+
+    case bnl @ BroadcastNestedLoopJoinExec(leftChild, rightChild, buildSide, 
joinType, _, _) =>
+      def resolveBroadcastNLJoinSkew(
+          stream: ShuffleStageInfo,
+          joinType: JoinType,
+          buildSide: BuildSide): SparkPlan = {
+        val streamMedSize = medianSize(stream.mapStats)
+        val numPartitions = stream.partitionsWithSizes.length
+        logDebug(
+          s"""
+             |Optimizing skewed join.
+             |Build Side:
+             |${buildSide}
+             |Stream side partitions size info:
+             |${getSizeInfo(streamMedSize, stream.mapStats.bytesByPartitionId)}
+        """.stripMargin)
+        val canSplitStream = canSplitLeftSide(joinType)
+        val streamActualSizes = stream.partitionsWithSizes.map(_._2)
+        val streamTargetSize = targetSize(streamActualSizes, streamMedSize)
+        val streamSidePartitions = 
mutable.ArrayBuffer.empty[ShufflePartitionSpec]
+        var numSkewedStream = 0
+        for (partitionIndex <- 0 until numPartitions) {
+          val streamActualSize = streamActualSizes(partitionIndex)
+          val isStreamSkew = isSkewed(streamActualSize, streamMedSize) && 
canSplitStream
+          val streamPartSpec = stream.partitionsWithSizes(partitionIndex)._1
+          val isStreamCoalesced =
+            streamPartSpec.startReducerIndex + 1 < 
streamPartSpec.endReducerIndex
+          // A skewed partition should never be coalesced, but skip it here 
just to be safe.
+          val streamParts = if (isStreamSkew && !isStreamCoalesced) {
+            val reducerId = streamPartSpec.startReducerIndex
+            val skewSpecs = createSkewPartitionSpecs(
+              stream.mapStats.shuffleId, reducerId, streamTargetSize)
+            if (skewSpecs.isDefined) {
+              logDebug(s"Stream side partition $partitionIndex " +
+                s"(${FileUtils.byteCountToDisplaySize(streamActualSize)}) is 
skewed, " +
+                s"split it into ${skewSpecs.get.length} parts.")
+              numSkewedStream += 1
+            }
+            skewSpecs.getOrElse(Seq(streamPartSpec))
+          } else {
+            Seq(streamPartSpec)
+          }
+
+          for {
+            streamSidePartition <- streamParts
+          } {
+            streamSidePartitions += streamSidePartition
+          }
+        }
+
+        logDebug(s"number of skewed partitions: left $numSkewedStream")
+        if (numSkewedStream > 0) {
+          val newStream = CustomShuffleReaderExec(stream.shuffleStage, 
streamSidePartitions.toSeq)
+          buildSide match {
+            case BuildRight => bnl.copy(left = newStream, right = bnl.right, 
isSkewJoin = true)
+            case BuildLeft => bnl.copy(left = bnl.left, right = newStream, 
isSkewJoin = true)
+          }
+        } else {
+          bnl
+        }
+      }
+
+      (leftChild, rightChild, buildSide) match {
+        case (ShuffleStage(left: ShuffleStageInfo), _, BuildRight) =>
+          resolveBroadcastNLJoinSkew(left, joinType, buildSide)
+        case (_, ShuffleStage(right: ShuffleStageInfo), BuildLeft) =>
+          resolveBroadcastNLJoinSkew(right, joinType, buildSide)

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to