[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19181 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/19181#discussion_r143773051 --- Diff: core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java --- @@ -503,6 +504,41 @@ public void testGetIterator() throws Exception { verifyIntIterator(sorter.getIterator(279), 279, 300); } + @Test + public void testOOMDuringSpill() throws Exception { +final UnsafeExternalSorter sorter = newSorter(); +// we assume that given default configuration, the size of --- End diff -- NIT NIT NIT NIT NIT NIT This comment is pretty good, but it is kind of hard to read because of the weird white space usage. Can you just make lines of 100 characters --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/19181#discussion_r143771647 --- Diff: core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java --- @@ -139,4 +139,49 @@ public int compare( } assertEquals(dataToSort.length, iterLength); } + + @Test + public void freeAfterOOM() { +final SparkConf sparkConf = +new SparkConf() --- End diff -- NIT style --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/19181#discussion_r143771458 --- Diff: core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java --- @@ -139,4 +139,49 @@ public int compare( } assertEquals(dataToSort.length, iterLength); } + + @Test + public void freeAfterOOM() { +final SparkConf sparkConf = +new SparkConf() +.set("spark.memory.offHeap.enabled", + "false"); +final TestMemoryManager testMemoryManager = +new TestMemoryManager(sparkConf); +final TaskMemoryManager memoryManager = new TaskMemoryManager( +testMemoryManager, 0); +final TestMemoryConsumer consumer = new TestMemoryConsumer(memoryManager); +final MemoryBlock dataPage = memoryManager.allocatePage(2048, consumer); +final Object baseObject = dataPage.getBaseObject(); +// Write the records into the data page: +long position = dataPage.getBaseOffset(); + +final HashPartitioner hashPartitioner = new HashPartitioner(4); +// Use integer comparison for comparing prefixes (which are partition ids, in this case) +final PrefixComparator prefixComparator = PrefixComparators.LONG; +final RecordComparator recordComparator = new RecordComparator() { + @Override + public int compare( + Object leftBaseObject, + long leftBaseOffset, + Object rightBaseObject, + long rightBaseOffset) { +return 0; + } +}; +UnsafeInMemorySorter sorter = new UnsafeInMemorySorter(consumer, memoryManager, +recordComparator, prefixComparator, 100, shouldUseRadixSort()); + +testMemoryManager.markExecutionAsOutOfMemoryOnce(); +try { + sorter.reset(); +} catch (OutOfMemoryError oom) { + // as expected --- End diff -- Make sure you fail on an unexpected success --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/19181#discussion_r143770712 --- Diff: core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java --- @@ -503,6 +504,41 @@ public void testGetIterator() throws Exception { verifyIntIterator(sorter.getIterator(279), 279, 300); } + @Test + public void testOOMDuringSpill() throws Exception { +final UnsafeExternalSorter sorter = newSorter(); +// we assume that given default configuration, the size of +// the data we insert to the sorter (ints) and assuming we shouldn't +// spill before pointers array is exhausted (memory manager is not configured to throw at this point) +// - so this loop run a reasonable number of loops (<2000) +// test test indeed completed within <30ms (on a quad i7 laptop). +for (int i = 0; sorter.hasSpaceForAnotherRecord(); ++i) { + insertNumber(sorter, i); +} +// we expect the next insert to attempt growing the pointerssArray +// first allocation is expected to fail, then a spill is triggered which attempts another allocation +// which also fails and we expect to see this OOM here. +// the original code messed with a released array within the spill code +// and ended up with a failed assertion. +// we also expect the location of the OOM to be org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.reset +memoryManager.markconsequentOOM(2); +OutOfMemoryError expectedOOM = null; +try { + insertNumber(sorter, 1024); +} +// we expect an OutOfMemoryError here, anything else (i.e the original NPE is a failure) +catch (OutOfMemoryError oom ){ + expectedOOM = oom; +} + +assertNotNull("expected OutOfMmoryError but it seems operation surprisingly succeeded" --- End diff -- NIT: just move this into the catch. Just fail if you reach this point, or fail in the try statement. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/19181#discussion_r143770192 --- Diff: core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java --- @@ -503,6 +504,41 @@ public void testGetIterator() throws Exception { verifyIntIterator(sorter.getIterator(279), 279, 300); } + @Test + public void testOOMDuringSpill() throws Exception { +final UnsafeExternalSorter sorter = newSorter(); +// we assume that given default configuration, the size of +// the data we insert to the sorter (ints) and assuming we shouldn't +// spill before pointers array is exhausted (memory manager is not configured to throw at this point) +// - so this loop run a reasonable number of loops (<2000) +// test test indeed completed within <30ms (on a quad i7 laptop). +for (int i = 0; sorter.hasSpaceForAnotherRecord(); ++i) { + insertNumber(sorter, i); +} +// we expect the next insert to attempt growing the pointerssArray +// first allocation is expected to fail, then a spill is triggered which attempts another allocation +// which also fails and we expect to see this OOM here. +// the original code messed with a released array within the spill code +// and ended up with a failed assertion. +// we also expect the location of the OOM to be org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.reset +memoryManager.markconsequentOOM(2); +OutOfMemoryError expectedOOM = null; +try { + insertNumber(sorter, 1024); +} +// we expect an OutOfMemoryError here, anything else (i.e the original NPE is a failure) +catch (OutOfMemoryError oom ){ --- End diff -- Nit weird spacing --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/19181#discussion_r142005517 --- Diff: core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java --- @@ -503,6 +511,39 @@ public void testGetIterator() throws Exception { verifyIntIterator(sorter.getIterator(279), 279, 300); } + @Test + public void testOOMDuringSpill() throws Exception { +final UnsafeExternalSorter sorter = newSorter(); +for (int i = 0; sorter.hasSpaceForAnotherRecord(); ++i) { + insertNumber(sorter, i); +} +// todo: this might actually not be zero if pageSize is somehow configured differently, +// so we actually have to compute the expected spill size according to the configured page size +assertEquals(0, sorter.getSpillSize()); --- End diff -- well, its a tricky one... as far as I can track the construction and logic of the relevant classes here, and given that we're sorting Ints here, we're not supposed to spill before expandPointersArray requests additional memory, it is however very difficult and 'involved' to actually check this during test. I can set the memoryManager to choke **before** the first loop, this way if my assumption breaks and the sorter attempts to allocate memory we'd get an OOM sooner than expected, effectively failing the test. @juliuszsompolski , do you find this approach better? it'd still need a comment describing the assumption about memory usage by the sorter. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/19181#discussion_r141892238 --- Diff: core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java --- @@ -139,4 +139,44 @@ public int compare( } assertEquals(dataToSort.length, iterLength); } + + @Test + public void freeAfterOOM() { +final TestMemoryManager testMemoryManager = new TestMemoryManager(new SparkConf().set("spark.memory.offHeap.enabled", "false")); --- End diff -- nit: Is it better to wrap this line since this is too long. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill
Github user juliuszsompolski commented on a diff in the pull request: https://github.com/apache/spark/pull/19181#discussion_r141843276 --- Diff: core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java --- @@ -139,4 +139,44 @@ public int compare( } assertEquals(dataToSort.length, iterLength); } + + @Test + public void freeAfterOOM() { +final TestMemoryManager testMemoryManager = new TestMemoryManager(new SparkConf().set("spark.memory.offHeap.enabled", "false")); +final TaskMemoryManager memoryManager = new TaskMemoryManager( +testMemoryManager, 0); +final TestMemoryConsumer consumer = new TestMemoryConsumer(memoryManager); +final MemoryBlock dataPage = memoryManager.allocatePage(2048, consumer); +final Object baseObject = dataPage.getBaseObject(); +// Write the records into the data page: +long position = dataPage.getBaseOffset(); + +final HashPartitioner hashPartitioner = new HashPartitioner(4); +// Use integer comparison for comparing prefixes (which are partition ids, in this case) +final PrefixComparator prefixComparator = PrefixComparators.LONG; +final RecordComparator recordComparator = new RecordComparator() { + @Override + public int compare( + Object leftBaseObject, + long leftBaseOffset, + Object rightBaseObject, + long rightBaseOffset) { +return 0; + } +}; +UnsafeInMemorySorter sorter = new UnsafeInMemorySorter(consumer, memoryManager, +recordComparator, prefixComparator, 100, shouldUseRadixSort()); + +testMemoryManager.markExecutionAsOutOfMemoryOnce(); +try { + sorter.reset(); +} catch( OutOfMemoryError oom ) { + //as expected +} +// this currently fails on NPE at org.apache.spark.memory.MemoryConsumer.freeArray(MemoryConsumer.java:108) +sorter.free(); +//simulate a 'back to back' free. --- End diff -- nit: ws: `// simulate ...` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill
Github user juliuszsompolski commented on a diff in the pull request: https://github.com/apache/spark/pull/19181#discussion_r141843447 --- Diff: core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala --- @@ -58,11 +58,15 @@ class TestMemoryManager(conf: SparkConf) override def maxOffHeapStorageMemory: Long = 0L - private var oomOnce = false + private var conseqOOM = 0 private var available = Long.MaxValue def markExecutionAsOutOfMemoryOnce(): Unit = { -oomOnce = true +markConseqOOM(1) + } + + def markConseqOOM( n : Int) : Unit = { --- End diff -- nit: markConsequentOOM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill
Github user juliuszsompolski commented on a diff in the pull request: https://github.com/apache/spark/pull/19181#discussion_r141843414 --- Diff: core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala --- @@ -58,11 +58,15 @@ class TestMemoryManager(conf: SparkConf) override def maxOffHeapStorageMemory: Long = 0L - private var oomOnce = false + private var conseqOOM = 0 --- End diff -- nit: conseq -> consequent --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill
Github user juliuszsompolski commented on a diff in the pull request: https://github.com/apache/spark/pull/19181#discussion_r141843193 --- Diff: core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java --- @@ -139,4 +139,44 @@ public int compare( } assertEquals(dataToSort.length, iterLength); } + + @Test + public void freeAfterOOM() { +final TestMemoryManager testMemoryManager = new TestMemoryManager(new SparkConf().set("spark.memory.offHeap.enabled", "false")); +final TaskMemoryManager memoryManager = new TaskMemoryManager( +testMemoryManager, 0); +final TestMemoryConsumer consumer = new TestMemoryConsumer(memoryManager); +final MemoryBlock dataPage = memoryManager.allocatePage(2048, consumer); +final Object baseObject = dataPage.getBaseObject(); +// Write the records into the data page: +long position = dataPage.getBaseOffset(); + +final HashPartitioner hashPartitioner = new HashPartitioner(4); +// Use integer comparison for comparing prefixes (which are partition ids, in this case) +final PrefixComparator prefixComparator = PrefixComparators.LONG; +final RecordComparator recordComparator = new RecordComparator() { + @Override + public int compare( + Object leftBaseObject, + long leftBaseOffset, + Object rightBaseObject, + long rightBaseOffset) { +return 0; + } +}; +UnsafeInMemorySorter sorter = new UnsafeInMemorySorter(consumer, memoryManager, +recordComparator, prefixComparator, 100, shouldUseRadixSort()); + +testMemoryManager.markExecutionAsOutOfMemoryOnce(); +try { + sorter.reset(); +} catch( OutOfMemoryError oom ) { + //as expected +} +// this currently fails on NPE at org.apache.spark.memory.MemoryConsumer.freeArray(MemoryConsumer.java:108) --- End diff -- nit: tense: "this currently fails" -> "[SPARK-21907] this failed ..." At the point when anyone reads it, it will hopefully not fail :-) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill
Github user juliuszsompolski commented on a diff in the pull request: https://github.com/apache/spark/pull/19181#discussion_r141842424 --- Diff: core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java --- @@ -85,7 +85,7 @@ private final LinkedList spillWriters = new LinkedList<>(); // These variables are reset after spilling: - @Nullable private volatile UnsafeInMemorySorter inMemSorter; + private @Nullable volatile UnsafeInMemorySorter inMemSorter; --- End diff -- nit: unnecessary change. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill
Github user juliuszsompolski commented on a diff in the pull request: https://github.com/apache/spark/pull/19181#discussion_r141842918 --- Diff: core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java --- @@ -503,6 +511,39 @@ public void testGetIterator() throws Exception { verifyIntIterator(sorter.getIterator(279), 279, 300); } + @Test + public void testOOMDuringSpill() throws Exception { +final UnsafeExternalSorter sorter = newSorter(); +for (int i = 0; sorter.hasSpaceForAnotherRecord(); ++i) { + insertNumber(sorter, i); +} +// todo: this might actually not be zero if pageSize is somehow configured differently, +// so we actually have to compute the expected spill size according to the configured page size +assertEquals(0, sorter.getSpillSize()); --- End diff -- If this might actually not be zero, maybe don't test this assertion? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill
Github user juliuszsompolski commented on a diff in the pull request: https://github.com/apache/spark/pull/19181#discussion_r141842522 --- Diff: core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java --- @@ -162,14 +162,25 @@ private int getUsableCapacity() { */ public void free() { if (consumer != null) { - consumer.freeArray(array); + if (null != array) { --- End diff -- nit: RHS literal (array != null) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill
Github user juliuszsompolski commented on a diff in the pull request: https://github.com/apache/spark/pull/19181#discussion_r141842730 --- Diff: core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java --- @@ -162,14 +162,25 @@ private int getUsableCapacity() { */ public void free() { if (consumer != null) { - consumer.freeArray(array); + if (null != array) { +consumer.freeArray(array); + } array = null; } } public void reset() { if (consumer != null) { consumer.freeArray(array); + // this is needed to prevent a 'nested' spill, --- End diff -- nit: it doesn't prevent a nested spill, it only renders it harmless remove this line - the rest of the comment is true. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill
Github user juliuszsompolski commented on a diff in the pull request: https://github.com/apache/spark/pull/19181#discussion_r141843914 --- Diff: core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java --- @@ -19,10 +19,18 @@ import java.io.File; import java.io.IOException; +import java.io.PrintWriter; import java.util.Arrays; import java.util.LinkedList; import java.util.UUID; +import jodd.io.StringOutputStream; +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.hamcrest.Matchers; +import org.hamcrest.TypeSafeMatcher; +import org.junit.Rule; +import org.junit.rules.ExpectedException; --- End diff -- nit: I think you don't use most of these imports anymore. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill
Github user juliuszsompolski commented on a diff in the pull request: https://github.com/apache/spark/pull/19181#discussion_r141843227 --- Diff: core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java --- @@ -139,4 +139,44 @@ public int compare( } assertEquals(dataToSort.length, iterLength); } + + @Test + public void freeAfterOOM() { +final TestMemoryManager testMemoryManager = new TestMemoryManager(new SparkConf().set("spark.memory.offHeap.enabled", "false")); +final TaskMemoryManager memoryManager = new TaskMemoryManager( +testMemoryManager, 0); +final TestMemoryConsumer consumer = new TestMemoryConsumer(memoryManager); +final MemoryBlock dataPage = memoryManager.allocatePage(2048, consumer); +final Object baseObject = dataPage.getBaseObject(); +// Write the records into the data page: +long position = dataPage.getBaseOffset(); + +final HashPartitioner hashPartitioner = new HashPartitioner(4); +// Use integer comparison for comparing prefixes (which are partition ids, in this case) +final PrefixComparator prefixComparator = PrefixComparators.LONG; +final RecordComparator recordComparator = new RecordComparator() { + @Override + public int compare( + Object leftBaseObject, + long leftBaseOffset, + Object rightBaseObject, + long rightBaseOffset) { +return 0; + } +}; +UnsafeInMemorySorter sorter = new UnsafeInMemorySorter(consumer, memoryManager, +recordComparator, prefixComparator, 100, shouldUseRadixSort()); + +testMemoryManager.markExecutionAsOutOfMemoryOnce(); +try { + sorter.reset(); +} catch( OutOfMemoryError oom ) { + //as expected --- End diff -- nit: ws: `// as expected` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill
Github user juliuszsompolski commented on a diff in the pull request: https://github.com/apache/spark/pull/19181#discussion_r141843046 --- Diff: core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java --- @@ -503,6 +511,39 @@ public void testGetIterator() throws Exception { verifyIntIterator(sorter.getIterator(279), 279, 300); } + @Test + public void testOOMDuringSpill() throws Exception { +final UnsafeExternalSorter sorter = newSorter(); +for (int i = 0; sorter.hasSpaceForAnotherRecord(); ++i) { + insertNumber(sorter, i); +} +// todo: this might actually not be zero if pageSize is somehow configured differently, +// so we actually have to compute the expected spill size according to the configured page size +assertEquals(0, sorter.getSpillSize()); +// we expect the next insert to attempt growing the pointerssArray +// first allocation is expected to fail, then a spill is triggered which attempts another allocation +// which also fails and we expect to see this OOM here. +// the original code messed with a released array within the spill code +// and ended up with a failed assertion. +// we also expect the location of the OOM to be org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.reset +memoryManager.markConseqOOM(2); +OutOfMemoryError expectedOOM = null; +try { + insertNumber(sorter, 1024); +} +// we expect an OutOfMemoryError here, anything else (i.e the original NPE is a failure) +catch( OutOfMemoryError oom ){ + expectedOOM = oom; +} + +assertNotNull("expected OutOfMmoryError but it seems operation surprisingly succeeded" +,expectedOOM); +String oomStackTrace = Utils.exceptionString(expectedOOM); +assertThat("expected OutOfMemoryError in org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.reset" +, oomStackTrace +, Matchers.containsString("org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.reset")); --- End diff -- nit: move commas to end of line (3x) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill
Github user juliuszsompolski commented on a diff in the pull request: https://github.com/apache/spark/pull/19181#discussion_r141842948 --- Diff: core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java --- @@ -503,6 +511,39 @@ public void testGetIterator() throws Exception { verifyIntIterator(sorter.getIterator(279), 279, 300); } + @Test + public void testOOMDuringSpill() throws Exception { +final UnsafeExternalSorter sorter = newSorter(); +for (int i = 0; sorter.hasSpaceForAnotherRecord(); ++i) { + insertNumber(sorter, i); +} +// todo: this might actually not be zero if pageSize is somehow configured differently, +// so we actually have to compute the expected spill size according to the configured page size +assertEquals(0, sorter.getSpillSize()); +// we expect the next insert to attempt growing the pointerssArray +// first allocation is expected to fail, then a spill is triggered which attempts another allocation +// which also fails and we expect to see this OOM here. +// the original code messed with a released array within the spill code +// and ended up with a failed assertion. +// we also expect the location of the OOM to be org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.reset +memoryManager.markConseqOOM(2); +OutOfMemoryError expectedOOM = null; +try { + insertNumber(sorter, 1024); +} +// we expect an OutOfMemoryError here, anything else (i.e the original NPE is a failure) +catch( OutOfMemoryError oom ){ --- End diff -- nit: ws: catch (OutOfMemoryError oom) { --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill
Github user juliuszsompolski commented on a diff in the pull request: https://github.com/apache/spark/pull/19181#discussion_r141843494 --- Diff: core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala --- @@ -58,11 +58,15 @@ class TestMemoryManager(conf: SparkConf) override def maxOffHeapStorageMemory: Long = 0L - private var oomOnce = false + private var conseqOOM = 0 private var available = Long.MaxValue def markExecutionAsOutOfMemoryOnce(): Unit = { -oomOnce = true +markConseqOOM(1) + } + + def markConseqOOM( n : Int) : Unit = { --- End diff -- nit: ws: `(n: Int): Unit` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/19181#discussion_r139594335 --- Diff: core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java --- @@ -503,6 +511,39 @@ public void testGetIterator() throws Exception { verifyIntIterator(sorter.getIterator(279), 279, 300); } + @Test + public void testOOMDuringSpill() throws Exception { +final UnsafeExternalSorter sorter = newSorter(); +for (int i = 0; sorter.hasSpaceForAnotherRecord(); ++i) { --- End diff -- according to [Jenkins](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81782/testReport/org.apache.spark.util.collection.unsafe.sort/UnsafeExternalSorterSuite/testOOMDuringSpill/): 7ms, I guess we're ok than :sunglasses: --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/19181#discussion_r139570073 --- Diff: core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java --- @@ -503,6 +511,39 @@ public void testGetIterator() throws Exception { verifyIntIterator(sorter.getIterator(279), 279, 300); } + @Test + public void testOOMDuringSpill() throws Exception { +final UnsafeExternalSorter sorter = newSorter(); +for (int i = 0; sorter.hasSpaceForAnotherRecord(); ++i) { --- End diff -- Will this make the test case takes quite a long time to finish? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/19181#discussion_r138751839 --- Diff: core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java --- @@ -503,6 +511,47 @@ public void testGetIterator() throws Exception { verifyIntIterator(sorter.getIterator(279), 279, 300); } + @Rule + public ExpectedException exceptions = ExpectedException.none(); + + @Test + public void testOOMDuringSpill() throws Exception { +final UnsafeExternalSorter sorter = newSorter(); +UnsafeInMemorySorter unsafeInMemSorter = sorter.inMemSorter; +for (int i = 0; unsafeInMemSorter.hasSpaceForAnotherRecord(); ++i) { + insertNumber(sorter, i); +} +// todo: this might actually not be zero if pageSize is somehow configured differently, +// so we actually have to compute the expected spill size according to the configured page size +assertEquals(0, sorter.getSpillSize()); +// we expect the next insert to attempt growing the pointerssArray +// first allocation is expected to fail, then a spill is triggered which attempts another allocation +// which also fails and we expect to see this OOM here. +// the original code messed with a released array within the spill code +// and ended up with a failed assertion. +// we also expect the location of the OOM to be org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.reset +memoryManager.markConseqOOM(2); +exceptions.expect(OutOfMemoryError.class); --- End diff -- I'm used to scalatest's Matchers, things could be so much easier if this test was written in Scala... Will rewrite... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/19181#discussion_r138751518 --- Diff: core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java --- @@ -85,7 +85,7 @@ private final LinkedList spillWriters = new LinkedList<>(); // These variables are reset after spilling: - @Nullable private volatile UnsafeInMemorySorter inMemSorter; + @VisibleForTesting @Nullable volatile UnsafeInMemorySorter inMemSorter; --- End diff -- @hvanhovell , is it acceptable to a method indicating if the next inset is going to spill? Can make this @VisibleForTest and prefixed with something like testOnly --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/19181#discussion_r138735379 --- Diff: core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java --- @@ -503,6 +511,47 @@ public void testGetIterator() throws Exception { verifyIntIterator(sorter.getIterator(279), 279, 300); } + @Rule + public ExpectedException exceptions = ExpectedException.none(); + + @Test + public void testOOMDuringSpill() throws Exception { +final UnsafeExternalSorter sorter = newSorter(); +UnsafeInMemorySorter unsafeInMemSorter = sorter.inMemSorter; +for (int i = 0; unsafeInMemSorter.hasSpaceForAnotherRecord(); ++i) { + insertNumber(sorter, i); +} +// todo: this might actually not be zero if pageSize is somehow configured differently, +// so we actually have to compute the expected spill size according to the configured page size +assertEquals(0, sorter.getSpillSize()); +// we expect the next insert to attempt growing the pointerssArray +// first allocation is expected to fail, then a spill is triggered which attempts another allocation +// which also fails and we expect to see this OOM here. +// the original code messed with a released array within the spill code +// and ended up with a failed assertion. +// we also expect the location of the OOM to be org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.reset +memoryManager.markConseqOOM(2); +exceptions.expect(OutOfMemoryError.class); --- End diff -- Why not just catch the error? Seems a bit easier. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/19181#discussion_r138733992 --- Diff: core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java --- @@ -162,14 +162,20 @@ private int getUsableCapacity() { */ public void free() { if (consumer != null) { - consumer.freeArray(array); + if (null != array) { +consumer.freeArray(array); + } array = null; } } public void reset() { if (consumer != null) { consumer.freeArray(array); + array = null; --- End diff -- Please document why this is needed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/19181#discussion_r138733553 --- Diff: core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java --- @@ -85,7 +85,7 @@ private final LinkedList spillWriters = new LinkedList<>(); // These variables are reset after spilling: - @Nullable private volatile UnsafeInMemorySorter inMemSorter; + @VisibleForTesting @Nullable volatile UnsafeInMemorySorter inMemSorter; --- End diff -- This is slightly hair raising. Can you try to find a different - less tricky path - to test this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/19181#discussion_r138373142 --- Diff: core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java --- @@ -170,6 +170,10 @@ public void free() { public void reset() { if (consumer != null) { consumer.freeArray(array); + array = LongArray.empty; --- End diff -- @hvanhovell , I'm starting to have second thoughts about the special `empty` instance here, I'm afraid the the nested call might trigger `freeArray` or something similar on it. perhaps using null is a better option here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill
GitHub user eyalfa opened a pull request: https://github.com/apache/spark/pull/19181 [SPARK-21907][CORE] oom during spill ## What changes were proposed in this pull request? 1. a test reproducing [SPARK-21907](https://issues.apache.org/jira/browse/SPARK-21907) 2. a fix for the root cause of the issue. `org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill` calls `org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.reset` which may trigger another spill, when this happens the `array` member is already de-allocated but still referenced by the code, this causes the nested spill to fail with an NPE in `org.apache.spark.memory.TaskMemoryManager.getPage`. This patch introduces a reproduction in a test case and a fix, the fix simply sets the in-mem sorter's array member to an empty array before actually performing the allocation. This prevents the spilling code from 'touching' the de-allocated array. ## How was this patch tested? introduced a new test case: `org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorterSuite#testOOMDuringSpill`. You can merge this pull request into a Git repository by running: $ git pull https://github.com/eyalfa/spark SPARK-21907__oom_during_spill Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19181.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19181 commit c9cbe1a3e3e794e9c1ee54d498301c3f332c3f6c Author: Eyal FaragoDate: 2017-09-10T10:05:06Z SPARK-21907__oom_during_spill: introduce a reproducing test. commit cc8ccfd3f3956a7652ec82e9748ec56609b19800 Author: Eyal Farago Date: 2017-09-10T14:29:32Z SPARK-21907__oom_during_spill: fix the root cause of this bug, improve test by requiring OOM exception thrown from the reset() method. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org