spark git commit: [SPARK-22997] Add additional defenses against use of freed MemoryBlocks

2018-01-10 Thread joshrosen
Repository: spark
Updated Branches:
  refs/heads/branch-2.3 2db523959 -> 60d4d79bb


[SPARK-22997] Add additional defenses against use of freed MemoryBlocks

## What changes were proposed in this pull request?

This patch modifies Spark's `MemoryAllocator` implementations so that 
`free(MemoryBlock)` mutates the passed block to clear pointers (in the off-heap 
case) or null out references to backing `long[]` arrays (in the on-heap case). 
The goal of this change is to add an extra layer of defense against 
use-after-free bugs because currently it's hard to detect corruption caused by 
blind writes to freed memory blocks.

## How was this patch tested?

New unit tests in `PlatformSuite`, including new tests for existing 
functionality because we did not have sufficient mutation coverage of the 
on-heap memory allocator's pooling logic.

Author: Josh Rosen 

Closes #20191 from 
JoshRosen/SPARK-22997-add-defenses-against-use-after-free-bugs-in-memory-allocator.

(cherry picked from commit f340b6b3066033d40b7e163fd5fb68e9820adfb1)
Signed-off-by: Josh Rosen 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/60d4d79b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/60d4d79b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/60d4d79b

Branch: refs/heads/branch-2.3
Commit: 60d4d79bb40f13c68773a0224f2003cdca28c138
Parents: 2db5239
Author: Josh Rosen 
Authored: Wed Jan 10 00:45:47 2018 -0800
Committer: Josh Rosen 
Committed: Wed Jan 10 00:46:27 2018 -0800

--
 .../unsafe/memory/HeapMemoryAllocator.java  | 35 ++
 .../apache/spark/unsafe/memory/MemoryBlock.java | 21 +++-
 .../unsafe/memory/UnsafeMemoryAllocator.java| 11 +
 .../apache/spark/unsafe/PlatformUtilSuite.java  | 50 +++-
 .../apache/spark/memory/TaskMemoryManager.java  | 13 -
 .../spark/memory/TaskMemoryManagerSuite.java| 29 
 6 files changed, 146 insertions(+), 13 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/60d4d79b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java
--
diff --git 
a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java
 
