cloud-fan commented on a change in pull request #32210: URL: https://github.com/apache/spark/pull/32210#discussion_r619309141
########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala ########## @@ -298,9 +311,83 @@ case class ShuffledHashJoinExec( streamResultIter ++ buildResultIter } + private val enableShuffledHashJoinFallback: Boolean = + sqlContext.conf.enableShuffledHashJoinFallback + + /** + * This is for testing only. We force shuffled hash join to fall back to sort merge join, + * once number of rows in build relation exceeds this limit. + */ + private val testFallbackStartsAt: Option[Int] = { + sqlContext.getConf("spark.sql.ShuffledHashJoin.testFallbackStartsAt", null) match { + case null | "" => None + case fallbackStartsAt => Some(fallbackStartsAt.toInt) + } + } + + /** + * Get [[SortExec]] plan for stream side on join keys. This is used for sort-based fallback. + * The return value is `Option` as the stream side may be already ordered on join keys. + */ + private def getStreamSortPlan: Option[SortExec] = { + val requiredStreamOrdering = requiredOrders(streamedKeys) + if (!SortOrder.orderingSatisfies(streamedPlan.outputOrdering, requiredStreamOrdering)) { + Some(SortExec(requiredStreamOrdering, global = false, child = streamedPlan)) + } else { + None + } + } + + /** + * Get [[SortExec]] plan for build side on join keys. This is used for sort-based fallback. + * The return value is not `Option` as some of build rows may be already read and put into + * [[HashedRelation]], so the ordering for build side needs to be forced again. + */ + private def getBuildSortPlan: SortExec = { + val requiredBuildOrdering = requiredOrders(buildKeys) + SortExec(requiredBuildOrdering, global = false, child = buildPlan) + } + + /** + * Fallback to sort merge join, from shuffled hash join. + * + * Sort stream and build side on join keys if necessary, as sort merge join requires both + * children to be sorted on join keys. See [[SortMergeJoinExec.requiredChildOrdering]]. + * After that execute sort merge join on sorted children. + */ + private def joinWithSortFallback( + streamIter: Iterator[InternalRow], + buildIter: Iterator[InternalRow], + relationIter: Iterator[InternalRow], + streamSortPlan: Option[SortExec], + buildSortPlan: SortExec, + sortMergeJoinPlan: SortMergeJoinExec, + numOutputRows: SQLMetric, + spillThreshold: Int, + inMemoryThreshold: Int): Iterator[InternalRow] = { + + // Sort stream and build side on join keys if necessary + val streamSortIter = streamSortPlan match { + case Some(plan) => + val sorter = plan.createSorter() + sorter.sort(streamIter.asInstanceOf[Iterator[UnsafeRow]]) + case _ => streamIter + } + val buildSortIter = buildSortPlan.createSorter().sort( + (relationIter ++ buildIter).asInstanceOf[Iterator[UnsafeRow]]) + + // Fallback to sort merge join + buildSide match { + case BuildLeft => sortMergeJoinPlan.executeJoinWithIterators( Review comment: It's a bit weird to instantiate a plan and call it method to reuse code. Can we put the shareable code in the base class `ShuffledJoin`? ########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala ########## @@ -298,9 +311,83 @@ case class ShuffledHashJoinExec( streamResultIter ++ buildResultIter } + private val enableShuffledHashJoinFallback: Boolean = + sqlContext.conf.enableShuffledHashJoinFallback + + /** + * This is for testing only. We force shuffled hash join to fall back to sort merge join, + * once number of rows in build relation exceeds this limit. + */ + private val testFallbackStartsAt: Option[Int] = { + sqlContext.getConf("spark.sql.ShuffledHashJoin.testFallbackStartsAt", null) match { + case null | "" => None + case fallbackStartsAt => Some(fallbackStartsAt.toInt) + } + } + + /** + * Get [[SortExec]] plan for stream side on join keys. This is used for sort-based fallback. + * The return value is `Option` as the stream side may be already ordered on join keys. + */ + private def getStreamSortPlan: Option[SortExec] = { + val requiredStreamOrdering = requiredOrders(streamedKeys) + if (!SortOrder.orderingSatisfies(streamedPlan.outputOrdering, requiredStreamOrdering)) { + Some(SortExec(requiredStreamOrdering, global = false, child = streamedPlan)) + } else { + None + } + } + + /** + * Get [[SortExec]] plan for build side on join keys. This is used for sort-based fallback. + * The return value is not `Option` as some of build rows may be already read and put into + * [[HashedRelation]], so the ordering for build side needs to be forced again. + */ + private def getBuildSortPlan: SortExec = { + val requiredBuildOrdering = requiredOrders(buildKeys) + SortExec(requiredBuildOrdering, global = false, child = buildPlan) + } + + /** + * Fallback to sort merge join, from shuffled hash join. + * + * Sort stream and build side on join keys if necessary, as sort merge join requires both + * children to be sorted on join keys. See [[SortMergeJoinExec.requiredChildOrdering]]. + * After that execute sort merge join on sorted children. + */ + private def joinWithSortFallback( + streamIter: Iterator[InternalRow], + buildIter: Iterator[InternalRow], + relationIter: Iterator[InternalRow], + streamSortPlan: Option[SortExec], + buildSortPlan: SortExec, + sortMergeJoinPlan: SortMergeJoinExec, + numOutputRows: SQLMetric, + spillThreshold: Int, + inMemoryThreshold: Int): Iterator[InternalRow] = { + + // Sort stream and build side on join keys if necessary + val streamSortIter = streamSortPlan match { + case Some(plan) => + val sorter = plan.createSorter() + sorter.sort(streamIter.asInstanceOf[Iterator[UnsafeRow]]) + case _ => streamIter + } + val buildSortIter = buildSortPlan.createSorter().sort( + (relationIter ++ buildIter).asInstanceOf[Iterator[UnsafeRow]]) + + // Fallback to sort merge join + buildSide match { + case BuildLeft => sortMergeJoinPlan.executeJoinWithIterators( Review comment: It's a bit weird to instantiate a plan and call its method to reuse code. Can we put the shareable code in the base class `ShuffledJoin`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org