This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new 2fa68a6  [SPARK-32900][CORE] Allow UnsafeExternalSorter to spill when 
there are nulls
2fa68a6 is described below

commit 2fa68a669cc83521c7257d844202790933ae9771
Author: Tom van Bussel <tom.vanbus...@databricks.com>
AuthorDate: Thu Sep 17 12:35:40 2020 +0200

    [SPARK-32900][CORE] Allow UnsafeExternalSorter to spill when there are nulls
    
    ### What changes were proposed in this pull request?
    
    This PR changes the way `UnsafeExternalSorter.SpillableIterator` checks 
whether it has spilled already, by checking whether `inMemSorter` is null. It 
also allows it to spill other `UnsafeSorterIterator`s than 
`UnsafeInMemorySorter.SortedIterator`.
    
    ### Why are the changes needed?
    
    Before this PR `UnsafeExternalSorter.SpillableIterator` could not spill 
when there are NULLs in the input and radix sorting is used. Currently, Spark 
determines whether UnsafeExternalSorter.SpillableIterator has not spilled yet 
by checking whether `upstream` is an instance of 
`UnsafeInMemorySorter.SortedIterator`. When radix sorting is used and there are 
NULLs in the input however, `upstream` will be an instance of 
`UnsafeExternalSorter.ChainedIterator` instead, and Spark will assume  [...]
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    A test was added to `UnsafeExternalSorterSuite` (and therefore also to 
`UnsafeExternalSorterRadixSortSuite`). I manually confirmed that the test 
failed in `UnsafeExternalSorterRadixSortSuite` without this patch.
    
    Closes #29772 from tomvanbussel/SPARK-32900.
    
    Authored-by: Tom van Bussel <tom.vanbus...@databricks.com>
    Signed-off-by: herman <her...@databricks.com>
    (cherry picked from commit e5e54a3614ffd2a9150921e84e5b813d5cbf285a)
    Signed-off-by: herman <her...@databricks.com>
---
 .../unsafe/sort/UnsafeExternalSorter.java          | 69 +++++++++++++---------
 .../unsafe/sort/UnsafeInMemorySorter.java          |  1 +
 .../unsafe/sort/UnsafeSorterIterator.java          |  2 +
 .../unsafe/sort/UnsafeSorterSpillMerger.java       |  5 ++
 .../unsafe/sort/UnsafeSorterSpillReader.java       |  5 ++
 .../unsafe/sort/UnsafeExternalSorterSuite.java     | 33 +++++++++++
 6 files changed, 88 insertions(+), 27 deletions(-)