b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java
index cc9cc42..3acfe36 100644
--- 
a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java
+++ 
b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java
@@ -31,8 +31,7 @@ import org.apache.spark.unsafe.Platform;
 public class HeapMemoryAllocator implements MemoryAllocator {
 
   @GuardedBy("this")
-  private final Map>> 
bufferPoolsBySize =
-new HashMap<>();
+  private final Map>> bufferPoolsBySize 
= new HashMap<>();
 
   private static final int POOLING_THRESHOLD_BYTES = 1024 * 1024;
 
@@ -49,13 +48,14 @@ public class HeapMemoryAllocator implements MemoryAllocator 
{
   public MemoryBlock allocate(long size) throws OutOfMemoryError {
 if (shouldPool(size)) {
   synchronized (this) {
-final LinkedList> pool = 
bufferPoolsBySize.get(size);
+final LinkedList> pool = 
bufferPoolsBySize.get(size);
 if (pool != null) {
   while (!pool.isEmpty()) {
-final WeakReference blockReference = pool.pop();
-final MemoryBlock memory = blockReference.get();
-if (memory != null) {
-  assert (memory.size() == size);
+final WeakReference arrayReference = pool.pop();
+final long[] array = arrayReference.get();
+if (array != null) {
+  assert (array.length * 8L >= size);
+  MemoryBlock memory = new MemoryBlock(array, 
Platform.LONG_ARRAY_OFFSET, size);
   if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
 memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE);
   }
@@ -76,18 +76,35 @@ public class HeapMemoryAllocator implements MemoryAllocator 
{
 
   @Override
   public void free(MemoryBlock memory) {
+assert (memory.obj != null) :
+  "baseObject was null; are you trying to use the on-heap allocator to 
free off-heap memory?";
+assert (memory.pageNumber != MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER) :
+  "page has already been freed";
+assert ((memory.pageNumber == MemoryBlock.NO_PAGE_NUMBER)
+|| (memory.pageNumber == MemoryBlock.FREED_IN_TMM_PAGE_NUMBER)) :
+  "TMM-allocated pages must first be freed via TMM.freePage(), not 
directly in allocator free()";
+
 final long size = memory.size();
 if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
   memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_FREED_VALUE);
 }
+
+// Mark the page a

spark git commit: [SPARK-22997] Add additional defenses against use of freed MemoryBlocks

2018-01-10 Thread joshrosen
Repository: spark
Updated Branches:
  refs/heads/master 70bcc9d5a -> f340b6b30


[SPARK-22997] Add additional defenses against use of freed MemoryBlocks

## What changes were proposed in this pull request?

This patch modifies Spark's `MemoryAllocator` implementations so that 
`free(MemoryBlock)` mutates the passed block to clear pointers (in the off-heap 
case) or null out references to backing `long[]` arrays (in the on-heap case). 
The goal of this change is to add an extra layer of defense against 
use-after-free bugs because currently it's hard to detect corruption caused by 
blind writes to freed memory blocks.

## How was this patch tested?

New unit tests in `PlatformSuite`, including new tests for existing 
functionality because we did not have sufficient mutation coverage of the 
on-heap memory allocator's pooling logic.

Author: Josh Rosen 

Closes #20191 from 
JoshRosen/SPARK-22997-add-defenses-against-use-after-free-bugs-in-memory-allocator.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f340b6b3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f340b6b3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f340b6b3

Branch: refs/heads/master
Commit: f340b6b3066033d40b7e163fd5fb68e9820adfb1
Parents: 70bcc9d
Author: Josh Rosen 
Authored: Wed Jan 10 00:45:47 2018 -0800
Committer: Josh Rosen 
Committed: Wed Jan 10 00:45:47 2018 -0800

--
 .../unsafe/memory/HeapMemoryAllocator.java  | 35 ++
 .../apache/spark/unsafe/memory/MemoryBlock.java | 21 +++-
 .../unsafe/memory/UnsafeMemoryAllocator.java| 11 +
 .../apache/spark/unsafe/PlatformUtilSuite.java  | 50 +++-
 .../apache/spark/memory/TaskMemoryManager.java  | 13 -
 .../spark/memory/TaskMemoryManagerSuite.java| 29 
 6 files changed, 146 insertions(+), 13 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f340b6b3/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java
--
diff --git 
a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java
 
b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java
index cc9cc42..3acfe36 100644
--- 
a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java
+++ 
b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java
@@ -31,8 +31,7 @@ import org.apache.spark.unsafe.Platform;
 public class HeapMemoryAllocator implements MemoryAllocator {
 
   @GuardedBy("this")
-  private final Map>> 
bufferPoolsBySize =
-new HashMap<>();
+  private final Map>> bufferPoolsBySize 
= new HashMap<>();
 
   private static final int POOLING_THRESHOLD_BYTES = 1024 * 1024;
 
@@ -49,13 +48,14 @@ public class HeapMemoryAllocator implements MemoryAllocator 
{
   public MemoryBlock allocate(long size) throws OutOfMemoryError {
 if (shouldPool(size)) {
   synchronized (this) {
-final LinkedList> pool = 
bufferPoolsBySize.get(size);
+final LinkedList> pool = 
bufferPoolsBySize.get(size);
 if (pool != null) {
   while (!pool.isEmpty()) {
-final WeakReference blockReference = pool.pop();
-final MemoryBlock memory = blockReference.get();
-if (memory != null) {
-  assert (memory.size() == size);
+final WeakReference arrayReference = pool.pop();
+final long[] array = arrayReference.get();
+if (array != null) {
+  assert (array.length * 8L >= size);
+  MemoryBlock memory = new MemoryBlock(array, 
Platform.LONG_ARRAY_OFFSET, size);
   if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
 memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE);
   }
@@ -76,18 +76,35 @@ public class HeapMemoryAllocator implements MemoryAllocator 
{
 
   @Override
   public void free(MemoryBlock memory) {
+assert (memory.obj != null) :
+  "baseObject was null; are you trying to use the on-heap allocator to 
free off-heap memory?";
+assert (memory.pageNumber != MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER) :
+  "page has already been freed";
+assert ((memory.pageNumber == MemoryBlock.NO_PAGE_NUMBER)
+|| (memory.pageNumber == MemoryBlock.FREED_IN_TMM_PAGE_NUMBER)) :
+  "TMM-allocated pages must first be freed via TMM.freePage(), not 
directly in allocator free()";
+
 final long size = memory.size();
 if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
   memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_FREED_VALUE);
 }
+
+// Mark the page as freed (so we can detect double-frees).
+memory.pageNumber = MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUM