[GitHub] spark pull request #22062: [SPARK-25081][Core]Nested spill in ShuffleExterna...

2018-08-10 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/22062#discussion_r209372979
  
--- Diff: 
core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala
 ---
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.sort
+
+import java.lang.{Long => JLong}
+
+import org.mockito.Mockito.when
+import org.scalatest.mockito.MockitoSugar
+
+import org.apache.spark._
+import org.apache.spark.executor.{ShuffleWriteMetrics, TaskMetrics}
+import org.apache.spark.memory._
+import org.apache.spark.unsafe.Platform
+
+class ShuffleExternalSorterSuite extends SparkFunSuite with 
LocalSparkContext with MockitoSugar {
+
+  test("nested spill should be no-op") {
+val conf = new SparkConf()
+  .setMaster("local[1]")
+  .setAppName("ShuffleExternalSorterSuite")
+  .set("spark.testing", "true")
+  .set("spark.testing.memory", "1600")
+  .set("spark.memory.fraction", "1")
+sc = new SparkContext(conf)
+
+val memoryManager = UnifiedMemoryManager(conf, 1)
+
+var shouldAllocate = false
+
+// Mock `TaskMemoryManager` to allocate free memory when 
`shouldAllocate` is true.
+// This will trigger a nested spill and expose issues if we don't 
handle this case properly.
+val taskMemoryManager = new TaskMemoryManager(memoryManager, 0) {
+  override def acquireExecutionMemory(required: Long, consumer: 
MemoryConsumer): Long = {
+// ExecutionMemoryPool.acquireMemory will wait until there are 400 
bytes for a task to use.
+// So we leave 400 bytes for the task.
+if (shouldAllocate &&
+  memoryManager.maxHeapMemory - memoryManager.executionMemoryUsed 
> 400) {
+  val acquireExecutionMemoryMethod =
+memoryManager.getClass.getMethods.filter(_.getName == 
"acquireExecutionMemory").head
+  acquireExecutionMemoryMethod.invoke(
+memoryManager,
+JLong.valueOf(
+  memoryManager.maxHeapMemory - 
memoryManager.executionMemoryUsed - 400),
+JLong.valueOf(1L), // taskAttemptId
+MemoryMode.ON_HEAP
+  ).asInstanceOf[java.lang.Long]
+}
+super.acquireExecutionMemory(required, consumer)
+  }
+}
+val taskContext = mock[TaskContext]
--- End diff --

lol


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22062: [SPARK-25081][Core]Nested spill in ShuffleExterna...

2018-08-10 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/22062


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22062: [SPARK-25081][Core]Nested spill in ShuffleExterna...

2018-08-10 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22062#discussion_r209337943
  
--- Diff: 
core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala
 ---
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.sort
+
+import java.lang.{Long => JLong}
+
+import org.mockito.Mockito.when
+import org.scalatest.mockito.MockitoSugar
+
+import org.apache.spark._
+import org.apache.spark.executor.{ShuffleWriteMetrics, TaskMetrics}
+import org.apache.spark.memory._
+import org.apache.spark.unsafe.Platform
+
+class ShuffleExternalSorterSuite extends SparkFunSuite with 
LocalSparkContext with MockitoSugar {
+
+  test("nested spill should be no-op") {
+val conf = new SparkConf()
+  .setMaster("local[1]")
+  .setAppName("ShuffleExternalSorterSuite")
+  .set("spark.testing", "true")
+  .set("spark.testing.memory", "1600")
+  .set("spark.memory.fraction", "1")
+sc = new SparkContext(conf)
+
+val memoryManager = UnifiedMemoryManager(conf, 1)
+
+var shouldAllocate = false
+
+// Mock `TaskMemoryManager` to allocate free memory when 
`shouldAllocate` is true.
+// This will trigger a nested spill and expose issues if we don't 
handle this case properly.
+val taskMemoryManager = new TaskMemoryManager(memoryManager, 0) {
+  override def acquireExecutionMemory(required: Long, consumer: 
MemoryConsumer): Long = {
+// ExecutionMemoryPool.acquireMemory will wait until there are 400 
bytes for a task to use.
+// So we leave 400 bytes for the task.
+if (shouldAllocate &&
+  memoryManager.maxHeapMemory - memoryManager.executionMemoryUsed 
> 400) {
+  val acquireExecutionMemoryMethod =
+memoryManager.getClass.getMethods.filter(_.getName == 
"acquireExecutionMemory").head
+  acquireExecutionMemoryMethod.invoke(
+memoryManager,
+JLong.valueOf(
+  memoryManager.maxHeapMemory - 
memoryManager.executionMemoryUsed - 400),
+JLong.valueOf(1L), // taskAttemptId
+MemoryMode.ON_HEAP
+  ).asInstanceOf[java.lang.Long]
+}
+super.acquireExecutionMemory(required, consumer)
+  }
+}
+val taskContext = mock[TaskContext]
--- End diff --

> We can also create a TaskContextImpl by hand right?

I can. Just to save several lines :)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22062: [SPARK-25081][Core]Nested spill in ShuffleExterna...

2018-08-10 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22062#discussion_r209338026
  
--- Diff: 
core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala
 ---
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.sort
+
+import java.lang.{Long => JLong}
+
+import org.mockito.Mockito.when
+import org.scalatest.mockito.MockitoSugar
+
+import org.apache.spark._
+import org.apache.spark.executor.{ShuffleWriteMetrics, TaskMetrics}
+import org.apache.spark.memory._
+import org.apache.spark.unsafe.Platform
+
+class ShuffleExternalSorterSuite extends SparkFunSuite with 
LocalSparkContext with MockitoSugar {
+
+  test("nested spill should be no-op") {
+val conf = new SparkConf()
+  .setMaster("local[1]")
+  .setAppName("ShuffleExternalSorterSuite")
+  .set("spark.testing", "true")
+  .set("spark.testing.memory", "1600")
+  .set("spark.memory.fraction", "1")
+sc = new SparkContext(conf)
+
+val memoryManager = UnifiedMemoryManager(conf, 1)
+
+var shouldAllocate = false
+
+// Mock `TaskMemoryManager` to allocate free memory when 
`shouldAllocate` is true.
+// This will trigger a nested spill and expose issues if we don't 
handle this case properly.
+val taskMemoryManager = new TaskMemoryManager(memoryManager, 0) {
+  override def acquireExecutionMemory(required: Long, consumer: 
MemoryConsumer): Long = {
+// ExecutionMemoryPool.acquireMemory will wait until there are 400 
bytes for a task to use.
+// So we leave 400 bytes for the task.
+if (shouldAllocate &&
+  memoryManager.maxHeapMemory - memoryManager.executionMemoryUsed 
> 400) {
+  val acquireExecutionMemoryMethod =
+memoryManager.getClass.getMethods.filter(_.getName == 
"acquireExecutionMemory").head
+  acquireExecutionMemoryMethod.invoke(
+memoryManager,
+JLong.valueOf(
+  memoryManager.maxHeapMemory - 
memoryManager.executionMemoryUsed - 400),
+JLong.valueOf(1L), // taskAttemptId
+MemoryMode.ON_HEAP
+  ).asInstanceOf[java.lang.Long]
+}
+super.acquireExecutionMemory(required, consumer)
+  }
+}
+val taskContext = mock[TaskContext]
+val taskMetrics = new TaskMetrics
+when(taskContext.taskMetrics()).thenReturn(taskMetrics)
+val sorter = new ShuffleExternalSorter(
+  taskMemoryManager,
+  sc.env.blockManager,
+  taskContext,
+  100, // initialSize - This will require ShuffleInMemorySorter to 
acquire at least 800 bytes
+  1, // numPartitions
+  conf,
+  new ShuffleWriteMetrics)
+val inMemSorter = {
+  val field = sorter.getClass.getDeclaredField("inMemSorter")
+  field.setAccessible(true)
+  field.get(sorter).asInstanceOf[ShuffleInMemorySorter]
+}
+// Allocate memory to make the next "insertRecord" call triggers a 
spill.
+val bytes = new Array[Byte](1)
+while (inMemSorter.hasSpaceForAnotherRecord) {
--- End diff --

> Access to the hasSpaceForAnotherRecord is the only reason why we need 
reflection right?

Yes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22062: [SPARK-25081][Core]Nested spill in ShuffleExterna...

2018-08-10 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22062#discussion_r209337484
  
--- Diff: 
core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java ---
@@ -94,12 +94,20 @@ public int numRecords() {
   }
 
   public void reset() {
+// Reset `pos` here so that `spill` triggered by the below 
`allocateArray` will be no-op.
+pos = 0;
--- End diff --

We also need to set `usableCapacity` to `0`. Otherwise, 
https://github.com/apache/spark/blob/9b8521e53e56a53b44c02366a99f8a8ee1307bbf/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java#L343
 will not rethrow SparkOutOfMemoryError. ShuffleExternalSorter will keep 
running and finally touch `array`.

Setting `array` to `null` is just for safety so that anything incorrect use 
will fail with NPE. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22062: [SPARK-25081][Core]Nested spill in ShuffleExterna...

2018-08-10 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/22062#discussion_r209292284
  
--- Diff: 
core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala
 ---
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.sort
+
+import java.lang.{Long => JLong}
+
+import org.mockito.Mockito.when
+import org.scalatest.mockito.MockitoSugar
+
+import org.apache.spark._
+import org.apache.spark.executor.{ShuffleWriteMetrics, TaskMetrics}
+import org.apache.spark.memory._
+import org.apache.spark.unsafe.Platform
+
+class ShuffleExternalSorterSuite extends SparkFunSuite with 
LocalSparkContext with MockitoSugar {
+
+  test("nested spill should be no-op") {
+val conf = new SparkConf()
+  .setMaster("local[1]")
+  .setAppName("ShuffleExternalSorterSuite")
+  .set("spark.testing", "true")
+  .set("spark.testing.memory", "1600")
+  .set("spark.memory.fraction", "1")
+sc = new SparkContext(conf)
+
+val memoryManager = UnifiedMemoryManager(conf, 1)
+
+var shouldAllocate = false
+
+// Mock `TaskMemoryManager` to allocate free memory when 
`shouldAllocate` is true.
+// This will trigger a nested spill and expose issues if we don't 
handle this case properly.
+val taskMemoryManager = new TaskMemoryManager(memoryManager, 0) {
+  override def acquireExecutionMemory(required: Long, consumer: 
MemoryConsumer): Long = {
+// ExecutionMemoryPool.acquireMemory will wait until there are 400 
bytes for a task to use.
+// So we leave 400 bytes for the task.
+if (shouldAllocate &&
+  memoryManager.maxHeapMemory - memoryManager.executionMemoryUsed 
> 400) {
+  val acquireExecutionMemoryMethod =
+memoryManager.getClass.getMethods.filter(_.getName == 
"acquireExecutionMemory").head
+  acquireExecutionMemoryMethod.invoke(
+memoryManager,
+JLong.valueOf(
+  memoryManager.maxHeapMemory - 
memoryManager.executionMemoryUsed - 400),
+JLong.valueOf(1L), // taskAttemptId
+MemoryMode.ON_HEAP
+  ).asInstanceOf[java.lang.Long]
+}
+super.acquireExecutionMemory(required, consumer)
+  }
+}
+val taskContext = mock[TaskContext]
--- End diff --

Do we need mockito here? We can also create a `TaskContextImpl` by hand 
right?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22062: [SPARK-25081][Core]Nested spill in ShuffleExterna...

2018-08-10 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/22062#discussion_r209291439
  
--- Diff: 
core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala
 ---
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.sort
+
+import java.lang.{Long => JLong}
+
+import org.mockito.Mockito.when
+import org.scalatest.mockito.MockitoSugar
+
+import org.apache.spark._
+import org.apache.spark.executor.{ShuffleWriteMetrics, TaskMetrics}
+import org.apache.spark.memory._
+import org.apache.spark.unsafe.Platform
+
+class ShuffleExternalSorterSuite extends SparkFunSuite with 
LocalSparkContext with MockitoSugar {
+
+  test("nested spill should be no-op") {
+val conf = new SparkConf()
+  .setMaster("local[1]")
+  .setAppName("ShuffleExternalSorterSuite")
+  .set("spark.testing", "true")
+  .set("spark.testing.memory", "1600")
+  .set("spark.memory.fraction", "1")
+sc = new SparkContext(conf)
+
+val memoryManager = UnifiedMemoryManager(conf, 1)
+
+var shouldAllocate = false
+
+// Mock `TaskMemoryManager` to allocate free memory when 
`shouldAllocate` is true.
+// This will trigger a nested spill and expose issues if we don't 
handle this case properly.
+val taskMemoryManager = new TaskMemoryManager(memoryManager, 0) {
+  override def acquireExecutionMemory(required: Long, consumer: 
MemoryConsumer): Long = {
+// ExecutionMemoryPool.acquireMemory will wait until there are 400 
bytes for a task to use.
+// So we leave 400 bytes for the task.
+if (shouldAllocate &&
+  memoryManager.maxHeapMemory - memoryManager.executionMemoryUsed 
> 400) {
+  val acquireExecutionMemoryMethod =
+memoryManager.getClass.getMethods.filter(_.getName == 
"acquireExecutionMemory").head
+  acquireExecutionMemoryMethod.invoke(
+memoryManager,
+JLong.valueOf(
+  memoryManager.maxHeapMemory - 
memoryManager.executionMemoryUsed - 400),
+JLong.valueOf(1L), // taskAttemptId
+MemoryMode.ON_HEAP
+  ).asInstanceOf[java.lang.Long]
+}
+super.acquireExecutionMemory(required, consumer)
+  }
+}
+val taskContext = mock[TaskContext]
+val taskMetrics = new TaskMetrics
+when(taskContext.taskMetrics()).thenReturn(taskMetrics)
+val sorter = new ShuffleExternalSorter(
+  taskMemoryManager,
+  sc.env.blockManager,
+  taskContext,
+  100, // initialSize - This will require ShuffleInMemorySorter to 
acquire at least 800 bytes
+  1, // numPartitions
+  conf,
+  new ShuffleWriteMetrics)
+val inMemSorter = {
+  val field = sorter.getClass.getDeclaredField("inMemSorter")
+  field.setAccessible(true)
+  field.get(sorter).asInstanceOf[ShuffleInMemorySorter]
+}
+// Allocate memory to make the next "insertRecord" call triggers a 
spill.
+val bytes = new Array[Byte](1)
+while (inMemSorter.hasSpaceForAnotherRecord) {
--- End diff --

Access to the `hasSpaceForAnotherRecord` is the only reason why we need 
reflection right?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22062: [SPARK-25081][Core]Nested spill in ShuffleExterna...

2018-08-10 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/22062#discussion_r209262151
  
--- Diff: 
core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java ---
@@ -94,12 +94,20 @@ public int numRecords() {
   }
 
   public void reset() {
+// Reset `pos` here so that `spill` triggered by the below 
`allocateArray` will be no-op.
+pos = 0;
--- End diff --

For my understanding: this is enough to fix the actual issue here right?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22062: [SPARK-25081][Core]Nested spill in ShuffleExterna...

2018-08-09 Thread zsxwing
GitHub user zsxwing opened a pull request:

https://github.com/apache/spark/pull/22062

[SPARK-25081][Core]Nested spill in ShuffleExternalSorter should not access 
released memory page

## What changes were proposed in this pull request?

This issue is pretty similar to 
[SPARK-21907](https://issues.apache.org/jira/browse/SPARK-21907). 

"allocateArray" in 
[ShuffleInMemorySorter.reset](https://github.com/apache/spark/blob/9b8521e53e56a53b44c02366a99f8a8ee1307bbf/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java#L99)
 may trigger a spill and cause ShuffleInMemorySorter access the released 
`array`. Another task may get the same memory page from the pool. This will 
cause two tasks access the same memory page. When a task reads memory written 
by another task, many types of failures may happen. Here are some examples I  
have seen:

- JVM crash. (This is easy to reproduce in a unit test as we fill newly 
allocated and deallocated memory with 0xa5 and 0x5a bytes which usually points 
to an invalid memory address)
- java.lang.IllegalArgumentException: Comparison method violates its 
general contract!
- java.lang.NullPointerException at 
org.apache.spark.memory.TaskMemoryManager.getPage(TaskMemoryManager.java:384)
- java.lang.UnsupportedOperationException: Cannot grow BufferHolder by size 
-536870912 because the size after growing exceeds size limitation 2147483632

This PR resets states in `ShuffleInMemorySorter.reset` before calling 
`allocateArray` to fix the issue.

## How was this patch tested?

The new unit test will make JVM crash without the fix.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zsxwing/spark SPARK-25081

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22062.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22062


commit 54799cae8ef0727988bbb863d326ea61b4d9ae72
Author: Shixiong Zhu 
Date:   2018-08-10T00:02:33Z

Nested spill in ShuffleExternalSorter should not access released memory page




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org