[spark] branch branch-2.4 updated: [SPARK-32900][CORE] Allow UnsafeExternalSorter to spill when there are nulls

2020-09-17 Thread hvanhovell
This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
 new 2fa68a6  [SPARK-32900][CORE] Allow UnsafeExternalSorter to spill when 
there are nulls
2fa68a6 is described below

commit 2fa68a669cc83521c7257d844202790933ae9771
Author: Tom van Bussel 
AuthorDate: Thu Sep 17 12:35:40 2020 +0200

[SPARK-32900][CORE] Allow UnsafeExternalSorter to spill when there are nulls

### What changes were proposed in this pull request?

This PR changes the way `UnsafeExternalSorter.SpillableIterator` checks 
whether it has spilled already, by checking whether `inMemSorter` is null. It 
also allows it to spill other `UnsafeSorterIterator`s than 
`UnsafeInMemorySorter.SortedIterator`.

### Why are the changes needed?

Before this PR `UnsafeExternalSorter.SpillableIterator` could not spill 
when there are NULLs in the input and radix sorting is used. Currently, Spark 
determines whether UnsafeExternalSorter.SpillableIterator has not spilled yet 
by checking whether `upstream` is an instance of 
`UnsafeInMemorySorter.SortedIterator`. When radix sorting is used and there are 
NULLs in the input however, `upstream` will be an instance of 
`UnsafeExternalSorter.ChainedIterator` instead, and Spark will assume  [...]

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

A test was added to `UnsafeExternalSorterSuite` (and therefore also to 
`UnsafeExternalSorterRadixSortSuite`). I manually confirmed that the test 
failed in `UnsafeExternalSorterRadixSortSuite` without this patch.

Closes #29772 from tomvanbussel/SPARK-32900.

Authored-by: Tom van Bussel 
Signed-off-by: herman 
(cherry picked from commit e5e54a3614ffd2a9150921e84e5b813d5cbf285a)
Signed-off-by: herman 
---
 .../unsafe/sort/UnsafeExternalSorter.java  | 69 +-
 .../unsafe/sort/UnsafeInMemorySorter.java  |  1 +
 .../unsafe/sort/UnsafeSorterIterator.java  |  2 +
 .../unsafe/sort/UnsafeSorterSpillMerger.java   |  5 ++
 .../unsafe/sort/UnsafeSorterSpillReader.java   |  5 ++
 .../unsafe/sort/UnsafeExternalSorterSuite.java | 33 +++
 6 files changed, 88 insertions(+), 27 deletions(-)

