[SPARK-11389][CORE] Add support for off-heap memory to MemoryManager

In order to lay the groundwork for proper off-heap memory support in SQL / 
Tungsten, we need to extend our MemoryManager to perform bookkeeping for 
off-heap memory.

## User-facing changes

This PR introduces a new configuration, `spark.memory.offHeapSize` (name 
subject to change), which specifies the absolute amount of off-heap memory that 
Spark and Spark SQL can use. If Tungsten is configured to use off-heap 
execution memory for allocating data pages, then all data page allocations must 
fit within this size limit.

## Internals changes

This PR contains a lot of internal refactoring of the MemoryManager. The key 
change at the heart of this patch is the introduction of a `MemoryPool` class 
(name subject to change) to manage the bookkeeping for a particular category of 
memory (storage, on-heap execution, and off-heap execution). These MemoryPools 
are not fixed-size; they can be dynamically grown and shrunk according to the 
MemoryManager's policies. In StaticMemoryManager, these pools have fixed sizes, 
proportional to the legacy `[storage|shuffle].memoryFraction`. In the new 
UnifiedMemoryManager, the sizes of these pools are dynamically adjusted 
according to its policies.

There are two subclasses of `MemoryPool`: `StorageMemoryPool` manages storage 
memory and `ExecutionMemoryPool` manages execution memory. The MemoryManager 
creates two execution pools, one for on-heap memory and one for off-heap. 
Instances of `ExecutionMemoryPool` manage the logic for fair sharing of their 
pooled memory across running tasks (in other words, the 
ShuffleMemoryManager-like logic has been moved out of MemoryManager and pushed 
into these ExecutionMemoryPool instances).

I think that this design is substantially easier to understand and reason about 
than the previous design, where most of these responsibilities were handled by 
MemoryManager and its subclasses. To see this, take at look at how simple the 
logic in `UnifiedMemoryManager` has become: it's now very easy to see when 
memory is dynamically shifted between storage and execution.

## TODOs

- [x] Fix handful of test failures in the MemoryManagerSuites.
- [x] Fix remaining TODO comments in code.
- [ ] Document new configuration.
- [x] Fix commented-out tests / asserts:
  - [x] UnifiedMemoryManagerSuite.
- [x] Write tests that exercise the new off-heap memory management policies.

Author: Josh Rosen <joshro...@databricks.com>

Closes #9344 from JoshRosen/offheap-memory-accounting.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/30b706b7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/30b706b7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/30b706b7

Branch: refs/heads/master
Commit: 30b706b7b36482921ec04145a0121ca147984fa8
Parents: 105732d
Author: Josh Rosen <joshro...@databricks.com>
Authored: Fri Nov 6 18:17:34 2015 -0800
Committer: Josh Rosen <joshro...@databricks.com>
Committed: Fri Nov 6 18:17:34 2015 -0800

----------------------------------------------------------------------
 .../org/apache/spark/memory/MemoryConsumer.java |   7 +-
 .../org/apache/spark/memory/MemoryMode.java     |  26 ++
 .../apache/spark/memory/TaskMemoryManager.java  |  72 ++++--
 .../main/scala/org/apache/spark/SparkEnv.scala  |   2 +-
 .../spark/memory/ExecutionMemoryPool.scala      | 153 ++++++++++++
 .../org/apache/spark/memory/MemoryManager.scala | 246 ++++++-------------
 .../org/apache/spark/memory/MemoryPool.scala    |  71 ++++++
 .../spark/memory/StaticMemoryManager.scala      |  75 +-----
 .../apache/spark/memory/StorageMemoryPool.scala | 138 +++++++++++
 .../spark/memory/UnifiedMemoryManager.scala     | 138 ++++++-----
 .../scala/org/apache/spark/memory/package.scala |  75 ++++++
 .../spark/util/collection/Spillable.scala       |   8 +-
 .../spark/memory/TaskMemoryManagerSuite.java    |   8 +-
 .../apache/spark/memory/TestMemoryConsumer.java |  10 +-
 .../shuffle/sort/UnsafeShuffleWriterSuite.java  |   2 +-
 .../map/AbstractBytesToBytesMapSuite.java       |   4 +-
 .../spark/memory/MemoryManagerSuite.scala       | 104 +++++---
 .../spark/memory/StaticMemoryManagerSuite.scala |  39 +--
 .../apache/spark/memory/TestMemoryManager.scala |  20 +-
 .../memory/UnifiedMemoryManagerSuite.scala      |  93 +++----
 .../spark/storage/BlockManagerSuite.scala       |   2 +-
 21 files changed, 828 insertions(+), 465 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/30b706b7/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java 
b/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java
index 8fbdb72..36138cc 100644
--- a/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java
+++ b/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java
@@ -17,15 +17,15 @@
 
 package org.apache.spark.memory;
 
-
 import java.io.IOException;
 
 import org.apache.spark.unsafe.array.LongArray;
 import org.apache.spark.unsafe.memory.MemoryBlock;
 