diff --git 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index a6a2076..f720ccd 100644
--- 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -505,11 +505,15 @@ public final class UnsafeExternalSorter extends 
MemoryConsumer {
    */
   class SpillableIterator extends UnsafeSorterIterator {
     private UnsafeSorterIterator upstream;
-    private UnsafeSorterIterator nextUpstream = null;
     private MemoryBlock lastPage = null;
     private boolean loaded = false;
     private int numRecords = 0;
 
+    private Object currentBaseObject;
+    private long currentBaseOffset;
+    private int currentRecordLength;
+    private long currentKeyPrefix;
+
     SpillableIterator(UnsafeSorterIterator inMemIterator) {
       this.upstream = inMemIterator;
       this.numRecords = inMemIterator.getNumRecords();
@@ -520,23 +524,26 @@ public final class UnsafeExternalSorter extends 
MemoryConsumer {
       return numRecords;
     }
 
+    @Override
+    public long getCurrentPageNumber() {
+      throw new UnsupportedOperationException();
+    }
+
     public long spill() throws IOException {
       synchronized (this) {
-        if (!(upstream instanceof UnsafeInMemorySorter.SortedIterator && 
nextUpstream == null
-          && numRecords > 0)) {
+        if (inMemSorter == null || numRecords <= 0) {
           return 0L;
         }
 
-        UnsafeInMemorySorter.SortedIterator inMemIterator =
-          ((UnsafeInMemorySorter.SortedIterator) upstream).clone();
+        long currentPageNumber = upstream.getCurrentPageNumber();
 
-       ShuffleWriteMetrics writeMetrics = new ShuffleWriteMetrics();
+        ShuffleWriteMetrics writeMetrics = new ShuffleWriteMetrics();
         // Iterate over the records that have not been returned and spill them.
         final UnsafeSorterSpillWriter spillWriter =
           new UnsafeSorterSpillWriter(blockManager, fileBufferSizeBytes, 
writeMetrics, numRecords);
-        spillIterator(inMemIterator, spillWriter);
+        spillIterator(upstream, spillWriter);
         spillWriters.add(spillWriter);
-        nextUpstream = spillWriter.getReader(serializerManager);
+        upstream = spillWriter.getReader(serializerManager);
 
         long released = 0L;
         synchronized (UnsafeExternalSorter.this) {
@@ -544,8 +551,7 @@ public final class UnsafeExternalSorter extends 
MemoryConsumer {
           // is accessing the current record. We free this page in that 
caller's next loadNext()
           // call.
           for (MemoryBlock page : allocatedPages) {
-            if (!loaded || page.pageNumber !=
-                    
((UnsafeInMemorySorter.SortedIterator)upstream).getCurrentPageNumber()) {
+            if (!loaded || page.pageNumber != currentPageNumber) {
               released += page.size();
               freePage(page);
             } else {
@@ -579,22 +585,26 @@ public final class UnsafeExternalSorter extends 
MemoryConsumer {
       try {
         synchronized (this) {
           loaded = true;
-          if (nextUpstream != null) {
-            // Just consumed the last record from in memory iterator
-            if(lastPage != null) {
-              // Do not free the page here, while we are locking 
`SpillableIterator`. The `freePage`
-              // method locks the `TaskMemoryManager`, and it's a bad idea to 
lock 2 objects in
-              // sequence. We may hit dead lock if another thread locks 
`TaskMemoryManager` and
-              // `SpillableIterator` in sequence, which may happen in
-              // `TaskMemoryManager.acquireExecutionMemory`.
-              pageToFree = lastPage;
-              lastPage = null;
-            }
-            upstream = nextUpstream;
-            nextUpstream = null;
+          // Just consumed the last record from in memory iterator
+          if (lastPage != null) {
+            // Do not free the page here, while we are locking 
`SpillableIterator`. The `freePage`
+            // method locks the `TaskMemoryManager`, and it's a bad idea to 
lock 2 objects in
+            // sequence. We may hit dead lock if another thread locks 
`TaskMemoryManager` and
+            // `SpillableIterator` in sequence, which may happen in
+            // `TaskMemoryManager.acquireExecutionMemory`.
+            pageToFree = lastPage;
+            lastPage = null;
           }
           numRecords--;
           upstream.loadNext();
+
+          // Keep track of the current base object, base offset, record 
length, and key prefix,
+          // so that the current record can still be read in case a spill is 
triggered and we
+          // switch to the spill writer's iterator.
+          currentBaseObject = upstream.getBaseObject();
+          currentBaseOffset = upstream.getBaseOffset();
+          currentRecordLength = upstream.getRecordLength();
+          currentKeyPrefix = upstream.getKeyPrefix();
         }
       } finally {
         if (pageToFree != null) {
@@ -605,22 +615,22 @@ public final class UnsafeExternalSorter extends 
MemoryConsumer {
 
     @Override
     public Object getBaseObject() {
-      return upstream.getBaseObject();
+      return currentBaseObject;
     }
 
     @Override
     public long getBaseOffset() {
-      return upstream.getBaseOffset();
+      return currentBaseOffset;
     }
 
     @Override
     public int getRecordLength() {
-      return upstream.getRecordLength();
+      return currentRecordLength;
     }
 
     @Override
     public long getKeyPrefix() {
-      return upstream.getKeyPrefix();
+      return currentKeyPrefix;
     }
   }
 
@@ -698,6 +708,11 @@ public final class UnsafeExternalSorter extends 
MemoryConsumer {
     }
 
     @Override
+    public long getCurrentPageNumber() {
+      return current.getCurrentPageNumber();
+    }
+
+    @Override
     public boolean hasNext() {
       while (!current.hasNext() && !iterators.isEmpty()) {
         current = iterators.remove();
diff --git 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
index aedc7ec..9aaa370 100644
--- 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
+++ 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
@@ -328,6 +328,7 @@ public final class UnsafeInMemorySorter {
     @Override
     public long getBaseOffset() { return baseOffset; }
 
+    @Override
     public long getCurrentPageNumber() {
       return currentPageNumber;
     }
diff --git 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterIterator.java
 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterIterator.java
index 1b3167f..d9f2231 100644
--- 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterIterator.java
+++ 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterIterator.java
@@ -34,4 +34,6 @@ public abstract class UnsafeSorterIterator {
   public abstract long getKeyPrefix();
 
   public abstract int getNumRecords();
+
+  public abstract long getCurrentPageNumber();
 }
diff --git 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillMerger.java
 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillMerger.java
index ab80028..f8603c5 100644
--- 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillMerger.java
+++ 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillMerger.java
@@ -71,6 +71,11 @@ final class UnsafeSorterSpillMerger {
       }
 
       @Override
+      public long getCurrentPageNumber() {
+        throw new UnsupportedOperationException();
+      }
+
+      @Override
       public boolean hasNext() {
         return !priorityQueue.isEmpty() || (spillReader != null && 
spillReader.hasNext());
       }
diff --git 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
index bfca670..84907bd 100644
--- 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
+++ 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
@@ -97,6 +97,11 @@ public final class UnsafeSorterSpillReader extends 
UnsafeSorterIterator implemen
   }
 
   @Override
+  public long getCurrentPageNumber() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
   public boolean hasNext() {
     return (numRecordsRemaining > 0);
   }
diff --git 
a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
 
b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
index 411cd5c..41813dd 100644
--- 
a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
+++ 
b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
@@ -355,6 +355,39 @@ public class UnsafeExternalSorterSuite {
   }
 
   @Test
+  public void forcedSpillingNullsWithReadIterator() throws Exception {
+    final UnsafeExternalSorter sorter = newSorter();
+    long[] record = new long[100];
+    final int recordSize = record.length * 8;
+    final int n = (int) pageSizeBytes / recordSize * 3;
+    for (int i = 0; i < n; i++) {
+      boolean isNull = i % 2 == 0;
+      sorter.insertRecord(record, Platform.LONG_ARRAY_OFFSET, recordSize, 0, 
isNull);
+    }
+    assertTrue(sorter.getNumberOfAllocatedPages() >= 2);
+
+    UnsafeExternalSorter.SpillableIterator iter =
+            (UnsafeExternalSorter.SpillableIterator) 
sorter.getSortedIterator();
+    final int numRecordsToReadBeforeSpilling = n / 3;
+    for (int i = 0; i < numRecordsToReadBeforeSpilling; i++) {
+      assertTrue(iter.hasNext());
+      iter.loadNext();
+    }
+
+    assertTrue(iter.spill() > 0);
+    assertEquals(0, iter.spill());
+
+    for (int i = numRecordsToReadBeforeSpilling; i < n; i++) {
+      assertTrue(iter.hasNext());
+      iter.loadNext();
+    }
+    assertFalse(iter.hasNext());
+
+    sorter.cleanupResources();
+    assertSpillFilesWereCleanedUp();
+  }
+
+  @Test
   public void forcedSpillingWithNotReadIterator() throws Exception {
     final UnsafeExternalSorter sorter = newSorter();
     long[] record = new long[100];


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to