Repository: spark Updated Branches: refs/heads/master 633ffd816 -> 2028e5a82
[SPARK-21907][CORE] oom during spill ## What changes were proposed in this pull request? 1. a test reproducing [SPARK-21907](https://issues.apache.org/jira/browse/SPARK-21907) 2. a fix for the root cause of the issue. `org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill` calls `org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.reset` which may trigger another spill, when this happens the `array` member is already de-allocated but still referenced by the code, this causes the nested spill to fail with an NPE in `org.apache.spark.memory.TaskMemoryManager.getPage`. This patch introduces a reproduction in a test case and a fix, the fix simply sets the in-mem sorter's array member to an empty array before actually performing the allocation. This prevents the spilling code from 'touching' the de-allocated array. ## How was this patch tested? introduced a new test case: `org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorterSuite#testOOMDuringSpill`. Author: Eyal Farago <e...@nrgene.com> Closes #19181 from eyalfa/SPARK-21907__oom_during_spill. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2028e5a8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2028e5a8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2028e5a8 Branch: refs/heads/master Commit: 2028e5a82bc3e9a79f9b84f376bdf606b8c9bb0f Parents: 633ffd8 Author: Eyal Farago <e...@nrgene.com> Authored: Tue Oct 10 22:49:47 2017 +0200 Committer: Herman van Hovell <hvanhov...@databricks.com> Committed: Tue Oct 10 22:49:47 2017 +0200 ---------------------------------------------------------------------- .../unsafe/sort/UnsafeExternalSorter.java | 4 ++ .../unsafe/sort/UnsafeInMemorySorter.java | 12 ++++- .../unsafe/sort/UnsafeExternalSorterSuite.java | 33 ++++++++++++++ .../unsafe/sort/UnsafeInMemorySorterSuite.java | 46 ++++++++++++++++++++ .../apache/spark/memory/TestMemoryManager.scala | 12 +++-- 5 files changed, 102 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/2028e5a8/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java index 39eda00..e749f7b 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java @@ -480,6 +480,10 @@ public final class UnsafeExternalSorter extends MemoryConsumer { } } + @VisibleForTesting boolean hasSpaceForAnotherRecord() { + return inMemSorter.hasSpaceForAnotherRecord(); + } + private static void spillIterator(UnsafeSorterIterator inMemIterator, UnsafeSorterSpillWriter spillWriter) throws IOException { while (inMemIterator.hasNext()) { http://git-wip-us.apache.org/repos/asf/spark/blob/2028e5a8/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java index c14c126..869ec90 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java @@ -162,7 +162,9 @@ public final class UnsafeInMemorySorter { */ public void free() { if (consumer != null) { - consumer.freeArray(array); + if (array != null) { + consumer.freeArray(array); + } array = null; } } @@ -170,6 +172,14 @@ public final class UnsafeInMemorySorter { public void reset() { if (consumer != null) { consumer.freeArray(array); + // the call to consumer.allocateArray may trigger a spill + // which in turn access this instance and eventually re-enter this method and try to free the array again. + // by setting the array to null and its length to 0 we effectively make the spill code-path a no-op. + // setting the array to null also indicates that it has already been de-allocated which prevents a double de-allocation in free(). + array = null; + usableCapacity = 0; + pos = 0; + nullBoundaryPos = 0; array = consumer.allocateArray(initialSize); usableCapacity = getUsableCapacity(); } http://git-wip-us.apache.org/repos/asf/spark/blob/2028e5a8/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java index 5330a68..6c5451d 100644 --- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java +++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java @@ -23,6 +23,7 @@ import java.util.Arrays; import java.util.LinkedList; import java.util.UUID; +import org.hamcrest.Matchers; import scala.Tuple2$; import org.junit.After; @@ -503,6 +504,38 @@ public class UnsafeExternalSorterSuite { verifyIntIterator(sorter.getIterator(279), 279, 300); } + @Test + public void testOOMDuringSpill() throws Exception { + final UnsafeExternalSorter sorter = newSorter(); + // we assume that given default configuration, + // the size of the data we insert to the sorter (ints) + // and assuming we shouldn't spill before pointers array is exhausted + // (memory manager is not configured to throw at this point) + // - so this loop runs a reasonable number of iterations (<2000). + // test indeed completed within <30ms (on a quad i7 laptop). + for (int i = 0; sorter.hasSpaceForAnotherRecord(); ++i) { + insertNumber(sorter, i); + } + // we expect the next insert to attempt growing the pointerssArray + // first allocation is expected to fail, then a spill is triggered which attempts another allocation + // which also fails and we expect to see this OOM here. + // the original code messed with a released array within the spill code + // and ended up with a failed assertion. + // we also expect the location of the OOM to be org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.reset + memoryManager.markconsequentOOM(2); + try { + insertNumber(sorter, 1024); + fail("expected OutOfMmoryError but it seems operation surprisingly succeeded"); + } + // we expect an OutOfMemoryError here, anything else (i.e the original NPE is a failure) + catch (OutOfMemoryError oom){ + String oomStackTrace = Utils.exceptionString(oom); + assertThat("expected OutOfMemoryError in org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.reset", + oomStackTrace, + Matchers.containsString("org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.reset")); + } + } + private void verifyIntIterator(UnsafeSorterIterator iter, int start, int end) throws IOException { for (int i = start; i < end; i++) { http://git-wip-us.apache.org/repos/asf/spark/blob/2028e5a8/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java index bd89085..1a3e11e 100644 --- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java +++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java @@ -35,6 +35,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.isIn; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; public class UnsafeInMemorySorterSuite { @@ -139,4 +140,49 @@ public class UnsafeInMemorySorterSuite { } assertEquals(dataToSort.length, iterLength); } + + @Test + public void freeAfterOOM() { + final SparkConf sparkConf = new SparkConf(); + sparkConf.set("spark.memory.offHeap.enabled", "false"); + + final TestMemoryManager testMemoryManager = + new TestMemoryManager(sparkConf); + final TaskMemoryManager memoryManager = new TaskMemoryManager( + testMemoryManager, 0); + final TestMemoryConsumer consumer = new TestMemoryConsumer(memoryManager); + final MemoryBlock dataPage = memoryManager.allocatePage(2048, consumer); + final Object baseObject = dataPage.getBaseObject(); + // Write the records into the data page: + long position = dataPage.getBaseOffset(); + + final HashPartitioner hashPartitioner = new HashPartitioner(4); + // Use integer comparison for comparing prefixes (which are partition ids, in this case) + final PrefixComparator prefixComparator = PrefixComparators.LONG; + final RecordComparator recordComparator = new RecordComparator() { + @Override + public int compare( + Object leftBaseObject, + long leftBaseOffset, + Object rightBaseObject, + long rightBaseOffset) { + return 0; + } + }; + UnsafeInMemorySorter sorter = new UnsafeInMemorySorter(consumer, memoryManager, + recordComparator, prefixComparator, 100, shouldUseRadixSort()); + + testMemoryManager.markExecutionAsOutOfMemoryOnce(); + try { + sorter.reset(); + fail("expected OutOfMmoryError but it seems operation surprisingly succeeded"); + } catch (OutOfMemoryError oom) { + // as expected + } + // [SPARK-21907] this failed on NPE at org.apache.spark.memory.MemoryConsumer.freeArray(MemoryConsumer.java:108) + sorter.free(); + // simulate a 'back to back' free. + sorter.free(); + } + } http://git-wip-us.apache.org/repos/asf/spark/blob/2028e5a8/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala b/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala index 5f699df..c26945f 100644 --- a/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala +++ b/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala @@ -27,8 +27,8 @@ class TestMemoryManager(conf: SparkConf) numBytes: Long, taskAttemptId: Long, memoryMode: MemoryMode): Long = { - if (oomOnce) { - oomOnce = false + if (consequentOOM > 0) { + consequentOOM -= 1 0 } else if (available >= numBytes) { available -= numBytes @@ -58,11 +58,15 @@ class TestMemoryManager(conf: SparkConf) override def maxOffHeapStorageMemory: Long = 0L - private var oomOnce = false + private var consequentOOM = 0 private var available = Long.MaxValue def markExecutionAsOutOfMemoryOnce(): Unit = { - oomOnce = true + markconsequentOOM(1) + } + + def markconsequentOOM(n : Int) : Unit = { + consequentOOM += n } def limit(avail: Long): Unit = { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org