GitHub user tejasapatil opened a pull request:

    https://github.com/apache/spark/pull/16909

    [SPARK-13450] Introduce UnsafeRowExternalArray. Change SortMergeJoin and 
WindowExec to use it

    ## What issue does this PR address ?
    
    Jira: https://issues.apache.org/jira/browse/SPARK-13450
    
    In `SortMergeJoinExec`, rows of the right relation having the same value 
for a join key are buffered in-memory. In case of skew, this causes OOMs (see 
comments in SPARK-13450 for more details). Heap dump from a failed job confirms 
this : 
https://issues.apache.org/jira/secure/attachment/12846382/heap-dump-analysis.png
 . While its possible to increase the heap size to workaround, Spark should be 
resilient to such issues as skews can happen arbitrarily.
    
    ## Change proposed in this pull request
    
    - Introduces `ExternalAppendOnlyUnsafeRowArray` 
      - It holds `UnsafeRow`s in-memory upto a certain threshold. 
      - After the threshold is hit, it switches to `UnsafeExternalSorter` which 
enables spilling of the rows to disk. It does NOT sort the data.
      - Allows iterating the array multiple times. However, any alteration to 
the array (using `add` or `clear`) will invalidate the existing iterator(s)
    - `WindowExec` was already using `UnsafeExternalSorter` to support 
spilling. Changed it to use the new array
    - Changed `SortMergeJoinExec` to use the new array implementation
      - NOTE: I have not changed FULL OUTER JOIN to use this new array 
implementation. Changing that will need more surgery and I will rather put up a 
separate PR for that once this gets in.
    
    Note for reviewers: The diff can be divided into 3 (or more) parts. My 
motive behind having all the changes in a single PR was to demonstrate that the 
API is sane and supports 2 use cases. If reviewing the whole thing as 3 
separate PRs would help, I am happy to make the spilt.
    
    ## How was this patch tested ?
    
    #### Unit testing
    - Added unit tests `ExternalAppendOnlyUnsafeRowArray` to validate all its 
APIs and access patterns
    - Added unit test for `SortMergeExec`
     - with and without spill for inner join, left outer join, right outer join 
to confirm that the spill threshold config behaves as expected and output is as 
expected.
     - This PR touches the scanning logic in `SortMergeExec` for _all_ joins 
(except FULL OUTER JOIN). However, I expect existing test cases to cover that 
there is no regression in correctness.
    - Added unit test for `WindowExec` to check behavior of spilling and 
correctness of results.
    
    #### Stress testing
    - Confirmed that OOM is gone by running against a production job which used 
to OOM
    - Since I cannot share details about prod workload externally, created 
synthetic data to mimic to issue. Ran before and after the fix to demonstrate 
the issue and query success with this PR
    
    Generating the synthetic data
    
    ```
    ./bin/spark-shell --driver-memory=6G
    
    import org.apache.spark.sql._
    val hc = SparkSession.builder.master("local").getOrCreate()
    
    hc.sql("DROP TABLE IF EXISTS spark_13450_large_table").collect
    hc.sql("DROP TABLE IF EXISTS spark_13450_one_row_table").collect
    
    val df1 = (0 until 1).map(i => ("10", "100", i.toString, (i * 
2).toString)).toDF("i", "j", "str1", "str2")
    
df1.write.format("org.apache.spark.sql.hive.orc.OrcFileFormat").bucketBy(100, 
"i", "j").sortBy("i", "j").saveAsTable("spark_13450_one_row_table")
    
    val df2 = (0 until 3000000).map(i => ("10", "100", i.toString, (i * 
2).toString)).toDF("i", "j", "str1", "str2")
    
df2.write.format("org.apache.spark.sql.hive.orc.OrcFileFormat").bucketBy(100, 
"i", "j").sortBy("i", "j").saveAsTable("spark_13450_large_table")
    ```
    
    Ran this against trunk VS local build with this PR. OOM repros with trunk 
and with the fix this query runs fine.
    
    ```
    ./bin/spark-shell --driver-java-options="-XX:+HeapDumpOnOutOfMemoryError 
-XX:HeapDumpPath=/tmp/spark.driver.heapdump.hprof"
    
    import org.apache.spark.sql._
    val hc = SparkSession.builder.master("local").getOrCreate()
    hc.sql("SET spark.sql.autoBroadcastJoinThreshold=1")
    hc.sql("SET spark.sql.sortMergeJoinExec.buffer.spill.threshold=10000")
    
    hc.sql("DROP TABLE IF EXISTS spark_13450_result").collect
    hc.sql("""
      CREATE TABLE spark_13450_result
      AS
      SELECT
        a.i AS a_i, a.j AS a_j, a.str1 AS a_str1, a.str2 AS a_str2,
        b.i AS b_i, b.j AS b_j, b.str1 AS b_str1, b.str2 AS b_str2
      FROM
        spark_13450_one_row_table a 
      JOIN
        spark_13450_large_table b 
      ON
        a.i=b.i AND
        a.j=b.j
    """)
    ```


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tejasapatil/spark SPARK-13450_smb_buffer_oom

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/16909.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #16909
    
----
commit e9cdd30252bce12d34f52cc31f95adb271ef2209
Author: Tejas Patil <tej...@fb.com>
Date:   2017-02-04T02:14:33Z

    [SPARK-13450] Introduce UnsafeRowExternalArray. Make SortMergeJoin and 
WindowExec use it

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to