[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill

2017-10-10 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/19181


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill

2017-10-10 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/19181#discussion_r143773051
  
--- Diff: 
core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
 ---
@@ -503,6 +504,41 @@ public void testGetIterator() throws Exception {
 verifyIntIterator(sorter.getIterator(279), 279, 300);
   }
 
+  @Test
+  public void testOOMDuringSpill() throws Exception {
+final UnsafeExternalSorter sorter = newSorter();
+// we assume that given default configuration, the size of
--- End diff --

NIT NIT NIT NIT NIT NIT This comment is pretty good, but it is kind of hard 
to read because of the weird white space usage. Can you just make lines of 100 
characters


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill

2017-10-10 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/19181#discussion_r143771647
  
--- Diff: 
core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java
 ---
@@ -139,4 +139,49 @@ public int compare(
 }
 assertEquals(dataToSort.length, iterLength);
   }
+
+  @Test
+  public void freeAfterOOM() {
+final SparkConf sparkConf =
+new SparkConf()
--- End diff --

NIT style


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill

2017-10-10 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/19181#discussion_r143771458
  
--- Diff: 
core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java
 ---
@@ -139,4 +139,49 @@ public int compare(
 }
 assertEquals(dataToSort.length, iterLength);
   }
+
+  @Test
+  public void freeAfterOOM() {
+final SparkConf sparkConf =
+new SparkConf()
+.set("spark.memory.offHeap.enabled",
+ "false");
+final TestMemoryManager testMemoryManager =
+new TestMemoryManager(sparkConf);
+final TaskMemoryManager memoryManager = new TaskMemoryManager(
+testMemoryManager, 0);
+final TestMemoryConsumer consumer = new 
TestMemoryConsumer(memoryManager);
+final MemoryBlock dataPage = memoryManager.allocatePage(2048, 
consumer);
+final Object baseObject = dataPage.getBaseObject();
+// Write the records into the data page:
+long position = dataPage.getBaseOffset();
+
+final HashPartitioner hashPartitioner = new HashPartitioner(4);
+// Use integer comparison for comparing prefixes (which are partition 
ids, in this case)
+final PrefixComparator prefixComparator = PrefixComparators.LONG;
+final RecordComparator recordComparator = new RecordComparator() {
+  @Override
+  public int compare(
+  Object leftBaseObject,
+  long leftBaseOffset,
+  Object rightBaseObject,
+  long rightBaseOffset) {
+return 0;
+  }
+};
+UnsafeInMemorySorter sorter = new UnsafeInMemorySorter(consumer, 
memoryManager,
+recordComparator, prefixComparator, 100, shouldUseRadixSort());
+
+testMemoryManager.markExecutionAsOutOfMemoryOnce();
+try {
+  sorter.reset();
+} catch (OutOfMemoryError oom) {
+  // as expected
--- End diff --

Make sure you fail on an unexpected success


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill

2017-10-10 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/19181#discussion_r143770712
  
--- Diff: 
core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
 ---
@@ -503,6 +504,41 @@ public void testGetIterator() throws Exception {
 verifyIntIterator(sorter.getIterator(279), 279, 300);
   }
 
+  @Test
+  public void testOOMDuringSpill() throws Exception {
+final UnsafeExternalSorter sorter = newSorter();
+// we assume that given default configuration, the size of
+// the data we insert to the sorter (ints) and assuming we shouldn't
+// spill before pointers array is exhausted (memory manager is not 
configured to throw at this point)
+// - so this loop run a reasonable number of loops (<2000)
+// test test indeed completed within <30ms (on a quad i7 laptop).
+for (int i = 0; sorter.hasSpaceForAnotherRecord(); ++i) {
+  insertNumber(sorter, i);
+}
+// we expect the next insert to attempt growing the pointerssArray
+// first allocation is expected to fail, then a spill is triggered 
which attempts another allocation
+// which also fails and we expect to see this OOM here.
+// the original code messed with a released array within the spill code
+// and ended up with a failed assertion.
+// we also expect the location of the OOM to be 
org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.reset
+memoryManager.markconsequentOOM(2);
+OutOfMemoryError expectedOOM = null;
+try {
+  insertNumber(sorter, 1024);
+}
+// we expect an OutOfMemoryError here, anything else (i.e the original 
NPE is a failure)
+catch (OutOfMemoryError oom ){
+  expectedOOM = oom;
+}
+
+assertNotNull("expected OutOfMmoryError but it seems operation 
surprisingly succeeded"
--- End diff --

NIT: just move this into the catch. Just fail if you reach this point, or 
fail in the try statement.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill

2017-10-10 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/19181#discussion_r143770192
  
--- Diff: 
core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
 ---
@@ -503,6 +504,41 @@ public void testGetIterator() throws Exception {
 verifyIntIterator(sorter.getIterator(279), 279, 300);
   }
 
+  @Test
+  public void testOOMDuringSpill() throws Exception {
+final UnsafeExternalSorter sorter = newSorter();
+// we assume that given default configuration, the size of
+// the data we insert to the sorter (ints) and assuming we shouldn't
+// spill before pointers array is exhausted (memory manager is not 
configured to throw at this point)
+// - so this loop run a reasonable number of loops (<2000)
+// test test indeed completed within <30ms (on a quad i7 laptop).
+for (int i = 0; sorter.hasSpaceForAnotherRecord(); ++i) {
+  insertNumber(sorter, i);
+}
+// we expect the next insert to attempt growing the pointerssArray
+// first allocation is expected to fail, then a spill is triggered 
which attempts another allocation
+// which also fails and we expect to see this OOM here.
+// the original code messed with a released array within the spill code
+// and ended up with a failed assertion.
+// we also expect the location of the OOM to be 
org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.reset
+memoryManager.markconsequentOOM(2);
+OutOfMemoryError expectedOOM = null;
+try {
+  insertNumber(sorter, 1024);
+}
+// we expect an OutOfMemoryError here, anything else (i.e the original 
NPE is a failure)
+catch (OutOfMemoryError oom ){
--- End diff --

Nit weird spacing


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill

2017-09-30 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/19181#discussion_r142005517
  
--- Diff: 
core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
 ---
@@ -503,6 +511,39 @@ public void testGetIterator() throws Exception {
 verifyIntIterator(sorter.getIterator(279), 279, 300);
   }
 
+  @Test
+  public void testOOMDuringSpill() throws Exception {
+final UnsafeExternalSorter sorter = newSorter();
+for (int i = 0; sorter.hasSpaceForAnotherRecord(); ++i) {
+  insertNumber(sorter, i);
+}
+// todo: this might actually not be zero if pageSize is somehow 
configured differently,
+// so we actually have to compute the expected spill size according to 
the configured page size
+assertEquals(0,  sorter.getSpillSize());
--- End diff --

well, its a tricky one...
as far as I can track the construction and logic of the relevant classes 
here, and given that we're sorting Ints here, we're not supposed to spill 
before expandPointersArray requests additional memory, it is however very 
difficult and 'involved' to actually check this during test.
I can set the memoryManager to choke **before** the first loop, this way if 
my assumption breaks and the sorter attempts to allocate memory we'd get an OOM 
sooner than expected, effectively failing the test.

@juliuszsompolski , do you find this approach better? it'd still need a 
comment describing the assumption about memory usage by the sorter.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill

2017-09-29 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/19181#discussion_r141892238
  
--- Diff: 
core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java
 ---
@@ -139,4 +139,44 @@ public int compare(
 }
 assertEquals(dataToSort.length, iterLength);
   }
+
+  @Test
+  public void freeAfterOOM() {
+final TestMemoryManager testMemoryManager = new TestMemoryManager(new 
SparkConf().set("spark.memory.offHeap.enabled", "false"));
--- End diff --

nit: Is it better to wrap this line since this is too long.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill

2017-09-29 Thread juliuszsompolski
Github user juliuszsompolski commented on a diff in the pull request:

https://github.com/apache/spark/pull/19181#discussion_r141843276
  
--- Diff: 
core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java
 ---
@@ -139,4 +139,44 @@ public int compare(
 }
 assertEquals(dataToSort.length, iterLength);
   }
+
+  @Test
+  public void freeAfterOOM() {
+final TestMemoryManager testMemoryManager = new TestMemoryManager(new 
SparkConf().set("spark.memory.offHeap.enabled", "false"));
+final TaskMemoryManager memoryManager = new TaskMemoryManager(
+testMemoryManager, 0);
+final TestMemoryConsumer consumer = new 
TestMemoryConsumer(memoryManager);
+final MemoryBlock dataPage = memoryManager.allocatePage(2048, 
consumer);
+final Object baseObject = dataPage.getBaseObject();
+// Write the records into the data page:
+long position = dataPage.getBaseOffset();
+
+final HashPartitioner hashPartitioner = new HashPartitioner(4);
+// Use integer comparison for comparing prefixes (which are partition 
ids, in this case)
+final PrefixComparator prefixComparator = PrefixComparators.LONG;
+final RecordComparator recordComparator = new RecordComparator() {
+  @Override
+  public int compare(
+  Object leftBaseObject,
+  long leftBaseOffset,
+  Object rightBaseObject,
+  long rightBaseOffset) {
+return 0;
+  }
+};
+UnsafeInMemorySorter sorter = new UnsafeInMemorySorter(consumer, 
memoryManager,
+recordComparator, prefixComparator, 100, shouldUseRadixSort());
+
+testMemoryManager.markExecutionAsOutOfMemoryOnce();
+try {
+  sorter.reset();
+} catch( OutOfMemoryError oom ) {
+  //as expected
+}
+// this currently fails on NPE at 
org.apache.spark.memory.MemoryConsumer.freeArray(MemoryConsumer.java:108)
+sorter.free();
+//simulate a 'back to back' free.
--- End diff --

nit: ws: `// simulate ...`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill

2017-09-29 Thread juliuszsompolski
Github user juliuszsompolski commented on a diff in the pull request:

https://github.com/apache/spark/pull/19181#discussion_r141843447
  
--- Diff: 
core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala ---
@@ -58,11 +58,15 @@ class TestMemoryManager(conf: SparkConf)
 
   override def maxOffHeapStorageMemory: Long = 0L
 
-  private var oomOnce = false
+  private var conseqOOM = 0
   private var available = Long.MaxValue
 
   def markExecutionAsOutOfMemoryOnce(): Unit = {
-oomOnce = true
+markConseqOOM(1)
+  }
+
+  def markConseqOOM( n : Int) : Unit = {
--- End diff --

nit: markConsequentOOM


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill

2017-09-29 Thread juliuszsompolski
Github user juliuszsompolski commented on a diff in the pull request:

https://github.com/apache/spark/pull/19181#discussion_r141843414
  
--- Diff: 
core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala ---
@@ -58,11 +58,15 @@ class TestMemoryManager(conf: SparkConf)
 
   override def maxOffHeapStorageMemory: Long = 0L
 
-  private var oomOnce = false
+  private var conseqOOM = 0
--- End diff --

nit: conseq -> consequent


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill

2017-09-29 Thread juliuszsompolski
Github user juliuszsompolski commented on a diff in the pull request:

https://github.com/apache/spark/pull/19181#discussion_r141843193
  
--- Diff: 
core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java
 ---
@@ -139,4 +139,44 @@ public int compare(
 }
 assertEquals(dataToSort.length, iterLength);
   }
+
+  @Test
+  public void freeAfterOOM() {
+final TestMemoryManager testMemoryManager = new TestMemoryManager(new 
SparkConf().set("spark.memory.offHeap.enabled", "false"));
+final TaskMemoryManager memoryManager = new TaskMemoryManager(
+testMemoryManager, 0);
+final TestMemoryConsumer consumer = new 
TestMemoryConsumer(memoryManager);
+final MemoryBlock dataPage = memoryManager.allocatePage(2048, 
consumer);
+final Object baseObject = dataPage.getBaseObject();
+// Write the records into the data page:
+long position = dataPage.getBaseOffset();
+
+final HashPartitioner hashPartitioner = new HashPartitioner(4);
+// Use integer comparison for comparing prefixes (which are partition 
ids, in this case)
+final PrefixComparator prefixComparator = PrefixComparators.LONG;
+final RecordComparator recordComparator = new RecordComparator() {
+  @Override
+  public int compare(
+  Object leftBaseObject,
+  long leftBaseOffset,
+  Object rightBaseObject,
+  long rightBaseOffset) {
+return 0;
+  }
+};
+UnsafeInMemorySorter sorter = new UnsafeInMemorySorter(consumer, 
memoryManager,
+recordComparator, prefixComparator, 100, shouldUseRadixSort());
+
+testMemoryManager.markExecutionAsOutOfMemoryOnce();
+try {
+  sorter.reset();
+} catch( OutOfMemoryError oom ) {
+  //as expected
+}
+// this currently fails on NPE at 
org.apache.spark.memory.MemoryConsumer.freeArray(MemoryConsumer.java:108)
--- End diff --

nit: tense: "this currently fails" -> "[SPARK-21907] this failed ..."
At the point when anyone reads it, it will hopefully not fail :-)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill

2017-09-29 Thread juliuszsompolski
Github user juliuszsompolski commented on a diff in the pull request:

https://github.com/apache/spark/pull/19181#discussion_r141842424
  
--- Diff: 
core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
 ---
@@ -85,7 +85,7 @@
   private final LinkedList spillWriters = new 
LinkedList<>();
 
   // These variables are reset after spilling:
-  @Nullable private volatile UnsafeInMemorySorter inMemSorter;
+  private @Nullable volatile UnsafeInMemorySorter inMemSorter;
--- End diff --

nit: unnecessary change.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill

2017-09-29 Thread juliuszsompolski
Github user juliuszsompolski commented on a diff in the pull request:

https://github.com/apache/spark/pull/19181#discussion_r141842918
  
--- Diff: 
core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
 ---
@@ -503,6 +511,39 @@ public void testGetIterator() throws Exception {
 verifyIntIterator(sorter.getIterator(279), 279, 300);
   }
 
+  @Test
+  public void testOOMDuringSpill() throws Exception {
+final UnsafeExternalSorter sorter = newSorter();
+for (int i = 0; sorter.hasSpaceForAnotherRecord(); ++i) {
+  insertNumber(sorter, i);
+}
+// todo: this might actually not be zero if pageSize is somehow 
configured differently,
+// so we actually have to compute the expected spill size according to 
the configured page size
+assertEquals(0,  sorter.getSpillSize());
--- End diff --

If this might actually not be zero, maybe don't test this assertion?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill

2017-09-29 Thread juliuszsompolski
Github user juliuszsompolski commented on a diff in the pull request:

https://github.com/apache/spark/pull/19181#discussion_r141842522
  
--- Diff: 
core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
 ---
@@ -162,14 +162,25 @@ private int getUsableCapacity() {
*/
   public void free() {
 if (consumer != null) {
-  consumer.freeArray(array);
+  if (null != array) {
--- End diff --

nit: RHS literal (array != null)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill

2017-09-29 Thread juliuszsompolski
Github user juliuszsompolski commented on a diff in the pull request:

https://github.com/apache/spark/pull/19181#discussion_r141842730
  
--- Diff: 
core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
 ---
@@ -162,14 +162,25 @@ private int getUsableCapacity() {
*/
   public void free() {
 if (consumer != null) {
-  consumer.freeArray(array);
+  if (null != array) {
+consumer.freeArray(array);
+  }
   array = null;
 }
   }
 
   public void reset() {
 if (consumer != null) {
   consumer.freeArray(array);
+  // this is needed to prevent a 'nested' spill,
--- End diff --

nit: it doesn't prevent a nested spill, it only renders it harmless
remove this line - the rest of the comment is true.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill

2017-09-29 Thread juliuszsompolski
Github user juliuszsompolski commented on a diff in the pull request:

https://github.com/apache/spark/pull/19181#discussion_r141843914
  
--- Diff: 
core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
 ---
@@ -19,10 +19,18 @@
 
 import java.io.File;
 import java.io.IOException;
+import java.io.PrintWriter;
 import java.util.Arrays;
 import java.util.LinkedList;
 import java.util.UUID;
 
+import jodd.io.StringOutputStream;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.Matchers;
+import org.hamcrest.TypeSafeMatcher;
+import org.junit.Rule;
+import org.junit.rules.ExpectedException;
--- End diff --

nit: I think you don't use most of these imports anymore.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill

2017-09-29 Thread juliuszsompolski
Github user juliuszsompolski commented on a diff in the pull request:

https://github.com/apache/spark/pull/19181#discussion_r141843227
  
--- Diff: 
core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java
 ---
@@ -139,4 +139,44 @@ public int compare(
 }
 assertEquals(dataToSort.length, iterLength);
   }
+
+  @Test
+  public void freeAfterOOM() {
+final TestMemoryManager testMemoryManager = new TestMemoryManager(new 
SparkConf().set("spark.memory.offHeap.enabled", "false"));
+final TaskMemoryManager memoryManager = new TaskMemoryManager(
+testMemoryManager, 0);
+final TestMemoryConsumer consumer = new 
TestMemoryConsumer(memoryManager);
+final MemoryBlock dataPage = memoryManager.allocatePage(2048, 
consumer);
+final Object baseObject = dataPage.getBaseObject();
+// Write the records into the data page:
+long position = dataPage.getBaseOffset();
+
+final HashPartitioner hashPartitioner = new HashPartitioner(4);
+// Use integer comparison for comparing prefixes (which are partition 
ids, in this case)
+final PrefixComparator prefixComparator = PrefixComparators.LONG;
+final RecordComparator recordComparator = new RecordComparator() {
+  @Override
+  public int compare(
+  Object leftBaseObject,
+  long leftBaseOffset,
+  Object rightBaseObject,
+  long rightBaseOffset) {
+return 0;
+  }
+};
+UnsafeInMemorySorter sorter = new UnsafeInMemorySorter(consumer, 
memoryManager,
+recordComparator, prefixComparator, 100, shouldUseRadixSort());
+
+testMemoryManager.markExecutionAsOutOfMemoryOnce();
+try {
+  sorter.reset();
+} catch( OutOfMemoryError oom ) {
+  //as expected
--- End diff --

nit: ws: `// as expected`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill

2017-09-29 Thread juliuszsompolski
Github user juliuszsompolski commented on a diff in the pull request:

https://github.com/apache/spark/pull/19181#discussion_r141843046
  
--- Diff: 
core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
 ---
@@ -503,6 +511,39 @@ public void testGetIterator() throws Exception {
 verifyIntIterator(sorter.getIterator(279), 279, 300);
   }
 
+  @Test
+  public void testOOMDuringSpill() throws Exception {
+final UnsafeExternalSorter sorter = newSorter();
+for (int i = 0; sorter.hasSpaceForAnotherRecord(); ++i) {
+  insertNumber(sorter, i);
+}
+// todo: this might actually not be zero if pageSize is somehow 
configured differently,
+// so we actually have to compute the expected spill size according to 
the configured page size
+assertEquals(0,  sorter.getSpillSize());
+// we expect the next insert to attempt growing the pointerssArray
+// first allocation is expected to fail, then a spill is triggered 
which attempts another allocation
+// which also fails and we expect to see this OOM here.
+// the original code messed with a released array within the spill code
+// and ended up with a failed assertion.
+// we also expect the location of the OOM to be 
org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.reset
+memoryManager.markConseqOOM(2);
+OutOfMemoryError expectedOOM = null;
+try {
+  insertNumber(sorter, 1024);
+}
+// we expect an OutOfMemoryError here, anything else (i.e the original 
NPE is a failure)
+catch( OutOfMemoryError oom ){
+  expectedOOM = oom;
+}
+
+assertNotNull("expected OutOfMmoryError but it seems operation 
surprisingly succeeded"
+,expectedOOM);
+String oomStackTrace = Utils.exceptionString(expectedOOM);
+assertThat("expected OutOfMemoryError in 
org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.reset"
+, oomStackTrace
+, 
Matchers.containsString("org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.reset"));
--- End diff --

nit: move commas to end of line (3x)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill

2017-09-29 Thread juliuszsompolski
Github user juliuszsompolski commented on a diff in the pull request:

https://github.com/apache/spark/pull/19181#discussion_r141842948
  
--- Diff: 
core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
 ---
@@ -503,6 +511,39 @@ public void testGetIterator() throws Exception {
 verifyIntIterator(sorter.getIterator(279), 279, 300);
   }
 
+  @Test
+  public void testOOMDuringSpill() throws Exception {
+final UnsafeExternalSorter sorter = newSorter();
+for (int i = 0; sorter.hasSpaceForAnotherRecord(); ++i) {
+  insertNumber(sorter, i);
+}
+// todo: this might actually not be zero if pageSize is somehow 
configured differently,
+// so we actually have to compute the expected spill size according to 
the configured page size
+assertEquals(0,  sorter.getSpillSize());
+// we expect the next insert to attempt growing the pointerssArray
+// first allocation is expected to fail, then a spill is triggered 
which attempts another allocation
+// which also fails and we expect to see this OOM here.
+// the original code messed with a released array within the spill code
+// and ended up with a failed assertion.
+// we also expect the location of the OOM to be 
org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.reset
+memoryManager.markConseqOOM(2);
+OutOfMemoryError expectedOOM = null;
+try {
+  insertNumber(sorter, 1024);
+}
+// we expect an OutOfMemoryError here, anything else (i.e the original 
NPE is a failure)
+catch( OutOfMemoryError oom ){
--- End diff --

nit: ws: catch (OutOfMemoryError oom) {


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill

2017-09-29 Thread juliuszsompolski
Github user juliuszsompolski commented on a diff in the pull request:

https://github.com/apache/spark/pull/19181#discussion_r141843494
  
--- Diff: 
core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala ---
@@ -58,11 +58,15 @@ class TestMemoryManager(conf: SparkConf)
 
   override def maxOffHeapStorageMemory: Long = 0L
 
-  private var oomOnce = false
+  private var conseqOOM = 0
   private var available = Long.MaxValue
 
   def markExecutionAsOutOfMemoryOnce(): Unit = {
-oomOnce = true
+markConseqOOM(1)
+  }
+
+  def markConseqOOM( n : Int) : Unit = {
--- End diff --

nit: ws: `(n: Int): Unit`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill

2017-09-18 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/19181#discussion_r139594335
  
--- Diff: 
core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
 ---
@@ -503,6 +511,39 @@ public void testGetIterator() throws Exception {
 verifyIntIterator(sorter.getIterator(279), 279, 300);
   }
 
+  @Test
+  public void testOOMDuringSpill() throws Exception {
+final UnsafeExternalSorter sorter = newSorter();
+for (int i = 0; sorter.hasSpaceForAnotherRecord(); ++i) {
--- End diff --

according to 
[Jenkins](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81782/testReport/org.apache.spark.util.collection.unsafe.sort/UnsafeExternalSorterSuite/testOOMDuringSpill/):
 7ms, I guess we're ok than :sunglasses: 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill

2017-09-18 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19181#discussion_r139570073
  
--- Diff: 
core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
 ---
@@ -503,6 +511,39 @@ public void testGetIterator() throws Exception {
 verifyIntIterator(sorter.getIterator(279), 279, 300);
   }
 
+  @Test
+  public void testOOMDuringSpill() throws Exception {
+final UnsafeExternalSorter sorter = newSorter();
+for (int i = 0; sorter.hasSpaceForAnotherRecord(); ++i) {
--- End diff --

Will this make the test case takes quite a long time to finish?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill

2017-09-13 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/19181#discussion_r138751839
  
--- Diff: 
core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
 ---
@@ -503,6 +511,47 @@ public void testGetIterator() throws Exception {
 verifyIntIterator(sorter.getIterator(279), 279, 300);
   }
 
+  @Rule
+  public ExpectedException exceptions = ExpectedException.none();
+
+  @Test
+  public void testOOMDuringSpill() throws Exception {
+final UnsafeExternalSorter sorter = newSorter();
+UnsafeInMemorySorter unsafeInMemSorter = sorter.inMemSorter;
+for (int i = 0; unsafeInMemSorter.hasSpaceForAnotherRecord(); ++i) {
+  insertNumber(sorter, i);
+}
+// todo: this might actually not be zero if pageSize is somehow 
configured differently,
+// so we actually have to compute the expected spill size according to 
the configured page size
+assertEquals(0,  sorter.getSpillSize());
+// we expect the next insert to attempt growing the pointerssArray
+// first allocation is expected to fail, then a spill is triggered 
which attempts another allocation
+// which also fails and we expect to see this OOM here.
+// the original code messed with a released array within the spill code
+// and ended up with a failed assertion.
+// we also expect the location of the OOM to be 
org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.reset
+memoryManager.markConseqOOM(2);
+exceptions.expect(OutOfMemoryError.class);
--- End diff --

I'm used to scalatest's Matchers, things could be so much easier if this 
test was written in Scala...
Will rewrite...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill

2017-09-13 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/19181#discussion_r138751518
  
--- Diff: 
core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
 ---
@@ -85,7 +85,7 @@
   private final LinkedList spillWriters = new 
LinkedList<>();
 
   // These variables are reset after spilling:
-  @Nullable private volatile UnsafeInMemorySorter inMemSorter;
+  @VisibleForTesting @Nullable volatile UnsafeInMemorySorter inMemSorter;
--- End diff --

@hvanhovell , is it acceptable to a method indicating if the next inset is 
going to spill? Can make this @VisibleForTest and prefixed with something like 
testOnly


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill

2017-09-13 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/19181#discussion_r138735379
  
--- Diff: 
core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
 ---
@@ -503,6 +511,47 @@ public void testGetIterator() throws Exception {
 verifyIntIterator(sorter.getIterator(279), 279, 300);
   }
 
+  @Rule
+  public ExpectedException exceptions = ExpectedException.none();
+
+  @Test
+  public void testOOMDuringSpill() throws Exception {
+final UnsafeExternalSorter sorter = newSorter();
+UnsafeInMemorySorter unsafeInMemSorter = sorter.inMemSorter;
+for (int i = 0; unsafeInMemSorter.hasSpaceForAnotherRecord(); ++i) {
+  insertNumber(sorter, i);
+}
+// todo: this might actually not be zero if pageSize is somehow 
configured differently,
+// so we actually have to compute the expected spill size according to 
the configured page size
+assertEquals(0,  sorter.getSpillSize());
+// we expect the next insert to attempt growing the pointerssArray
+// first allocation is expected to fail, then a spill is triggered 
which attempts another allocation
+// which also fails and we expect to see this OOM here.
+// the original code messed with a released array within the spill code
+// and ended up with a failed assertion.
+// we also expect the location of the OOM to be 
org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.reset
+memoryManager.markConseqOOM(2);
+exceptions.expect(OutOfMemoryError.class);
--- End diff --

Why not just catch the error? Seems a bit easier.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill

2017-09-13 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/19181#discussion_r138733992
  
--- Diff: 
core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
 ---
@@ -162,14 +162,20 @@ private int getUsableCapacity() {
*/
   public void free() {
 if (consumer != null) {
-  consumer.freeArray(array);
+  if (null != array) {
+consumer.freeArray(array);
+  }
   array = null;
 }
   }
 
   public void reset() {
 if (consumer != null) {
   consumer.freeArray(array);
+  array = null;
--- End diff --

Please document why this is needed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill

2017-09-13 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/19181#discussion_r138733553
  
--- Diff: 
core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
 ---
@@ -85,7 +85,7 @@
   private final LinkedList spillWriters = new 
LinkedList<>();
 
   // These variables are reset after spilling:
-  @Nullable private volatile UnsafeInMemorySorter inMemSorter;
+  @VisibleForTesting @Nullable volatile UnsafeInMemorySorter inMemSorter;
--- End diff --

This is slightly hair raising. Can you try to find a different - less 
tricky path - to test this?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill

2017-09-12 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/19181#discussion_r138373142
  
--- Diff: 
core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
 ---
@@ -170,6 +170,10 @@ public void free() {
   public void reset() {
 if (consumer != null) {
   consumer.freeArray(array);
+  array = LongArray.empty;
--- End diff --

@hvanhovell ,
I'm starting to have second thoughts about the special `empty` instance 
here, I'm afraid the the nested call might trigger `freeArray` or something 
similar on it.
perhaps using null is a better option here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill

2017-09-10 Thread eyalfa
GitHub user eyalfa opened a pull request:

https://github.com/apache/spark/pull/19181

[SPARK-21907][CORE]  oom during spill

## What changes were proposed in this pull request?
1. a test reproducing 
[SPARK-21907](https://issues.apache.org/jira/browse/SPARK-21907)
2. a fix for the root cause of the issue.

`org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill` 
calls `org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.reset` 
which may trigger another spill,
when this happens the `array` member is already de-allocated but still 
referenced by the code, this causes the nested spill to fail with an NPE in 
`org.apache.spark.memory.TaskMemoryManager.getPage`.
This patch introduces a reproduction in a test case and a fix, the fix 
simply sets the in-mem sorter's array member to an empty array before actually 
performing the allocation. This prevents the spilling code from 'touching' the 
de-allocated array.

## How was this patch tested?
introduced a new test case: 
`org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorterSuite#testOOMDuringSpill`.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/eyalfa/spark SPARK-21907__oom_during_spill

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19181.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19181


commit c9cbe1a3e3e794e9c1ee54d498301c3f332c3f6c
Author: Eyal Farago 
Date:   2017-09-10T10:05:06Z

SPARK-21907__oom_during_spill: introduce a reproducing test.

commit cc8ccfd3f3956a7652ec82e9748ec56609b19800
Author: Eyal Farago 
Date:   2017-09-10T14:29:32Z

SPARK-21907__oom_during_spill: fix the root cause of this bug, improve test 
by requiring OOM exception thrown from the reset() method.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org