[spark] branch branch-2.4 updated: [SPARK-32900][CORE] Allow UnsafeExternalSorter to spill when there are nulls
This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new 2fa68a6 [SPARK-32900][CORE] Allow UnsafeExternalSorter to spill when there are nulls 2fa68a6 is described below commit 2fa68a669cc83521c7257d844202790933ae9771 Author: Tom van Bussel AuthorDate: Thu Sep 17 12:35:40 2020 +0200 [SPARK-32900][CORE] Allow UnsafeExternalSorter to spill when there are nulls ### What changes were proposed in this pull request? This PR changes the way `UnsafeExternalSorter.SpillableIterator` checks whether it has spilled already, by checking whether `inMemSorter` is null. It also allows it to spill other `UnsafeSorterIterator`s than `UnsafeInMemorySorter.SortedIterator`. ### Why are the changes needed? Before this PR `UnsafeExternalSorter.SpillableIterator` could not spill when there are NULLs in the input and radix sorting is used. Currently, Spark determines whether UnsafeExternalSorter.SpillableIterator has not spilled yet by checking whether `upstream` is an instance of `UnsafeInMemorySorter.SortedIterator`. When radix sorting is used and there are NULLs in the input however, `upstream` will be an instance of `UnsafeExternalSorter.ChainedIterator` instead, and Spark will assume [...] ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? A test was added to `UnsafeExternalSorterSuite` (and therefore also to `UnsafeExternalSorterRadixSortSuite`). I manually confirmed that the test failed in `UnsafeExternalSorterRadixSortSuite` without this patch. Closes #29772 from tomvanbussel/SPARK-32900. Authored-by: Tom van Bussel Signed-off-by: herman (cherry picked from commit e5e54a3614ffd2a9150921e84e5b813d5cbf285a) Signed-off-by: herman --- .../unsafe/sort/UnsafeExternalSorter.java | 69 +- .../unsafe/sort/UnsafeInMemorySorter.java | 1 + .../unsafe/sort/UnsafeSorterIterator.java | 2 + .../unsafe/sort/UnsafeSorterSpillMerger.java | 5 ++ .../unsafe/sort/UnsafeSorterSpillReader.java | 5 ++ .../unsafe/sort/UnsafeExternalSorterSuite.java | 33 +++ 6 files changed, 88 insertions(+), 27 deletions(-) diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java index a6a2076..f720ccd 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java @@ -505,11 +505,15 @@ public final class UnsafeExternalSorter extends MemoryConsumer { */ class SpillableIterator extends UnsafeSorterIterator { private UnsafeSorterIterator upstream; -private UnsafeSorterIterator nextUpstream = null; private MemoryBlock lastPage = null; private boolean loaded = false; private int numRecords = 0; +private Object currentBaseObject; +private long currentBaseOffset; +private int currentRecordLength; +private long currentKeyPrefix; + SpillableIterator(UnsafeSorterIterator inMemIterator) { this.upstream = inMemIterator; this.numRecords = inMemIterator.getNumRecords(); @@ -520,23 +524,26 @@ public final class UnsafeExternalSorter extends MemoryConsumer { return numRecords; } +@Override +public long getCurrentPageNumber() { + throw new UnsupportedOperationException(); +} + public long spill() throws IOException { synchronized (this) { -if (!(upstream instanceof UnsafeInMemorySorter.SortedIterator && nextUpstream == null - && numRecords > 0)) { +if (inMemSorter == null || numRecords <= 0) { return 0L; } -UnsafeInMemorySorter.SortedIterator inMemIterator = - ((UnsafeInMemorySorter.SortedIterator) upstream).clone(); +long currentPageNumber = upstream.getCurrentPageNumber(); - ShuffleWriteMetrics writeMetrics = new ShuffleWriteMetrics(); +ShuffleWriteMetrics writeMetrics = new ShuffleWriteMetrics(); // Iterate over the records that have not been returned and spill them. final UnsafeSorterSpillWriter spillWriter = new UnsafeSorterSpillWriter(blockManager, fileBufferSizeBytes, writeMetrics, numRecords); -spillIterator(inMemIterator, spillWriter); +spillIterator(upstream, spillWriter); spillWriters.add(spillWriter); -nextUpstream = spillWriter.getReader(serializerManager); +upstream = spillWriter.getReader(serializerManager); long released = 0L; synchronized
[spark] branch branch-2.4 updated: [SPARK-32900][CORE] Allow UnsafeExternalSorter to spill when there are nulls
This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new 2fa68a6 [SPARK-32900][CORE] Allow UnsafeExternalSorter to spill when there are nulls 2fa68a6 is described below commit 2fa68a669cc83521c7257d844202790933ae9771 Author: Tom van Bussel AuthorDate: Thu Sep 17 12:35:40 2020 +0200 [SPARK-32900][CORE] Allow UnsafeExternalSorter to spill when there are nulls ### What changes were proposed in this pull request? This PR changes the way `UnsafeExternalSorter.SpillableIterator` checks whether it has spilled already, by checking whether `inMemSorter` is null. It also allows it to spill other `UnsafeSorterIterator`s than `UnsafeInMemorySorter.SortedIterator`. ### Why are the changes needed? Before this PR `UnsafeExternalSorter.SpillableIterator` could not spill when there are NULLs in the input and radix sorting is used. Currently, Spark determines whether UnsafeExternalSorter.SpillableIterator has not spilled yet by checking whether `upstream` is an instance of `UnsafeInMemorySorter.SortedIterator`. When radix sorting is used and there are NULLs in the input however, `upstream` will be an instance of `UnsafeExternalSorter.ChainedIterator` instead, and Spark will assume [...] ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? A test was added to `UnsafeExternalSorterSuite` (and therefore also to `UnsafeExternalSorterRadixSortSuite`). I manually confirmed that the test failed in `UnsafeExternalSorterRadixSortSuite` without this patch. Closes #29772 from tomvanbussel/SPARK-32900. Authored-by: Tom van Bussel Signed-off-by: herman (cherry picked from commit e5e54a3614ffd2a9150921e84e5b813d5cbf285a) Signed-off-by: herman --- .../unsafe/sort/UnsafeExternalSorter.java | 69 +- .../unsafe/sort/UnsafeInMemorySorter.java | 1 + .../unsafe/sort/UnsafeSorterIterator.java | 2 + .../unsafe/sort/UnsafeSorterSpillMerger.java | 5 ++ .../unsafe/sort/UnsafeSorterSpillReader.java | 5 ++ .../unsafe/sort/UnsafeExternalSorterSuite.java | 33 +++ 6 files changed, 88 insertions(+), 27 deletions(-) diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java index a6a2076..f720ccd 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java @@ -505,11 +505,15 @@ public final class UnsafeExternalSorter extends MemoryConsumer { */ class SpillableIterator extends UnsafeSorterIterator { private UnsafeSorterIterator upstream; -private UnsafeSorterIterator nextUpstream = null; private MemoryBlock lastPage = null; private boolean loaded = false; private int numRecords = 0; +private Object currentBaseObject; +private long currentBaseOffset; +private int currentRecordLength; +private long currentKeyPrefix; + SpillableIterator(UnsafeSorterIterator inMemIterator) { this.upstream = inMemIterator; this.numRecords = inMemIterator.getNumRecords(); @@ -520,23 +524,26 @@ public final class UnsafeExternalSorter extends MemoryConsumer { return numRecords; } +@Override +public long getCurrentPageNumber() { + throw new UnsupportedOperationException(); +} + public long spill() throws IOException { synchronized (this) { -if (!(upstream instanceof UnsafeInMemorySorter.SortedIterator && nextUpstream == null - && numRecords > 0)) { +if (inMemSorter == null || numRecords <= 0) { return 0L; } -UnsafeInMemorySorter.SortedIterator inMemIterator = - ((UnsafeInMemorySorter.SortedIterator) upstream).clone(); +long currentPageNumber = upstream.getCurrentPageNumber(); - ShuffleWriteMetrics writeMetrics = new ShuffleWriteMetrics(); +ShuffleWriteMetrics writeMetrics = new ShuffleWriteMetrics(); // Iterate over the records that have not been returned and spill them. final UnsafeSorterSpillWriter spillWriter = new UnsafeSorterSpillWriter(blockManager, fileBufferSizeBytes, writeMetrics, numRecords); -spillIterator(inMemIterator, spillWriter); +spillIterator(upstream, spillWriter); spillWriters.add(spillWriter); -nextUpstream = spillWriter.getReader(serializerManager); +upstream = spillWriter.getReader(serializerManager); long released = 0L; synchronized
[spark] branch branch-2.4 updated: [SPARK-32900][CORE] Allow UnsafeExternalSorter to spill when there are nulls
This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new 2fa68a6 [SPARK-32900][CORE] Allow UnsafeExternalSorter to spill when there are nulls 2fa68a6 is described below commit 2fa68a669cc83521c7257d844202790933ae9771 Author: Tom van Bussel AuthorDate: Thu Sep 17 12:35:40 2020 +0200 [SPARK-32900][CORE] Allow UnsafeExternalSorter to spill when there are nulls ### What changes were proposed in this pull request? This PR changes the way `UnsafeExternalSorter.SpillableIterator` checks whether it has spilled already, by checking whether `inMemSorter` is null. It also allows it to spill other `UnsafeSorterIterator`s than `UnsafeInMemorySorter.SortedIterator`. ### Why are the changes needed? Before this PR `UnsafeExternalSorter.SpillableIterator` could not spill when there are NULLs in the input and radix sorting is used. Currently, Spark determines whether UnsafeExternalSorter.SpillableIterator has not spilled yet by checking whether `upstream` is an instance of `UnsafeInMemorySorter.SortedIterator`. When radix sorting is used and there are NULLs in the input however, `upstream` will be an instance of `UnsafeExternalSorter.ChainedIterator` instead, and Spark will assume [...] ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? A test was added to `UnsafeExternalSorterSuite` (and therefore also to `UnsafeExternalSorterRadixSortSuite`). I manually confirmed that the test failed in `UnsafeExternalSorterRadixSortSuite` without this patch. Closes #29772 from tomvanbussel/SPARK-32900. Authored-by: Tom van Bussel Signed-off-by: herman (cherry picked from commit e5e54a3614ffd2a9150921e84e5b813d5cbf285a) Signed-off-by: herman --- .../unsafe/sort/UnsafeExternalSorter.java | 69 +- .../unsafe/sort/UnsafeInMemorySorter.java | 1 + .../unsafe/sort/UnsafeSorterIterator.java | 2 + .../unsafe/sort/UnsafeSorterSpillMerger.java | 5 ++ .../unsafe/sort/UnsafeSorterSpillReader.java | 5 ++ .../unsafe/sort/UnsafeExternalSorterSuite.java | 33 +++ 6 files changed, 88 insertions(+), 27 deletions(-) diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java index a6a2076..f720ccd 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java @@ -505,11 +505,15 @@ public final class UnsafeExternalSorter extends MemoryConsumer { */ class SpillableIterator extends UnsafeSorterIterator { private UnsafeSorterIterator upstream; -private UnsafeSorterIterator nextUpstream = null; private MemoryBlock lastPage = null; private boolean loaded = false; private int numRecords = 0; +private Object currentBaseObject; +private long currentBaseOffset; +private int currentRecordLength; +private long currentKeyPrefix; + SpillableIterator(UnsafeSorterIterator inMemIterator) { this.upstream = inMemIterator; this.numRecords = inMemIterator.getNumRecords(); @@ -520,23 +524,26 @@ public final class UnsafeExternalSorter extends MemoryConsumer { return numRecords; } +@Override +public long getCurrentPageNumber() { + throw new UnsupportedOperationException(); +} + public long spill() throws IOException { synchronized (this) { -if (!(upstream instanceof UnsafeInMemorySorter.SortedIterator && nextUpstream == null - && numRecords > 0)) { +if (inMemSorter == null || numRecords <= 0) { return 0L; } -UnsafeInMemorySorter.SortedIterator inMemIterator = - ((UnsafeInMemorySorter.SortedIterator) upstream).clone(); +long currentPageNumber = upstream.getCurrentPageNumber(); - ShuffleWriteMetrics writeMetrics = new ShuffleWriteMetrics(); +ShuffleWriteMetrics writeMetrics = new ShuffleWriteMetrics(); // Iterate over the records that have not been returned and spill them. final UnsafeSorterSpillWriter spillWriter = new UnsafeSorterSpillWriter(blockManager, fileBufferSizeBytes, writeMetrics, numRecords); -spillIterator(inMemIterator, spillWriter); +spillIterator(upstream, spillWriter); spillWriters.add(spillWriter); -nextUpstream = spillWriter.getReader(serializerManager); +upstream = spillWriter.getReader(serializerManager); long released = 0L; synchronized
[spark] branch branch-2.4 updated: [SPARK-32900][CORE] Allow UnsafeExternalSorter to spill when there are nulls
This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new 2fa68a6 [SPARK-32900][CORE] Allow UnsafeExternalSorter to spill when there are nulls 2fa68a6 is described below commit 2fa68a669cc83521c7257d844202790933ae9771 Author: Tom van Bussel AuthorDate: Thu Sep 17 12:35:40 2020 +0200 [SPARK-32900][CORE] Allow UnsafeExternalSorter to spill when there are nulls ### What changes were proposed in this pull request? This PR changes the way `UnsafeExternalSorter.SpillableIterator` checks whether it has spilled already, by checking whether `inMemSorter` is null. It also allows it to spill other `UnsafeSorterIterator`s than `UnsafeInMemorySorter.SortedIterator`. ### Why are the changes needed? Before this PR `UnsafeExternalSorter.SpillableIterator` could not spill when there are NULLs in the input and radix sorting is used. Currently, Spark determines whether UnsafeExternalSorter.SpillableIterator has not spilled yet by checking whether `upstream` is an instance of `UnsafeInMemorySorter.SortedIterator`. When radix sorting is used and there are NULLs in the input however, `upstream` will be an instance of `UnsafeExternalSorter.ChainedIterator` instead, and Spark will assume [...] ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? A test was added to `UnsafeExternalSorterSuite` (and therefore also to `UnsafeExternalSorterRadixSortSuite`). I manually confirmed that the test failed in `UnsafeExternalSorterRadixSortSuite` without this patch. Closes #29772 from tomvanbussel/SPARK-32900. Authored-by: Tom van Bussel Signed-off-by: herman (cherry picked from commit e5e54a3614ffd2a9150921e84e5b813d5cbf285a) Signed-off-by: herman --- .../unsafe/sort/UnsafeExternalSorter.java | 69 +- .../unsafe/sort/UnsafeInMemorySorter.java | 1 + .../unsafe/sort/UnsafeSorterIterator.java | 2 + .../unsafe/sort/UnsafeSorterSpillMerger.java | 5 ++ .../unsafe/sort/UnsafeSorterSpillReader.java | 5 ++ .../unsafe/sort/UnsafeExternalSorterSuite.java | 33 +++ 6 files changed, 88 insertions(+), 27 deletions(-) diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java index a6a2076..f720ccd 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java @@ -505,11 +505,15 @@ public final class UnsafeExternalSorter extends MemoryConsumer { */ class SpillableIterator extends UnsafeSorterIterator { private UnsafeSorterIterator upstream; -private UnsafeSorterIterator nextUpstream = null; private MemoryBlock lastPage = null; private boolean loaded = false; private int numRecords = 0; +private Object currentBaseObject; +private long currentBaseOffset; +private int currentRecordLength; +private long currentKeyPrefix; + SpillableIterator(UnsafeSorterIterator inMemIterator) { this.upstream = inMemIterator; this.numRecords = inMemIterator.getNumRecords(); @@ -520,23 +524,26 @@ public final class UnsafeExternalSorter extends MemoryConsumer { return numRecords; } +@Override +public long getCurrentPageNumber() { + throw new UnsupportedOperationException(); +} + public long spill() throws IOException { synchronized (this) { -if (!(upstream instanceof UnsafeInMemorySorter.SortedIterator && nextUpstream == null - && numRecords > 0)) { +if (inMemSorter == null || numRecords <= 0) { return 0L; } -UnsafeInMemorySorter.SortedIterator inMemIterator = - ((UnsafeInMemorySorter.SortedIterator) upstream).clone(); +long currentPageNumber = upstream.getCurrentPageNumber(); - ShuffleWriteMetrics writeMetrics = new ShuffleWriteMetrics(); +ShuffleWriteMetrics writeMetrics = new ShuffleWriteMetrics(); // Iterate over the records that have not been returned and spill them. final UnsafeSorterSpillWriter spillWriter = new UnsafeSorterSpillWriter(blockManager, fileBufferSizeBytes, writeMetrics, numRecords); -spillIterator(inMemIterator, spillWriter); +spillIterator(upstream, spillWriter); spillWriters.add(spillWriter); -nextUpstream = spillWriter.getReader(serializerManager); +upstream = spillWriter.getReader(serializerManager); long released = 0L; synchronized
[spark] branch branch-2.4 updated: [SPARK-32900][CORE] Allow UnsafeExternalSorter to spill when there are nulls
This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new 2fa68a6 [SPARK-32900][CORE] Allow UnsafeExternalSorter to spill when there are nulls 2fa68a6 is described below commit 2fa68a669cc83521c7257d844202790933ae9771 Author: Tom van Bussel AuthorDate: Thu Sep 17 12:35:40 2020 +0200 [SPARK-32900][CORE] Allow UnsafeExternalSorter to spill when there are nulls ### What changes were proposed in this pull request? This PR changes the way `UnsafeExternalSorter.SpillableIterator` checks whether it has spilled already, by checking whether `inMemSorter` is null. It also allows it to spill other `UnsafeSorterIterator`s than `UnsafeInMemorySorter.SortedIterator`. ### Why are the changes needed? Before this PR `UnsafeExternalSorter.SpillableIterator` could not spill when there are NULLs in the input and radix sorting is used. Currently, Spark determines whether UnsafeExternalSorter.SpillableIterator has not spilled yet by checking whether `upstream` is an instance of `UnsafeInMemorySorter.SortedIterator`. When radix sorting is used and there are NULLs in the input however, `upstream` will be an instance of `UnsafeExternalSorter.ChainedIterator` instead, and Spark will assume [...] ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? A test was added to `UnsafeExternalSorterSuite` (and therefore also to `UnsafeExternalSorterRadixSortSuite`). I manually confirmed that the test failed in `UnsafeExternalSorterRadixSortSuite` without this patch. Closes #29772 from tomvanbussel/SPARK-32900. Authored-by: Tom van Bussel Signed-off-by: herman (cherry picked from commit e5e54a3614ffd2a9150921e84e5b813d5cbf285a) Signed-off-by: herman --- .../unsafe/sort/UnsafeExternalSorter.java | 69 +- .../unsafe/sort/UnsafeInMemorySorter.java | 1 + .../unsafe/sort/UnsafeSorterIterator.java | 2 + .../unsafe/sort/UnsafeSorterSpillMerger.java | 5 ++ .../unsafe/sort/UnsafeSorterSpillReader.java | 5 ++ .../unsafe/sort/UnsafeExternalSorterSuite.java | 33 +++ 6 files changed, 88 insertions(+), 27 deletions(-) diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java index a6a2076..f720ccd 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java @@ -505,11 +505,15 @@ public final class UnsafeExternalSorter extends MemoryConsumer { */ class SpillableIterator extends UnsafeSorterIterator { private UnsafeSorterIterator upstream; -private UnsafeSorterIterator nextUpstream = null; private MemoryBlock lastPage = null; private boolean loaded = false; private int numRecords = 0; +private Object currentBaseObject; +private long currentBaseOffset; +private int currentRecordLength; +private long currentKeyPrefix; + SpillableIterator(UnsafeSorterIterator inMemIterator) { this.upstream = inMemIterator; this.numRecords = inMemIterator.getNumRecords(); @@ -520,23 +524,26 @@ public final class UnsafeExternalSorter extends MemoryConsumer { return numRecords; } +@Override +public long getCurrentPageNumber() { + throw new UnsupportedOperationException(); +} + public long spill() throws IOException { synchronized (this) { -if (!(upstream instanceof UnsafeInMemorySorter.SortedIterator && nextUpstream == null - && numRecords > 0)) { +if (inMemSorter == null || numRecords <= 0) { return 0L; } -UnsafeInMemorySorter.SortedIterator inMemIterator = - ((UnsafeInMemorySorter.SortedIterator) upstream).clone(); +long currentPageNumber = upstream.getCurrentPageNumber(); - ShuffleWriteMetrics writeMetrics = new ShuffleWriteMetrics(); +ShuffleWriteMetrics writeMetrics = new ShuffleWriteMetrics(); // Iterate over the records that have not been returned and spill them. final UnsafeSorterSpillWriter spillWriter = new UnsafeSorterSpillWriter(blockManager, fileBufferSizeBytes, writeMetrics, numRecords); -spillIterator(inMemIterator, spillWriter); +spillIterator(upstream, spillWriter); spillWriters.add(spillWriter); -nextUpstream = spillWriter.getReader(serializerManager); +upstream = spillWriter.getReader(serializerManager); long released = 0L; synchronized