[ https://issues.apache.org/jira/browse/SPARK-30536?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
shanyu zhao updated SPARK-30536: -------------------------------- Attachment: spark-30536-explained.pdf > Sort-merge join operator spilling performance improvements > ---------------------------------------------------------- > > Key: SPARK-30536 > URL: https://issues.apache.org/jira/browse/SPARK-30536 > Project: Spark > Issue Type: Improvement > Components: Spark Core, SQL > Affects Versions: 3.0.0 > Reporter: Sinisa Knezevic > Priority: Major > Attachments: spark-30536-explained.pdf > > > Testing with TPC-DS 100 TB benchmark data set showed that some of SQLs > (example query 14) are not able to run even with extremely large Spark > executor memory.Spark spilling feature has to be enabled, in order to be able > to process these SQLs. Processing of SQLs becomes extremely slow when > spilling is enabled.The Spark spilling feature is enabled via two parameters: > “spark.sql.sortMergeJoinExec.buffer.in.memory.threshold” and > “spark.sql.sortMergeJoinExec.buffer.spill.threshold” > “spark.sql.sortMergeJoinExec.buffer.in.memory.threshold” – when this > threshold is reached, the data will be moved from > ExternalAppendOnlyUnsafeRowArrey object into UnsafeExternalSorter object. > “spark.sql.sortMergeJoinExec.buffer.spill.threshold” – when this threshold is > reached, the data will be spilled from UnsafeExternalSorter object onto the > disk. > > During execution of sort-merge join (Left Semi Join ) for each left join row > “right matches” are found and stored into ExternalAppendOnlyUnsafeRowArrey > object.In the case of Query 14 there are millions of rows of “right matches”. > To run this query spilling is enabled and data is moved from > ExternalAppendOnlyUnsafeRowArrey into UnsafeExternalSorter and then spilled > onto the disk.When million rows are processed on left side of the join, the > iterator on top of spilled “right matches” rows is created each time. This > means that millions of time iterator on top of right matches (that are > spilled on the disk) is created.The current Spark implementation creates > iterator on top of spilled rows and producing I/0 which results into millions > of I/0 when million rows are processed. > > To avoid the performance bottleneck this JIRA introducing following solution: > 1. Implement lazy initialization of UnsafeSorterSpillReader - iterator on top > of spilled rows: > … During SortMergeJoin (Left Semi Join) execution, the iterator on the > spill data is created but no iteration over the data is done. > ... Having lazy initialization of UnsafeSorterSpillReader will enable > efficient processing of SortMergeJoin even if data is spilled onto disk. > Unnecessary I/O will be avoided. > 2. Decrease initial memory read buffer size in UnsafeSorterSpillReader from > 1MB to 1KB: > … UnsafeSorterSpillReader constructor takes lot of time due to size of > default 1MB memory read buffer. > … The code already has logic to increase the memory read buffer if it > cannot fit the data, so decreasing the size to 1K is safe and has positive > performance impact. > 3. Improve memory utilization when spilling is enabled in > ExternalAppendOnlyUnsafeRowArrey: > … In the current implementation, when spilling is enabled, > UnsafeExternalSorter object is created and then data moved from > ExternalAppendOnlyUnsafeRowArrey object into UnsafeExternalSorter and then > ExternalAppendOnlyUnsafeRowArrey object is emptied. Just before > ExternalAppendOnlyUnsafeRowArrey object is emptied there are both objects in > the memory with the same data. That require double memory and there is > duplication of data. This can be avoided. > … In the proposed solution, when > spark.sql.sortMergeJoinExec.buffer.in.memory.threshold is reached adding new > rows into ExternalAppendOnlyUnsafeRowArray object stops. UnsafeExternalSorter > object is created and new rows are added into this object. > ExternalAppendOnlyUnsafeRowArray object retains all rows already added into > this object. This approach will enable better memory utilization and avoid > unnecessary movement of data from one object into another. > > The test of this solution with query 14 and enabled spilling on the disk, > showed 500X performance improvements and it didn’t degrade performance of the > other SQLs from TPC-DS benchmark. -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org