Repository: spark Updated Branches: refs/heads/master 833eab2c9 -> 97a1aa2c7
[SPARK-21315][SQL] Skip some spill files when generateIterator(startIndex) in ExternalAppendOnlyUnsafeRowArray. ## What changes were proposed in this pull request? In current code, it is expensive to use `UnboundedFollowingWindowFunctionFrame`, because it is iterating from the start to lower bound every time calling `write` method. When traverse the iterator, it's possible to skip some spilled files thus to save some time. ## How was this patch tested? Added unit test Did a small test for benchmark: Put 2000200 rows into `UnsafeExternalSorter`-- 2 spill files(each contains 1000000 rows) and inMemSorter contains 200 rows. Move the iterator forward to index=2000001. *With this change*: `getIterator(2000001)`, it will cost almost 0ms~1ms; *Without this change*: `for(int i=0; i<2000001; i++)geIterator().loadNext()`, it will cost 300ms. Author: jinxing <jinxing6...@126.com> Closes #18541 from jinxing64/SPARK-21315. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/97a1aa2c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/97a1aa2c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/97a1aa2c Branch: refs/heads/master Commit: 97a1aa2c70b1bf726d5f572789e150d168ac61e5 Parents: 833eab2 Author: jinxing <jinxing6...@126.com> Authored: Tue Jul 11 11:47:47 2017 +0800 Committer: Wenchen Fan <wenc...@databricks.com> Committed: Tue Jul 11 11:47:47 2017 +0800 ---------------------------------------------------------------------- .../unsafe/sort/UnsafeExternalSorter.java | 35 +++++++++++++++++--- .../unsafe/sort/UnsafeSorterSpillWriter.java | 4 +++ .../unsafe/sort/UnsafeExternalSorterSuite.java | 34 ++++++++++++++++++- .../ExternalAppendOnlyUnsafeRowArray.scala | 22 ++---------- ...ernalAppendOnlyUnsafeRowArrayBenchmark.scala | 2 +- 5 files changed, 70 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/97a1aa2c/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java index 82d03e3..a6e858c 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java @@ -589,29 +589,54 @@ public final class UnsafeExternalSorter extends MemoryConsumer { } /** - * Returns a iterator, which will return the rows in the order as inserted. + * Returns an iterator starts from startIndex, which will return the rows in the order as + * inserted. * * It is the caller's responsibility to call `cleanupResources()` * after consuming this iterator. * * TODO: support forced spilling */ - public UnsafeSorterIterator getIterator() throws IOException { + public UnsafeSorterIterator getIterator(int startIndex) throws IOException { if (spillWriters.isEmpty()) { assert(inMemSorter != null); - return inMemSorter.getSortedIterator(); + UnsafeSorterIterator iter = inMemSorter.getSortedIterator(); + moveOver(iter, startIndex); + return iter; } else { LinkedList<UnsafeSorterIterator> queue = new LinkedList<>(); + int i = 0; for (UnsafeSorterSpillWriter spillWriter : spillWriters) { - queue.add(spillWriter.getReader(serializerManager)); + if (i + spillWriter.recordsSpilled() > startIndex) { + UnsafeSorterIterator iter = spillWriter.getReader(serializerManager); + moveOver(iter, startIndex - i); + queue.add(iter); + } + i += spillWriter.recordsSpilled(); } if (inMemSorter != null) { - queue.add(inMemSorter.getSortedIterator()); + UnsafeSorterIterator iter = inMemSorter.getSortedIterator(); + moveOver(iter, startIndex - i); + queue.add(iter); } return new ChainedIterator(queue); } } + private void moveOver(UnsafeSorterIterator iter, int steps) + throws IOException { + if (steps > 0) { + for (int i = 0; i < steps; i++) { + if (iter.hasNext()) { + iter.loadNext(); + } else { + throw new ArrayIndexOutOfBoundsException("Failed to move the iterator " + steps + + " steps forward"); + } + } + } + } + /** * Chain multiple UnsafeSorterIterator together as single one. */ http://git-wip-us.apache.org/repos/asf/spark/blob/97a1aa2c/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillWriter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillWriter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillWriter.java index f9b5493..850f247 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillWriter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillWriter.java @@ -155,4 +155,8 @@ public final class UnsafeSorterSpillWriter { public UnsafeSorterSpillReader getReader(SerializerManager serializerManager) throws IOException { return new UnsafeSorterSpillReader(serializerManager, file, blockId); } + + public int recordsSpilled() { + return numRecordsSpilled; + } } http://git-wip-us.apache.org/repos/asf/spark/blob/97a1aa2c/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java index d31d7c1..cd5db1a 100644 --- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java +++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java @@ -395,7 +395,7 @@ public class UnsafeExternalSorterSuite { sorter.spill(); } } - UnsafeSorterIterator iter = sorter.getIterator(); + UnsafeSorterIterator iter = sorter.getIterator(0); for (int i = 0; i < n; i++) { iter.hasNext(); iter.loadNext(); @@ -479,5 +479,37 @@ public class UnsafeExternalSorterSuite { } } + @Test + public void testGetIterator() throws Exception { + final UnsafeExternalSorter sorter = newSorter(); + for (int i = 0; i < 100; i++) { + insertNumber(sorter, i); + } + verifyIntIterator(sorter.getIterator(0), 0, 100); + verifyIntIterator(sorter.getIterator(79), 79, 100); + + sorter.spill(); + for (int i = 100; i < 200; i++) { + insertNumber(sorter, i); + } + sorter.spill(); + verifyIntIterator(sorter.getIterator(79), 79, 200); + + for (int i = 200; i < 300; i++) { + insertNumber(sorter, i); + } + verifyIntIterator(sorter.getIterator(79), 79, 300); + verifyIntIterator(sorter.getIterator(139), 139, 300); + verifyIntIterator(sorter.getIterator(279), 279, 300); + } + + private void verifyIntIterator(UnsafeSorterIterator iter, int start, int end) + throws IOException { + for (int i = start; i < end; i++) { + assert (iter.hasNext()); + iter.loadNext(); + assert (Platform.getInt(iter.getBaseObject(), iter.getBaseOffset()) == i); + } + } } http://git-wip-us.apache.org/repos/asf/spark/blob/97a1aa2c/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala index 458ac4b..c4d3834 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala @@ -166,7 +166,7 @@ private[sql] class ExternalAppendOnlyUnsafeRowArray( if (spillableArray == null) { new InMemoryBufferIterator(startIndex) } else { - new SpillableArrayIterator(spillableArray.getIterator, numFieldsPerRow, startIndex) + new SpillableArrayIterator(spillableArray.getIterator(startIndex), numFieldsPerRow) } } @@ -204,29 +204,11 @@ private[sql] class ExternalAppendOnlyUnsafeRowArray( private[this] class SpillableArrayIterator( iterator: UnsafeSorterIterator, - numFieldPerRow: Int, - startIndex: Int) + numFieldPerRow: Int) extends ExternalAppendOnlyUnsafeRowArrayIterator { private val currentRow = new UnsafeRow(numFieldPerRow) - def init(): Unit = { - var i = 0 - while (i < startIndex) { - if (iterator.hasNext) { - iterator.loadNext() - } else { - throw new ArrayIndexOutOfBoundsException( - "Invalid `startIndex` provided for generating iterator over the array. " + - s"Total elements: $numRows, requested `startIndex`: $startIndex") - } - i += 1 - } - } - - // Traverse upto the given [[startIndex]] - init() - override def hasNext(): Boolean = !isModified() && iterator.hasNext override def next(): UnsafeRow = { http://git-wip-us.apache.org/repos/asf/spark/blob/97a1aa2c/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala index 00c5f25..031ac38 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala @@ -130,7 +130,7 @@ object ExternalAppendOnlyUnsafeRowArrayBenchmark { false)) val unsafeRow = new UnsafeRow(1) - val iter = array.getIterator + val iter = array.getIterator(0) while (iter.hasNext) { iter.loadNext() unsafeRow.pointTo(iter.getBaseObject, iter.getBaseOffset, iter.getRecordLength) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org