-
 /**
  * An memory consumer of TaskMemoryManager, which support spilling.
+ *
+ * Note: this only supports allocation / spilling of Tungsten memory.
  */
 public abstract class MemoryConsumer {
 
@@ -36,7 +36,6 @@ public abstract class MemoryConsumer {
   protected MemoryConsumer(TaskMemoryManager taskMemoryManager, long pageSize) 
{
     this.taskMemoryManager = taskMemoryManager;
     this.pageSize = pageSize;
-    this.used = 0;
   }
 
   protected MemoryConsumer(TaskMemoryManager taskMemoryManager) {
@@ -67,6 +66,8 @@ public abstract class MemoryConsumer {
    *
    * Note: In order to avoid possible deadlock, should not call 
acquireMemory() from spill().
    *
+   * Note: today, this only frees Tungsten-managed pages.
+   *
    * @param size the amount of memory should be released
    * @param trigger the MemoryConsumer that trigger this spilling
    * @return the amount of released memory in bytes

http://git-wip-us.apache.org/repos/asf/spark/blob/30b706b7/core/src/main/java/org/apache/spark/memory/MemoryMode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/memory/MemoryMode.java 
b/core/src/main/java/org/apache/spark/memory/MemoryMode.java
new file mode 100644
index 0000000..3a5e72d
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/memory/MemoryMode.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.memory;
+
+import org.apache.spark.annotation.Private;
+
+@Private
+public enum MemoryMode {
+  ON_HEAP,
+  OFF_HEAP
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/30b706b7/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java 
b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
index 6440f9c..5f743b2 100644
--- a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
+++ b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
@@ -103,10 +103,10 @@ public class TaskMemoryManager {
    * without doing any masking or lookups. Since this branching should be 
well-predicted by the JIT,
    * this extra layer of indirection / abstraction hopefully shouldn't be too 
expensive.
    */
-  private final boolean inHeap;
+  final MemoryMode tungstenMemoryMode;
 
   /**
-   * The size of memory granted to each consumer.
+   * Tracks spillable memory consumers.
    */
   @GuardedBy("this")
   private final HashSet<MemoryConsumer> consumers;
@@ -115,7 +115,7 @@ public class TaskMemoryManager {
    * Construct a new TaskMemoryManager.
    */
   public TaskMemoryManager(MemoryManager memoryManager, long taskAttemptId) {
-    this.inHeap = memoryManager.tungstenMemoryIsAllocatedInHeap();
+    this.tungstenMemoryMode = memoryManager.tungstenMemoryMode();
     this.memoryManager = memoryManager;
     this.taskAttemptId = taskAttemptId;
     this.consumers = new HashSet<>();
@@ -127,12 +127,19 @@ public class TaskMemoryManager {
    *
    * @return number of bytes successfully granted (<= N).
    */
-  public long acquireExecutionMemory(long required, MemoryConsumer consumer) {
+  public long acquireExecutionMemory(
+      long required,
+      MemoryMode mode,
+      MemoryConsumer consumer) {
     assert(required >= 0);
+    // If we are allocating Tungsten pages off-heap and receive a request to 
allocate on-heap
+    // memory here, then it may not make sense to spill since that would only 
end up freeing
+    // off-heap memory. This is subject to change, though, so it may be risky 
to make this
+    // optimization now in case we forget to undo it late when making changes.
     synchronized (this) {
-      long got = memoryManager.acquireExecutionMemory(required, taskAttemptId);
+      long got = memoryManager.acquireExecutionMemory(required, taskAttemptId, 
mode);
 
-      // try to release memory from other consumers first, then we can reduce 
the frequency of
+      // Try to release memory from other consumers first, then we can reduce 
the frequency of
       // spilling, avoid to have too many spilled files.
       if (got < required) {
         // Call spill() on other consumers to release memory
@@ -140,10 +147,10 @@ public class TaskMemoryManager {
           if (c != consumer && c.getUsed() > 0) {
             try {
               long released = c.spill(required - got, consumer);
-              if (released > 0) {
-                logger.info("Task {} released {} from {} for {}", 
taskAttemptId,
+              if (released > 0 && mode == tungstenMemoryMode) {
+                logger.debug("Task {} released {} from {} for {}", 
taskAttemptId,
                   Utils.bytesToString(released), c, consumer);
-                got += memoryManager.acquireExecutionMemory(required - got, 
taskAttemptId);
+                got += memoryManager.acquireExecutionMemory(required - got, 
taskAttemptId, mode);
                 if (got >= required) {
                   break;
                 }
@@ -161,10 +168,10 @@ public class TaskMemoryManager {
       if (got < required && consumer != null) {
         try {
           long released = consumer.spill(required - got, consumer);
-          if (released > 0) {
-            logger.info("Task {} released {} from itself ({})", taskAttemptId,
+          if (released > 0 && mode == tungstenMemoryMode) {
+            logger.debug("Task {} released {} from itself ({})", taskAttemptId,
               Utils.bytesToString(released), consumer);
-            got += memoryManager.acquireExecutionMemory(required - got, 
taskAttemptId);
+            got += memoryManager.acquireExecutionMemory(required - got, 
taskAttemptId, mode);
           }
         } catch (IOException e) {
           logger.error("error while calling spill() on " + consumer, e);
@@ -184,9 +191,9 @@ public class TaskMemoryManager {
   /**
    * Release N bytes of execution memory for a MemoryConsumer.
    */
-  public void releaseExecutionMemory(long size, MemoryConsumer consumer) {
+  public void releaseExecutionMemory(long size, MemoryMode mode, 
MemoryConsumer consumer) {
     logger.debug("Task {} release {} from {}", taskAttemptId, 
Utils.bytesToString(size), consumer);
-    memoryManager.releaseExecutionMemory(size, taskAttemptId);
+    memoryManager.releaseExecutionMemory(size, taskAttemptId, mode);
   }
 
   /**
@@ -195,11 +202,19 @@ public class TaskMemoryManager {
   public void showMemoryUsage() {
     logger.info("Memory used in task " + taskAttemptId);
     synchronized (this) {
+      long memoryAccountedForByConsumers = 0;
       for (MemoryConsumer c: consumers) {
-        if (c.getUsed() > 0) {
-          logger.info("Acquired by " + c + ": " + 
Utils.bytesToString(c.getUsed()));
+        long totalMemUsage = c.getUsed();
+        memoryAccountedForByConsumers += totalMemUsage;
+        if (totalMemUsage > 0) {
+          logger.info("Acquired by " + c + ": " + 
Utils.bytesToString(totalMemUsage));
         }
       }
+      long memoryNotAccountedFor =
+        memoryManager.getExecutionMemoryUsageForTask(taskAttemptId) - 
memoryAccountedForByConsumers;
+      logger.info(
+        "{} bytes of memory were used by task {} but are not associated with 
specific consumers",
+        memoryNotAccountedFor, taskAttemptId);
     }
   }
 
@@ -214,7 +229,8 @@ public class TaskMemoryManager {
    * Allocate a block of memory that will be tracked in the MemoryManager's 
page table; this is
    * intended for allocating large blocks of Tungsten memory that will be 
shared between operators.
    *
-   * Returns `null` if there was not enough memory to allocate the page.
+   * Returns `null` if there was not enough memory to allocate the page. May 
return a page that
+   * contains fewer bytes than requested, so callers should verify the size of 
returned pages.
    */
   public MemoryBlock allocatePage(long size, MemoryConsumer consumer) {
     if (size > MAXIMUM_PAGE_SIZE_BYTES) {
@@ -222,7 +238,7 @@ public class TaskMemoryManager {
         "Cannot allocate a page with more than " + MAXIMUM_PAGE_SIZE_BYTES + " 
bytes");
     }
 
-    long acquired = acquireExecutionMemory(size, consumer);
+    long acquired = acquireExecutionMemory(size, tungstenMemoryMode, consumer);
     if (acquired <= 0) {
       return null;
     }
@@ -231,7 +247,7 @@ public class TaskMemoryManager {
     synchronized (this) {
       pageNumber = allocatedPages.nextClearBit(0);
       if (pageNumber >= PAGE_TABLE_SIZE) {
-        releaseExecutionMemory(acquired, consumer);
+        releaseExecutionMemory(acquired, tungstenMemoryMode, consumer);
         throw new IllegalStateException(
           "Have already allocated a maximum of " + PAGE_TABLE_SIZE + " pages");
       }
@@ -262,7 +278,7 @@ public class TaskMemoryManager {
     }
     long pageSize = page.size();
     memoryManager.tungstenMemoryAllocator().free(page);
-    releaseExecutionMemory(pageSize, consumer);
+    releaseExecutionMemory(pageSize, tungstenMemoryMode, consumer);
   }
 
   /**
@@ -276,7 +292,7 @@ public class TaskMemoryManager {
    * @return an encoded page address.
    */
   public long encodePageNumberAndOffset(MemoryBlock page, long offsetInPage) {
-    if (!inHeap) {
+    if (tungstenMemoryMode == MemoryMode.OFF_HEAP) {
       // In off-heap mode, an offset is an absolute address that may require a 
full 64 bits to
       // encode. Due to our page size limitation, though, we can convert this 
into an offset that's
       // relative to the page's base offset; this relative offset will fit in 
51 bits.
@@ -305,7 +321,7 @@ public class TaskMemoryManager {
    * {@link TaskMemoryManager#encodePageNumberAndOffset(MemoryBlock, long)}
    */
   public Object getPage(long pagePlusOffsetAddress) {
-    if (inHeap) {
+    if (tungstenMemoryMode == MemoryMode.ON_HEAP) {
       final int pageNumber = decodePageNumber(pagePlusOffsetAddress);
       assert (pageNumber >= 0 && pageNumber < PAGE_TABLE_SIZE);
       final MemoryBlock page = pageTable[pageNumber];
@@ -323,7 +339,7 @@ public class TaskMemoryManager {
    */
   public long getOffsetInPage(long pagePlusOffsetAddress) {
     final long offsetInPage = decodeOffset(pagePlusOffsetAddress);
-    if (inHeap) {
+    if (tungstenMemoryMode == MemoryMode.ON_HEAP) {
       return offsetInPage;
     } else {
       // In off-heap mode, an offset is an absolute address. In 
encodePageNumberAndOffset, we
@@ -351,11 +367,19 @@ public class TaskMemoryManager {
       }
       consumers.clear();
     }
+
+    for (MemoryBlock page : pageTable) {
+      if (page != null) {
+        memoryManager.tungstenMemoryAllocator().free(page);
+      }
+    }
+    Arrays.fill(pageTable, null);
+
     return memoryManager.releaseAllExecutionMemoryForTask(taskAttemptId);
   }
 
   /**
-   * Returns the memory consumption, in bytes, for the current task
+   * Returns the memory consumption, in bytes, for the current task.
    */
   public long getMemoryConsumptionForThisTask() {
     return memoryManager.getExecutionMemoryUsageForTask(taskAttemptId);

http://git-wip-us.apache.org/repos/asf/spark/blob/30b706b7/core/src/main/scala/org/apache/spark/SparkEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala 
b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 23ae936..4474a83 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -341,7 +341,7 @@ object SparkEnv extends Logging {
       if (useLegacyMemoryManager) {
         new StaticMemoryManager(conf, numUsableCores)
       } else {
-        new UnifiedMemoryManager(conf, numUsableCores)
+        UnifiedMemoryManager(conf, numUsableCores)
       }
 
     val blockTransferService = new NettyBlockTransferService(conf, 
securityManager, numUsableCores)

http://git-wip-us.apache.org/repos/asf/spark/blob/30b706b7/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala 
b/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala
new file mode 100644
index 0000000..7825bae
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.memory
+
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.mutable
+
+import org.apache.spark.Logging
+
+/**
+ * Implements policies and bookkeeping for sharing a adjustable-sized pool of 
memory between tasks.
+ *
+ * Tries to ensure that each task gets a reasonable share of memory, instead 
of some task ramping up
+ * to a large amount first and then causing others to spill to disk repeatedly.
+ *
+ * If there are N tasks, it ensures that each task can acquire at least 1 / 2N 
of the memory
+ * before it has to spill, and at most 1 / N. Because N varies dynamically, we 
keep track of the
+ * set of active tasks and redo the calculations of 1 / 2N and 1 / N in 
waiting tasks whenever this
+ * set changes. This is all done by synchronizing access to mutable state and 
using wait() and
+ * notifyAll() to signal changes to callers. Prior to Spark 1.6, this 
arbitration of memory across
+ * tasks was performed by the ShuffleMemoryManager.
+ *
+ * @param lock a [[MemoryManager]] instance to synchronize on
+ * @param poolName a human-readable name for this pool, for use in log messages
+ */
+class ExecutionMemoryPool(
+    lock: Object,
+    poolName: String
+  ) extends MemoryPool(lock) with Logging {
+
+  /**
+   * Map from taskAttemptId -> memory consumption in bytes
+   */
+  @GuardedBy("lock")
+  private val memoryForTask = new mutable.HashMap[Long, Long]()
+
+  override def memoryUsed: Long = lock.synchronized {
+    memoryForTask.values.sum
+  }
+
+  /**
+   * Returns the memory consumption, in bytes, for the given task.
+   */
+  def getMemoryUsageForTask(taskAttemptId: Long): Long = lock.synchronized {
+    memoryForTask.getOrElse(taskAttemptId, 0L)
+  }
+
+  /**
+   * Try to acquire up to `numBytes` of memory for the given task and return 
the number of bytes
+   * obtained, or 0 if none can be allocated.
+   *
+   * This call may block until there is enough free memory in some situations, 
to make sure each
+   * task has a chance to ramp up to at least 1 / 2N of the total memory pool 
(where N is the # of
+   * active tasks) before it is forced to spill. This can happen if the number 
of tasks increase
+   * but an older task had a lot of memory already.
+   *
+   * @return the number of bytes granted to the task.
+   */
+  def acquireMemory(numBytes: Long, taskAttemptId: Long): Long = 
lock.synchronized {
+    assert(numBytes > 0, s"invalid number of bytes requested: $numBytes")
+
+    // Add this task to the taskMemory map just so we can keep an accurate 
count of the number
+    // of active tasks, to let other tasks ramp down their memory in calls to 
`acquireMemory`
+    if (!memoryForTask.contains(taskAttemptId)) {
+      memoryForTask(taskAttemptId) = 0L
+      // This will later cause waiting tasks to wake up and check numTasks 
again
+      lock.notifyAll()
+    }
+
+    // Keep looping until we're either sure that we don't want to grant this 
request (because this
+    // task would have more than 1 / numActiveTasks of the memory) or we have 
enough free
+    // memory to give it (we always let each task get at least 1 / (2 * 
numActiveTasks)).
+    // TODO: simplify this to limit each task to its own slot
+    while (true) {
+      val numActiveTasks = memoryForTask.keys.size
+      val curMem = memoryForTask(taskAttemptId)
+
+      // How much we can grant this task; don't let it grow to more than 1 / 
numActiveTasks;
+      // don't let it be negative
+      val maxToGrant =
+        math.min(numBytes, math.max(0, (poolSize / numActiveTasks) - curMem))
+      // Only give it as much memory as is free, which might be none if it 
reached 1 / numTasks
+      val toGrant = math.min(maxToGrant, memoryFree)
+
+      if (curMem < poolSize / (2 * numActiveTasks)) {
+        // We want to let each task get at least 1 / (2 * numActiveTasks) 
before blocking;
+        // if we can't give it this much now, wait for other tasks to free up 
memory
+        // (this happens if older tasks allocated lots of memory before N grew)
+        if (memoryFree >= math.min(maxToGrant, poolSize / (2 * numActiveTasks) 
- curMem)) {
+          memoryForTask(taskAttemptId) += toGrant
+          return toGrant
+        } else {
+          logInfo(
+            s"TID $taskAttemptId waiting for at least 1/2N of $poolName pool 
to be free")
+          lock.wait()
+        }
+      } else {
+        memoryForTask(taskAttemptId) += toGrant
+        return toGrant
+      }
+    }
+    0L  // Never reached
+  }
+
+  /**
+   * Release `numBytes` of memory acquired by the given task.
+   */
+  def releaseMemory(numBytes: Long, taskAttemptId: Long): Unit = 
lock.synchronized {
+    val curMem = memoryForTask.getOrElse(taskAttemptId, 0L)
+    var memoryToFree = if (curMem < numBytes) {
+      logWarning(
+        s"Internal error: release called on $numBytes bytes but task only has 
$curMem bytes " +
+          s"of memory from the $poolName pool")
+      curMem
+    } else {
+      numBytes
+    }
+    if (memoryForTask.contains(taskAttemptId)) {
+      memoryForTask(taskAttemptId) -= memoryToFree
+      if (memoryForTask(taskAttemptId) <= 0) {
+        memoryForTask.remove(taskAttemptId)
+      }
+    }
+    lock.notifyAll() // Notify waiters in acquireMemory() that memory has been 
freed
+  }
+
+  /**
+   * Release all memory for the given task and mark it as inactive (e.g. when 
a task ends).
+   * @return the number of bytes freed.
+   */
+  def releaseAllMemoryForTask(taskAttemptId: Long): Long = lock.synchronized {
+    val numBytesToFree = getMemoryUsageForTask(taskAttemptId)
+    releaseMemory(numBytesToFree, taskAttemptId)
+    numBytesToFree
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/30b706b7/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala 
b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
index b0cf269..ceb8ea4 100644
--- a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
@@ -20,12 +20,8 @@ package org.apache.spark.memory
 import javax.annotation.concurrent.GuardedBy
 
 import scala.collection.mutable
-import scala.collection.mutable.ArrayBuffer
 
-import com.google.common.annotations.VisibleForTesting
-
-import org.apache.spark.util.Utils
-import org.apache.spark.{SparkException, TaskContext, SparkConf, Logging}
+import org.apache.spark.{SparkConf, Logging}
 import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore}
 import org.apache.spark.unsafe.array.ByteArrayMethods
 import org.apache.spark.unsafe.memory.MemoryAllocator
@@ -36,53 +32,40 @@ import org.apache.spark.unsafe.memory.MemoryAllocator
  * In this context, execution memory refers to that used for computation in 
shuffles, joins,
  * sorts and aggregations, while storage memory refers to that used for 
caching and propagating
  * internal data across the cluster. There exists one MemoryManager per JVM.
- *
- * The MemoryManager abstract base class itself implements policies for 
sharing execution memory
- * between tasks; it tries to ensure that each task gets a reasonable share of 
memory, instead of
- * some task ramping up to a large amount first and then causing others to 
spill to disk repeatedly.
- * If there are N tasks, it ensures that each task can acquire at least 1 / 2N 
of the memory
- * before it has to spill, and at most 1 / N. Because N varies dynamically, we 
keep track of the
- * set of active tasks and redo the calculations of 1 / 2N and 1 / N in 
waiting tasks whenever
- * this set changes. This is all done by synchronizing access to mutable state 
and using wait() and
- * notifyAll() to signal changes to callers. Prior to Spark 1.6, this 
arbitration of memory across
- * tasks was performed by the ShuffleMemoryManager.
  */
-private[spark] abstract class MemoryManager(conf: SparkConf, numCores: Int) 
extends Logging {
+private[spark] abstract class MemoryManager(
+    conf: SparkConf,
+    numCores: Int,
+    storageMemory: Long,
+    onHeapExecutionMemory: Long) extends Logging {
 
   // -- Methods related to memory allocation policies and bookkeeping 
------------------------------
 
-  // The memory store used to evict cached blocks
-  private var _memoryStore: MemoryStore = _
-  protected def memoryStore: MemoryStore = {
-    if (_memoryStore == null) {
-      throw new IllegalArgumentException("memory store not initialized yet")
-    }
-    _memoryStore
-  }
+  @GuardedBy("this")
+  protected val storageMemoryPool = new StorageMemoryPool(this)
+  @GuardedBy("this")
+  protected val onHeapExecutionMemoryPool = new ExecutionMemoryPool(this, 
"on-heap execution")
+  @GuardedBy("this")
+  protected val offHeapExecutionMemoryPool = new ExecutionMemoryPool(this, 
"off-heap execution")
 
-  // Amount of execution/storage memory in use, accesses must be synchronized 
on `this`
-  @GuardedBy("this") protected var _executionMemoryUsed: Long = 0
-  @GuardedBy("this") protected var _storageMemoryUsed: Long = 0
-  // Map from taskAttemptId -> memory consumption in bytes
-  @GuardedBy("this") private val executionMemoryForTask = new 
mutable.HashMap[Long, Long]()
-
-  /**
-   * Set the [[MemoryStore]] used by this manager to evict cached blocks.
-   * This must be set after construction due to initialization ordering 
constraints.
-   */
-  final def setMemoryStore(store: MemoryStore): Unit = {
-    _memoryStore = store
-  }
+  storageMemoryPool.incrementPoolSize(storageMemory)
+  onHeapExecutionMemoryPool.incrementPoolSize(onHeapExecutionMemory)
+  
offHeapExecutionMemoryPool.incrementPoolSize(conf.getSizeAsBytes("spark.memory.offHeapSize",
 0))
 
   /**
-   * Total available memory for execution, in bytes.
+   * Total available memory for storage, in bytes. This amount can vary over 
time, depending on
+   * the MemoryManager implementation.
+   * In this model, this is equivalent to the amount of memory not occupied by 
execution.
    */
-  def maxExecutionMemory: Long
+  def maxStorageMemory: Long
 
   /**
-   * Total available memory for storage, in bytes.
+   * Set the [[MemoryStore]] used by this manager to evict cached blocks.
+   * This must be set after construction due to initialization ordering 
constraints.
    */
-  def maxStorageMemory: Long
+  final def setMemoryStore(store: MemoryStore): Unit = synchronized {
+    storageMemoryPool.setMemoryStore(store)
+  }
 
   // TODO: avoid passing evicted blocks around to simplify method signatures 
(SPARK-10985)
 
@@ -94,7 +77,9 @@ private[spark] abstract class MemoryManager(conf: SparkConf, 
numCores: Int) exte
   def acquireStorageMemory(
       blockId: BlockId,
       numBytes: Long,
-      evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean
+      evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = 
synchronized {
+    storageMemoryPool.acquireMemory(blockId, numBytes, evictedBlocks)
+  }
 
   /**
    * Acquire N bytes of memory to unroll the given block, evicting existing 
ones if necessary.
@@ -109,103 +94,25 @@ private[spark] abstract class MemoryManager(conf: 
SparkConf, numCores: Int) exte
   def acquireUnrollMemory(
       blockId: BlockId,
       numBytes: Long,
-      evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = 
synchronized {
-    acquireStorageMemory(blockId, numBytes, evictedBlocks)
-  }
-
-  /**
-   * Acquire N bytes of memory for execution, evicting cached blocks if 
necessary.
-   * Blocks evicted in the process, if any, are added to `evictedBlocks`.
-   * @return number of bytes successfully granted (<= N).
-   */
-  @VisibleForTesting
-  private[memory] def doAcquireExecutionMemory(
-      numBytes: Long,
-      evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Long
+      evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean
 
   /**
-   * Try to acquire up to `numBytes` of execution memory for the current task 
and return the number
-   * of bytes obtained, or 0 if none can be allocated.
+   * Try to acquire up to `numBytes` of execution memory for the current task 
and return the
+   * number of bytes obtained, or 0 if none can be allocated.
    *
    * This call may block until there is enough free memory in some situations, 
to make sure each
    * task has a chance to ramp up to at least 1 / 2N of the total memory pool 
(where N is the # of
    * active tasks) before it is forced to spill. This can happen if the number 
of tasks increase
    * but an older task had a lot of memory already.
-   *
-   * Subclasses should override `doAcquireExecutionMemory` in order to 
customize the policies
-   * that control global sharing of memory between execution and storage.
    */
   private[memory]
-  final def acquireExecutionMemory(numBytes: Long, taskAttemptId: Long): Long 
= synchronized {
-    assert(numBytes > 0, "invalid number of bytes requested: " + numBytes)
-
-    // Add this task to the taskMemory map just so we can keep an accurate 
count of the number
-    // of active tasks, to let other tasks ramp down their memory in calls to 
tryToAcquire
-    if (!executionMemoryForTask.contains(taskAttemptId)) {
-      executionMemoryForTask(taskAttemptId) = 0L
-      // This will later cause waiting tasks to wake up and check numTasks 
again
-      notifyAll()
-    }
-
-    // Once the cross-task memory allocation policy has decided to grant more 
memory to a task,
-    // this method is called in order to actually obtain that execution 
memory, potentially
-    // triggering eviction of storage memory:
-    def acquire(toGrant: Long): Long = synchronized {
-      val evictedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
-      val acquired = doAcquireExecutionMemory(toGrant, evictedBlocks)
-      // Register evicted blocks, if any, with the active task metrics
-      Option(TaskContext.get()).foreach { tc =>
-        val metrics = tc.taskMetrics()
-        val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, 
BlockStatus)]())
-        metrics.updatedBlocks = Some(lastUpdatedBlocks ++ evictedBlocks.toSeq)
-      }
-      executionMemoryForTask(taskAttemptId) += acquired
-      acquired
-    }
-
-    // Keep looping until we're either sure that we don't want to grant this 
request (because this
-    // task would have more than 1 / numActiveTasks of the memory) or we have 
enough free
-    // memory to give it (we always let each task get at least 1 / (2 * 
numActiveTasks)).
-    // TODO: simplify this to limit each task to its own slot
-    while (true) {
-      val numActiveTasks = executionMemoryForTask.keys.size
-      val curMem = executionMemoryForTask(taskAttemptId)
-      val freeMemory = maxExecutionMemory - executionMemoryForTask.values.sum
-
-      // How much we can grant this task; don't let it grow to more than 1 / 
numActiveTasks;
-      // don't let it be negative
-      val maxToGrant =
-        math.min(numBytes, math.max(0, (maxExecutionMemory / numActiveTasks) - 
curMem))
-      // Only give it as much memory as is free, which might be none if it 
reached 1 / numTasks
-      val toGrant = math.min(maxToGrant, freeMemory)
-
-      if (curMem < maxExecutionMemory / (2 * numActiveTasks)) {
-        // We want to let each task get at least 1 / (2 * numActiveTasks) 
before blocking;
-        // if we can't give it this much now, wait for other tasks to free up 
memory
-        // (this happens if older tasks allocated lots of memory before N grew)
-        if (
-          freeMemory >= math.min(maxToGrant, maxExecutionMemory / (2 * 
numActiveTasks) - curMem)) {
-          return acquire(toGrant)
-        } else {
-          logInfo(
-            s"TID $taskAttemptId waiting for at least 1/2N of execution memory 
pool to be free")
-          wait()
-        }
-      } else {
-        return acquire(toGrant)
-      }
-    }
-    0L  // Never reached
-  }
-
-  @VisibleForTesting
-  private[memory] def releaseExecutionMemory(numBytes: Long): Unit = 
synchronized {
-    if (numBytes > _executionMemoryUsed) {
-      logWarning(s"Attempted to release $numBytes bytes of execution " +
-        s"memory when we only have ${_executionMemoryUsed} bytes")
-      _executionMemoryUsed = 0
-    } else {
-      _executionMemoryUsed -= numBytes
+  def acquireExecutionMemory(
+      numBytes: Long,
+      taskAttemptId: Long,
+      memoryMode: MemoryMode): Long = synchronized {
+    memoryMode match {
+      case MemoryMode.ON_HEAP => 
onHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId)
+      case MemoryMode.OFF_HEAP => 
offHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId)
     }
   }
 
@@ -213,24 +120,14 @@ private[spark] abstract class MemoryManager(conf: 
SparkConf, numCores: Int) exte
    * Release numBytes of execution memory belonging to the given task.
    */
   private[memory]
-  final def releaseExecutionMemory(numBytes: Long, taskAttemptId: Long): Unit 
= synchronized {
-    val curMem = executionMemoryForTask.getOrElse(taskAttemptId, 0L)
-    if (curMem < numBytes) {
-      if (Utils.isTesting) {
-        throw new SparkException(
-          s"Internal error: release called on $numBytes bytes but task only 
has $curMem")
-      } else {
-        logWarning(s"Internal error: release called on $numBytes bytes but 
task only has $curMem")
-      }
-    }
-    if (executionMemoryForTask.contains(taskAttemptId)) {
-      executionMemoryForTask(taskAttemptId) -= numBytes
-      if (executionMemoryForTask(taskAttemptId) <= 0) {
-        executionMemoryForTask.remove(taskAttemptId)
-      }
-      releaseExecutionMemory(numBytes)
+  def releaseExecutionMemory(
+      numBytes: Long,
+      taskAttemptId: Long,
+      memoryMode: MemoryMode): Unit = synchronized {
+    memoryMode match {
+      case MemoryMode.ON_HEAP => 
onHeapExecutionMemoryPool.releaseMemory(numBytes, taskAttemptId)
+      case MemoryMode.OFF_HEAP => 
offHeapExecutionMemoryPool.releaseMemory(numBytes, taskAttemptId)
     }
-    notifyAll() // Notify waiters in acquireExecutionMemory() that memory has 
been freed
   }
 
   /**
@@ -238,35 +135,28 @@ private[spark] abstract class MemoryManager(conf: 
SparkConf, numCores: Int) exte
    * @return the number of bytes freed.
    */
   private[memory] def releaseAllExecutionMemoryForTask(taskAttemptId: Long): 
Long = synchronized {
-    val numBytesToFree = getExecutionMemoryUsageForTask(taskAttemptId)
-    releaseExecutionMemory(numBytesToFree, taskAttemptId)
-    numBytesToFree
+    onHeapExecutionMemoryPool.releaseAllMemoryForTask(taskAttemptId) +
+      offHeapExecutionMemoryPool.releaseAllMemoryForTask(taskAttemptId)
   }
 
   /**
    * Release N bytes of storage memory.
    */
   def releaseStorageMemory(numBytes: Long): Unit = synchronized {
-    if (numBytes > _storageMemoryUsed) {
-      logWarning(s"Attempted to release $numBytes bytes of storage " +
-        s"memory when we only have ${_storageMemoryUsed} bytes")
-      _storageMemoryUsed = 0
-    } else {
-      _storageMemoryUsed -= numBytes
-    }
+    storageMemoryPool.releaseMemory(numBytes)
   }
 
   /**
    * Release all storage memory acquired.
    */
-  def releaseAllStorageMemory(): Unit = synchronized {
-    _storageMemoryUsed = 0
+  final def releaseAllStorageMemory(): Unit = synchronized {
+    storageMemoryPool.releaseAllMemory()
   }
 
   /**
    * Release N bytes of unroll memory.
    */
-  def releaseUnrollMemory(numBytes: Long): Unit = synchronized {
+  final def releaseUnrollMemory(numBytes: Long): Unit = synchronized {
     releaseStorageMemory(numBytes)
   }
 
@@ -274,26 +164,35 @@ private[spark] abstract class MemoryManager(conf: 
SparkConf, numCores: Int) exte
    * Execution memory currently in use, in bytes.
    */
   final def executionMemoryUsed: Long = synchronized {
-    _executionMemoryUsed
+    onHeapExecutionMemoryPool.memoryUsed + 
offHeapExecutionMemoryPool.memoryUsed
   }
 
   /**
    * Storage memory currently in use, in bytes.
    */
   final def storageMemoryUsed: Long = synchronized {
-    _storageMemoryUsed
+    storageMemoryPool.memoryUsed
   }
 
   /**
    * Returns the execution memory consumption, in bytes, for the given task.
    */
   private[memory] def getExecutionMemoryUsageForTask(taskAttemptId: Long): 
Long = synchronized {
-    executionMemoryForTask.getOrElse(taskAttemptId, 0L)
+    onHeapExecutionMemoryPool.getMemoryUsageForTask(taskAttemptId) +
+      offHeapExecutionMemoryPool.getMemoryUsageForTask(taskAttemptId)
   }
 
   // -- Fields related to Tungsten managed memory 
-------------------------------------------------
 
   /**
+   * Tracks whether Tungsten memory will be allocated on the JVM heap or 
off-heap using
+   * sun.misc.Unsafe.
+   */
+  final val tungstenMemoryMode: MemoryMode = {
+    if (conf.getBoolean("spark.unsafe.offHeap", false)) MemoryMode.OFF_HEAP 
else MemoryMode.ON_HEAP
+  }
+
+  /**
    * The default page size, in bytes.
    *
    * If user didn't explicitly set "spark.buffer.pageSize", we figure out the 
default value
@@ -306,21 +205,22 @@ private[spark] abstract class MemoryManager(conf: 
SparkConf, numCores: Int) exte
     val cores = if (numCores > 0) numCores else 
Runtime.getRuntime.availableProcessors()
     // Because of rounding to next power of 2, we may have safetyFactor as 8 
in worst case
     val safetyFactor = 16
-    val size = ByteArrayMethods.nextPowerOf2(maxExecutionMemory / cores / 
safetyFactor)
+    val maxTungstenMemory: Long = tungstenMemoryMode match {
+      case MemoryMode.ON_HEAP => onHeapExecutionMemoryPool.poolSize
+      case MemoryMode.OFF_HEAP => offHeapExecutionMemoryPool.poolSize
+    }
+    val size = ByteArrayMethods.nextPowerOf2(maxTungstenMemory / cores / 
safetyFactor)
     val default = math.min(maxPageSize, math.max(minPageSize, size))
     conf.getSizeAsBytes("spark.buffer.pageSize", default)
   }
 
   /**
-   * Tracks whether Tungsten memory will be allocated on the JVM heap or 
off-heap using
-   * sun.misc.Unsafe.
-   */
-  final val tungstenMemoryIsAllocatedInHeap: Boolean =
-    !conf.getBoolean("spark.unsafe.offHeap", false)
-
-  /**
    * Allocates memory for use by Unsafe/Tungsten code.
    */
-  private[memory] final val tungstenMemoryAllocator: MemoryAllocator =
-    if (tungstenMemoryIsAllocatedInHeap) MemoryAllocator.HEAP else 
MemoryAllocator.UNSAFE
+  private[memory] final val tungstenMemoryAllocator: MemoryAllocator = {
+    tungstenMemoryMode match {
+      case MemoryMode.ON_HEAP => MemoryAllocator.HEAP
+      case MemoryMode.OFF_HEAP => MemoryAllocator.UNSAFE
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/30b706b7/core/src/main/scala/org/apache/spark/memory/MemoryPool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/memory/MemoryPool.scala 
b/core/src/main/scala/org/apache/spark/memory/MemoryPool.scala
new file mode 100644
index 0000000..bfeec47
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/memory/MemoryPool.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.memory
+
+import javax.annotation.concurrent.GuardedBy
+
+/**
+ * Manages bookkeeping for an adjustable-sized region of memory. This class is 
internal to
+ * the [[MemoryManager]]. See subclasses for more details.
+ *
+ * @param lock a [[MemoryManager]] instance, used for synchronization. We 
purposely erase the type
+ *             to `Object` to avoid programming errors, since this object 
should only be used for
+ *             synchronization purposes.
+ */
+abstract class MemoryPool(lock: Object) {
+
+  @GuardedBy("lock")
+  private[this] var _poolSize: Long = 0
+
+  /**
+   * Returns the current size of the pool, in bytes.
+   */
+  final def poolSize: Long = lock.synchronized {
+    _poolSize
+  }
+
+  /**
+   * Returns the amount of free memory in the pool, in bytes.
+   */
+  final def memoryFree: Long = lock.synchronized {
+    _poolSize - memoryUsed
+  }
+
+  /**
+   * Expands the pool by `delta` bytes.
+   */
+  final def incrementPoolSize(delta: Long): Unit = lock.synchronized {
+    require(delta >= 0)
+    _poolSize += delta
+  }
+
+  /**
+   * Shrinks the pool by `delta` bytes.
+   */
+  final def decrementPoolSize(delta: Long): Unit = lock.synchronized {
+    require(delta >= 0)
+    require(delta <= _poolSize)
+    require(_poolSize - delta >= memoryUsed)
+    _poolSize -= delta
+  }
+
+  /**
+   * Returns the amount of used memory in this pool (in bytes).
+   */
+  def memoryUsed: Long
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/30b706b7/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala 
b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
index 9c2c2e9..12a0943 100644
--- a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
@@ -22,7 +22,6 @@ import scala.collection.mutable
 import org.apache.spark.SparkConf
 import org.apache.spark.storage.{BlockId, BlockStatus}
 
-
 /**
  * A [[MemoryManager]] that statically partitions the heap space into disjoint 
regions.
  *
@@ -32,10 +31,14 @@ import org.apache.spark.storage.{BlockId, BlockStatus}
  */
 private[spark] class StaticMemoryManager(
     conf: SparkConf,
-    override val maxExecutionMemory: Long,
+    maxOnHeapExecutionMemory: Long,
     override val maxStorageMemory: Long,
     numCores: Int)
-  extends MemoryManager(conf, numCores) {
+  extends MemoryManager(
+    conf,
+    numCores,
+    maxStorageMemory,
+    maxOnHeapExecutionMemory) {
 
   def this(conf: SparkConf, numCores: Int) {
     this(
@@ -50,76 +53,15 @@ private[spark] class StaticMemoryManager(
     (maxStorageMemory * conf.getDouble("spark.storage.unrollFraction", 
0.2)).toLong
   }
 
-  /**
-   * Acquire N bytes of memory for execution.
-   * @return number of bytes successfully granted (<= N).
-   */
-  override def doAcquireExecutionMemory(
-      numBytes: Long,
-      evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Long = 
synchronized {
-    assert(numBytes >= 0)
-    assert(_executionMemoryUsed <= maxExecutionMemory)
-    val bytesToGrant = math.min(numBytes, maxExecutionMemory - 
_executionMemoryUsed)
-    _executionMemoryUsed += bytesToGrant
-    bytesToGrant
-  }
-
-  /**
-   * Acquire N bytes of memory to cache the given block, evicting existing 
ones if necessary.
-   * Blocks evicted in the process, if any, are added to `evictedBlocks`.
-   * @return whether all N bytes were successfully granted.
-   */
-  override def acquireStorageMemory(
-      blockId: BlockId,
-      numBytes: Long,
-      evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = 
synchronized {
-    acquireStorageMemory(blockId, numBytes, numBytes, evictedBlocks)
-  }
-
-  /**
-   * Acquire N bytes of memory to unroll the given block, evicting existing 
ones if necessary.
-   *
-   * This evicts at most M bytes worth of existing blocks, where M is a 
fraction of the storage
-   * space specified by `spark.storage.unrollFraction`. Blocks evicted in the 
process, if any,
-   * are added to `evictedBlocks`.
-   *
-   * @return whether all N bytes were successfully granted.
-   */
   override def acquireUnrollMemory(
       blockId: BlockId,
       numBytes: Long,
       evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = 
synchronized {
-    val currentUnrollMemory = memoryStore.currentUnrollMemory
+    val currentUnrollMemory = storageMemoryPool.memoryStore.currentUnrollMemory
     val maxNumBytesToFree = math.max(0, maxMemoryToEvictForUnroll - 
currentUnrollMemory)
     val numBytesToFree = math.min(numBytes, maxNumBytesToFree)
-    acquireStorageMemory(blockId, numBytes, numBytesToFree, evictedBlocks)
+    storageMemoryPool.acquireMemory(blockId, numBytes, numBytesToFree, 
evictedBlocks)
   }
-
-  /**
-   * Acquire N bytes of storage memory for the given block, evicting existing 
ones if necessary.
-   *
-   * @param blockId the ID of the block we are acquiring storage memory for
-   * @param numBytesToAcquire the size of this block
-   * @param numBytesToFree the size of space to be freed through evicting 
blocks
-   * @param evictedBlocks a holder for blocks evicted in the process
-   * @return whether all N bytes were successfully granted.
-   */
-  private def acquireStorageMemory(
-      blockId: BlockId,
-      numBytesToAcquire: Long,
-      numBytesToFree: Long,
-      evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = 
synchronized {
-    assert(numBytesToAcquire >= 0)
-    assert(numBytesToFree >= 0)
-    memoryStore.ensureFreeSpace(blockId, numBytesToFree, evictedBlocks)
-    assert(_storageMemoryUsed <= maxStorageMemory)
-    val enoughMemory = _storageMemoryUsed + numBytesToAcquire <= 
maxStorageMemory
-    if (enoughMemory) {
-      _storageMemoryUsed += numBytesToAcquire
-    }
-    enoughMemory
-  }
-
 }
 
 
@@ -135,7 +77,6 @@ private[spark] object StaticMemoryManager {
     (systemMaxMemory * memoryFraction * safetyFraction).toLong
   }
 
-
   /**
    * Return the total amount of memory available for the execution region, in 
bytes.
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/30b706b7/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala 
b/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala
new file mode 100644
index 0000000..6a322ea
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.memory
+
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{TaskContext, Logging}
+import org.apache.spark.storage.{MemoryStore, BlockStatus, BlockId}
+
+/**
+ * Performs bookkeeping for managing an adjustable-size pool of memory that is 
used for storage
+ * (caching).
+ *
+ * @param lock a [[MemoryManager]] instance to synchronize on
+ */
+class StorageMemoryPool(lock: Object) extends MemoryPool(lock) with Logging {
+
+  @GuardedBy("lock")
+  private[this] var _memoryUsed: Long = 0L
+
+  override def memoryUsed: Long = lock.synchronized {
+    _memoryUsed
+  }
+
+  private var _memoryStore: MemoryStore = _
+  def memoryStore: MemoryStore = {
+    if (_memoryStore == null) {
+      throw new IllegalStateException("memory store not initialized yet")
+    }
+    _memoryStore
+  }
+
+  /**
+   * Set the [[MemoryStore]] used by this manager to evict cached blocks.
+   * This must be set after construction due to initialization ordering 
constraints.
+   */
+  final def setMemoryStore(store: MemoryStore): Unit = {
+    _memoryStore = store
+  }
+
+  /**
+   * Acquire N bytes of memory to cache the given block, evicting existing 
ones if necessary.
+   * Blocks evicted in the process, if any, are added to `evictedBlocks`.
+   * @return whether all N bytes were successfully granted.
+   */
+  def acquireMemory(
+      blockId: BlockId,
+      numBytes: Long,
+      evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = 
lock.synchronized {
+    acquireMemory(blockId, numBytes, numBytes, evictedBlocks)
+  }
+
+  /**
+   * Acquire N bytes of storage memory for the given block, evicting existing 
ones if necessary.
+   *
+   * @param blockId the ID of the block we are acquiring storage memory for
+   * @param numBytesToAcquire the size of this block
+   * @param numBytesToFree the size of space to be freed through evicting 
blocks
+   * @return whether all N bytes were successfully granted.
+   */
+  def acquireMemory(
+      blockId: BlockId,
+      numBytesToAcquire: Long,
+      numBytesToFree: Long,
+      evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = 
lock.synchronized {
+    assert(numBytesToAcquire >= 0)
+    assert(numBytesToFree >= 0)
+    assert(memoryUsed <= poolSize)
+    memoryStore.ensureFreeSpace(blockId, numBytesToFree, evictedBlocks)
+    // Register evicted blocks, if any, with the active task metrics
+    Option(TaskContext.get()).foreach { tc =>
+      val metrics = tc.taskMetrics()
+      val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, 
BlockStatus)]())
+      metrics.updatedBlocks = Some(lastUpdatedBlocks ++ evictedBlocks.toSeq)
+    }
+    // NOTE: If the memory store evicts blocks, then those evictions will 
synchronously call
+    // back into this StorageMemoryPool in order to free. Therefore, these 
variables should have
+    // been updated.
+    val enoughMemory = numBytesToAcquire <= memoryFree
+    if (enoughMemory) {
+      _memoryUsed += numBytesToAcquire
+    }
+    enoughMemory
+  }
+
+  def releaseMemory(size: Long): Unit = lock.synchronized {
+    if (size > _memoryUsed) {
+      logWarning(s"Attempted to release $size bytes of storage " +
+        s"memory when we only have ${_memoryUsed} bytes")
+      _memoryUsed = 0
+    } else {
+      _memoryUsed -= size
+    }
+  }
+
+  def releaseAllMemory(): Unit = lock.synchronized {
+    _memoryUsed = 0
+  }
+
+  /**
+   * Try to shrink the size of this storage memory pool by `spaceToFree` 
bytes. Return the number
+   * of bytes removed from the pool's capacity.
+   */
+  def shrinkPoolToFreeSpace(spaceToFree: Long): Long = lock.synchronized {
+    // First, shrink the pool by reclaiming free memory:
+    val spaceFreedByReleasingUnusedMemory = Math.min(spaceToFree, memoryFree)
+    decrementPoolSize(spaceFreedByReleasingUnusedMemory)
+    if (spaceFreedByReleasingUnusedMemory == spaceToFree) {
+      spaceFreedByReleasingUnusedMemory
+    } else {
+      // If reclaiming free memory did not adequately shrink the pool, begin 
evicting blocks:
+      val evictedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
+      memoryStore.ensureFreeSpace(spaceToFree - 
spaceFreedByReleasingUnusedMemory, evictedBlocks)
+      val spaceFreedByEviction = evictedBlocks.map(_._2.memSize).sum
+      _memoryUsed -= spaceFreedByEviction
+      decrementPoolSize(spaceFreedByEviction)
+      spaceFreedByReleasingUnusedMemory + spaceFreedByEviction
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/30b706b7/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala 
b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
index a309303..8be5b05 100644
--- a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
@@ -22,7 +22,6 @@ import scala.collection.mutable
 import org.apache.spark.SparkConf
 import org.apache.spark.storage.{BlockStatus, BlockId}
 
-
 /**
  * A [[MemoryManager]] that enforces a soft boundary between execution and 
storage such that
  * either side can borrow memory from the other.
@@ -41,98 +40,105 @@ import org.apache.spark.storage.{BlockStatus, BlockId}
  * The implication is that attempts to cache blocks may fail if execution has 
already eaten
  * up most of the storage space, in which case the new blocks will be evicted 
immediately
  * according to their respective storage levels.
+ *
+ * @param storageRegionSize Size of the storage region, in bytes.
+ *                          This region is not statically reserved; execution 
can borrow from
+ *                          it if necessary. Cached blocks can be evicted only 
if actual
+ *                          storage memory usage exceeds this region.
  */
-private[spark] class UnifiedMemoryManager(
+private[spark] class UnifiedMemoryManager private[memory] (
     conf: SparkConf,
     maxMemory: Long,
+    private val storageRegionSize: Long,
     numCores: Int)
-  extends MemoryManager(conf, numCores) {
-
-  def this(conf: SparkConf, numCores: Int) {
-    this(conf, UnifiedMemoryManager.getMaxMemory(conf), numCores)
-  }
-
-  /**
-   * Size of the storage region, in bytes.
-   *
-   * This region is not statically reserved; execution can borrow from it if 
necessary.
-   * Cached blocks can be evicted only if actual storage memory usage exceeds 
this region.
-   */
-  private val storageRegionSize: Long = {
-    (maxMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong
-  }
-
-  /**
-   * Total amount of memory, in bytes, not currently occupied by either 
execution or storage.
-   */
-  private def totalFreeMemory: Long = synchronized {
-    assert(_executionMemoryUsed <= maxMemory)
-    assert(_storageMemoryUsed <= maxMemory)
-    assert(_executionMemoryUsed + _storageMemoryUsed <= maxMemory)
-    maxMemory - _executionMemoryUsed - _storageMemoryUsed
-  }
+  extends MemoryManager(
+    conf,
+    numCores,
+    storageRegionSize,
+    maxMemory - storageRegionSize) {
 
-  /**
-   * Total available memory for execution, in bytes.
-   * In this model, this is equivalent to the amount of memory not occupied by 
storage.
-   */
-  override def maxExecutionMemory: Long = synchronized {
-    maxMemory - _storageMemoryUsed
-  }
+  // We always maintain this invariant:
+  assert(onHeapExecutionMemoryPool.poolSize + storageMemoryPool.poolSize == 
maxMemory)
 
-  /**
-   * Total available memory for storage, in bytes.
-   * In this model, this is equivalent to the amount of memory not occupied by 
execution.
-   */
   override def maxStorageMemory: Long = synchronized {
-    maxMemory - _executionMemoryUsed
+    maxMemory - onHeapExecutionMemoryPool.memoryUsed
   }
 
   /**
-   * Acquire N bytes of memory for execution, evicting cached blocks if 
necessary.
+   * Try to acquire up to `numBytes` of execution memory for the current task 
and return the
+   * number of bytes obtained, or 0 if none can be allocated.
    *
-   * This method evicts blocks only up to the amount of memory borrowed by 
storage.
-   * Blocks evicted in the process, if any, are added to `evictedBlocks`.
-   * @return number of bytes successfully granted (<= N).
+   * This call may block until there is enough free memory in some situations, 
to make sure each
+   * task has a chance to ramp up to at least 1 / 2N of the total memory pool 
(where N is the # of
+   * active tasks) before it is forced to spill. This can happen if the number 
of tasks increase
+   * but an older task had a lot of memory already.
    */
-  private[memory] override def doAcquireExecutionMemory(
+  override private[memory] def acquireExecutionMemory(
       numBytes: Long,
-      evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Long = 
synchronized {
+      taskAttemptId: Long,
+      memoryMode: MemoryMode): Long = synchronized {
+    assert(onHeapExecutionMemoryPool.poolSize + storageMemoryPool.poolSize == 
maxMemory)
     assert(numBytes >= 0)
-    val memoryBorrowedByStorage = math.max(0, _storageMemoryUsed - 
storageRegionSize)
-    // If there is not enough free memory AND storage has borrowed some 
execution memory,
-    // then evict as much memory borrowed by storage as needed to grant this 
request
-    val shouldEvictStorage = totalFreeMemory < numBytes && 
memoryBorrowedByStorage > 0
-    if (shouldEvictStorage) {
-      val spaceToEnsure = math.min(numBytes, memoryBorrowedByStorage)
-      memoryStore.ensureFreeSpace(spaceToEnsure, evictedBlocks)
+    memoryMode match {
+      case MemoryMode.ON_HEAP =>
+        if (numBytes > onHeapExecutionMemoryPool.memoryFree) {
+          val extraMemoryNeeded = numBytes - 
onHeapExecutionMemoryPool.memoryFree
+          // There is not enough free memory in the execution pool, so try to 
reclaim memory from
+          // storage. We can reclaim any free memory from the storage pool. If 
the storage pool
+          // has grown to become larger than `storageRegionSize`, we can evict 
blocks and reclaim
+          // the memory that storage has borrowed from execution.
+          val memoryReclaimableFromStorage =
+            math.max(storageMemoryPool.memoryFree, storageMemoryPool.poolSize 
- storageRegionSize)
+          if (memoryReclaimableFromStorage > 0) {
+            // Only reclaim as much space as is necessary and available:
+            val spaceReclaimed = storageMemoryPool.shrinkPoolToFreeSpace(
+              math.min(extraMemoryNeeded, memoryReclaimableFromStorage))
+            onHeapExecutionMemoryPool.incrementPoolSize(spaceReclaimed)
+          }
+        }
+        onHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId)
+      case MemoryMode.OFF_HEAP =>
+        // For now, we only support on-heap caching of data, so we do not need 
to interact with
+        // the storage pool when allocating off-heap memory. This will change 
in the future, though.
+        super.acquireExecutionMemory(numBytes, taskAttemptId, memoryMode)
     }
-    val bytesToGrant = math.min(numBytes, totalFreeMemory)
-    _executionMemoryUsed += bytesToGrant
-    bytesToGrant
   }
 
-  /**
-   * Acquire N bytes of memory to cache the given block, evicting existing 
ones if necessary.
-   * Blocks evicted in the process, if any, are added to `evictedBlocks`.
-   * @return whether all N bytes were successfully granted.
-   */
   override def acquireStorageMemory(
       blockId: BlockId,
       numBytes: Long,
       evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = 
synchronized {
+    assert(onHeapExecutionMemoryPool.poolSize + storageMemoryPool.poolSize == 
maxMemory)
     assert(numBytes >= 0)
-    memoryStore.ensureFreeSpace(blockId, numBytes, evictedBlocks)
-    val enoughMemory = totalFreeMemory >= numBytes
-    if (enoughMemory) {
-      _storageMemoryUsed += numBytes
+    if (numBytes > storageMemoryPool.memoryFree) {
+      // There is not enough free memory in the storage pool, so try to borrow 
free memory from
+      // the execution pool.
+      val memoryBorrowedFromExecution = 
Math.min(onHeapExecutionMemoryPool.memoryFree, numBytes)
+      onHeapExecutionMemoryPool.decrementPoolSize(memoryBorrowedFromExecution)
+      storageMemoryPool.incrementPoolSize(memoryBorrowedFromExecution)
     }
-    enoughMemory
+    storageMemoryPool.acquireMemory(blockId, numBytes, evictedBlocks)
   }
 
+  override def acquireUnrollMemory(
+      blockId: BlockId,
+      numBytes: Long,
+      evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = 
synchronized {
+    acquireStorageMemory(blockId, numBytes, evictedBlocks)
+  }
 }
 
-private object UnifiedMemoryManager {
+object UnifiedMemoryManager {
+
+  def apply(conf: SparkConf, numCores: Int): UnifiedMemoryManager = {
+    val maxMemory = getMaxMemory(conf)
+    new UnifiedMemoryManager(
+      conf,
+      maxMemory = maxMemory,
+      storageRegionSize =
+        (maxMemory * conf.getDouble("spark.memory.storageFraction", 
0.5)).toLong,
+      numCores = numCores)
+  }
 
   /**
    * Return the total amount of memory shared between execution and storage, 
in bytes.

http://git-wip-us.apache.org/repos/asf/spark/blob/30b706b7/core/src/main/scala/org/apache/spark/memory/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/memory/package.scala 
b/core/src/main/scala/org/apache/spark/memory/package.scala
new file mode 100644
index 0000000..564e30d
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/memory/package.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+/**
+ * This package implements Spark's memory management system. This system 
consists of two main
+ * components, a JVM-wide memory manager and a per-task manager:
+ *
+ * - [[org.apache.spark.memory.MemoryManager]] manages Spark's overall memory 
usage within a JVM.
+ *   This component implements the policies for dividing the available memory 
across tasks and for
+ *   allocating memory between storage (memory used caching and data transfer) 
and execution (memory
+ *   used by computations, such as shuffles, joins, sorts, and aggregations).
+ * - [[org.apache.spark.memory.TaskMemoryManager]] manages the memory 
allocated by individual tasks.
+ *   Tasks interact with TaskMemoryManager and never directly interact with 
the JVM-wide
+ *   MemoryManager.
+ *
+ * Internally, each of these components have additional abstractions for 
memory bookkeeping:
+ *
+ *  - [[org.apache.spark.memory.MemoryConsumer]]s are clients of the 
TaskMemoryManager and
+ *    correspond to individual operators and data structures within a task. 
The TaskMemoryManager
+ *    receives memory allocation requests from MemoryConsumers and issues 
callbacks to consumers
+ *    in order to trigger spilling when running low on memory.
+ *  - [[org.apache.spark.memory.MemoryPool]]s are a bookkeeping abstraction 
used by the
+ *    MemoryManager to track the division of memory between storage and 
execution.
+ *
+ * Diagrammatically:
+ *
+ * {{{
+ *       +-------------+
+ *       | MemConsumer |----+                                   
+------------------------+
+ *       +-------------+    |    +-------------------+          |     
MemoryManager      |
+ *                          +--->| TaskMemoryManager |----+     |              
          |
+ *       +-------------+    |    +-------------------+    |     |  
+------------------+  |
+ *       | MemConsumer |----+                             |     |  |  
StorageMemPool  |  |
+ *       +-------------+         +-------------------+    |     |  
+------------------+  |
+ *                               | TaskMemoryManager |----+     |              
          |
+ *                               +-------------------+    |     |  
+------------------+  |
+ *                                                        +---->|  
|OnHeapExecMemPool |  |
+ *                                        *               |     |  
+------------------+  |
+ *                                        *               |     |              
          |
+ *       +-------------+                  *               |     |  
+------------------+  |
+ *       | MemConsumer |----+                             |     |  
|OffHeapExecMemPool|  |
+ *       +-------------+    |    +-------------------+    |     |  
+------------------+  |
+ *                          +--->| TaskMemoryManager |----+     |              
          |
+ *                               +-------------------+          
+------------------------+
+ * }}}
+ *
+ *
+ * There are two implementations of [[org.apache.spark.memory.MemoryManager]] 
which vary in how
+ * they handle the sizing of their memory pools:
+ *
+ *  - [[org.apache.spark.memory.UnifiedMemoryManager]], the default in Spark 
1.6+, enforces soft
+ *    boundaries between storage and execution memory, allowing requests for 
memory in one region
+ *    to be fulfilled by borrowing memory from the other.
+ *  - [[org.apache.spark.memory.StaticMemoryManager]] enforces hard boundaries 
between storage
+ *    and execution memory by statically partitioning Spark's memory and 
preventing storage and
+ *    execution from borrowing memory from each other. This mode is retained 
only for legacy
+ *    compatibility purposes.
+ */
+package object memory

http://git-wip-us.apache.org/repos/asf/spark/blob/30b706b7/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala 
b/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala
index 9e00262..3a48af8 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.util.collection
 
-import org.apache.spark.memory.TaskMemoryManager
+import org.apache.spark.memory.{MemoryMode, TaskMemoryManager}
 import org.apache.spark.{Logging, SparkEnv}
 
 /**
@@ -78,7 +78,8 @@ private[spark] trait Spillable[C] extends Logging {
     if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) {
       // Claim up to double our current memory from the shuffle memory pool
       val amountToRequest = 2 * currentMemory - myMemoryThreshold
-      val granted = taskMemoryManager.acquireExecutionMemory(amountToRequest, 
null)
+      val granted =
+        taskMemoryManager.acquireExecutionMemory(amountToRequest, 
MemoryMode.ON_HEAP, null)
       myMemoryThreshold += granted
       // If we were granted too little memory to grow further (either 
tryToAcquire returned 0,
       // or we already had more memory than myMemoryThreshold), spill the 
current collection
@@ -107,7 +108,8 @@ private[spark] trait Spillable[C] extends Logging {
    */
   def releaseMemory(): Unit = {
     // The amount we requested does not include the initial memory tracking 
threshold
-    taskMemoryManager.releaseExecutionMemory(myMemoryThreshold - 
initialMemoryThreshold, null)
+    taskMemoryManager.releaseExecutionMemory(
+      myMemoryThreshold - initialMemoryThreshold, MemoryMode.ON_HEAP, null)
     myMemoryThreshold = initialMemoryThreshold
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/30b706b7/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java 
b/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java
index c731317..711eed0 100644
--- a/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java
+++ b/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java
@@ -28,8 +28,14 @@ public class TaskMemoryManagerSuite {
   @Test
   public void leakedPageMemoryIsDetected() {
     final TaskMemoryManager manager = new TaskMemoryManager(
-      new TestMemoryManager(new SparkConf().set("spark.unsafe.offHeap", 
"false")), 0);
+      new StaticMemoryManager(
+        new SparkConf().set("spark.unsafe.offHeap", "false"),
+        Long.MAX_VALUE,
+        Long.MAX_VALUE,
+        1),
+      0);
     manager.allocatePage(4096, null);  // leak memory
+    Assert.assertEquals(4096, manager.getMemoryConsumptionForThisTask());
     Assert.assertEquals(4096, manager.cleanUpAllAllocatedMemory());
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/30b706b7/core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java 
b/core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java
index 8ae3642..e6e16ff 100644
--- a/core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java
+++ b/core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java
@@ -32,13 +32,19 @@ public class TestMemoryConsumer extends MemoryConsumer {
   }
 
   void use(long size) {
-    long got = taskMemoryManager.acquireExecutionMemory(size, this);
+    long got = taskMemoryManager.acquireExecutionMemory(
+      size,
+      taskMemoryManager.tungstenMemoryMode,
+      this);
     used += got;
   }
 
   void free(long size) {
     used -= size;
-    taskMemoryManager.releaseExecutionMemory(size, this);
+    taskMemoryManager.releaseExecutionMemory(
+      size,
+      taskMemoryManager.tungstenMemoryMode,
+      this);
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/30b706b7/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
 
b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
index 4763395..0e0eca5 100644
--- 
a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
+++ 
b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
@@ -423,7 +423,7 @@ public class UnsafeShuffleWriterSuite {
     memoryManager.limit(UnsafeShuffleWriter.INITIAL_SORT_BUFFER_SIZE * 16);
     final UnsafeShuffleWriter<Object, Object> writer = createWriter(false);
     final ArrayList<Product2<Object, Object>> dataToWrite = new ArrayList<>();
-    for (int i = 0; i < UnsafeShuffleWriter.INITIAL_SORT_BUFFER_SIZE; i++) {
+    for (int i = 0; i < UnsafeShuffleWriter.INITIAL_SORT_BUFFER_SIZE + 1; i++) 
{
       dataToWrite.add(new Tuple2<Object, Object>(i, i));
     }
     writer.write(dataToWrite.iterator());

http://git-wip-us.apache.org/repos/asf/spark/blob/30b706b7/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
 
b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
index 92bd45e..3bca790 100644
--- 
a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
+++ 
b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
@@ -83,7 +83,9 @@ public abstract class AbstractBytesToBytesMapSuite {
   public void setup() {
     memoryManager =
       new TestMemoryManager(
-        new SparkConf().set("spark.unsafe.offHeap", "" + 
useOffHeapMemoryAllocator()));
+        new SparkConf()
+          .set("spark.unsafe.offHeap", "" + useOffHeapMemoryAllocator())
+          .set("spark.memory.offHeapSize", "256mb"));
     taskMemoryManager = new TaskMemoryManager(memoryManager, 0);
 
     tempDir = Utils.createTempDir(System.getProperty("java.io.tmpdir"), 
"unsafe-test");

http://git-wip-us.apache.org/repos/asf/spark/blob/30b706b7/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
index 4a9479c..f55d435 100644
--- a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.memory
 
 import java.util.concurrent.atomic.AtomicLong
 
+import scala.collection.mutable
 import scala.concurrent.duration.Duration
 import scala.concurrent.{Await, ExecutionContext, Future}
 
@@ -29,7 +30,7 @@ import org.mockito.stubbing.Answer
 import org.scalatest.time.SpanSugar._
 
 import org.apache.spark.SparkFunSuite
-import org.apache.spark.storage.MemoryStore
+import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore, 
StorageLevel}
 
 
 /**
@@ -78,7 +79,12 @@ private[memory] trait MemoryManagerSuite extends 
SparkFunSuite {
         require(args(numBytesPos).isInstanceOf[Long], s"bad test: expected 
ensureFreeSpace " +
           s"argument at index $numBytesPos to be a Long: ${args.mkString(", 
")}")
         val numBytes = args(numBytesPos).asInstanceOf[Long]
-        mockEnsureFreeSpace(mm, numBytes)
+        val success = mockEnsureFreeSpace(mm, numBytes)
+        if (success) {
+          args.last.asInstanceOf[mutable.Buffer[(BlockId, 
BlockStatus)]].append(
+            (null, BlockStatus(StorageLevel.MEMORY_ONLY, numBytes, 0L, 0L)))
+        }
+        success
       }
     }
   }
@@ -132,93 +138,95 @@ private[memory] trait MemoryManagerSuite extends 
SparkFunSuite {
   }
 
   /**
-   * Create a MemoryManager with the specified execution memory limit and no 
storage memory.
+   * Create a MemoryManager with the specified execution memory limits and no 
storage memory.
    */
-  protected def createMemoryManager(maxExecutionMemory: Long): MemoryManager
+  protected def createMemoryManager(
+     maxOnHeapExecutionMemory: Long,
+     maxOffHeapExecutionMemory: Long = 0L): MemoryManager
 
   // -- Tests of sharing of execution memory between tasks 
----------------------------------------
   // Prior to Spark 1.6, these tests were part of ShuffleMemoryManagerSuite.
 
   implicit val ec = ExecutionContext.global
 
-  test("single task requesting execution memory") {
+  test("single task requesting on-heap execution memory") {
     val manager = createMemoryManager(1000L)
     val taskMemoryManager = new TaskMemoryManager(manager, 0)
 
-    assert(taskMemoryManager.acquireExecutionMemory(100L, null) === 100L)
-    assert(taskMemoryManager.acquireExecutionMemory(400L, null) === 400L)
-    assert(taskMemoryManager.acquireExecutionMemory(400L, null) === 400L)
-    assert(taskMemoryManager.acquireExecutionMemory(200L, null) === 100L)
-    assert(taskMemoryManager.acquireExecutionMemory(100L, null) === 0L)
-    assert(taskMemoryManager.acquireExecutionMemory(100L, null) === 0L)
+    assert(taskMemoryManager.acquireExecutionMemory(100L, MemoryMode.ON_HEAP, 
null) === 100L)
+    assert(taskMemoryManager.acquireExecutionMemory(400L, MemoryMode.ON_HEAP, 
null) === 400L)
+    assert(taskMemoryManager.acquireExecutionMemory(400L, MemoryMode.ON_HEAP, 
null) === 400L)
+    assert(taskMemoryManager.acquireExecutionMemory(200L, MemoryMode.ON_HEAP, 
null) === 100L)
+    assert(taskMemoryManager.acquireExecutionMemory(100L, MemoryMode.ON_HEAP, 
null) === 0L)
+    assert(taskMemoryManager.acquireExecutionMemory(100L, MemoryMode.ON_HEAP, 
null) === 0L)
 
-    taskMemoryManager.releaseExecutionMemory(500L, null)
-    assert(taskMemoryManager.acquireExecutionMemory(300L, null) === 300L)
-    assert(taskMemoryManager.acquireExecutionMemory(300L, null) === 200L)
+    taskMemoryManager.releaseExecutionMemory(500L, MemoryMode.ON_HEAP, null)
+    assert(taskMemoryManager.acquireExecutionMemory(300L, MemoryMode.ON_HEAP, 
null) === 300L)
+    assert(taskMemoryManager.acquireExecutionMemory(300L, MemoryMode.ON_HEAP, 
null) === 200L)
 
     taskMemoryManager.cleanUpAllAllocatedMemory()
-    assert(taskMemoryManager.acquireExecutionMemory(1000L, null) === 1000L)
-    assert(taskMemoryManager.acquireExecutionMemory(100L, null) === 0L)
+    assert(taskMemoryManager.acquireExecutionMemory(1000L, MemoryMode.ON_HEAP, 
null) === 1000L)
+    assert(taskMemoryManager.acquireExecutionMemory(100L, MemoryMode.ON_HEAP, 
null) === 0L)
   }
 
-  test("two tasks requesting full execution memory") {
+  test("two tasks requesting full on-heap execution memory") {
     val memoryManager = createMemoryManager(1000L)
     val t1MemManager = new TaskMemoryManager(memoryManager, 1)
     val t2MemManager = new TaskMemoryManager(memoryManager, 2)
     val futureTimeout: Duration = 20.seconds
 
     // Have both tasks request 500 bytes, then wait until both requests have 
been granted:
-    val t1Result1 = Future { t1MemManager.acquireExecutionMemory(500L, null) }
-    val t2Result1 = Future { t2MemManager.acquireExecutionMemory(500L, null) }
+    val t1Result1 = Future { t1MemManager.acquireExecutionMemory(500L, 
MemoryMode.ON_HEAP, null) }
+    val t2Result1 = Future { t2MemManager.acquireExecutionMemory(500L, 
MemoryMode.ON_HEAP, null) }
     assert(Await.result(t1Result1, futureTimeout) === 500L)
     assert(Await.result(t2Result1, futureTimeout) === 500L)
 
     // Have both tasks each request 500 bytes more; both should immediately 
return 0 as they are
     // both now at 1 / N
-    val t1Result2 = Future { t1MemManager.acquireExecutionMemory(500L, null) }
-    val t2Result2 = Future { t2MemManager.acquireExecutionMemory(500L, null) }
+    val t1Result2 = Future { t1MemManager.acquireExecutionMemory(500L, 
MemoryMode.ON_HEAP, null) }
+    val t2Result2 = Future { t2MemManager.acquireExecutionMemory(500L, 
MemoryMode.ON_HEAP, null) }
     assert(Await.result(t1Result2, 200.millis) === 0L)
     assert(Await.result(t2Result2, 200.millis) === 0L)
   }
 
-  test("two tasks cannot grow past 1 / N of execution memory") {
+  test("two tasks cannot grow past 1 / N of on-heap execution memory") {
     val memoryManager = createMemoryManager(1000L)
     val t1MemManager = new TaskMemoryManager(memoryManager, 1)
     val t2MemManager = new TaskMemoryManager(memoryManager, 2)
     val futureTimeout: Duration = 20.seconds
 
     // Have both tasks request 250 bytes, then wait until both requests have 
been granted:
-    val t1Result1 = Future { t1MemManager.acquireExecutionMemory(250L, null) }
-    val t2Result1 = Future { t2MemManager.acquireExecutionMemory(250L, null) }
+    val t1Result1 = Future { t1MemManager.acquireExecutionMemory(250L, 
MemoryMode.ON_HEAP, null) }
+    val t2Result1 = Future { t2MemManager.acquireExecutionMemory(250L, 
MemoryMode.ON_HEAP, null) }
     assert(Await.result(t1Result1, futureTimeout) === 250L)
     assert(Await.result(t2Result1, futureTimeout) === 250L)
 
     // Have both tasks each request 500 bytes more.
     // We should only grant 250 bytes to each of them on this second request
-    val t1Result2 = Future { t1MemManager.acquireExecutionMemory(500L, null) }
-    val t2Result2 = Future { t2MemManager.acquireExecutionMemory(500L, null) }
+    val t1Result2 = Future { t1MemManager.acquireExecutionMemory(500L, 
MemoryMode.ON_HEAP, null) }
+    val t2Result2 = Future { t2MemManager.acquireExecutionMemory(500L, 
MemoryMode.ON_HEAP, null) }
     assert(Await.result(t1Result2, futureTimeout) === 250L)
     assert(Await.result(t2Result2, futureTimeout) === 250L)
   }
 
-  test("tasks can block to get at least 1 / 2N of execution memory") {
+  test("tasks can block to get at least 1 / 2N of on-heap execution memory") {
     val memoryManager = createMemoryManager(1000L)
     val t1MemManager = new TaskMemoryManager(memoryManager, 1)
     val t2MemManager = new TaskMemoryManager(memoryManager, 2)
     val futureTimeout: Duration = 20.seconds
 
     // t1 grabs 1000 bytes and then waits until t2 is ready to make a request.
-    val t1Result1 = Future { t1MemManager.acquireExecutionMemory(1000L, null) }
+    val t1Result1 = Future { t1MemManager.acquireExecutionMemory(1000L, 
MemoryMode.ON_HEAP, null) }
     assert(Await.result(t1Result1, futureTimeout) === 1000L)
-    val t2Result1 = Future { t2MemManager.acquireExecutionMemory(250L, null) }
+    val t2Result1 = Future { t2MemManager.acquireExecutionMemory(250L, 
MemoryMode.ON_HEAP, null) }
     // Make sure that t2 didn't grab the memory right away. This is hacky but 
it would be difficult
     // to make sure the other thread blocks for some time otherwise.
     Thread.sleep(300)
-    t1MemManager.releaseExecutionMemory(250L, null)
+    t1MemManager.releaseExecutionMemory(250L, MemoryMode.ON_HEAP, null)
     // The memory freed from t1 should now be granted to t2.
     assert(Await.result(t2Result1, futureTimeout) === 250L)
     // Further requests by t2 should be denied immediately because it now has 
1 / 2N of the memory.
-    val t2Result2 = Future { t2MemManager.acquireExecutionMemory(100L, null) }
+    val t2Result2 = Future { t2MemManager.acquireExecutionMemory(100L, 
MemoryMode.ON_HEAP, null) }
     assert(Await.result(t2Result2, 200.millis) === 0L)
   }
 
@@ -229,18 +237,18 @@ private[memory] trait MemoryManagerSuite extends 
SparkFunSuite {
     val futureTimeout: Duration = 20.seconds
 
     // t1 grabs 1000 bytes and then waits until t2 is ready to make a request.
-    val t1Result1 = Future { t1MemManager.acquireExecutionMemory(1000L, null) }
+    val t1Result1 = Future { t1MemManager.acquireExecutionMemory(1000L, 
MemoryMode.ON_HEAP, null) }
     assert(Await.result(t1Result1, futureTimeout) === 1000L)
-    val t2Result1 = Future { t2MemManager.acquireExecutionMemory(500L, null) }
+    val t2Result1 = Future { t2MemManager.acquireExecutionMemory(500L, 
MemoryMode.ON_HEAP, null) }
     // Make sure that t2 didn't grab the memory right away. This is hacky but 
it would be difficult
     // to make sure the other thread blocks for some time otherwise.
     Thread.sleep(300)
     // t1 releases all of its memory, so t2 should be able to grab all of the 
memory
     t1MemManager.cleanUpAllAllocatedMemory()
     assert(Await.result(t2Result1, futureTimeout) === 500L)
-    val t2Result2 = Future { t2MemManager.acquireExecutionMemory(500L, null) }
+    val t2Result2 = Future { t2MemManager.acquireExecutionMemory(500L, 
MemoryMode.ON_HEAP, null) }
     assert(Await.result(t2Result2, futureTimeout) === 500L)
-    val t2Result3 = Future { t2MemManager.acquireExecutionMemory(500L, null) }
+    val t2Result3 = Future { t2MemManager.acquireExecutionMemory(500L, 
MemoryMode.ON_HEAP, null) }
     assert(Await.result(t2Result3, 200.millis) === 0L)
   }
 
@@ -251,15 +259,35 @@ private[memory] trait MemoryManagerSuite extends 
SparkFunSuite {
     val t2MemManager = new TaskMemoryManager(memoryManager, 2)
     val futureTimeout: Duration = 20.seconds
 
-    val t1Result1 = Future { t1MemManager.acquireExecutionMemory(700L, null) }
+    val t1Result1 = Future { t1MemManager.acquireExecutionMemory(700L, 
MemoryMode.ON_HEAP, null) }
     assert(Await.result(t1Result1, futureTimeout) === 700L)
 
-    val t2Result1 = Future { t2MemManager.acquireExecutionMemory(300L, null) }
+    val t2Result1 = Future { t2MemManager.acquireExecutionMemory(300L, 
MemoryMode.ON_HEAP, null) }
     assert(Await.result(t2Result1, futureTimeout) === 300L)
 
-    val t1Result2 = Future { t1MemManager.acquireExecutionMemory(300L, null) }
+    val t1Result2 = Future { t1MemManager.acquireExecutionMemory(300L, 
MemoryMode.ON_HEAP, null) }
     assert(Await.result(t1Result2, 200.millis) === 0L)
   }
+
+  test("off-heap execution allocations cannot exceed limit") {
+    val memoryManager = createMemoryManager(
+      maxOnHeapExecutionMemory = 0L,
+      maxOffHeapExecutionMemory = 1000L)
+
+    val tMemManager = new TaskMemoryManager(memoryManager, 1)
+    val result1 = Future { tMemManager.acquireExecutionMemory(1000L, 
MemoryMode.OFF_HEAP, null) }
+    assert(Await.result(result1, 200.millis) === 1000L)
+    assert(tMemManager.getMemoryConsumptionForThisTask === 1000L)
+
+    val result2 = Future { tMemManager.acquireExecutionMemory(300L, 
MemoryMode.OFF_HEAP, null) }
+    assert(Await.result(result2, 200.millis) === 0L)
+
+    assert(tMemManager.getMemoryConsumptionForThisTask === 1000L)
+    tMemManager.releaseExecutionMemory(500L, MemoryMode.OFF_HEAP, null)
+    assert(tMemManager.getMemoryConsumptionForThisTask === 500L)
+    tMemManager.releaseExecutionMemory(500L, MemoryMode.OFF_HEAP, null)
+    assert(tMemManager.getMemoryConsumptionForThisTask === 0L)
+  }
 }
 
 private object MemoryManagerSuite {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to