siknezevic commented on a change in pull request #27246: URL: https://github.com/apache/spark/pull/27246#discussion_r445299048
########## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala ########## @@ -182,6 +182,47 @@ object ExternalAppendOnlyUnsafeRowArrayBenchmark extends BenchmarkBase { } } + def testAgainstUnsafeSorterSpillReader( + numSpillThreshold: Int, + numRows: Int, + numIterators: Int, + iterations: Int): Unit = { + val rows = testRows(numRows) + val benchmark = new Benchmark(s"Spilling SpillReader with $numRows rows", iterations * numRows, + output = output) + + benchmark.addCase("UnsafeSorterSpillReader_bufferSize1024") { _: Int => + val array = UnsafeExternalSorter.create( + TaskContext.get().taskMemoryManager(), + SparkEnv.get.blockManager, + SparkEnv.get.serializerManager, + TaskContext.get(), + null, + null, + 1024, + SparkEnv.get.memoryManager.pageSizeBytes, + numSpillThreshold, + false) + + rows.foreach(x => + array.insertRecord( + x.getBaseObject, + x.getBaseOffset, + x.getSizeInBytes, + 0, + false)) + + for (_ <- 0L until numIterators) { Review comment: During execution of sort-merge join (Left Semi Join ) for each left join row “right matches” are found and stored into ExternalAppendOnlyUnsafeRowArrey object. ExternalAppendOnlyUnsafeRowArrey object with “right matches” is created when first row on left side of the join is processed and then reused if next rows on the left side of join is the same like previous one. In the case of Queries 14a/14b there are millions of rows of “right matches” which could not fit into memory. To run this query spilling is enabled and “right matches rows” data is moved from ExternalAppendOnlyUnsafeRowArrey into UnsafeExternalSorter and then spilled onto the disk. To perform join operation on left join row, you have to create iterator on top of “right matches rows”. The operation of creation of iterator on top of “right matches” is repeated for each processed row on the left side of the join. When million rows are processed on left side of the join, the iterator on top of spilled “right matches” rows is created each time. This means that millions of time iterator on top of right matches (that are spilled on the disk) is created. The current Spark implementation creates iterator on top of spilled rows and producing I/0 because it reads number of rows stored in the spilled files but iteration action on top of iterator is never done during join operation. Iterator is created, never used and then discarded with each processed join row. This will results into millions of I/0s. One I/0 is 2 or 3 millisecond. Hence this PR which creates lazy iterator ("lazy" constructor for UnsafeSorterSpillReader), so no I/O is done. Also, my micro-benchmark simulates creation of iterators on top of spilled files which contain “right matches”. Sorry for the long explanation. Not sure if I can make it simpler. I hope it clarifies why I created micro-benchmark this way. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org