cloud-fan commented on a change in pull request #32210:
URL: https://github.com/apache/spark/pull/32210#discussion_r619309141



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##########
@@ -298,9 +311,83 @@ case class ShuffledHashJoinExec(
     streamResultIter ++ buildResultIter
   }
 
+  private val enableShuffledHashJoinFallback: Boolean =
+    sqlContext.conf.enableShuffledHashJoinFallback
+
+  /**
+   * This is for testing only. We force shuffled hash join to fall back to 
sort merge join,
+   * once number of rows in build relation exceeds this limit.
+   */
+  private val testFallbackStartsAt: Option[Int] = {
+    sqlContext.getConf("spark.sql.ShuffledHashJoin.testFallbackStartsAt", 
null) match {
+      case null | "" => None
+      case fallbackStartsAt => Some(fallbackStartsAt.toInt)
+    }
+  }
+
+  /**
+   * Get [[SortExec]] plan for stream side on join keys. This is used for 
sort-based fallback.
+   * The return value is `Option` as the stream side may be already ordered on 
join keys.
+   */
+  private def getStreamSortPlan: Option[SortExec] = {
+    val requiredStreamOrdering = requiredOrders(streamedKeys)
+    if (!SortOrder.orderingSatisfies(streamedPlan.outputOrdering, 
requiredStreamOrdering)) {
+      Some(SortExec(requiredStreamOrdering, global = false, child = 
streamedPlan))
+    } else {
+      None
+    }
+  }
+
+  /**
+   * Get [[SortExec]] plan for build side on join keys. This is used for 
sort-based fallback.
+   * The return value is not `Option` as some of build rows may be already 
read and put into
+   * [[HashedRelation]], so the ordering for build side needs to be forced 
again.
+   */
+  private def getBuildSortPlan: SortExec = {
+    val requiredBuildOrdering = requiredOrders(buildKeys)
+    SortExec(requiredBuildOrdering, global = false, child = buildPlan)
+  }
+
+  /**
+   * Fallback to sort merge join, from shuffled hash join.
+   *
+   * Sort stream and build side on join keys if necessary, as sort merge join 
requires both
+   * children to be sorted on join keys. See 
[[SortMergeJoinExec.requiredChildOrdering]].
+   * After that execute sort merge join on sorted children.
+   */
+  private def joinWithSortFallback(
+      streamIter: Iterator[InternalRow],
+      buildIter: Iterator[InternalRow],
+      relationIter: Iterator[InternalRow],
+      streamSortPlan: Option[SortExec],
+      buildSortPlan: SortExec,
+      sortMergeJoinPlan: SortMergeJoinExec,
+      numOutputRows: SQLMetric,
+      spillThreshold: Int,
+      inMemoryThreshold: Int): Iterator[InternalRow] = {
+
+    // Sort stream and build side on join keys if necessary
+    val streamSortIter = streamSortPlan match {
+      case Some(plan) =>
+        val sorter = plan.createSorter()
+        sorter.sort(streamIter.asInstanceOf[Iterator[UnsafeRow]])
+      case _ => streamIter
+    }
+    val buildSortIter = buildSortPlan.createSorter().sort(
+      (relationIter ++ buildIter).asInstanceOf[Iterator[UnsafeRow]])
+
+    // Fallback to sort merge join
+    buildSide match {
+      case BuildLeft => sortMergeJoinPlan.executeJoinWithIterators(

Review comment:
       It's a bit weird to instantiate a plan and call it method to reuse code. 
Can we put the shareable code in the base class `ShuffledJoin`?

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##########
@@ -298,9 +311,83 @@ case class ShuffledHashJoinExec(
     streamResultIter ++ buildResultIter
   }
 
+  private val enableShuffledHashJoinFallback: Boolean =
+    sqlContext.conf.enableShuffledHashJoinFallback
+
+  /**
+   * This is for testing only. We force shuffled hash join to fall back to 
sort merge join,
+   * once number of rows in build relation exceeds this limit.
+   */
+  private val testFallbackStartsAt: Option[Int] = {
+    sqlContext.getConf("spark.sql.ShuffledHashJoin.testFallbackStartsAt", 
null) match {
+      case null | "" => None
+      case fallbackStartsAt => Some(fallbackStartsAt.toInt)
+    }
+  }
+
+  /**
+   * Get [[SortExec]] plan for stream side on join keys. This is used for 
sort-based fallback.
+   * The return value is `Option` as the stream side may be already ordered on 
join keys.
+   */
+  private def getStreamSortPlan: Option[SortExec] = {
+    val requiredStreamOrdering = requiredOrders(streamedKeys)
+    if (!SortOrder.orderingSatisfies(streamedPlan.outputOrdering, 
requiredStreamOrdering)) {
+      Some(SortExec(requiredStreamOrdering, global = false, child = 
streamedPlan))
+    } else {
+      None
+    }
+  }
+
+  /**
+   * Get [[SortExec]] plan for build side on join keys. This is used for 
sort-based fallback.
+   * The return value is not `Option` as some of build rows may be already 
read and put into
+   * [[HashedRelation]], so the ordering for build side needs to be forced 
again.
+   */
+  private def getBuildSortPlan: SortExec = {
+    val requiredBuildOrdering = requiredOrders(buildKeys)
+    SortExec(requiredBuildOrdering, global = false, child = buildPlan)
+  }
+
+  /**
+   * Fallback to sort merge join, from shuffled hash join.
+   *
+   * Sort stream and build side on join keys if necessary, as sort merge join 
requires both
+   * children to be sorted on join keys. See 
[[SortMergeJoinExec.requiredChildOrdering]].
+   * After that execute sort merge join on sorted children.
+   */
+  private def joinWithSortFallback(
+      streamIter: Iterator[InternalRow],
+      buildIter: Iterator[InternalRow],
+      relationIter: Iterator[InternalRow],
+      streamSortPlan: Option[SortExec],
+      buildSortPlan: SortExec,
+      sortMergeJoinPlan: SortMergeJoinExec,
+      numOutputRows: SQLMetric,
+      spillThreshold: Int,
+      inMemoryThreshold: Int): Iterator[InternalRow] = {
+
+    // Sort stream and build side on join keys if necessary
+    val streamSortIter = streamSortPlan match {
+      case Some(plan) =>
+        val sorter = plan.createSorter()
+        sorter.sort(streamIter.asInstanceOf[Iterator[UnsafeRow]])
+      case _ => streamIter
+    }
+    val buildSortIter = buildSortPlan.createSorter().sort(
+      (relationIter ++ buildIter).asInstanceOf[Iterator[UnsafeRow]])
+
+    // Fallback to sort merge join
+    buildSide match {
+      case BuildLeft => sortMergeJoinPlan.executeJoinWithIterators(

Review comment:
       It's a bit weird to instantiate a plan and call its method to reuse 
code. Can we put the shareable code in the base class `ShuffledJoin`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to