diff --git 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index a6a2076..f720ccd 100644
--- 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -505,11 +505,15 @@ public final class UnsafeExternalSorter extends 
MemoryConsumer {
*/
   class SpillableIterator extends UnsafeSorterIterator {
 private UnsafeSorterIterator upstream;
-private UnsafeSorterIterator nextUpstream = null;
 private MemoryBlock lastPage = null;
 private boolean loaded = false;
 private int numRecords = 0;
 
+private Object currentBaseObject;
+private long currentBaseOffset;
+private int currentRecordLength;
+private long currentKeyPrefix;
+
 SpillableIterator(UnsafeSorterIterator inMemIterator) {
   this.upstream = inMemIterator;
   this.numRecords = inMemIterator.getNumRecords();
@@ -520,23 +524,26 @@ public final class UnsafeExternalSorter extends 
MemoryConsumer {
   return numRecords;
 }
 
+@Override
+public long getCurrentPageNumber() {
+  throw new UnsupportedOperationException();
+}
+
 public long spill() throws IOException {
   synchronized (this) {
-if (!(upstream instanceof UnsafeInMemorySorter.SortedIterator && 
nextUpstream == null
-  && numRecords > 0)) {
+if (inMemSorter == null || numRecords <= 0) {
   return 0L;
 }
 
-UnsafeInMemorySorter.SortedIterator inMemIterator =
-  ((UnsafeInMemorySorter.SortedIterator) upstream).clone();
+long currentPageNumber = upstream.getCurrentPageNumber();
 
-   ShuffleWriteMetrics writeMetrics = new ShuffleWriteMetrics();
+ShuffleWriteMetrics writeMetrics = new ShuffleWriteMetrics();
 // Iterate over the records that have not been returned and spill them.
 final UnsafeSorterSpillWriter spillWriter =
   new UnsafeSorterSpillWriter(blockManager, fileBufferSizeBytes, 
writeMetrics, numRecords);
-spillIterator(inMemIterator, spillWriter);
+spillIterator(upstream, spillWriter);
 spillWriters.add(spillWriter);
-nextUpstream = spillWriter.getReader(serializerManager);
+upstream = spillWriter.getReader(serializerManager);
 
 long released = 0L;
 synchronized 

[spark] branch branch-2.4 updated: [SPARK-32900][CORE] Allow UnsafeExternalSorter to spill when there are nulls

2020-09-17 Thread hvanhovell
This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
 new 2fa68a6  [SPARK-32900][CORE] Allow UnsafeExternalSorter to spill when 
there are nulls
2fa68a6 is described below

commit 2fa68a669cc83521c7257d844202790933ae9771
Author: Tom van Bussel 
AuthorDate: Thu Sep 17 12:35:40 2020 +0200

[SPARK-32900][CORE] Allow UnsafeExternalSorter to spill when there are nulls

### What changes were proposed in this pull request?

This PR changes the way `UnsafeExternalSorter.SpillableIterator` checks 
whether it has spilled already, by checking whether `inMemSorter` is null. It 
also allows it to spill other `UnsafeSorterIterator`s than 
`UnsafeInMemorySorter.SortedIterator`.

### Why are the changes needed?

Before this PR `UnsafeExternalSorter.SpillableIterator` could not spill 
when there are NULLs in the input and radix sorting is used. Currently, Spark 
determines whether UnsafeExternalSorter.SpillableIterator has not spilled yet 
by checking whether `upstream` is an instance of 
`UnsafeInMemorySorter.SortedIterator`. When radix sorting is used and there are 
NULLs in the input however, `upstream` will be an instance of 
`UnsafeExternalSorter.ChainedIterator` instead, and Spark will assume  [...]

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

A test was added to `UnsafeExternalSorterSuite` (and therefore also to 
`UnsafeExternalSorterRadixSortSuite`). I manually confirmed that the test 
failed in `UnsafeExternalSorterRadixSortSuite` without this patch.

Closes #29772 from tomvanbussel/SPARK-32900.

Authored-by: Tom van Bussel 
Signed-off-by: herman 
(cherry picked from commit e5e54a3614ffd2a9150921e84e5b813d5cbf285a)
Signed-off-by: herman 
---
 .../unsafe/sort/UnsafeExternalSorter.java  | 69 +-
 .../unsafe/sort/UnsafeInMemorySorter.java  |  1 +
 .../unsafe/sort/UnsafeSorterIterator.java  |  2 +
 .../unsafe/sort/UnsafeSorterSpillMerger.java   |  5 ++
 .../unsafe/sort/UnsafeSorterSpillReader.java   |  5 ++
 .../unsafe/sort/UnsafeExternalSorterSuite.java | 33 +++
 6 files changed, 88 insertions(+), 27 deletions(-)

diff --git 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index a6a2076..f720ccd 100644
--- 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -505,11 +505,15 @@ public final class UnsafeExternalSorter extends 
MemoryConsumer {
*/
   class SpillableIterator extends UnsafeSorterIterator {
 private UnsafeSorterIterator upstream;
-private UnsafeSorterIterator nextUpstream = null;
 private MemoryBlock lastPage = null;
 private boolean loaded = false;
 private int numRecords = 0;
 
+private Object currentBaseObject;
+private long currentBaseOffset;
+private int currentRecordLength;
+private long currentKeyPrefix;
+
 SpillableIterator(UnsafeSorterIterator inMemIterator) {
   this.upstream = inMemIterator;
   this.numRecords = inMemIterator.getNumRecords();
@@ -520,23 +524,26 @@ public final class UnsafeExternalSorter extends 
MemoryConsumer {
   return numRecords;
 }
 
+@Override
+public long getCurrentPageNumber() {
+  throw new UnsupportedOperationException();
+}
+
 public long spill() throws IOException {
   synchronized (this) {
-if (!(upstream instanceof UnsafeInMemorySorter.SortedIterator && 
nextUpstream == null
-  && numRecords > 0)) {
+if (inMemSorter == null || numRecords <= 0) {
   return 0L;
 }
 
-UnsafeInMemorySorter.SortedIterator inMemIterator =
-  ((UnsafeInMemorySorter.SortedIterator) upstream).clone();
+long currentPageNumber = upstream.getCurrentPageNumber();
 
-   ShuffleWriteMetrics writeMetrics = new ShuffleWriteMetrics();
+ShuffleWriteMetrics writeMetrics = new ShuffleWriteMetrics();
 // Iterate over the records that have not been returned and spill them.
 final UnsafeSorterSpillWriter spillWriter =
   new UnsafeSorterSpillWriter(blockManager, fileBufferSizeBytes, 
writeMetrics, numRecords);
-spillIterator(inMemIterator, spillWriter);
+spillIterator(upstream, spillWriter);
 spillWriters.add(spillWriter);
-nextUpstream = spillWriter.getReader(serializerManager);
+upstream = spillWriter.getReader(serializerManager);
 
 long released = 0L;
 synchronized 

[spark] branch branch-2.4 updated: [SPARK-32900][CORE] Allow UnsafeExternalSorter to spill when there are nulls

2020-09-17 Thread hvanhovell
This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
 new 2fa68a6  [SPARK-32900][CORE] Allow UnsafeExternalSorter to spill when 
there are nulls
2fa68a6 is described below

commit 2fa68a669cc83521c7257d844202790933ae9771
Author: Tom van Bussel 
AuthorDate: Thu Sep 17 12:35:40 2020 +0200

[SPARK-32900][CORE] Allow UnsafeExternalSorter to spill when there are nulls

### What changes were proposed in this pull request?

This PR changes the way `UnsafeExternalSorter.SpillableIterator` checks 
whether it has spilled already, by checking whether `inMemSorter` is null. It 
also allows it to spill other `UnsafeSorterIterator`s than 
`UnsafeInMemorySorter.SortedIterator`.

### Why are the changes needed?

Before this PR `UnsafeExternalSorter.SpillableIterator` could not spill 
when there are NULLs in the input and radix sorting is used. Currently, Spark 
determines whether UnsafeExternalSorter.SpillableIterator has not spilled yet 
by checking whether `upstream` is an instance of 
`UnsafeInMemorySorter.SortedIterator`. When radix sorting is used and there are 
NULLs in the input however, `upstream` will be an instance of 
`UnsafeExternalSorter.ChainedIterator` instead, and Spark will assume  [...]

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

A test was added to `UnsafeExternalSorterSuite` (and therefore also to 
`UnsafeExternalSorterRadixSortSuite`). I manually confirmed that the test 
failed in `UnsafeExternalSorterRadixSortSuite` without this patch.

Closes #29772 from tomvanbussel/SPARK-32900.

Authored-by: Tom van Bussel 
Signed-off-by: herman 
(cherry picked from commit e5e54a3614ffd2a9150921e84e5b813d5cbf285a)
Signed-off-by: herman 
---
 .../unsafe/sort/UnsafeExternalSorter.java  | 69 +-
 .../unsafe/sort/UnsafeInMemorySorter.java  |  1 +
 .../unsafe/sort/UnsafeSorterIterator.java  |  2 +
 .../unsafe/sort/UnsafeSorterSpillMerger.java   |  5 ++
 .../unsafe/sort/UnsafeSorterSpillReader.java   |  5 ++
 .../unsafe/sort/UnsafeExternalSorterSuite.java | 33 +++
 6 files changed, 88 insertions(+), 27 deletions(-)

diff --git 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index a6a2076..f720ccd 100644
--- 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -505,11 +505,15 @@ public final class UnsafeExternalSorter extends 
MemoryConsumer {
*/
   class SpillableIterator extends UnsafeSorterIterator {
 private UnsafeSorterIterator upstream;
-private UnsafeSorterIterator nextUpstream = null;
 private MemoryBlock lastPage = null;
 private boolean loaded = false;
 private int numRecords = 0;
 
+private Object currentBaseObject;
+private long currentBaseOffset;
+private int currentRecordLength;
+private long currentKeyPrefix;
+
 SpillableIterator(UnsafeSorterIterator inMemIterator) {
   this.upstream = inMemIterator;
   this.numRecords = inMemIterator.getNumRecords();
@@ -520,23 +524,26 @@ public final class UnsafeExternalSorter extends 
MemoryConsumer {
   return numRecords;
 }
 
+@Override
+public long getCurrentPageNumber() {
+  throw new UnsupportedOperationException();
+}
+
 public long spill() throws IOException {
   synchronized (this) {
-if (!(upstream instanceof UnsafeInMemorySorter.SortedIterator && 
nextUpstream == null
-  && numRecords > 0)) {
+if (inMemSorter == null || numRecords <= 0) {
   return 0L;
 }
 
-UnsafeInMemorySorter.SortedIterator inMemIterator =
-  ((UnsafeInMemorySorter.SortedIterator) upstream).clone();
+long currentPageNumber = upstream.getCurrentPageNumber();
 
-   ShuffleWriteMetrics writeMetrics = new ShuffleWriteMetrics();
+ShuffleWriteMetrics writeMetrics = new ShuffleWriteMetrics();
 // Iterate over the records that have not been returned and spill them.
 final UnsafeSorterSpillWriter spillWriter =
   new UnsafeSorterSpillWriter(blockManager, fileBufferSizeBytes, 
writeMetrics, numRecords);
-spillIterator(inMemIterator, spillWriter);
+spillIterator(upstream, spillWriter);
 spillWriters.add(spillWriter);
-nextUpstream = spillWriter.getReader(serializerManager);
+upstream = spillWriter.getReader(serializerManager);
 
 long released = 0L;
 synchronized 

[spark] branch branch-2.4 updated: [SPARK-32900][CORE] Allow UnsafeExternalSorter to spill when there are nulls

2020-09-17 Thread hvanhovell
This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
 new 2fa68a6  [SPARK-32900][CORE] Allow UnsafeExternalSorter to spill when 
there are nulls
2fa68a6 is described below

commit 2fa68a669cc83521c7257d844202790933ae9771
Author: Tom van Bussel 
AuthorDate: Thu Sep 17 12:35:40 2020 +0200

[SPARK-32900][CORE] Allow UnsafeExternalSorter to spill when there are nulls

### What changes were proposed in this pull request?

This PR changes the way `UnsafeExternalSorter.SpillableIterator` checks 
whether it has spilled already, by checking whether `inMemSorter` is null. It 
also allows it to spill other `UnsafeSorterIterator`s than 
`UnsafeInMemorySorter.SortedIterator`.

### Why are the changes needed?

Before this PR `UnsafeExternalSorter.SpillableIterator` could not spill 
when there are NULLs in the input and radix sorting is used. Currently, Spark 
determines whether UnsafeExternalSorter.SpillableIterator has not spilled yet 
by checking whether `upstream` is an instance of 
`UnsafeInMemorySorter.SortedIterator`. When radix sorting is used and there are 
NULLs in the input however, `upstream` will be an instance of 
`UnsafeExternalSorter.ChainedIterator` instead, and Spark will assume  [...]

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

A test was added to `UnsafeExternalSorterSuite` (and therefore also to 
`UnsafeExternalSorterRadixSortSuite`). I manually confirmed that the test 
failed in `UnsafeExternalSorterRadixSortSuite` without this patch.

Closes #29772 from tomvanbussel/SPARK-32900.

Authored-by: Tom van Bussel 
Signed-off-by: herman 
(cherry picked from commit e5e54a3614ffd2a9150921e84e5b813d5cbf285a)
Signed-off-by: herman 
---
 .../unsafe/sort/UnsafeExternalSorter.java  | 69 +-
 .../unsafe/sort/UnsafeInMemorySorter.java  |  1 +
 .../unsafe/sort/UnsafeSorterIterator.java  |  2 +
 .../unsafe/sort/UnsafeSorterSpillMerger.java   |  5 ++
 .../unsafe/sort/UnsafeSorterSpillReader.java   |  5 ++
 .../unsafe/sort/UnsafeExternalSorterSuite.java | 33 +++
 6 files changed, 88 insertions(+), 27 deletions(-)

diff --git 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index a6a2076..f720ccd 100644
--- 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -505,11 +505,15 @@ public final class UnsafeExternalSorter extends 
MemoryConsumer {
*/
   class SpillableIterator extends UnsafeSorterIterator {
 private UnsafeSorterIterator upstream;
-private UnsafeSorterIterator nextUpstream = null;
 private MemoryBlock lastPage = null;
 private boolean loaded = false;
 private int numRecords = 0;
 
+private Object currentBaseObject;
+private long currentBaseOffset;
+private int currentRecordLength;
+private long currentKeyPrefix;
+
 SpillableIterator(UnsafeSorterIterator inMemIterator) {
   this.upstream = inMemIterator;
   this.numRecords = inMemIterator.getNumRecords();
@@ -520,23 +524,26 @@ public final class UnsafeExternalSorter extends 
MemoryConsumer {
   return numRecords;
 }
 
+@Override
+public long getCurrentPageNumber() {
+  throw new UnsupportedOperationException();
+}
+
 public long spill() throws IOException {
   synchronized (this) {
-if (!(upstream instanceof UnsafeInMemorySorter.SortedIterator && 
nextUpstream == null
-  && numRecords > 0)) {
+if (inMemSorter == null || numRecords <= 0) {
   return 0L;
 }
 
-UnsafeInMemorySorter.SortedIterator inMemIterator =
-  ((UnsafeInMemorySorter.SortedIterator) upstream).clone();
+long currentPageNumber = upstream.getCurrentPageNumber();
 
-   ShuffleWriteMetrics writeMetrics = new ShuffleWriteMetrics();
+ShuffleWriteMetrics writeMetrics = new ShuffleWriteMetrics();
 // Iterate over the records that have not been returned and spill them.
 final UnsafeSorterSpillWriter spillWriter =
   new UnsafeSorterSpillWriter(blockManager, fileBufferSizeBytes, 
writeMetrics, numRecords);
-spillIterator(inMemIterator, spillWriter);
+spillIterator(upstream, spillWriter);
 spillWriters.add(spillWriter);
-nextUpstream = spillWriter.getReader(serializerManager);
+upstream = spillWriter.getReader(serializerManager);
 
 long released = 0L;
 synchronized 

[spark] branch branch-2.4 updated: [SPARK-32900][CORE] Allow UnsafeExternalSorter to spill when there are nulls

2020-09-17 Thread hvanhovell
This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
 new 2fa68a6  [SPARK-32900][CORE] Allow UnsafeExternalSorter to spill when 
there are nulls
2fa68a6 is described below

commit 2fa68a669cc83521c7257d844202790933ae9771
Author: Tom van Bussel 
AuthorDate: Thu Sep 17 12:35:40 2020 +0200

[SPARK-32900][CORE] Allow UnsafeExternalSorter to spill when there are nulls

### What changes were proposed in this pull request?

This PR changes the way `UnsafeExternalSorter.SpillableIterator` checks 
whether it has spilled already, by checking whether `inMemSorter` is null. It 
also allows it to spill other `UnsafeSorterIterator`s than 
`UnsafeInMemorySorter.SortedIterator`.

### Why are the changes needed?

Before this PR `UnsafeExternalSorter.SpillableIterator` could not spill 
when there are NULLs in the input and radix sorting is used. Currently, Spark 
determines whether UnsafeExternalSorter.SpillableIterator has not spilled yet 
by checking whether `upstream` is an instance of 
`UnsafeInMemorySorter.SortedIterator`. When radix sorting is used and there are 
NULLs in the input however, `upstream` will be an instance of 
`UnsafeExternalSorter.ChainedIterator` instead, and Spark will assume  [...]

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

A test was added to `UnsafeExternalSorterSuite` (and therefore also to 
`UnsafeExternalSorterRadixSortSuite`). I manually confirmed that the test 
failed in `UnsafeExternalSorterRadixSortSuite` without this patch.

Closes #29772 from tomvanbussel/SPARK-32900.

Authored-by: Tom van Bussel 
Signed-off-by: herman 
(cherry picked from commit e5e54a3614ffd2a9150921e84e5b813d5cbf285a)
Signed-off-by: herman 
---
 .../unsafe/sort/UnsafeExternalSorter.java  | 69 +-
 .../unsafe/sort/UnsafeInMemorySorter.java  |  1 +
 .../unsafe/sort/UnsafeSorterIterator.java  |  2 +
 .../unsafe/sort/UnsafeSorterSpillMerger.java   |  5 ++
 .../unsafe/sort/UnsafeSorterSpillReader.java   |  5 ++
 .../unsafe/sort/UnsafeExternalSorterSuite.java | 33 +++
 6 files changed, 88 insertions(+), 27 deletions(-)

diff --git 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index a6a2076..f720ccd 100644
--- 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -505,11 +505,15 @@ public final class UnsafeExternalSorter extends 
MemoryConsumer {
*/
   class SpillableIterator extends UnsafeSorterIterator {
 private UnsafeSorterIterator upstream;
-private UnsafeSorterIterator nextUpstream = null;
 private MemoryBlock lastPage = null;
 private boolean loaded = false;
 private int numRecords = 0;
 
+private Object currentBaseObject;
+private long currentBaseOffset;
+private int currentRecordLength;
+private long currentKeyPrefix;
+
 SpillableIterator(UnsafeSorterIterator inMemIterator) {
   this.upstream = inMemIterator;
   this.numRecords = inMemIterator.getNumRecords();
@@ -520,23 +524,26 @@ public final class UnsafeExternalSorter extends 
MemoryConsumer {
   return numRecords;
 }
 
+@Override
+public long getCurrentPageNumber() {
+  throw new UnsupportedOperationException();
+}
+
 public long spill() throws IOException {
   synchronized (this) {
-if (!(upstream instanceof UnsafeInMemorySorter.SortedIterator && 
nextUpstream == null
-  && numRecords > 0)) {
+if (inMemSorter == null || numRecords <= 0) {
   return 0L;
 }
 
-UnsafeInMemorySorter.SortedIterator inMemIterator =
-  ((UnsafeInMemorySorter.SortedIterator) upstream).clone();
+long currentPageNumber = upstream.getCurrentPageNumber();
 
-   ShuffleWriteMetrics writeMetrics = new ShuffleWriteMetrics();
+ShuffleWriteMetrics writeMetrics = new ShuffleWriteMetrics();
 // Iterate over the records that have not been returned and spill them.
 final UnsafeSorterSpillWriter spillWriter =
   new UnsafeSorterSpillWriter(blockManager, fileBufferSizeBytes, 
writeMetrics, numRecords);
-spillIterator(inMemIterator, spillWriter);
+spillIterator(upstream, spillWriter);
 spillWriters.add(spillWriter);
-nextUpstream = spillWriter.getReader(serializerManager);
+upstream = spillWriter.getReader(serializerManager);
 
 long released = 0L;
 synchronized