[ https://issues.apache.org/jira/browse/SPARK-32634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17316327#comment-17316327 ]
Lietong Liu commented on SPARK-32634: ------------------------------------- [~chengsu] I have a question about implementing of fallback mechanism. Since current `doConsume` of ShuffledHashJoinExec consume one row from stream side once, not like SortMergeJoinExex, how we sort stream side when fallback is enabled? Looking forward to your reply! > Introduce sort-based fallback mechanism for shuffled hash join > --------------------------------------------------------------- > > Key: SPARK-32634 > URL: https://issues.apache.org/jira/browse/SPARK-32634 > Project: Spark > Issue Type: Sub-task > Components: SQL > Affects Versions: 3.1.0 > Reporter: Cheng Su > Priority: Minor > > A major pain point for spark users to stay away from using shuffled hash join > is out of memory issue. Shuffled hash join tends to have OOM issue because it > allocates in-memory hashed relation (`UnsafeHashedRelation` or > `LongHashedRelation`) for build side, and there's no recovery (e.g. > fallback/spill) once the size of hashed relation grows and cannot fit in > memory. On the other hand, shuffled hash join is more CPU and IO efficient > than sort merge join when joining one large table and a small table (but > small table is too large to be broadcasted), as SHJ does not sort the large > table, but SMJ needs to do that. > To improve the reliability of shuffled hash join, a fallback mechanism can be > introduced to avoid shuffled hash join OOM issue completely. Similarly we > already have a fallback to sort-based aggregation for hash aggregate. The > idea is: > (1).Build hashed relation as current, but monitor the hashed relation size > when inserting each build side row. If size of hashed relation being always > smaller than a configurable threshold, go to (2.1), else go to (2.2). > (2.1).Current shuffled hash join logic: reading stream side rows and probing > hashed relation. > (2.2).Fall back to sort merge join: Sort stream side rows, and sort build > side rows (iterate rows already in hashed relation (e.g. through > `BytesToBytesMap.destructiveIterator`), then iterate rest of un-read build > side rows). Then doing sort merge join for stream + build side rows. > > Note: > (1).the fallback is dynamic and happened per task, which means task 0 can > incur the fallback e.g. if it has a big build side, but task 1,2 don't need > to incur the fallback depending on the size of hashed relation. > (2).there's no major code change for SHJ and SMJ. Major change is around > HashedRelation to introduce some new methods, e.g. > `HashedRelation.destructiveValues()` to return an Iterator of build side rows > in hashed relation and cleaning up hashed relation along the way. > (3).we have run this feature by default in our internal fork more than 2 > years, and we benefit a lot from it with users can choose to use SHJ, and we > don't need to worry about SHJ reliability (see > https://issues.apache.org/jira/browse/SPARK-21505 for the original proposal > from our side, I tweak here to make it less intrusive and more acceptable, > e.g. not introducing a separate join operator, but doing the fallback > automatically inside SHJ operator itself). -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org