Repository: spark
Updated Branches:
  refs/heads/master 91cdab51c -> f5aba6573


[SPARK-25081][CORE] Nested spill in ShuffleExternalSorter should not access 
released memory page

## What changes were proposed in this pull request?

This issue is pretty similar to 
[SPARK-21907](https://issues.apache.org/jira/browse/SPARK-21907).

"allocateArray" in 
[ShuffleInMemorySorter.reset](https://github.com/apache/spark/blob/9b8521e53e56a53b44c02366a99f8a8ee1307bbf/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java#L99)
 may trigger a spill and cause ShuffleInMemorySorter access the released 
`array`. Another task may get the same memory page from the pool. This will 
cause two tasks access the same memory page. When a task reads memory written 
by another task, many types of failures may happen. Here are some examples I  
have seen:

- JVM crash. (This is easy to reproduce in a unit test as we fill newly 
allocated and deallocated memory with 0xa5 and 0x5a bytes which usually points 
to an invalid memory address)
- java.lang.IllegalArgumentException: Comparison method violates its general 
contract!
- java.lang.NullPointerException at 
org.apache.spark.memory.TaskMemoryManager.getPage(TaskMemoryManager.java:384)
- java.lang.UnsupportedOperationException: Cannot grow BufferHolder by size 
-536870912 because the size after growing exceeds size limitation 2147483632

This PR resets states in `ShuffleInMemorySorter.reset` before calling 
`allocateArray` to fix the issue.

## How was this patch tested?

The new unit test will make JVM crash without the fix.

Closes #22062 from zsxwing/SPARK-25081.

Authored-by: Shixiong Zhu <zsxw...@gmail.com>
Signed-off-by: Shixiong Zhu <zsxw...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f5aba657
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f5aba657
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f5aba657

Branch: refs/heads/master
Commit: f5aba657396bd4e2e03dd06491a2d169a99592a7
Parents: 91cdab5
Author: Shixiong Zhu <zsxw...@gmail.com>
Authored: Fri Aug 10 10:53:44 2018 -0700
Committer: Shixiong Zhu <zsxw...@gmail.com>
Committed: Fri Aug 10 10:53:44 2018 -0700

----------------------------------------------------------------------
 .../shuffle/sort/ShuffleInMemorySorter.java     |  12 +-
 .../sort/ShuffleExternalSorterSuite.scala       | 111 +++++++++++++++++++
 2 files changed, 121 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f5aba657/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java 
b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java
index 8f49859..4b48599 100644
--- 
a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java
+++ 
b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java
@@ -65,7 +65,7 @@ final class ShuffleInMemorySorter {
    */
   private int usableCapacity = 0;
 
-  private int initialSize;
+  private final int initialSize;
 
   ShuffleInMemorySorter(MemoryConsumer consumer, int initialSize, boolean 
useRadixSort) {
     this.consumer = consumer;
@@ -94,12 +94,20 @@ final class ShuffleInMemorySorter {
   }
 
   public void reset() {
+    // Reset `pos` here so that `spill` triggered by the below `allocateArray` 
will be no-op.
+    pos = 0;
     if (consumer != null) {
       consumer.freeArray(array);
+      // As `array` has been released, we should set it to  `null` to avoid 
accessing it before
+      // `allocateArray` returns. `usableCapacity` is also set to `0` to avoid 
any codes writing
+      // data to `ShuffleInMemorySorter` when `array` is `null` (e.g., in
+      // ShuffleExternalSorter.growPointerArrayIfNecessary, we may try to 
access
+      // `ShuffleInMemorySorter` when `allocateArray` throws 
SparkOutOfMemoryError).
+      array = null;
+      usableCapacity = 0;
       array = consumer.allocateArray(initialSize);
       usableCapacity = getUsableCapacity();
     }
-    pos = 0;
   }
 
   public void expandPointerArray(LongArray newArray) {

http://git-wip-us.apache.org/repos/asf/spark/blob/f5aba657/core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala
 
b/core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala
new file mode 100644
index 0000000..b9f0e87
--- /dev/null
+++ 
b/core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.sort
+
+import java.lang.{Long => JLong}
+
+import org.mockito.Mockito.when
+import org.scalatest.mockito.MockitoSugar
+
+import org.apache.spark._
+import org.apache.spark.executor.{ShuffleWriteMetrics, TaskMetrics}
+import org.apache.spark.memory._
+import org.apache.spark.unsafe.Platform
+
+class ShuffleExternalSorterSuite extends SparkFunSuite with LocalSparkContext 
with MockitoSugar {
+
+  test("nested spill should be no-op") {
+    val conf = new SparkConf()
+      .setMaster("local[1]")
+      .setAppName("ShuffleExternalSorterSuite")
+      .set("spark.testing", "true")
+      .set("spark.testing.memory", "1600")
+      .set("spark.memory.fraction", "1")
+    sc = new SparkContext(conf)
+
+    val memoryManager = UnifiedMemoryManager(conf, 1)
+
+    var shouldAllocate = false
+
+    // Mock `TaskMemoryManager` to allocate free memory when `shouldAllocate` 
is true.
+    // This will trigger a nested spill and expose issues if we don't handle 
this case properly.
+    val taskMemoryManager = new TaskMemoryManager(memoryManager, 0) {
+      override def acquireExecutionMemory(required: Long, consumer: 
MemoryConsumer): Long = {
+        // ExecutionMemoryPool.acquireMemory will wait until there are 400 
bytes for a task to use.
+        // So we leave 400 bytes for the task.
+        if (shouldAllocate &&
+          memoryManager.maxHeapMemory - memoryManager.executionMemoryUsed > 
400) {
+          val acquireExecutionMemoryMethod =
+            memoryManager.getClass.getMethods.filter(_.getName == 
"acquireExecutionMemory").head
+          acquireExecutionMemoryMethod.invoke(
+            memoryManager,
+            JLong.valueOf(
+              memoryManager.maxHeapMemory - memoryManager.executionMemoryUsed 
- 400),
+            JLong.valueOf(1L), // taskAttemptId
+            MemoryMode.ON_HEAP
+          ).asInstanceOf[java.lang.Long]
+        }
+        super.acquireExecutionMemory(required, consumer)
+      }
+    }
+    val taskContext = mock[TaskContext]
+    val taskMetrics = new TaskMetrics
+    when(taskContext.taskMetrics()).thenReturn(taskMetrics)
+    val sorter = new ShuffleExternalSorter(
+      taskMemoryManager,
+      sc.env.blockManager,
+      taskContext,
+      100, // initialSize - This will require ShuffleInMemorySorter to acquire 
at least 800 bytes
+      1, // numPartitions
+      conf,
+      new ShuffleWriteMetrics)
+    val inMemSorter = {
+      val field = sorter.getClass.getDeclaredField("inMemSorter")
+      field.setAccessible(true)
+      field.get(sorter).asInstanceOf[ShuffleInMemorySorter]
+    }
+    // Allocate memory to make the next "insertRecord" call triggers a spill.
+    val bytes = new Array[Byte](1)
+    while (inMemSorter.hasSpaceForAnotherRecord) {
+      sorter.insertRecord(bytes, Platform.BYTE_ARRAY_OFFSET, 1, 0)
+    }
+
+    // This flag will make the mocked TaskMemoryManager acquire free memory 
released by spill to
+    // trigger a nested spill.
+    shouldAllocate = true
+
+    // Should throw `SparkOutOfMemoryError` as there is no enough memory: 
`ShuffleInMemorySorter`
+    // will try to acquire 800 bytes but there are only 400 bytes available.
+    //
+    // Before the fix, a nested spill may use a released page and this causes 
two tasks access the
+    // same memory page. When a task reads memory written by another task, 
many types of failures
+    // may happen. Here are some examples we have seen:
+    //
+    // - JVM crash. (This is easy to reproduce in the unit test as we fill 
newly allocated and
+    //   deallocated memory with 0xa5 and 0x5a bytes which usually points to 
an invalid memory
+    //   address)
+    // - java.lang.IllegalArgumentException: Comparison method violates its 
general contract!
+    // - java.lang.NullPointerException
+    //     at 
org.apache.spark.memory.TaskMemoryManager.getPage(TaskMemoryManager.java:384)
+    // - java.lang.UnsupportedOperationException: Cannot grow BufferHolder by 
size -536870912
+    //     because the size after growing exceeds size limitation 2147483632
+    intercept[SparkOutOfMemoryError] {
+      sorter.insertRecord(bytes, Platform.BYTE_ARRAY_OFFSET, 1, 0)
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to