Repository: spark
Updated Branches:
  refs/heads/master bd94ea4c8 -> 20c0bcd97


[SPARK-14135] Add off-heap storage memory bookkeeping support to MemoryManager

This patch extends Spark's `UnifiedMemoryManager` to add bookkeeping support 
for off-heap storage memory, an requirement for enabling off-heap caching 
(which will be done by #11805). The `MemoryManager`'s `storageMemoryPool` has 
been split into separate on- and off-heap pools and the storage and unroll 
memory allocation methods have been updated to accept a `memoryMode` parameter 
to specify whether allocations should be performed on- or off-heap.

In order to reduce the testing surface, the `StaticMemoryManager` does not 
support off-heap caching (we plan to eventually remove the 
`StaticMemoryManager`, so this isn't a significant limitation).

Author: Josh Rosen <joshro...@databricks.com>

Closes #11942 from JoshRosen/off-heap-storage-memory-bookkeeping.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/20c0bcd9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/20c0bcd9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/20c0bcd9

Branch: refs/heads/master
Commit: 20c0bcd972cfbb2f2aa92948f9ee337724a70361
Parents: bd94ea4
Author: Josh Rosen <joshro...@databricks.com>
Authored: Sat Mar 26 11:03:25 2016 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Sat Mar 26 11:03:25 2016 -0700

----------------------------------------------------------------------
 .../spark/memory/ExecutionMemoryPool.scala      |   9 +-
 .../org/apache/spark/memory/MemoryManager.scala |  46 +++--
 .../spark/memory/StaticMemoryManager.scala      |  36 ++--
 .../apache/spark/memory/StorageMemoryPool.scala |  18 +-
 .../spark/memory/UnifiedMemoryManager.scala     | 168 +++++++++++--------
 .../org/apache/spark/storage/BlockManager.scala |   4 +-
 .../org/apache/spark/storage/StorageLevel.scala |   6 +
 .../spark/storage/memory/MemoryStore.scala      |  19 ++-
 .../spark/memory/MemoryManagerSuite.scala       |   2 +-
 .../spark/memory/StaticMemoryManagerSuite.scala |  60 +++----
 .../apache/spark/memory/TestMemoryManager.scala |  14 +-
 .../memory/UnifiedMemoryManagerSuite.scala      |  80 +++++----
 .../spark/storage/BlockManagerSuite.scala       |   4 +-
 13 files changed, 280 insertions(+), 186 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/20c0bcd9/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala 
b/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala
index 319718e..f816707 100644
--- a/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala
+++ b/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala
@@ -37,13 +37,18 @@ import org.apache.spark.internal.Logging
  * tasks was performed by the ShuffleMemoryManager.
  *
  * @param lock a [[MemoryManager]] instance to synchronize on
- * @param poolName a human-readable name for this pool, for use in log messages
+ * @param memoryMode the type of memory tracked by this pool (on- or off-heap)
  */
 private[memory] class ExecutionMemoryPool(
     lock: Object,
-    poolName: String
+    memoryMode: MemoryMode
   ) extends MemoryPool(lock) with Logging {
 
+  private[this] val poolName: String = memoryMode match {
+    case MemoryMode.ON_HEAP => "on-heap execution"
+    case MemoryMode.OFF_HEAP => "off-heap execution"
+  }
+
   /**
    * Map from taskAttemptId -> memory consumption in bytes
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/20c0bcd9/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala 
b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
index 5e8abee..10656bc 100644
--- a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
@@ -36,42 +36,52 @@ import org.apache.spark.unsafe.memory.MemoryAllocator
 private[spark] abstract class MemoryManager(
     conf: SparkConf,
     numCores: Int,
-    storageMemory: Long,
+    onHeapStorageMemory: Long,
     onHeapExecutionMemory: Long) extends Logging {
 
   // -- Methods related to memory allocation policies and bookkeeping 
------------------------------
 
   @GuardedBy("this")
-  protected val storageMemoryPool = new StorageMemoryPool(this)
+  protected val onHeapStorageMemoryPool = new StorageMemoryPool(this, 
MemoryMode.ON_HEAP)
   @GuardedBy("this")
-  protected val onHeapExecutionMemoryPool = new ExecutionMemoryPool(this, 
"on-heap execution")
+  protected val offHeapStorageMemoryPool = new StorageMemoryPool(this, 
MemoryMode.OFF_HEAP)
   @GuardedBy("this")
-  protected val offHeapExecutionMemoryPool = new ExecutionMemoryPool(this, 
"off-heap execution")
+  protected val onHeapExecutionMemoryPool = new ExecutionMemoryPool(this, 
MemoryMode.ON_HEAP)
+  @GuardedBy("this")
+  protected val offHeapExecutionMemoryPool = new ExecutionMemoryPool(this, 
MemoryMode.OFF_HEAP)
 
-  storageMemoryPool.incrementPoolSize(storageMemory)
+  onHeapStorageMemoryPool.incrementPoolSize(onHeapStorageMemory)
   onHeapExecutionMemoryPool.incrementPoolSize(onHeapExecutionMemory)
-  
offHeapExecutionMemoryPool.incrementPoolSize(conf.getSizeAsBytes("spark.memory.offHeap.size",
 0))
+
+  protected[this] val maxOffHeapMemory = 
conf.getSizeAsBytes("spark.memory.offHeap.size", 0)
+  protected[this] val offHeapStorageMemory =
+    (maxOffHeapMemory * conf.getDouble("spark.memory.storageFraction", 
0.5)).toLong
+
+  offHeapExecutionMemoryPool.incrementPoolSize(maxOffHeapMemory - 
offHeapStorageMemory)
+  offHeapStorageMemoryPool.incrementPoolSize(offHeapStorageMemory)
 
   /**
    * Total available memory for storage, in bytes. This amount can vary over 
time, depending on
    * the MemoryManager implementation.
    * In this model, this is equivalent to the amount of memory not occupied by 
execution.
    */
-  def maxStorageMemory: Long
+  def maxOnHeapStorageMemory: Long
 
   /**
    * Set the [[MemoryStore]] used by this manager to evict cached blocks.
    * This must be set after construction due to initialization ordering 
constraints.
    */
   final def setMemoryStore(store: MemoryStore): Unit = synchronized {
-    storageMemoryPool.setMemoryStore(store)
+    onHeapStorageMemoryPool.setMemoryStore(store)
+    offHeapStorageMemoryPool.setMemoryStore(store)
   }
 
   /**
    * Acquire N bytes of memory to cache the given block, evicting existing 
ones if necessary.
+   *
    * @return whether all N bytes were successfully granted.
    */
-  def acquireStorageMemory(blockId: BlockId, numBytes: Long): Boolean
+  def acquireStorageMemory(blockId: BlockId, numBytes: Long, memoryMode: 
MemoryMode): Boolean
 
   /**
    * Acquire N bytes of memory to unroll the given block, evicting existing 
ones if necessary.
@@ -82,7 +92,7 @@ private[spark] abstract class MemoryManager(
    *
    * @return whether all N bytes were successfully granted.
    */
-  def acquireUnrollMemory(blockId: BlockId, numBytes: Long): Boolean
+  def acquireUnrollMemory(blockId: BlockId, numBytes: Long, memoryMode: 
MemoryMode): Boolean
 
   /**
    * Try to acquire up to `numBytes` of execution memory for the current task 
and return the
@@ -126,22 +136,26 @@ private[spark] abstract class MemoryManager(
   /**
    * Release N bytes of storage memory.
    */
-  def releaseStorageMemory(numBytes: Long): Unit = synchronized {
-    storageMemoryPool.releaseMemory(numBytes)
+  def releaseStorageMemory(numBytes: Long, memoryMode: MemoryMode): Unit = 
synchronized {
+    memoryMode match {
+      case MemoryMode.ON_HEAP => 
onHeapStorageMemoryPool.releaseMemory(numBytes)
+      case MemoryMode.OFF_HEAP => 
offHeapStorageMemoryPool.releaseMemory(numBytes)
+    }
   }
 
   /**
    * Release all storage memory acquired.
    */
   final def releaseAllStorageMemory(): Unit = synchronized {
-    storageMemoryPool.releaseAllMemory()
+    onHeapStorageMemoryPool.releaseAllMemory()
+    offHeapStorageMemoryPool.releaseAllMemory()
   }
 
   /**
    * Release N bytes of unroll memory.
    */
-  final def releaseUnrollMemory(numBytes: Long): Unit = synchronized {
-    releaseStorageMemory(numBytes)
+  final def releaseUnrollMemory(numBytes: Long, memoryMode: MemoryMode): Unit 
= synchronized {
+    releaseStorageMemory(numBytes, memoryMode)
   }
 
   /**
@@ -155,7 +169,7 @@ private[spark] abstract class MemoryManager(
    * Storage memory currently in use, in bytes.
    */
   final def storageMemoryUsed: Long = synchronized {
-    storageMemoryPool.memoryUsed
+    onHeapStorageMemoryPool.memoryUsed + offHeapStorageMemoryPool.memoryUsed
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/20c0bcd9/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala 
b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
index f9f8f82..cbd0fa9 100644
--- a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
@@ -30,12 +30,12 @@ import org.apache.spark.storage.BlockId
 private[spark] class StaticMemoryManager(
     conf: SparkConf,
     maxOnHeapExecutionMemory: Long,
-    override val maxStorageMemory: Long,
+    override val maxOnHeapStorageMemory: Long,
     numCores: Int)
   extends MemoryManager(
     conf,
     numCores,
-    maxStorageMemory,
+    maxOnHeapStorageMemory,
     maxOnHeapExecutionMemory) {
 
   def this(conf: SparkConf, numCores: Int) {
@@ -46,25 +46,39 @@ private[spark] class StaticMemoryManager(
       numCores)
   }
 
+  // The StaticMemoryManager does not support off-heap storage memory:
+  
offHeapExecutionMemoryPool.incrementPoolSize(offHeapStorageMemoryPool.poolSize)
+  offHeapStorageMemoryPool.decrementPoolSize(offHeapStorageMemoryPool.poolSize)
+
   // Max number of bytes worth of blocks to evict when unrolling
   private val maxUnrollMemory: Long = {
-    (maxStorageMemory * conf.getDouble("spark.storage.unrollFraction", 
0.2)).toLong
+    (maxOnHeapStorageMemory * conf.getDouble("spark.storage.unrollFraction", 
0.2)).toLong
   }
 
-  override def acquireStorageMemory(blockId: BlockId, numBytes: Long): Boolean 
= synchronized {
-    if (numBytes > maxStorageMemory) {
+  override def acquireStorageMemory(
+      blockId: BlockId,
+      numBytes: Long,
+      memoryMode: MemoryMode): Boolean = synchronized {
+    require(memoryMode != MemoryMode.OFF_HEAP,
+      "StaticMemoryManager does not support off-heap storage memory")
+    if (numBytes > maxOnHeapStorageMemory) {
       // Fail fast if the block simply won't fit
       logInfo(s"Will not store $blockId as the required space ($numBytes 
bytes) exceeds our " +
-        s"memory limit ($maxStorageMemory bytes)")
+        s"memory limit ($maxOnHeapStorageMemory bytes)")
       false
     } else {
-      storageMemoryPool.acquireMemory(blockId, numBytes)
+      onHeapStorageMemoryPool.acquireMemory(blockId, numBytes)
     }
   }
 
-  override def acquireUnrollMemory(blockId: BlockId, numBytes: Long): Boolean 
= synchronized {
-    val currentUnrollMemory = storageMemoryPool.memoryStore.currentUnrollMemory
-    val freeMemory = storageMemoryPool.memoryFree
+  override def acquireUnrollMemory(
+      blockId: BlockId,
+      numBytes: Long,
+      memoryMode: MemoryMode): Boolean = synchronized {
+    require(memoryMode != MemoryMode.OFF_HEAP,
+      "StaticMemoryManager does not support off-heap unroll memory")
+    val currentUnrollMemory = 
onHeapStorageMemoryPool.memoryStore.currentUnrollMemory
+    val freeMemory = onHeapStorageMemoryPool.memoryFree
     // When unrolling, we will use all of the existing free memory, and, if 
necessary,
     // some extra space freed from evicting cached blocks. We must place a cap 
on the
     // amount of memory to be evicted by unrolling, however, otherwise 
unrolling one
@@ -72,7 +86,7 @@ private[spark] class StaticMemoryManager(
     val maxNumBytesToFree = math.max(0, maxUnrollMemory - currentUnrollMemory 
- freeMemory)
     // Keep it within the range 0 <= X <= maxNumBytesToFree
     val numBytesToFree = math.max(0, math.min(maxNumBytesToFree, numBytes - 
freeMemory))
-    storageMemoryPool.acquireMemory(blockId, numBytes, numBytesToFree)
+    onHeapStorageMemoryPool.acquireMemory(blockId, numBytes, numBytesToFree)
   }
 
   private[memory]

http://git-wip-us.apache.org/repos/asf/spark/blob/20c0bcd9/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala 
b/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala
index 6fcf26e..a67e8da 100644
--- a/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala
+++ b/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala
@@ -28,8 +28,12 @@ import org.apache.spark.storage.memory.MemoryStore
  * (caching).
  *
  * @param lock a [[MemoryManager]] instance to synchronize on
+ * @param memoryMode the type of memory tracked by this pool (on- or off-heap)
  */
-private[memory] class StorageMemoryPool(lock: Object) extends MemoryPool(lock) 
with Logging {
+private[memory] class StorageMemoryPool(
+    lock: Object,
+    memoryMode: MemoryMode
+  ) extends MemoryPool(lock) with Logging {
 
   @GuardedBy("lock")
   private[this] var _memoryUsed: Long = 0L
@@ -79,7 +83,8 @@ private[memory] class StorageMemoryPool(lock: Object) extends 
MemoryPool(lock) w
     assert(numBytesToAcquire >= 0)
     assert(numBytesToFree >= 0)
     assert(memoryUsed <= poolSize)
-    if (numBytesToFree > 0) {
+    // Once we support off-heap caching, this will need to change:
+    if (numBytesToFree > 0 && memoryMode == MemoryMode.ON_HEAP) {
       memoryStore.evictBlocksToFreeSpace(Some(blockId), numBytesToFree)
     }
     // NOTE: If the memory store evicts blocks, then those evictions will 
synchronously call
@@ -117,7 +122,14 @@ private[memory] class StorageMemoryPool(lock: Object) 
extends MemoryPool(lock) w
     val remainingSpaceToFree = spaceToFree - spaceFreedByReleasingUnusedMemory
     if (remainingSpaceToFree > 0) {
       // If reclaiming free memory did not adequately shrink the pool, begin 
evicting blocks:
-      val spaceFreedByEviction = memoryStore.evictBlocksToFreeSpace(None, 
remainingSpaceToFree)
+      val spaceFreedByEviction = {
+        // Once we support off-heap caching, this will need to change:
+        if (memoryMode == MemoryMode.ON_HEAP) {
+          memoryStore.evictBlocksToFreeSpace(None, remainingSpaceToFree)
+        } else {
+          0
+        }
+      }
       // When a block is released, BlockManager.dropFromMemory() calls 
releaseMemory(), so we do
       // not need to decrement _memoryUsed here. However, we do need to 
decrement the pool size.
       decrementPoolSize(spaceFreedByEviction)

http://git-wip-us.apache.org/repos/asf/spark/blob/20c0bcd9/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala 
b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
index 6c57c98..fa9c021 100644
--- a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
@@ -39,27 +39,32 @@ import org.apache.spark.storage.BlockId
  * up most of the storage space, in which case the new blocks will be evicted 
immediately
  * according to their respective storage levels.
  *
- * @param storageRegionSize Size of the storage region, in bytes.
+ * @param onHeapStorageRegionSize Size of the storage region, in bytes.
  *                          This region is not statically reserved; execution 
can borrow from
  *                          it if necessary. Cached blocks can be evicted only 
if actual
  *                          storage memory usage exceeds this region.
  */
 private[spark] class UnifiedMemoryManager private[memory] (
     conf: SparkConf,
-    val maxMemory: Long,
-    storageRegionSize: Long,
+    val maxHeapMemory: Long,
+    onHeapStorageRegionSize: Long,
     numCores: Int)
   extends MemoryManager(
     conf,
     numCores,
-    storageRegionSize,
-    maxMemory - storageRegionSize) {
+    onHeapStorageRegionSize,
+    maxHeapMemory - onHeapStorageRegionSize) {
 
-  // We always maintain this invariant:
-  assert(onHeapExecutionMemoryPool.poolSize + storageMemoryPool.poolSize == 
maxMemory)
+  private def assertInvariants(): Unit = {
+    assert(onHeapExecutionMemoryPool.poolSize + 
onHeapStorageMemoryPool.poolSize == maxHeapMemory)
+    assert(
+      offHeapExecutionMemoryPool.poolSize + offHeapStorageMemoryPool.poolSize 
== maxOffHeapMemory)
+  }
+
+  assertInvariants()
 
-  override def maxStorageMemory: Long = synchronized {
-    maxMemory - onHeapExecutionMemoryPool.memoryUsed
+  override def maxOnHeapStorageMemory: Long = synchronized {
+    maxHeapMemory - onHeapExecutionMemoryPool.memoryUsed
   }
 
   /**
@@ -75,83 +80,104 @@ private[spark] class UnifiedMemoryManager private[memory] (
       numBytes: Long,
       taskAttemptId: Long,
       memoryMode: MemoryMode): Long = synchronized {
-    assert(onHeapExecutionMemoryPool.poolSize + storageMemoryPool.poolSize == 
maxMemory)
+    assertInvariants()
     assert(numBytes >= 0)
-    memoryMode match {
-      case MemoryMode.ON_HEAP =>
-
-        /**
-         * Grow the execution pool by evicting cached blocks, thereby 
shrinking the storage pool.
-         *
-         * When acquiring memory for a task, the execution pool may need to 
make multiple
-         * attempts. Each attempt must be able to evict storage in case 
another task jumps in
-         * and caches a large block between the attempts. This is called once 
per attempt.
-         */
-        def maybeGrowExecutionPool(extraMemoryNeeded: Long): Unit = {
-          if (extraMemoryNeeded > 0) {
-            // There is not enough free memory in the execution pool, so try 
to reclaim memory from
-            // storage. We can reclaim any free memory from the storage pool. 
If the storage pool
-            // has grown to become larger than `storageRegionSize`, we can 
evict blocks and reclaim
-            // the memory that storage has borrowed from execution.
-            val memoryReclaimableFromStorage =
-              math.max(storageMemoryPool.memoryFree, 
storageMemoryPool.poolSize - storageRegionSize)
-            if (memoryReclaimableFromStorage > 0) {
-              // Only reclaim as much space as is necessary and available:
-              val spaceReclaimed = storageMemoryPool.shrinkPoolToFreeSpace(
-                math.min(extraMemoryNeeded, memoryReclaimableFromStorage))
-              onHeapExecutionMemoryPool.incrementPoolSize(spaceReclaimed)
-            }
-          }
-        }
+    val (executionPool, storagePool, storageRegionSize, maxMemory) = 
memoryMode match {
+      case MemoryMode.ON_HEAP => (
+        onHeapExecutionMemoryPool,
+        onHeapStorageMemoryPool,
+        onHeapStorageRegionSize,
+        maxHeapMemory)
+      case MemoryMode.OFF_HEAP => (
+        offHeapExecutionMemoryPool,
+        offHeapStorageMemoryPool,
+        offHeapStorageMemory,
+        maxOffHeapMemory)
+    }
 
-        /**
-         * The size the execution pool would have after evicting storage 
memory.
-         *
-         * The execution memory pool divides this quantity among the active 
tasks evenly to cap
-         * the execution memory allocation for each task. It is important to 
keep this greater
-         * than the execution pool size, which doesn't take into account 
potential memory that
-         * could be freed by evicting storage. Otherwise we may hit 
SPARK-12155.
-         *
-         * Additionally, this quantity should be kept below `maxMemory` to 
arbitrate fairness
-         * in execution memory allocation across tasks, Otherwise, a task may 
occupy more than
-         * its fair share of execution memory, mistakenly thinking that other 
tasks can acquire
-         * the portion of storage memory that cannot be evicted.
-         */
-        def computeMaxExecutionPoolSize(): Long = {
-          maxMemory - math.min(storageMemoryUsed, storageRegionSize)
+    /**
+     * Grow the execution pool by evicting cached blocks, thereby shrinking 
the storage pool.
+     *
+     * When acquiring memory for a task, the execution pool may need to make 
multiple
+     * attempts. Each attempt must be able to evict storage in case another 
task jumps in
+     * and caches a large block between the attempts. This is called once per 
attempt.
+     */
+    def maybeGrowExecutionPool(extraMemoryNeeded: Long): Unit = {
+      if (extraMemoryNeeded > 0) {
+        // There is not enough free memory in the execution pool, so try to 
reclaim memory from
+        // storage. We can reclaim any free memory from the storage pool. If 
the storage pool
+        // has grown to become larger than `storageRegionSize`, we can evict 
blocks and reclaim
+        // the memory that storage has borrowed from execution.
+        val memoryReclaimableFromStorage = math.max(
+          storagePool.memoryFree,
+          storagePool.poolSize - storageRegionSize)
+        if (memoryReclaimableFromStorage > 0) {
+          // Only reclaim as much space as is necessary and available:
+          val spaceReclaimed = storagePool.shrinkPoolToFreeSpace(
+            math.min(extraMemoryNeeded, memoryReclaimableFromStorage))
+          executionPool.incrementPoolSize(spaceReclaimed)
         }
+      }
+    }
 
-        onHeapExecutionMemoryPool.acquireMemory(
-          numBytes, taskAttemptId, maybeGrowExecutionPool, 
computeMaxExecutionPoolSize)
-
-      case MemoryMode.OFF_HEAP =>
-        // For now, we only support on-heap caching of data, so we do not need 
to interact with
-        // the storage pool when allocating off-heap memory. This will change 
in the future, though.
-        offHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId)
+    /**
+     * The size the execution pool would have after evicting storage memory.
+     *
+     * The execution memory pool divides this quantity among the active tasks 
evenly to cap
+     * the execution memory allocation for each task. It is important to keep 
this greater
+     * than the execution pool size, which doesn't take into account potential 
memory that
+     * could be freed by evicting storage. Otherwise we may hit SPARK-12155.
+     *
+     * Additionally, this quantity should be kept below `maxMemory` to 
arbitrate fairness
+     * in execution memory allocation across tasks, Otherwise, a task may 
occupy more than
+     * its fair share of execution memory, mistakenly thinking that other 
tasks can acquire
+     * the portion of storage memory that cannot be evicted.
+     */
+    def computeMaxExecutionPoolSize(): Long = {
+      maxMemory - math.min(storagePool.memoryUsed, storageRegionSize)
     }
+
+    executionPool.acquireMemory(
+      numBytes, taskAttemptId, maybeGrowExecutionPool, 
computeMaxExecutionPoolSize)
   }
 
-  override def acquireStorageMemory(blockId: BlockId, numBytes: Long): Boolean 
= synchronized {
-    assert(onHeapExecutionMemoryPool.poolSize + storageMemoryPool.poolSize == 
maxMemory)
+  override def acquireStorageMemory(
+      blockId: BlockId,
+      numBytes: Long,
+      memoryMode: MemoryMode): Boolean = synchronized {
+    assertInvariants()
     assert(numBytes >= 0)
-    if (numBytes > maxStorageMemory) {
+    val (executionPool, storagePool, maxMemory) = memoryMode match {
+      case MemoryMode.ON_HEAP => (
+        onHeapExecutionMemoryPool,
+        onHeapStorageMemoryPool,
+        maxOnHeapStorageMemory)
+      case MemoryMode.OFF_HEAP => (
+        offHeapExecutionMemoryPool,
+        offHeapStorageMemoryPool,
+        maxOffHeapMemory)
+    }
+    if (numBytes > maxMemory) {
       // Fail fast if the block simply won't fit
       logInfo(s"Will not store $blockId as the required space ($numBytes 
bytes) exceeds our " +
-        s"memory limit ($maxStorageMemory bytes)")
+        s"memory limit ($maxMemory bytes)")
       return false
     }
-    if (numBytes > storageMemoryPool.memoryFree) {
+    if (numBytes > storagePool.memoryFree) {
       // There is not enough free memory in the storage pool, so try to borrow 
free memory from
       // the execution pool.
-      val memoryBorrowedFromExecution = 
Math.min(onHeapExecutionMemoryPool.memoryFree, numBytes)
-      onHeapExecutionMemoryPool.decrementPoolSize(memoryBorrowedFromExecution)
-      storageMemoryPool.incrementPoolSize(memoryBorrowedFromExecution)
+      val memoryBorrowedFromExecution = Math.min(executionPool.memoryFree, 
numBytes)
+      executionPool.decrementPoolSize(memoryBorrowedFromExecution)
+      storagePool.incrementPoolSize(memoryBorrowedFromExecution)
     }
-    storageMemoryPool.acquireMemory(blockId, numBytes)
+    storagePool.acquireMemory(blockId, numBytes)
   }
 
-  override def acquireUnrollMemory(blockId: BlockId, numBytes: Long): Boolean 
= synchronized {
-    acquireStorageMemory(blockId, numBytes)
+  override def acquireUnrollMemory(
+      blockId: BlockId,
+      numBytes: Long,
+      memoryMode: MemoryMode): Boolean = synchronized {
+    acquireStorageMemory(blockId, numBytes, memoryMode)
   }
 }
 
@@ -167,8 +193,8 @@ object UnifiedMemoryManager {
     val maxMemory = getMaxMemory(conf)
     new UnifiedMemoryManager(
       conf,
-      maxMemory = maxMemory,
-      storageRegionSize =
+      maxHeapMemory = maxMemory,
+      onHeapStorageRegionSize =
         (maxMemory * conf.getDouble("spark.memory.storageFraction", 
0.5)).toLong,
       numCores = numCores)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/20c0bcd9/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 30d2e23..0c7763f 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -29,7 +29,7 @@ import scala.util.control.NonFatal
 import org.apache.spark._
 import org.apache.spark.executor.{DataReadMethod, ShuffleWriteMetrics}
 import org.apache.spark.internal.Logging
-import org.apache.spark.memory.MemoryManager
+import org.apache.spark.memory.{MemoryManager, MemoryMode}
 import org.apache.spark.network._
 import org.apache.spark.network.buffer.{ManagedBuffer, NettyManagedBuffer}
 import org.apache.spark.network.netty.SparkTransportConf
@@ -94,7 +94,7 @@ private[spark] class BlockManager(
   // However, since we use this only for reporting and logging, what we 
actually want here is
   // the absolute maximum value that `maxStorageMemory` can ever possibly 
reach. We may need
   // to revisit whether reporting this value as the "max" is intuitive to the 
user.
-  private val maxMemory = memoryManager.maxStorageMemory
+  private val maxMemory = memoryManager.maxOnHeapStorageMemory
 
   // Port used by the external shuffle service. In Yarn mode, this may be 
already be
   // set through the Hadoop configuration as the server is launched in the 
Yarn NM.

http://git-wip-us.apache.org/repos/asf/spark/blob/20c0bcd9/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala 
b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
index 38e9534..7d23295 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
@@ -21,6 +21,7 @@ import java.io.{Externalizable, IOException, ObjectInput, 
ObjectOutput}
 import java.util.concurrent.ConcurrentHashMap
 
 import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.memory.MemoryMode
 import org.apache.spark.util.Utils
 
 /**
@@ -65,6 +66,11 @@ class StorageLevel private(
     require(replication == 1, "Off-heap storage level does not support 
multiple replication")
   }
 
+  private[spark] def memoryMode: MemoryMode = {
+    if (useOffHeap) MemoryMode.OFF_HEAP
+    else MemoryMode.ON_HEAP
+  }
+
   override def clone(): StorageLevel = {
     new StorageLevel(useDisk, useMemory, useOffHeap, deserialized, replication)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/20c0bcd9/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala 
b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
index 1a78c9c..3ca41f3 100644
--- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
@@ -29,7 +29,7 @@ import com.google.common.io.ByteStreams
 
 import org.apache.spark.{SparkConf, TaskContext}
 import org.apache.spark.internal.Logging
-import org.apache.spark.memory.MemoryManager
+import org.apache.spark.memory.{MemoryManager, MemoryMode}
 import org.apache.spark.serializer.{SerializationStream, SerializerManager}
 import org.apache.spark.storage.{BlockId, BlockInfoManager, StorageLevel}
 import org.apache.spark.util.{CompletionIterator, SizeEstimator, Utils}
@@ -93,7 +93,7 @@ private[spark] class MemoryStore(
     conf.getLong("spark.storage.unrollMemoryThreshold", 1024 * 1024)
 
   /** Total amount of memory available for storage, in bytes. */
-  private def maxMemory: Long = memoryManager.maxStorageMemory
+  private def maxMemory: Long = memoryManager.maxOnHeapStorageMemory
 
   if (maxMemory < unrollMemoryThreshold) {
     logWarning(s"Max memory ${Utils.bytesToString(maxMemory)} is less than the 
initial memory " +
@@ -133,7 +133,7 @@ private[spark] class MemoryStore(
       size: Long,
       _bytes: () => ChunkedByteBuffer): Boolean = {
     require(!contains(blockId), s"Block $blockId is already present in the 
MemoryStore")
-    if (memoryManager.acquireStorageMemory(blockId, size)) {
+    if (memoryManager.acquireStorageMemory(blockId, size, MemoryMode.ON_HEAP)) 
{
       // We acquired enough memory for the block, so go ahead and put it
       val bytes = _bytes()
       assert(bytes.size == size)
@@ -229,7 +229,7 @@ private[spark] class MemoryStore(
         // Synchronize so that transfer is atomic
         memoryManager.synchronized {
           releaseUnrollMemoryForThisTask(amount)
-          val success = memoryManager.acquireStorageMemory(blockId, amount)
+          val success = memoryManager.acquireStorageMemory(blockId, amount, 
MemoryMode.ON_HEAP)
           assert(success, "transferring unroll memory to storage memory 
failed")
         }
       }
@@ -237,7 +237,8 @@ private[spark] class MemoryStore(
       val enoughStorageMemory = {
         if (unrollMemoryUsedByThisBlock <= size) {
           val acquiredExtra =
-            memoryManager.acquireStorageMemory(blockId, size - 
unrollMemoryUsedByThisBlock)
+            memoryManager.acquireStorageMemory(
+              blockId, size - unrollMemoryUsedByThisBlock, MemoryMode.ON_HEAP)
           if (acquiredExtra) {
             transferUnrollToStorage(unrollMemoryUsedByThisBlock)
           }
@@ -353,7 +354,7 @@ private[spark] class MemoryStore(
       // Synchronize so that transfer is atomic
       memoryManager.synchronized {
         releaseUnrollMemoryForThisTask(unrollMemoryUsedByThisBlock)
-        val success = memoryManager.acquireStorageMemory(blockId, entry.size)
+        val success = memoryManager.acquireStorageMemory(blockId, entry.size, 
MemoryMode.ON_HEAP)
         assert(success, "transferring unroll memory to storage memory failed")
       }
       entries.synchronized {
@@ -406,7 +407,7 @@ private[spark] class MemoryStore(
       entries.remove(blockId)
     }
     if (entry != null) {
-      memoryManager.releaseStorageMemory(entry.size)
+      memoryManager.releaseStorageMemory(entry.size, MemoryMode.ON_HEAP)
       logInfo(s"Block $blockId of size ${entry.size} dropped " +
         s"from memory (free ${maxMemory - blocksMemoryUsed})")
       true
@@ -531,7 +532,7 @@ private[spark] class MemoryStore(
    */
   def reserveUnrollMemoryForThisTask(blockId: BlockId, memory: Long): Boolean 
= {
     memoryManager.synchronized {
-      val success = memoryManager.acquireUnrollMemory(blockId, memory)
+      val success = memoryManager.acquireUnrollMemory(blockId, memory, 
MemoryMode.ON_HEAP)
       if (success) {
         val taskAttemptId = currentTaskAttemptId()
         unrollMemoryMap(taskAttemptId) = 
unrollMemoryMap.getOrElse(taskAttemptId, 0L) + memory
@@ -554,7 +555,7 @@ private[spark] class MemoryStore(
           if (unrollMemoryMap(taskAttemptId) == 0) {
             unrollMemoryMap.remove(taskAttemptId)
           }
-          memoryManager.releaseUnrollMemory(memoryToRelease)
+          memoryManager.releaseUnrollMemory(memoryToRelease, 
MemoryMode.ON_HEAP)
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/20c0bcd9/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
index 686e948..aaca653 100644
--- a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
@@ -100,7 +100,7 @@ private[memory] trait MemoryManagerSuite extends 
SparkFunSuite with BeforeAndAft
         evictBlocksToFreeSpaceCalled.set(numBytesToFree)
         if (numBytesToFree <= mm.storageMemoryUsed) {
           // We can evict enough blocks to fulfill the request for space
-          mm.releaseStorageMemory(numBytesToFree)
+          mm.releaseStorageMemory(numBytesToFree, MemoryMode.ON_HEAP)
           evictedBlocks.append(
             (null, BlockStatus(StorageLevel.MEMORY_ONLY, numBytesToFree, 0L)))
           numBytesToFree

http://git-wip-us.apache.org/repos/asf/spark/blob/20c0bcd9/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
index 741d4fd..4e31fb5 100644
--- a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
@@ -35,7 +35,7 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite {
     val mm = new StaticMemoryManager(
       conf,
       maxOnHeapExecutionMemory = maxExecutionMem,
-      maxStorageMemory = maxStorageMem,
+      maxOnHeapStorageMemory = maxStorageMem,
       numCores = 1)
     val ms = makeMemoryStore(mm)
     (mm, ms)
@@ -50,7 +50,7 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite {
         .set("spark.testing.memory", maxOnHeapExecutionMemory.toString)
         .set("spark.memory.offHeap.size", maxOffHeapExecutionMemory.toString),
       maxOnHeapExecutionMemory = maxOnHeapExecutionMemory,
-      maxStorageMemory = 0,
+      maxOnHeapStorageMemory = 0,
       numCores = 1)
   }
 
@@ -58,22 +58,23 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite {
     val maxExecutionMem = 1000L
     val taskAttemptId = 0L
     val (mm, _) = makeThings(maxExecutionMem, Long.MaxValue)
+    val memoryMode = MemoryMode.ON_HEAP
     assert(mm.executionMemoryUsed === 0L)
-    assert(mm.acquireExecutionMemory(10L, taskAttemptId, MemoryMode.ON_HEAP) 
=== 10L)
+    assert(mm.acquireExecutionMemory(10L, taskAttemptId, memoryMode) === 10L)
     assert(mm.executionMemoryUsed === 10L)
-    assert(mm.acquireExecutionMemory(100L, taskAttemptId, MemoryMode.ON_HEAP) 
=== 100L)
+    assert(mm.acquireExecutionMemory(100L, taskAttemptId, memoryMode) === 100L)
     // Acquire up to the max
-    assert(mm.acquireExecutionMemory(1000L, taskAttemptId, MemoryMode.ON_HEAP) 
=== 890L)
+    assert(mm.acquireExecutionMemory(1000L, taskAttemptId, memoryMode) === 
890L)
     assert(mm.executionMemoryUsed === maxExecutionMem)
-    assert(mm.acquireExecutionMemory(1L, taskAttemptId, MemoryMode.ON_HEAP) 
=== 0L)
+    assert(mm.acquireExecutionMemory(1L, taskAttemptId, memoryMode) === 0L)
     assert(mm.executionMemoryUsed === maxExecutionMem)
-    mm.releaseExecutionMemory(800L, taskAttemptId, MemoryMode.ON_HEAP)
+    mm.releaseExecutionMemory(800L, taskAttemptId, memoryMode)
     assert(mm.executionMemoryUsed === 200L)
     // Acquire after release
-    assert(mm.acquireExecutionMemory(1L, taskAttemptId, MemoryMode.ON_HEAP) 
=== 1L)
+    assert(mm.acquireExecutionMemory(1L, taskAttemptId, memoryMode) === 1L)
     assert(mm.executionMemoryUsed === 201L)
     // Release beyond what was acquired
-    mm.releaseExecutionMemory(maxExecutionMem, taskAttemptId, 
MemoryMode.ON_HEAP)
+    mm.releaseExecutionMemory(maxExecutionMem, taskAttemptId, memoryMode)
     assert(mm.executionMemoryUsed === 0L)
   }
 
@@ -81,23 +82,24 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite {
     val maxStorageMem = 1000L
     val dummyBlock = TestBlockId("you can see the world you brought to live")
     val (mm, ms) = makeThings(Long.MaxValue, maxStorageMem)
+    val memoryMode = MemoryMode.ON_HEAP
     assert(mm.storageMemoryUsed === 0L)
-    assert(mm.acquireStorageMemory(dummyBlock, 10L))
+    assert(mm.acquireStorageMemory(dummyBlock, 10L, memoryMode))
     assertEvictBlocksToFreeSpaceNotCalled(ms)
     assert(mm.storageMemoryUsed === 10L)
 
-    assert(mm.acquireStorageMemory(dummyBlock, 100L))
+    assert(mm.acquireStorageMemory(dummyBlock, 100L, memoryMode))
     assertEvictBlocksToFreeSpaceNotCalled(ms)
     assert(mm.storageMemoryUsed === 110L)
     // Acquire more than the max, not granted
-    assert(!mm.acquireStorageMemory(dummyBlock, maxStorageMem + 1L))
+    assert(!mm.acquireStorageMemory(dummyBlock, maxStorageMem + 1L, 
memoryMode))
     assertEvictBlocksToFreeSpaceNotCalled(ms)
     assert(mm.storageMemoryUsed === 110L)
     // Acquire up to the max, requests after this are still granted due to LRU 
eviction
-    assert(mm.acquireStorageMemory(dummyBlock, maxStorageMem))
+    assert(mm.acquireStorageMemory(dummyBlock, maxStorageMem, memoryMode))
     assertEvictBlocksToFreeSpaceCalled(ms, 110L)
     assert(mm.storageMemoryUsed === 1000L)
-    assert(mm.acquireStorageMemory(dummyBlock, 1L))
+    assert(mm.acquireStorageMemory(dummyBlock, 1L, memoryMode))
     assertEvictBlocksToFreeSpaceCalled(ms, 1L)
     assert(evictedBlocks.nonEmpty)
     evictedBlocks.clear()
@@ -105,19 +107,19 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite 
{
     // 1000 bytes. This is different from real behavior, where the 1-byte 
block would have evicted
     // the 1000-byte block entirely. This is set up differently so we can 
write finer-grained tests.
     assert(mm.storageMemoryUsed === 1000L)
-    mm.releaseStorageMemory(800L)
+    mm.releaseStorageMemory(800L, memoryMode)
     assert(mm.storageMemoryUsed === 200L)
     // Acquire after release
-    assert(mm.acquireStorageMemory(dummyBlock, 1L))
+    assert(mm.acquireStorageMemory(dummyBlock, 1L, memoryMode))
     assertEvictBlocksToFreeSpaceNotCalled(ms)
     assert(mm.storageMemoryUsed === 201L)
     mm.releaseAllStorageMemory()
     assert(mm.storageMemoryUsed === 0L)
-    assert(mm.acquireStorageMemory(dummyBlock, 1L))
+    assert(mm.acquireStorageMemory(dummyBlock, 1L, memoryMode))
     assertEvictBlocksToFreeSpaceNotCalled(ms)
     assert(mm.storageMemoryUsed === 1L)
     // Release beyond what was acquired
-    mm.releaseStorageMemory(100L)
+    mm.releaseStorageMemory(100L, memoryMode)
     assert(mm.storageMemoryUsed === 0L)
   }
 
@@ -127,20 +129,21 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite 
{
     val taskAttemptId = 0L
     val dummyBlock = TestBlockId("ain't nobody love like you do")
     val (mm, ms) = makeThings(maxExecutionMem, maxStorageMem)
+    val memoryMode = MemoryMode.ON_HEAP
     // Only execution memory should increase
-    assert(mm.acquireExecutionMemory(100L, taskAttemptId, MemoryMode.ON_HEAP) 
=== 100L)
+    assert(mm.acquireExecutionMemory(100L, taskAttemptId, memoryMode) === 100L)
     assert(mm.storageMemoryUsed === 0L)
     assert(mm.executionMemoryUsed === 100L)
-    assert(mm.acquireExecutionMemory(1000L, taskAttemptId, MemoryMode.ON_HEAP) 
=== 100L)
+    assert(mm.acquireExecutionMemory(1000L, taskAttemptId, memoryMode) === 
100L)
     assert(mm.storageMemoryUsed === 0L)
     assert(mm.executionMemoryUsed === 200L)
     // Only storage memory should increase
-    assert(mm.acquireStorageMemory(dummyBlock, 50L))
+    assert(mm.acquireStorageMemory(dummyBlock, 50L, memoryMode))
     assertEvictBlocksToFreeSpaceNotCalled(ms)
     assert(mm.storageMemoryUsed === 50L)
     assert(mm.executionMemoryUsed === 200L)
     // Only execution memory should be released
-    mm.releaseExecutionMemory(133L, taskAttemptId, MemoryMode.ON_HEAP)
+    mm.releaseExecutionMemory(133L, taskAttemptId, memoryMode)
     assert(mm.storageMemoryUsed === 50L)
     assert(mm.executionMemoryUsed === 67L)
     // Only storage memory should be released
@@ -153,21 +156,22 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite 
{
     val maxStorageMem = 1000L
     val dummyBlock = TestBlockId("lonely water")
     val (mm, ms) = makeThings(Long.MaxValue, maxStorageMem)
-    assert(mm.acquireUnrollMemory(dummyBlock, 100L))
+    val memoryMode = MemoryMode.ON_HEAP
+    assert(mm.acquireUnrollMemory(dummyBlock, 100L, memoryMode))
     when(ms.currentUnrollMemory).thenReturn(100L)
     assertEvictBlocksToFreeSpaceNotCalled(ms)
     assert(mm.storageMemoryUsed === 100L)
-    mm.releaseUnrollMemory(40L)
+    mm.releaseUnrollMemory(40L, memoryMode)
     assert(mm.storageMemoryUsed === 60L)
     when(ms.currentUnrollMemory).thenReturn(60L)
-    assert(mm.acquireStorageMemory(dummyBlock, 800L))
+    assert(mm.acquireStorageMemory(dummyBlock, 800L, memoryMode))
     assertEvictBlocksToFreeSpaceNotCalled(ms)
     assert(mm.storageMemoryUsed === 860L)
     // `spark.storage.unrollFraction` is 0.4, so the max unroll space is 400 
bytes.
     // As of this point, cache memory is 800 bytes and current unroll memory 
is 60 bytes.
     // Requesting 240 more bytes of unroll memory will leave our total unroll 
memory at
     // 300 bytes, still under the 400-byte limit. Therefore, all 240 bytes are 
granted.
-    assert(mm.acquireUnrollMemory(dummyBlock, 240L))
+    assert(mm.acquireUnrollMemory(dummyBlock, 240L, memoryMode))
     assertEvictBlocksToFreeSpaceCalled(ms, 100L) // 860 + 240 - 1000
     when(ms.currentUnrollMemory).thenReturn(300L) // 60 + 240
     assert(mm.storageMemoryUsed === 1000L)
@@ -175,11 +179,11 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite 
{
     // We already have 300 bytes of unroll memory, so requesting 150 more will 
leave us
     // above the 400-byte limit. Since there is not enough free memory, this 
request will
     // fail even after evicting as much as we can (400 - 300 = 100 bytes).
-    assert(!mm.acquireUnrollMemory(dummyBlock, 150L))
+    assert(!mm.acquireUnrollMemory(dummyBlock, 150L, memoryMode))
     assertEvictBlocksToFreeSpaceCalled(ms, 100L)
     assert(mm.storageMemoryUsed === 900L)
     // Release beyond what was acquired
-    mm.releaseUnrollMemory(maxStorageMem)
+    mm.releaseUnrollMemory(maxStorageMem, memoryMode)
     assert(mm.storageMemoryUsed === 0L)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/20c0bcd9/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala 
b/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala
index 6dad3f4..6a4f409 100644
--- a/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala
+++ b/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala
@@ -39,16 +39,22 @@ class TestMemoryManager(conf: SparkConf)
       grant
     }
   }
-  override def acquireStorageMemory(blockId: BlockId, numBytes: Long): Boolean 
= true
-  override def acquireUnrollMemory(blockId: BlockId, numBytes: Long): Boolean 
= true
-  override def releaseStorageMemory(numBytes: Long): Unit = {}
+  override def acquireStorageMemory(
+      blockId: BlockId,
+      numBytes: Long,
+      memoryMode: MemoryMode): Boolean = true
+  override def acquireUnrollMemory(
+      blockId: BlockId,
+      numBytes: Long,
+     memoryMode: MemoryMode): Boolean = true
+  override def releaseStorageMemory(numBytes: Long, memoryMode: MemoryMode): 
Unit = {}
   override private[memory] def releaseExecutionMemory(
       numBytes: Long,
       taskAttemptId: Long,
       memoryMode: MemoryMode): Unit = {
     available += numBytes
   }
-  override def maxStorageMemory: Long = Long.MaxValue
+  override def maxOnHeapStorageMemory: Long = Long.MaxValue
 
   private var oomOnce = false
   private var available = Long.MaxValue

http://git-wip-us.apache.org/repos/asf/spark/blob/20c0bcd9/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
index 9001a26..1425581 100644
--- 
a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
@@ -52,47 +52,49 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite 
with PrivateMethodTes
     val maxMemory = 1000L
     val taskAttemptId = 0L
     val (mm, _) = makeThings(maxMemory)
+    val memoryMode = MemoryMode.ON_HEAP
     assert(mm.executionMemoryUsed === 0L)
-    assert(mm.acquireExecutionMemory(10L, taskAttemptId, MemoryMode.ON_HEAP) 
=== 10L)
+    assert(mm.acquireExecutionMemory(10L, taskAttemptId, memoryMode) === 10L)
     assert(mm.executionMemoryUsed === 10L)
-    assert(mm.acquireExecutionMemory(100L, taskAttemptId, MemoryMode.ON_HEAP) 
=== 100L)
+    assert(mm.acquireExecutionMemory(100L, taskAttemptId, memoryMode) === 100L)
     // Acquire up to the max
-    assert(mm.acquireExecutionMemory(1000L, taskAttemptId, MemoryMode.ON_HEAP) 
=== 890L)
+    assert(mm.acquireExecutionMemory(1000L, taskAttemptId, memoryMode) === 
890L)
     assert(mm.executionMemoryUsed === maxMemory)
-    assert(mm.acquireExecutionMemory(1L, taskAttemptId, MemoryMode.ON_HEAP) 
=== 0L)
+    assert(mm.acquireExecutionMemory(1L, taskAttemptId, memoryMode) === 0L)
     assert(mm.executionMemoryUsed === maxMemory)
-    mm.releaseExecutionMemory(800L, taskAttemptId, MemoryMode.ON_HEAP)
+    mm.releaseExecutionMemory(800L, taskAttemptId, memoryMode)
     assert(mm.executionMemoryUsed === 200L)
     // Acquire after release
-    assert(mm.acquireExecutionMemory(1L, taskAttemptId, MemoryMode.ON_HEAP) 
=== 1L)
+    assert(mm.acquireExecutionMemory(1L, taskAttemptId, memoryMode) === 1L)
     assert(mm.executionMemoryUsed === 201L)
     // Release beyond what was acquired
-    mm.releaseExecutionMemory(maxMemory, taskAttemptId, MemoryMode.ON_HEAP)
+    mm.releaseExecutionMemory(maxMemory, taskAttemptId, memoryMode)
     assert(mm.executionMemoryUsed === 0L)
   }
 
   test("basic storage memory") {
     val maxMemory = 1000L
     val (mm, ms) = makeThings(maxMemory)
+    val memoryMode = MemoryMode.ON_HEAP
     assert(mm.storageMemoryUsed === 0L)
-    assert(mm.acquireStorageMemory(dummyBlock, 10L))
+    assert(mm.acquireStorageMemory(dummyBlock, 10L, memoryMode))
     assertEvictBlocksToFreeSpaceNotCalled(ms)
     assert(mm.storageMemoryUsed === 10L)
 
-    assert(mm.acquireStorageMemory(dummyBlock, 100L))
+    assert(mm.acquireStorageMemory(dummyBlock, 100L, memoryMode))
     assertEvictBlocksToFreeSpaceNotCalled(ms)
     assert(mm.storageMemoryUsed === 110L)
     // Acquire more than the max, not granted
-    assert(!mm.acquireStorageMemory(dummyBlock, maxMemory + 1L))
+    assert(!mm.acquireStorageMemory(dummyBlock, maxMemory + 1L, memoryMode))
     assertEvictBlocksToFreeSpaceNotCalled(ms)
     assert(mm.storageMemoryUsed === 110L)
     // Acquire up to the max, requests after this are still granted due to LRU 
eviction
-    assert(mm.acquireStorageMemory(dummyBlock, maxMemory))
+    assert(mm.acquireStorageMemory(dummyBlock, maxMemory, memoryMode))
     assertEvictBlocksToFreeSpaceCalled(ms, 110L)
     assert(mm.storageMemoryUsed === 1000L)
     assert(evictedBlocks.nonEmpty)
     evictedBlocks.clear()
-    assert(mm.acquireStorageMemory(dummyBlock, 1L))
+    assert(mm.acquireStorageMemory(dummyBlock, 1L, memoryMode))
     assertEvictBlocksToFreeSpaceCalled(ms, 1L)
     assert(evictedBlocks.nonEmpty)
     evictedBlocks.clear()
@@ -100,19 +102,19 @@ class UnifiedMemoryManagerSuite extends 
MemoryManagerSuite with PrivateMethodTes
     // 1000 bytes. This is different from real behavior, where the 1-byte 
block would have evicted
     // the 1000-byte block entirely. This is set up differently so we can 
write finer-grained tests.
     assert(mm.storageMemoryUsed === 1000L)
-    mm.releaseStorageMemory(800L)
+    mm.releaseStorageMemory(800L, memoryMode)
     assert(mm.storageMemoryUsed === 200L)
     // Acquire after release
-    assert(mm.acquireStorageMemory(dummyBlock, 1L))
+    assert(mm.acquireStorageMemory(dummyBlock, 1L, memoryMode))
     assertEvictBlocksToFreeSpaceNotCalled(ms)
     assert(mm.storageMemoryUsed === 201L)
     mm.releaseAllStorageMemory()
     assert(mm.storageMemoryUsed === 0L)
-    assert(mm.acquireStorageMemory(dummyBlock, 1L))
+    assert(mm.acquireStorageMemory(dummyBlock, 1L, memoryMode))
     assertEvictBlocksToFreeSpaceNotCalled(ms)
     assert(mm.storageMemoryUsed === 1L)
     // Release beyond what was acquired
-    mm.releaseStorageMemory(100L)
+    mm.releaseStorageMemory(100L, memoryMode)
     assert(mm.storageMemoryUsed === 0L)
   }
 
@@ -120,18 +122,19 @@ class UnifiedMemoryManagerSuite extends 
MemoryManagerSuite with PrivateMethodTes
     val maxMemory = 1000L
     val taskAttemptId = 0L
     val (mm, ms) = makeThings(maxMemory)
+    val memoryMode = MemoryMode.ON_HEAP
     // Acquire enough storage memory to exceed the storage region
-    assert(mm.acquireStorageMemory(dummyBlock, 750L))
+    assert(mm.acquireStorageMemory(dummyBlock, 750L, memoryMode))
     assertEvictBlocksToFreeSpaceNotCalled(ms)
     assert(mm.executionMemoryUsed === 0L)
     assert(mm.storageMemoryUsed === 750L)
     // Execution needs to request 250 bytes to evict storage memory
-    assert(mm.acquireExecutionMemory(100L, taskAttemptId, MemoryMode.ON_HEAP) 
=== 100L)
+    assert(mm.acquireExecutionMemory(100L, taskAttemptId, memoryMode) === 100L)
     assert(mm.executionMemoryUsed === 100L)
     assert(mm.storageMemoryUsed === 750L)
     assertEvictBlocksToFreeSpaceNotCalled(ms)
     // Execution wants 200 bytes but only 150 are free, so storage is evicted
-    assert(mm.acquireExecutionMemory(200L, taskAttemptId, MemoryMode.ON_HEAP) 
=== 200L)
+    assert(mm.acquireExecutionMemory(200L, taskAttemptId, memoryMode) === 200L)
     assert(mm.executionMemoryUsed === 300L)
     assert(mm.storageMemoryUsed === 700L)
     assertEvictBlocksToFreeSpaceCalled(ms, 50L)
@@ -141,13 +144,13 @@ class UnifiedMemoryManagerSuite extends 
MemoryManagerSuite with PrivateMethodTes
     require(mm.executionMemoryUsed === 300L)
     require(mm.storageMemoryUsed === 0, "bad test: all storage memory should 
have been released")
     // Acquire some storage memory again, but this time keep it within the 
storage region
-    assert(mm.acquireStorageMemory(dummyBlock, 400L))
+    assert(mm.acquireStorageMemory(dummyBlock, 400L, memoryMode))
     assertEvictBlocksToFreeSpaceNotCalled(ms)
     assert(mm.storageMemoryUsed === 400L)
     assert(mm.executionMemoryUsed === 300L)
     // Execution cannot evict storage because the latter is within the storage 
fraction,
     // so grant only what's remaining without evicting anything, i.e. 1000 - 
300 - 400 = 300
-    assert(mm.acquireExecutionMemory(400L, taskAttemptId, MemoryMode.ON_HEAP) 
=== 300L)
+    assert(mm.acquireExecutionMemory(400L, taskAttemptId, memoryMode) === 300L)
     assert(mm.executionMemoryUsed === 600L)
     assert(mm.storageMemoryUsed === 400L)
     assertEvictBlocksToFreeSpaceNotCalled(ms)
@@ -157,8 +160,9 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite 
with PrivateMethodTes
     val maxMemory = 1000L
     val taskAttemptId = 0L
     val (mm, ms) = makeThings(maxMemory)
+    val memoryMode = MemoryMode.ON_HEAP
     // Acquire enough storage memory to exceed the storage region size
-    assert(mm.acquireStorageMemory(dummyBlock, 700L))
+    assert(mm.acquireStorageMemory(dummyBlock, 700L, memoryMode))
     assertEvictBlocksToFreeSpaceNotCalled(ms)
     assert(mm.executionMemoryUsed === 0L)
     assert(mm.storageMemoryUsed === 700L)
@@ -166,7 +170,7 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite 
with PrivateMethodTes
     // mistakenly think that the 300 bytes of free space was still available 
even after
     // using it to expand the execution pool. Consequently, no storage memory 
was released
     // and the following call granted only 300 bytes to execution.
-    assert(mm.acquireExecutionMemory(500L, taskAttemptId, MemoryMode.ON_HEAP) 
=== 500L)
+    assert(mm.acquireExecutionMemory(500L, taskAttemptId, memoryMode) === 500L)
     assertEvictBlocksToFreeSpaceCalled(ms, 200L)
     assert(mm.storageMemoryUsed === 500L)
     assert(mm.executionMemoryUsed === 500L)
@@ -177,34 +181,35 @@ class UnifiedMemoryManagerSuite extends 
MemoryManagerSuite with PrivateMethodTes
     val maxMemory = 1000L
     val taskAttemptId = 0L
     val (mm, ms) = makeThings(maxMemory)
+    val memoryMode = MemoryMode.ON_HEAP
     // Acquire enough execution memory to exceed the execution region
-    assert(mm.acquireExecutionMemory(800L, taskAttemptId, MemoryMode.ON_HEAP) 
=== 800L)
+    assert(mm.acquireExecutionMemory(800L, taskAttemptId, memoryMode) === 800L)
     assert(mm.executionMemoryUsed === 800L)
     assert(mm.storageMemoryUsed === 0L)
     assertEvictBlocksToFreeSpaceNotCalled(ms)
     // Storage should not be able to evict execution
-    assert(mm.acquireStorageMemory(dummyBlock, 100L))
+    assert(mm.acquireStorageMemory(dummyBlock, 100L, memoryMode))
     assert(mm.executionMemoryUsed === 800L)
     assert(mm.storageMemoryUsed === 100L)
     assertEvictBlocksToFreeSpaceNotCalled(ms)
-    assert(!mm.acquireStorageMemory(dummyBlock, 250L))
+    assert(!mm.acquireStorageMemory(dummyBlock, 250L, memoryMode))
     assert(mm.executionMemoryUsed === 800L)
     assert(mm.storageMemoryUsed === 100L)
     // Do not attempt to evict blocks, since evicting will not free enough 
memory:
     assertEvictBlocksToFreeSpaceNotCalled(ms)
-    mm.releaseExecutionMemory(maxMemory, taskAttemptId, MemoryMode.ON_HEAP)
-    mm.releaseStorageMemory(maxMemory)
+    mm.releaseExecutionMemory(maxMemory, taskAttemptId, memoryMode)
+    mm.releaseStorageMemory(maxMemory, memoryMode)
     // Acquire some execution memory again, but this time keep it within the 
execution region
-    assert(mm.acquireExecutionMemory(200L, taskAttemptId, MemoryMode.ON_HEAP) 
=== 200L)
+    assert(mm.acquireExecutionMemory(200L, taskAttemptId, memoryMode) === 200L)
     assert(mm.executionMemoryUsed === 200L)
     assert(mm.storageMemoryUsed === 0L)
     assertEvictBlocksToFreeSpaceNotCalled(ms)
     // Storage should still not be able to evict execution
-    assert(mm.acquireStorageMemory(dummyBlock, 750L))
+    assert(mm.acquireStorageMemory(dummyBlock, 750L, memoryMode))
     assert(mm.executionMemoryUsed === 200L)
     assert(mm.storageMemoryUsed === 750L)
     assertEvictBlocksToFreeSpaceNotCalled(ms) // since there were 800 bytes 
free
-    assert(!mm.acquireStorageMemory(dummyBlock, 850L))
+    assert(!mm.acquireStorageMemory(dummyBlock, 850L, memoryMode))
     assert(mm.executionMemoryUsed === 200L)
     assert(mm.storageMemoryUsed === 750L)
     // Do not attempt to evict blocks, since evicting will not free enough 
memory:
@@ -221,7 +226,7 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite 
with PrivateMethodTes
       .set("spark.testing.reservedMemory", reservedMemory.toString)
     val mm = UnifiedMemoryManager(conf, numCores = 1)
     val expectedMaxMemory = ((systemMemory - reservedMemory) * 
memoryFraction).toLong
-    assert(mm.maxMemory === expectedMaxMemory)
+    assert(mm.maxHeapMemory === expectedMaxMemory)
 
     // Try using a system memory that's too small
     val conf2 = conf.clone().set("spark.testing.memory", (reservedMemory / 
2).toString)
@@ -256,18 +261,19 @@ class UnifiedMemoryManagerSuite extends 
MemoryManagerSuite with PrivateMethodTes
       .set("spark.testing.memory", "1000")
     val mm = UnifiedMemoryManager(conf, numCores = 2)
     val ms = makeMemoryStore(mm)
-    assert(mm.maxMemory === 1000)
+    val memoryMode = MemoryMode.ON_HEAP
+    assert(mm.maxHeapMemory === 1000)
     // Have two tasks each acquire some execution memory so that the memory 
pool registers that
     // there are two active tasks:
-    assert(mm.acquireExecutionMemory(100L, 0, MemoryMode.ON_HEAP) === 100L)
-    assert(mm.acquireExecutionMemory(100L, 1, MemoryMode.ON_HEAP) === 100L)
+    assert(mm.acquireExecutionMemory(100L, 0, memoryMode) === 100L)
+    assert(mm.acquireExecutionMemory(100L, 1, memoryMode) === 100L)
     // Fill up all of the remaining memory with storage.
-    assert(mm.acquireStorageMemory(dummyBlock, 800L))
+    assert(mm.acquireStorageMemory(dummyBlock, 800L, memoryMode))
     assertEvictBlocksToFreeSpaceNotCalled(ms)
     assert(mm.storageMemoryUsed === 800)
     assert(mm.executionMemoryUsed === 200)
     // A task should still be able to allocate 100 bytes execution memory by 
evicting blocks
-    assert(mm.acquireExecutionMemory(100L, 0, MemoryMode.ON_HEAP) === 100L)
+    assert(mm.acquireExecutionMemory(100L, 0, memoryMode) === 100L)
     assertEvictBlocksToFreeSpaceCalled(ms, 100L)
     assert(mm.executionMemoryUsed === 300)
     assert(mm.storageMemoryUsed === 700)

http://git-wip-us.apache.org/repos/asf/spark/blob/20c0bcd9/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 7a4cb39..6fc32cb 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -34,7 +34,7 @@ import org.scalatest.concurrent.Timeouts._
 
 import org.apache.spark._
 import org.apache.spark.executor.DataReadMethod
-import org.apache.spark.memory.StaticMemoryManager
+import org.apache.spark.memory.{MemoryMode, StaticMemoryManager}
 import org.apache.spark.network.{BlockDataManager, BlockTransferService}
 import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
 import org.apache.spark.network.netty.NettyBlockTransferService
@@ -821,7 +821,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers 
with BeforeAndAfterE
     val memoryManager = new StaticMemoryManager(
       conf,
       maxOnHeapExecutionMemory = Long.MaxValue,
-      maxStorageMemory = 1200,
+      maxOnHeapStorageMemory = 1200,
       numCores = 1)
     val serializerManager = new SerializerManager(new JavaSerializer(conf), 
conf)
     store = new BlockManager(SparkContext.DRIVER_IDENTIFIER, rpcEnv, master,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to