This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push: new 2fa68a6 [SPARK-32900][CORE] Allow UnsafeExternalSorter to spill when there are nulls 2fa68a6 is described below commit 2fa68a669cc83521c7257d844202790933ae9771 Author: Tom van Bussel <tom.vanbus...@databricks.com> AuthorDate: Thu Sep 17 12:35:40 2020 +0200 [SPARK-32900][CORE] Allow UnsafeExternalSorter to spill when there are nulls ### What changes were proposed in this pull request? This PR changes the way `UnsafeExternalSorter.SpillableIterator` checks whether it has spilled already, by checking whether `inMemSorter` is null. It also allows it to spill other `UnsafeSorterIterator`s than `UnsafeInMemorySorter.SortedIterator`. ### Why are the changes needed? Before this PR `UnsafeExternalSorter.SpillableIterator` could not spill when there are NULLs in the input and radix sorting is used. Currently, Spark determines whether UnsafeExternalSorter.SpillableIterator has not spilled yet by checking whether `upstream` is an instance of `UnsafeInMemorySorter.SortedIterator`. When radix sorting is used and there are NULLs in the input however, `upstream` will be an instance of `UnsafeExternalSorter.ChainedIterator` instead, and Spark will assume [...] ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? A test was added to `UnsafeExternalSorterSuite` (and therefore also to `UnsafeExternalSorterRadixSortSuite`). I manually confirmed that the test failed in `UnsafeExternalSorterRadixSortSuite` without this patch. Closes #29772 from tomvanbussel/SPARK-32900. Authored-by: Tom van Bussel <tom.vanbus...@databricks.com> Signed-off-by: herman <her...@databricks.com> (cherry picked from commit e5e54a3614ffd2a9150921e84e5b813d5cbf285a) Signed-off-by: herman <her...@databricks.com> --- .../unsafe/sort/UnsafeExternalSorter.java | 69 +++++++++++++--------- .../unsafe/sort/UnsafeInMemorySorter.java | 1 + .../unsafe/sort/UnsafeSorterIterator.java | 2 + .../unsafe/sort/UnsafeSorterSpillMerger.java | 5 ++ .../unsafe/sort/UnsafeSorterSpillReader.java | 5 ++ .../unsafe/sort/UnsafeExternalSorterSuite.java | 33 +++++++++++ 6 files changed, 88 insertions(+), 27 deletions(-) diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java index a6a2076..f720ccd 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java @@ -505,11 +505,15 @@ public final class UnsafeExternalSorter extends MemoryConsumer { */ class SpillableIterator extends UnsafeSorterIterator { private UnsafeSorterIterator upstream; - private UnsafeSorterIterator nextUpstream = null; private MemoryBlock lastPage = null; private boolean loaded = false; private int numRecords = 0; + private Object currentBaseObject; + private long currentBaseOffset; + private int currentRecordLength; + private long currentKeyPrefix; + SpillableIterator(UnsafeSorterIterator inMemIterator) { this.upstream = inMemIterator; this.numRecords = inMemIterator.getNumRecords(); @@ -520,23 +524,26 @@ public final class UnsafeExternalSorter extends MemoryConsumer { return numRecords; } + @Override + public long getCurrentPageNumber() { + throw new UnsupportedOperationException(); + } + public long spill() throws IOException { synchronized (this) { - if (!(upstream instanceof UnsafeInMemorySorter.SortedIterator && nextUpstream == null - && numRecords > 0)) { + if (inMemSorter == null || numRecords <= 0) { return 0L; } - UnsafeInMemorySorter.SortedIterator inMemIterator = - ((UnsafeInMemorySorter.SortedIterator) upstream).clone(); + long currentPageNumber = upstream.getCurrentPageNumber(); - ShuffleWriteMetrics writeMetrics = new ShuffleWriteMetrics(); + ShuffleWriteMetrics writeMetrics = new ShuffleWriteMetrics(); // Iterate over the records that have not been returned and spill them. final UnsafeSorterSpillWriter spillWriter = new UnsafeSorterSpillWriter(blockManager, fileBufferSizeBytes, writeMetrics, numRecords); - spillIterator(inMemIterator, spillWriter); + spillIterator(upstream, spillWriter); spillWriters.add(spillWriter); - nextUpstream = spillWriter.getReader(serializerManager); + upstream = spillWriter.getReader(serializerManager); long released = 0L; synchronized (UnsafeExternalSorter.this) { @@ -544,8 +551,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer { // is accessing the current record. We free this page in that caller's next loadNext() // call. for (MemoryBlock page : allocatedPages) { - if (!loaded || page.pageNumber != - ((UnsafeInMemorySorter.SortedIterator)upstream).getCurrentPageNumber()) { + if (!loaded || page.pageNumber != currentPageNumber) { released += page.size(); freePage(page); } else { @@ -579,22 +585,26 @@ public final class UnsafeExternalSorter extends MemoryConsumer { try { synchronized (this) { loaded = true; - if (nextUpstream != null) { - // Just consumed the last record from in memory iterator - if(lastPage != null) { - // Do not free the page here, while we are locking `SpillableIterator`. The `freePage` - // method locks the `TaskMemoryManager`, and it's a bad idea to lock 2 objects in - // sequence. We may hit dead lock if another thread locks `TaskMemoryManager` and - // `SpillableIterator` in sequence, which may happen in - // `TaskMemoryManager.acquireExecutionMemory`. - pageToFree = lastPage; - lastPage = null; - } - upstream = nextUpstream; - nextUpstream = null; + // Just consumed the last record from in memory iterator + if (lastPage != null) { + // Do not free the page here, while we are locking `SpillableIterator`. The `freePage` + // method locks the `TaskMemoryManager`, and it's a bad idea to lock 2 objects in + // sequence. We may hit dead lock if another thread locks `TaskMemoryManager` and + // `SpillableIterator` in sequence, which may happen in + // `TaskMemoryManager.acquireExecutionMemory`. + pageToFree = lastPage; + lastPage = null; } numRecords--; upstream.loadNext(); + + // Keep track of the current base object, base offset, record length, and key prefix, + // so that the current record can still be read in case a spill is triggered and we + // switch to the spill writer's iterator. + currentBaseObject = upstream.getBaseObject(); + currentBaseOffset = upstream.getBaseOffset(); + currentRecordLength = upstream.getRecordLength(); + currentKeyPrefix = upstream.getKeyPrefix(); } } finally { if (pageToFree != null) { @@ -605,22 +615,22 @@ public final class UnsafeExternalSorter extends MemoryConsumer { @Override public Object getBaseObject() { - return upstream.getBaseObject(); + return currentBaseObject; } @Override public long getBaseOffset() { - return upstream.getBaseOffset(); + return currentBaseOffset; } @Override public int getRecordLength() { - return upstream.getRecordLength(); + return currentRecordLength; } @Override public long getKeyPrefix() { - return upstream.getKeyPrefix(); + return currentKeyPrefix; } } @@ -698,6 +708,11 @@ public final class UnsafeExternalSorter extends MemoryConsumer { } @Override + public long getCurrentPageNumber() { + return current.getCurrentPageNumber(); + } + + @Override public boolean hasNext() { while (!current.hasNext() && !iterators.isEmpty()) { current = iterators.remove(); diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java index aedc7ec..9aaa370 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java @@ -328,6 +328,7 @@ public final class UnsafeInMemorySorter { @Override public long getBaseOffset() { return baseOffset; } + @Override public long getCurrentPageNumber() { return currentPageNumber; } diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterIterator.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterIterator.java index 1b3167f..d9f2231 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterIterator.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterIterator.java @@ -34,4 +34,6 @@ public abstract class UnsafeSorterIterator { public abstract long getKeyPrefix(); public abstract int getNumRecords(); + + public abstract long getCurrentPageNumber(); } diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillMerger.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillMerger.java index ab80028..f8603c5 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillMerger.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillMerger.java @@ -71,6 +71,11 @@ final class UnsafeSorterSpillMerger { } @Override + public long getCurrentPageNumber() { + throw new UnsupportedOperationException(); + } + + @Override public boolean hasNext() { return !priorityQueue.isEmpty() || (spillReader != null && spillReader.hasNext()); } diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java index bfca670..84907bd 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java @@ -97,6 +97,11 @@ public final class UnsafeSorterSpillReader extends UnsafeSorterIterator implemen } @Override + public long getCurrentPageNumber() { + throw new UnsupportedOperationException(); + } + + @Override public boolean hasNext() { return (numRecordsRemaining > 0); } diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java index 411cd5c..41813dd 100644 --- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java +++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java @@ -355,6 +355,39 @@ public class UnsafeExternalSorterSuite { } @Test + public void forcedSpillingNullsWithReadIterator() throws Exception { + final UnsafeExternalSorter sorter = newSorter(); + long[] record = new long[100]; + final int recordSize = record.length * 8; + final int n = (int) pageSizeBytes / recordSize * 3; + for (int i = 0; i < n; i++) { + boolean isNull = i % 2 == 0; + sorter.insertRecord(record, Platform.LONG_ARRAY_OFFSET, recordSize, 0, isNull); + } + assertTrue(sorter.getNumberOfAllocatedPages() >= 2); + + UnsafeExternalSorter.SpillableIterator iter = + (UnsafeExternalSorter.SpillableIterator) sorter.getSortedIterator(); + final int numRecordsToReadBeforeSpilling = n / 3; + for (int i = 0; i < numRecordsToReadBeforeSpilling; i++) { + assertTrue(iter.hasNext()); + iter.loadNext(); + } + + assertTrue(iter.spill() > 0); + assertEquals(0, iter.spill()); + + for (int i = numRecordsToReadBeforeSpilling; i < n; i++) { + assertTrue(iter.hasNext()); + iter.loadNext(); + } + assertFalse(iter.hasNext()); + + sorter.cleanupResources(); + assertSpillFilesWereCleanedUp(); + } + + @Test public void forcedSpillingWithNotReadIterator() throws Exception { final UnsafeExternalSorter sorter = newSorter(); long[] record = new long[100]; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org