Lin created SPARK-21505:
---------------------------

             Summary: A dynamic join operator to improve the join reliability
                 Key: SPARK-21505
                 URL: https://issues.apache.org/jira/browse/SPARK-21505
             Project: Spark
          Issue Type: New Feature
          Components: SQL
    Affects Versions: 2.2.0, 2.3.0, 3.0.0
            Reporter: Lin


As we know, hash join is more efficient than sort merge join. But today hash 
join is not so widely used because it may fail with OutOfMemory (OOM) error due 
to limited memory resource, data skew, statistics mis-estimation and so on. For 
example, if we apply shuffle hash join on an uneven distributed dataset, some 
partitions might be so large  that we cannot make a Hash table for this 
particular partition causing OOM error. When OOM happens, current Spark 
technology will throw an Exception, resulting in job failure. On the other 
hand, if sort-merge join is used, there will be shuffle, sorting and extra 
spill, causing the degradation of the join. Considering the efficiency of hash 
join, we want to propose a fallback mechanism to dynamically use hash join or 
sort-merge join at runtime at task level to provide a more reliable join 
operation.

This new dynamic join operator internally implements the logic of HashJoin, 
Iterator Reconstruct, Sort, and MergeJoin.  We show the process of this dynamic 
join method as following:

HashJoin: We start from building  Hash table on one side of join partitions. If 
Hash table is built successfully, it would be the same as the current 
ShuffledHashJoin operator. 

Sort: If we fail to build Hash table due to the large partition size, we do 
SortMergeJoin only on this partition. But we need to rebuild the   When OOM 
happens, a Hash table corresponding to partial part of this partition has been 
built successfully (e.g. first 4000 rows of RDD), and the iterator of this 
partition is now pointing to the 4001st row of partition. We reuse this hash 
table to reconstruct the iterator for the first 4000 rows and concatenate  with 
the rest rows of this partition so that we can rebuild this partition 
completely. On this re-built partition, we apply sorting based on key values.

MergeJoin: After getting two sorted Iterators, we perform regular merge join 
against them and emits the records to downstream operators.

Iterator Reconstruct:  BytesToBytesMap has to be spilled to disk to release the 
memory for other operators, such as Sort, Join, etc. In addition, it has to be 
converted to Iterator, so that it can be concatenated with remaining items in 
the original iterator that is used to build the hash table.

Meta Data Population: Necessary metadata, such as sorting keys, jointype, etc,  
has to be populated, so that they are used for potential Sort and MergeJoin 
operator.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to