[SPARK-10983] Unified memory manager

This patch unifies the memory management of the storage and execution regions 
such that either side can borrow memory from each other. When memory pressure 
arises, storage will be evicted in favor of execution. To avoid regressions in 
cases where storage is crucial, we dynamically allocate a fraction of space for 
storage that execution cannot evict. Several configurations are introduced:

- **spark.memory.fraction (default 0.75)**: ​fraction of the heap space used 
for execution and storage. The lower this is, the more frequently spills and 
cached data eviction occur. The purpose of this config is to set aside memory 
for internal metadata, user data structures, and imprecise size estimation in 
the case of sparse, unusually large records.

- **spark.memory.storageFraction (default 0.5)**: size of the storage region 
within the space set aside by `s​park.memory.fraction`. ​Cached data may 
only be evicted if total storage exceeds this region.

- **spark.memory.useLegacyMode (default false)**: whether to use the memory 
management that existed in Spark 1.5 and before. This is mainly for backward 
compatibility.

For a detailed description of the design, see 
[SPARK-10000](https://issues.apache.org/jira/browse/SPARK-10000). This patch 
builds on top of the `MemoryManager` interface introduced in #9000.

Author: Andrew Or <and...@databricks.com>

Closes #9084 from andrewor14/unified-memory-manager.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b3ffac51
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b3ffac51
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b3ffac51

Branch: refs/heads/master
Commit: b3ffac5178795f2d8e7908b3e77e8e89f50b5f6f
Parents: 2b574f5
Author: Andrew Or <and...@databricks.com>
Authored: Tue Oct 13 13:49:59 2015 -0700
Committer: Josh Rosen <joshro...@databricks.com>
Committed: Tue Oct 13 13:49:59 2015 -0700

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/SparkConf.scala |  23 +-
 .../main/scala/org/apache/spark/SparkEnv.scala  |  11 +-
 .../org/apache/spark/memory/MemoryManager.scala |  83 ++++++--
 .../spark/memory/StaticMemoryManager.scala      | 105 +++-------
 .../spark/memory/UnifiedMemoryManager.scala     | 141 +++++++++++++
 .../spark/shuffle/ShuffleMemoryManager.scala    |  38 ++--
 .../org/apache/spark/storage/BlockManager.scala |   4 +
 .../org/apache/spark/storage/MemoryStore.scala  | 121 +++++++----
 .../util/collection/ExternalAppendOnlyMap.scala |  10 -
 .../org/apache/spark/DistributedSuite.scala     |   7 +-
 .../scala/org/apache/spark/ShuffleSuite.scala   |   6 +-
 .../spark/memory/MemoryManagerSuite.scala       | 133 ++++++++++++
 .../spark/memory/StaticMemoryManagerSuite.scala | 105 ++++------
 .../memory/UnifiedMemoryManagerSuite.scala      | 208 +++++++++++++++++++
 .../shuffle/ShuffleMemoryManagerSuite.scala     |   5 +-
 .../shuffle/unsafe/UnsafeShuffleSuite.scala     |   3 -
 .../collection/ExternalAppendOnlyMapSuite.scala |   9 +-
 .../util/collection/ExternalSorterSuite.scala   |  23 +-
 docs/configuration.md                           |  99 ++++++---
 .../execution/TestShuffleMemoryManager.scala    |  10 +-
 .../execution/UnsafeRowSerializerSuite.scala    |   2 +-
 21 files changed, 840 insertions(+), 306 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b3ffac51/core/src/main/scala/org/apache/spark/SparkConf.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala 
b/core/src/main/scala/org/apache/spark/SparkConf.scala
index b344b5e..1a0ac3d 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -418,16 +418,35 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable 
with Logging {
     }
 
     // Validate memory fractions
-    val memoryKeys = Seq(
+    val deprecatedMemoryKeys = Seq(
       "spark.storage.memoryFraction",
       "spark.shuffle.memoryFraction",
       "spark.shuffle.safetyFraction",
       "spark.storage.unrollFraction",
       "spark.storage.safetyFraction")
+    val memoryKeys = Seq(
+      "spark.memory.fraction",
+      "spark.memory.storageFraction") ++
+      deprecatedMemoryKeys
     for (key <- memoryKeys) {
       val value = getDouble(key, 0.5)
       if (value > 1 || value < 0) {
-        throw new IllegalArgumentException("$key should be between 0 and 1 
(was '$value').")
+        throw new IllegalArgumentException(s"$key should be between 0 and 1 
(was '$value').")
+      }
+    }
+
+    // Warn against deprecated memory fractions (unless legacy memory 
management mode is enabled)
+    val legacyMemoryManagementKey = "spark.memory.useLegacyMode"
+    val legacyMemoryManagement = getBoolean(legacyMemoryManagementKey, false)
+    if (!legacyMemoryManagement) {
+      val keyset = deprecatedMemoryKeys.toSet
+      val detected = settings.keys().asScala.filter(keyset.contains)
+      if (detected.nonEmpty) {
+        logWarning("Detected deprecated memory fraction settings: " +
+          detected.mkString("[", ", ", "]") + ". As of Spark 1.6, execution 
and storage " +
+          "memory management are unified. All memory fractions used in the old 
model are " +
+          "now deprecated and no longer read. If you wish to use the old 
memory management, " +
+          s"you may explicitly enable `$legacyMemoryManagementKey` (not 
recommended).")
       }
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b3ffac51/core/src/main/scala/org/apache/spark/SparkEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala 
b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index df3d84a..c329983 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -30,7 +30,7 @@ import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.api.python.PythonWorkerFactory
 import org.apache.spark.broadcast.BroadcastManager
 import org.apache.spark.metrics.MetricsSystem
-import org.apache.spark.memory.{MemoryManager, StaticMemoryManager}
+import org.apache.spark.memory.{MemoryManager, StaticMemoryManager, 
UnifiedMemoryManager}
 import org.apache.spark.network.BlockTransferService
 import org.apache.spark.network.netty.NettyBlockTransferService
 import org.apache.spark.rpc.{RpcEndpointRef, RpcEndpoint, RpcEnv}
@@ -335,7 +335,14 @@ object SparkEnv extends Logging {
     val shuffleMgrClass = 
shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
     val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
 
-    val memoryManager = new StaticMemoryManager(conf)
+    val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode", 
false)
+    val memoryManager: MemoryManager =
+      if (useLegacyMemoryManager) {
+        new StaticMemoryManager(conf)
+      } else {
+        new UnifiedMemoryManager(conf)
+      }
+
     val shuffleMemoryManager = ShuffleMemoryManager.create(conf, 
memoryManager, numUsableCores)
 
     val blockTransferService = new NettyBlockTransferService(conf, 
securityManager, numUsableCores)

http://git-wip-us.apache.org/repos/asf/spark/blob/b3ffac51/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala 
b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
index 4bf73b6..7168ac5 100644
--- a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
@@ -19,6 +19,7 @@ package org.apache.spark.memory
 
 import scala.collection.mutable
 
+import org.apache.spark.Logging
 import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore}
 
 
@@ -29,7 +30,7 @@ import org.apache.spark.storage.{BlockId, BlockStatus, 
MemoryStore}
  * sorts and aggregations, while storage memory refers to that used for 
caching and propagating
  * internal data across the cluster. There exists one of these per JVM.
  */
-private[spark] abstract class MemoryManager {
+private[spark] abstract class MemoryManager extends Logging {
 
   // The memory store used to evict cached blocks
   private var _memoryStore: MemoryStore = _
@@ -40,19 +41,38 @@ private[spark] abstract class MemoryManager {
     _memoryStore
   }
 
+  // Amount of execution/storage memory in use, accesses must be synchronized 
on `this`
+  protected var _executionMemoryUsed: Long = 0
+  protected var _storageMemoryUsed: Long = 0
+
   /**
    * Set the [[MemoryStore]] used by this manager to evict cached blocks.
    * This must be set after construction due to initialization ordering 
constraints.
    */
-  def setMemoryStore(store: MemoryStore): Unit = {
+  final def setMemoryStore(store: MemoryStore): Unit = {
     _memoryStore = store
   }
 
   /**
-   * Acquire N bytes of memory for execution.
+   * Total available memory for execution, in bytes.
+   */
+  def maxExecutionMemory: Long
+
+  /**
+   * Total available memory for storage, in bytes.
+   */
+  def maxStorageMemory: Long
+
+  // TODO: avoid passing evicted blocks around to simplify method signatures 
(SPARK-10985)
+
+  /**
+   * Acquire N bytes of memory for execution, evicting cached blocks if 
necessary.
+   * Blocks evicted in the process, if any, are added to `evictedBlocks`.
    * @return number of bytes successfully granted (<= N).
    */
-  def acquireExecutionMemory(numBytes: Long): Long
+  def acquireExecutionMemory(
+      numBytes: Long,
+      evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Long
 
   /**
    * Acquire N bytes of memory to cache the given block, evicting existing 
ones if necessary.
@@ -66,52 +86,73 @@ private[spark] abstract class MemoryManager {
 
   /**
    * Acquire N bytes of memory to unroll the given block, evicting existing 
ones if necessary.
+   *
+   * This extra method allows subclasses to differentiate behavior between 
acquiring storage
+   * memory and acquiring unroll memory. For instance, the memory management 
model in Spark
+   * 1.5 and before places a limit on the amount of space that can be freed 
from unrolling.
    * Blocks evicted in the process, if any, are added to `evictedBlocks`.
+   *
    * @return whether all N bytes were successfully granted.
    */
   def acquireUnrollMemory(
       blockId: BlockId,
       numBytes: Long,
-      evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean
+      evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = 
synchronized {
+    acquireStorageMemory(blockId, numBytes, evictedBlocks)
+  }
 
   /**
    * Release N bytes of execution memory.
    */
-  def releaseExecutionMemory(numBytes: Long): Unit
+  def releaseExecutionMemory(numBytes: Long): Unit = synchronized {
+    if (numBytes > _executionMemoryUsed) {
+      logWarning(s"Attempted to release $numBytes bytes of execution " +
+        s"memory when we only have ${_executionMemoryUsed} bytes")
+      _executionMemoryUsed = 0
+    } else {
+      _executionMemoryUsed -= numBytes
+    }
+  }
 
   /**
    * Release N bytes of storage memory.
    */
-  def releaseStorageMemory(numBytes: Long): Unit
+  def releaseStorageMemory(numBytes: Long): Unit = synchronized {
+    if (numBytes > _storageMemoryUsed) {
+      logWarning(s"Attempted to release $numBytes bytes of storage " +
+        s"memory when we only have ${_storageMemoryUsed} bytes")
+      _storageMemoryUsed = 0
+    } else {
+      _storageMemoryUsed -= numBytes
+    }
+  }
 
   /**
    * Release all storage memory acquired.
    */
-  def releaseStorageMemory(): Unit
+  def releaseAllStorageMemory(): Unit = synchronized {
+    _storageMemoryUsed = 0
+  }
 
   /**
    * Release N bytes of unroll memory.
    */
-  def releaseUnrollMemory(numBytes: Long): Unit
-
-  /**
-   * Total available memory for execution, in bytes.
-   */
-  def maxExecutionMemory: Long
-
-  /**
-   * Total available memory for storage, in bytes.
-   */
-  def maxStorageMemory: Long
+  def releaseUnrollMemory(numBytes: Long): Unit = synchronized {
+    releaseStorageMemory(numBytes)
+  }
 
   /**
    * Execution memory currently in use, in bytes.
    */
-  def executionMemoryUsed: Long
+  final def executionMemoryUsed: Long = synchronized {
+    _executionMemoryUsed
+  }
 
   /**
    * Storage memory currently in use, in bytes.
    */
-  def storageMemoryUsed: Long
+  final def storageMemoryUsed: Long = synchronized {
+    _storageMemoryUsed
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/b3ffac51/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala 
b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
index 150445e..fa44f37 100644
--- a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
@@ -19,7 +19,7 @@ package org.apache.spark.memory
 
 import scala.collection.mutable
 
-import org.apache.spark.{Logging, SparkConf}
+import org.apache.spark.SparkConf
 import org.apache.spark.storage.{BlockId, BlockStatus}
 
 
@@ -34,17 +34,7 @@ private[spark] class StaticMemoryManager(
     conf: SparkConf,
     override val maxExecutionMemory: Long,
     override val maxStorageMemory: Long)
-  extends MemoryManager with Logging {
-
-  // Max number of bytes worth of blocks to evict when unrolling
-  private val maxMemoryToEvictForUnroll: Long = {
-    (maxStorageMemory * conf.getDouble("spark.storage.unrollFraction", 
0.2)).toLong
-  }
-
-  // Amount of execution / storage memory in use
-  // Accesses must be synchronized on `this`
-  private var _executionMemoryUsed: Long = 0
-  private var _storageMemoryUsed: Long = 0
+  extends MemoryManager {
 
   def this(conf: SparkConf) {
     this(
@@ -53,11 +43,19 @@ private[spark] class StaticMemoryManager(
       StaticMemoryManager.getMaxStorageMemory(conf))
   }
 
+  // Max number of bytes worth of blocks to evict when unrolling
+  private val maxMemoryToEvictForUnroll: Long = {
+    (maxStorageMemory * conf.getDouble("spark.storage.unrollFraction", 
0.2)).toLong
+  }
+
   /**
    * Acquire N bytes of memory for execution.
    * @return number of bytes successfully granted (<= N).
    */
-  override def acquireExecutionMemory(numBytes: Long): Long = synchronized {
+  override def acquireExecutionMemory(
+      numBytes: Long,
+      evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Long = 
synchronized {
+    assert(numBytes >= 0)
     assert(_executionMemoryUsed <= maxExecutionMemory)
     val bytesToGrant = math.min(numBytes, maxExecutionMemory - 
_executionMemoryUsed)
     _executionMemoryUsed += bytesToGrant
@@ -72,7 +70,7 @@ private[spark] class StaticMemoryManager(
   override def acquireStorageMemory(
       blockId: BlockId,
       numBytes: Long,
-      evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = {
+      evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = 
synchronized {
     acquireStorageMemory(blockId, numBytes, numBytes, evictedBlocks)
   }
 
@@ -88,7 +86,7 @@ private[spark] class StaticMemoryManager(
   override def acquireUnrollMemory(
       blockId: BlockId,
       numBytes: Long,
-      evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = {
+      evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = 
synchronized {
     val currentUnrollMemory = memoryStore.currentUnrollMemory
     val maxNumBytesToFree = math.max(0, maxMemoryToEvictForUnroll - 
currentUnrollMemory)
     val numBytesToFree = math.min(numBytes, maxNumBytesToFree)
@@ -108,71 +106,16 @@ private[spark] class StaticMemoryManager(
       blockId: BlockId,
       numBytesToAcquire: Long,
       numBytesToFree: Long,
-      evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = {
-    // Note: Keep this outside synchronized block to avoid potential deadlocks!
+      evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = 
synchronized {
+    assert(numBytesToAcquire >= 0)
+    assert(numBytesToFree >= 0)
     memoryStore.ensureFreeSpace(blockId, numBytesToFree, evictedBlocks)
-    synchronized {
-      assert(_storageMemoryUsed <= maxStorageMemory)
-      val enoughMemory = _storageMemoryUsed + numBytesToAcquire <= 
maxStorageMemory
-      if (enoughMemory) {
-        _storageMemoryUsed += numBytesToAcquire
-      }
-      enoughMemory
-    }
-  }
-
-  /**
-   * Release N bytes of execution memory.
-   */
-  override def releaseExecutionMemory(numBytes: Long): Unit = synchronized {
-    if (numBytes > _executionMemoryUsed) {
-      logWarning(s"Attempted to release $numBytes bytes of execution " +
-        s"memory when we only have ${_executionMemoryUsed} bytes")
-      _executionMemoryUsed = 0
-    } else {
-      _executionMemoryUsed -= numBytes
-    }
-  }
-
-  /**
-   * Release N bytes of storage memory.
-   */
-  override def releaseStorageMemory(numBytes: Long): Unit = synchronized {
-    if (numBytes > _storageMemoryUsed) {
-      logWarning(s"Attempted to release $numBytes bytes of storage " +
-        s"memory when we only have ${_storageMemoryUsed} bytes")
-      _storageMemoryUsed = 0
-    } else {
-      _storageMemoryUsed -= numBytes
+    assert(_storageMemoryUsed <= maxStorageMemory)
+    val enoughMemory = _storageMemoryUsed + numBytesToAcquire <= 
maxStorageMemory
+    if (enoughMemory) {
+      _storageMemoryUsed += numBytesToAcquire
     }
-  }
-
-  /**
-   * Release all storage memory acquired.
-   */
-  override def releaseStorageMemory(): Unit = synchronized {
-    _storageMemoryUsed = 0
-  }
-
-  /**
-   * Release N bytes of unroll memory.
-   */
-  override def releaseUnrollMemory(numBytes: Long): Unit = {
-    releaseStorageMemory(numBytes)
-  }
-
-  /**
-   * Amount of execution memory currently in use, in bytes.
-   */
-  override def executionMemoryUsed: Long = synchronized {
-    _executionMemoryUsed
-  }
-
-  /**
-   * Amount of storage memory currently in use, in bytes.
-   */
-  override def storageMemoryUsed: Long = synchronized {
-    _storageMemoryUsed
+    enoughMemory
   }
 
 }
@@ -184,9 +127,10 @@ private[spark] object StaticMemoryManager {
    * Return the total amount of memory available for the storage region, in 
bytes.
    */
   private def getMaxStorageMemory(conf: SparkConf): Long = {
+    val systemMaxMemory = conf.getLong("spark.testing.memory", 
Runtime.getRuntime.maxMemory)
     val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.6)
     val safetyFraction = conf.getDouble("spark.storage.safetyFraction", 0.9)
-    (Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong
+    (systemMaxMemory * memoryFraction * safetyFraction).toLong
   }
 
 
@@ -194,9 +138,10 @@ private[spark] object StaticMemoryManager {
    * Return the total amount of memory available for the execution region, in 
bytes.
    */
   private def getMaxExecutionMemory(conf: SparkConf): Long = {
+    val systemMaxMemory = conf.getLong("spark.testing.memory", 
Runtime.getRuntime.maxMemory)
     val memoryFraction = conf.getDouble("spark.shuffle.memoryFraction", 0.2)
     val safetyFraction = conf.getDouble("spark.shuffle.safetyFraction", 0.8)
-    (Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong
+    (systemMaxMemory * memoryFraction * safetyFraction).toLong
   }
 
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/b3ffac51/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala 
b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
new file mode 100644
index 0000000..5bf78d5
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.memory
+
+import scala.collection.mutable
+
+import org.apache.spark.SparkConf
+import org.apache.spark.storage.{BlockStatus, BlockId}
+
+
+/**
+ * A [[MemoryManager]] that enforces a soft boundary between execution and 
storage such that
+ * either side can borrow memory from the other.
+ *
+ * The region shared between execution and storage is a fraction of the total 
heap space
+ * configurable through `spark.memory.fraction` (default 0.75). The position 
of the boundary
+ * within this space is further determined by `spark.memory.storageFraction` 
(default 0.5).
+ * This means the size of the storage region is 0.75 * 0.5 = 0.375 of the heap 
space by default.
+ *
+ * Storage can borrow as much execution memory as is free until execution 
reclaims its space.
+ * When this happens, cached blocks will be evicted from memory until 
sufficient borrowed
+ * memory is released to satisfy the execution memory request.
+ *
+ * Similarly, execution can borrow as much storage memory as is free. However, 
execution
+ * memory is *never* evicted by storage due to the complexities involved in 
implementing this.
+ * The implication is that attempts to cache blocks may fail if execution has 
already eaten
+ * up most of the storage space, in which case the new blocks will be evicted 
immediately
+ * according to their respective storage levels.
+ */
+private[spark] class UnifiedMemoryManager(conf: SparkConf, maxMemory: Long) 
extends MemoryManager {
+
+  def this(conf: SparkConf) {
+    this(conf, UnifiedMemoryManager.getMaxMemory(conf))
+  }
+
+  /**
+   * Size of the storage region, in bytes.
+   *
+   * This region is not statically reserved; execution can borrow from it if 
necessary.
+   * Cached blocks can be evicted only if actual storage memory usage exceeds 
this region.
+   */
+  private val storageRegionSize: Long = {
+    (maxMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong
+  }
+
+  /**
+   * Total amount of memory, in bytes, not currently occupied by either 
execution or storage.
+   */
+  private def totalFreeMemory: Long = synchronized {
+    assert(_executionMemoryUsed <= maxMemory)
+    assert(_storageMemoryUsed <= maxMemory)
+    assert(_executionMemoryUsed + _storageMemoryUsed <= maxMemory)
+    maxMemory - _executionMemoryUsed - _storageMemoryUsed
+  }
+
+  /**
+   * Total available memory for execution, in bytes.
+   * In this model, this is equivalent to the amount of memory not occupied by 
storage.
+   */
+  override def maxExecutionMemory: Long = synchronized {
+    maxMemory - _storageMemoryUsed
+  }
+
+  /**
+   * Total available memory for storage, in bytes.
+   * In this model, this is equivalent to the amount of memory not occupied by 
execution.
+   */
+  override def maxStorageMemory: Long = synchronized {
+    maxMemory - _executionMemoryUsed
+  }
+
+  /**
+   * Acquire N bytes of memory for execution, evicting cached blocks if 
necessary.
+   *
+   * This method evicts blocks only up to the amount of memory borrowed by 
storage.
+   * Blocks evicted in the process, if any, are added to `evictedBlocks`.
+   * @return number of bytes successfully granted (<= N).
+   */
+  override def acquireExecutionMemory(
+      numBytes: Long,
+      evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Long = 
synchronized {
+    assert(numBytes >= 0)
+    val memoryBorrowedByStorage = math.max(0, _storageMemoryUsed - 
storageRegionSize)
+    // If there is not enough free memory AND storage has borrowed some 
execution memory,
+    // then evict as much memory borrowed by storage as needed to grant this 
request
+    val shouldEvictStorage = totalFreeMemory < numBytes && 
memoryBorrowedByStorage > 0
+    if (shouldEvictStorage) {
+      val spaceToEnsure = math.min(numBytes, memoryBorrowedByStorage)
+      memoryStore.ensureFreeSpace(spaceToEnsure, evictedBlocks)
+    }
+    val bytesToGrant = math.min(numBytes, totalFreeMemory)
+    _executionMemoryUsed += bytesToGrant
+    bytesToGrant
+  }
+
+  /**
+   * Acquire N bytes of memory to cache the given block, evicting existing 
ones if necessary.
+   * Blocks evicted in the process, if any, are added to `evictedBlocks`.
+   * @return whether all N bytes were successfully granted.
+   */
+  override def acquireStorageMemory(
+      blockId: BlockId,
+      numBytes: Long,
+      evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = 
synchronized {
+    assert(numBytes >= 0)
+    memoryStore.ensureFreeSpace(blockId, numBytes, evictedBlocks)
+    val enoughMemory = totalFreeMemory >= numBytes
+    if (enoughMemory) {
+      _storageMemoryUsed += numBytes
+    }
+    enoughMemory
+  }
+
+}
+
+private object UnifiedMemoryManager {
+
+  /**
+   * Return the total amount of memory shared between execution and storage, 
in bytes.
+   */
+  private def getMaxMemory(conf: SparkConf): Long = {
+    val systemMaxMemory = conf.getLong("spark.testing.memory", 
Runtime.getRuntime.maxMemory)
+    val memoryFraction = conf.getDouble("spark.memory.fraction", 0.75)
+    (systemMaxMemory * memoryFraction).toLong
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/b3ffac51/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala 
b/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala
index bb64bb3..aaf543c 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala
@@ -18,11 +18,13 @@
 package org.apache.spark.shuffle
 
 import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
 
 import com.google.common.annotations.VisibleForTesting
 
 import org.apache.spark._
 import org.apache.spark.memory.{StaticMemoryManager, MemoryManager}
+import org.apache.spark.storage.{BlockId, BlockStatus}
 import org.apache.spark.unsafe.array.ByteArrayMethods
 
 /**
@@ -36,8 +38,8 @@ import org.apache.spark.unsafe.array.ByteArrayMethods
  * If there are N tasks, it ensures that each tasks can acquire at least 1 / 
2N of the memory
  * before it has to spill, and at most 1 / N. Because N varies dynamically, we 
keep track of the
  * set of active tasks and redo the calculations of 1 / 2N and 1 / N in 
waiting tasks whenever
- * this set changes. This is all done by synchronizing access on "this" to 
mutate state and using
- * wait() and notifyAll() to signal changes.
+ * this set changes. This is all done by synchronizing access to 
`memoryManager` to mutate state
+ * and using wait() and notifyAll() to signal changes.
  *
  * Use `ShuffleMemoryManager.create()` factory method to create a new instance.
  *
@@ -51,7 +53,6 @@ class ShuffleMemoryManager protected (
   extends Logging {
 
   private val taskMemory = new mutable.HashMap[Long, Long]()  // taskAttemptId 
-> memory bytes
-  private val maxMemory = memoryManager.maxExecutionMemory
 
   private def currentTaskAttemptId(): Long = {
     // In case this is called on the driver, return an invalid task attempt id.
@@ -65,7 +66,7 @@ class ShuffleMemoryManager protected (
    * total memory pool (where N is the # of active tasks) before it is forced 
to spill. This can
    * happen if the number of tasks increases but an older task had a lot of 
memory already.
    */
-  def tryToAcquire(numBytes: Long): Long = synchronized {
+  def tryToAcquire(numBytes: Long): Long = memoryManager.synchronized {
     val taskAttemptId = currentTaskAttemptId()
     assert(numBytes > 0, "invalid number of bytes requested: " + numBytes)
 
@@ -73,15 +74,18 @@ class ShuffleMemoryManager protected (
     // of active tasks, to let other tasks ramp down their memory in calls to 
tryToAcquire
     if (!taskMemory.contains(taskAttemptId)) {
       taskMemory(taskAttemptId) = 0L
-      notifyAll()  // Will later cause waiting tasks to wake up and check 
numTasks again
+      // This will later cause waiting tasks to wake up and check numTasks 
again
+      memoryManager.notifyAll()
     }
 
     // Keep looping until we're either sure that we don't want to grant this 
request (because this
     // task would have more than 1 / numActiveTasks of the memory) or we have 
enough free
     // memory to give it (we always let each task get at least 1 / (2 * 
numActiveTasks)).
+    // TODO: simplify this to limit each task to its own slot
     while (true) {
       val numActiveTasks = taskMemory.keys.size
       val curMem = taskMemory(taskAttemptId)
+      val maxMemory = memoryManager.maxExecutionMemory
       val freeMemory = maxMemory - taskMemory.values.sum
 
       // How much we can grant this task; don't let it grow to more than 1 / 
numActiveTasks;
@@ -99,7 +103,7 @@ class ShuffleMemoryManager protected (
         } else {
           logInfo(
             s"TID $taskAttemptId waiting for at least 1/2N of shuffle memory 
pool to be free")
-          wait()
+          memoryManager.wait()
         }
       } else {
         return acquire(toGrant)
@@ -112,15 +116,23 @@ class ShuffleMemoryManager protected (
    * Acquire N bytes of execution memory from the memory manager for the 
current task.
    * @return number of bytes actually acquired (<= N).
    */
-  private def acquire(numBytes: Long): Long = synchronized {
+  private def acquire(numBytes: Long): Long = memoryManager.synchronized {
     val taskAttemptId = currentTaskAttemptId()
-    val acquired = memoryManager.acquireExecutionMemory(numBytes)
+    val evictedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
+    val acquired = memoryManager.acquireExecutionMemory(numBytes, 
evictedBlocks)
+    // Register evicted blocks, if any, with the active task metrics
+    // TODO: just do this in `acquireExecutionMemory` (SPARK-10985)
+    Option(TaskContext.get()).foreach { tc =>
+      val metrics = tc.taskMetrics()
+      val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, 
BlockStatus)]())
+      metrics.updatedBlocks = Some(lastUpdatedBlocks ++ evictedBlocks.toSeq)
+    }
     taskMemory(taskAttemptId) += acquired
     acquired
   }
 
   /** Release numBytes bytes for the current task. */
-  def release(numBytes: Long): Unit = synchronized {
+  def release(numBytes: Long): Unit = memoryManager.synchronized {
     val taskAttemptId = currentTaskAttemptId()
     val curMem = taskMemory.getOrElse(taskAttemptId, 0L)
     if (curMem < numBytes) {
@@ -129,20 +141,20 @@ class ShuffleMemoryManager protected (
     }
     taskMemory(taskAttemptId) -= numBytes
     memoryManager.releaseExecutionMemory(numBytes)
-    notifyAll()  // Notify waiters who locked "this" in tryToAcquire that 
memory has been freed
+    memoryManager.notifyAll() // Notify waiters in tryToAcquire that memory 
has been freed
   }
 
   /** Release all memory for the current task and mark it as inactive (e.g. 
when a task ends). */
-  def releaseMemoryForThisTask(): Unit = synchronized {
+  def releaseMemoryForThisTask(): Unit = memoryManager.synchronized {
     val taskAttemptId = currentTaskAttemptId()
     taskMemory.remove(taskAttemptId).foreach { numBytes =>
       memoryManager.releaseExecutionMemory(numBytes)
     }
-    notifyAll()  // Notify waiters who locked "this" in tryToAcquire that 
memory has been freed
+    memoryManager.notifyAll() // Notify waiters in tryToAcquire that memory 
has been freed
   }
 
   /** Returns the memory consumption, in bytes, for the current task */
-  def getMemoryConsumptionForThisTask(): Long = synchronized {
+  def getMemoryConsumptionForThisTask(): Long = memoryManager.synchronized {
     val taskAttemptId = currentTaskAttemptId()
     taskMemory.getOrElse(taskAttemptId, 0L)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/b3ffac51/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 9f5bd2a..c374b93 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -91,6 +91,10 @@ private[spark] class BlockManager(
   }
   memoryManager.setMemoryStore(memoryStore)
 
+  // Note: depending on the memory manager, `maxStorageMemory` may actually 
vary over time.
+  // However, since we use this only for reporting and logging, what we 
actually want here is
+  // the absolute maximum value that `maxStorageMemory` can ever possibly 
reach. We may need
+  // to revisit whether reporting this value as the "max" is intuitive to the 
user.
   private val maxMemory = memoryManager.maxStorageMemory
 
   private[spark]

http://git-wip-us.apache.org/repos/asf/spark/blob/b3ffac51/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala 
b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
index 35c57b9..4dbac38 100644
--- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
@@ -37,15 +37,14 @@ private case class MemoryEntry(value: Any, size: Long, 
deserialized: Boolean)
 private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: 
MemoryManager)
   extends BlockStore(blockManager) {
 
+  // Note: all changes to memory allocations, notably putting blocks, evicting 
blocks, and
+  // acquiring or releasing unroll memory, must be synchronized on 
`memoryManager`!
+
   private val conf = blockManager.conf
   private val entries = new LinkedHashMap[BlockId, MemoryEntry](32, 0.75f, 
true)
-  private val maxMemory = memoryManager.maxStorageMemory
-
-  // Ensure only one thread is putting, and if necessary, dropping blocks at 
any given time
-  private val accountingLock = new Object
 
   // A mapping from taskAttemptId to amount of memory used for unrolling a 
block (in bytes)
-  // All accesses of this map are assumed to have manually synchronized on 
`accountingLock`
+  // All accesses of this map are assumed to have manually synchronized on 
`memoryManager`
   private val unrollMemoryMap = mutable.HashMap[Long, Long]()
   // Same as `unrollMemoryMap`, but for pending unroll memory as defined below.
   // Pending unroll memory refers to the intermediate memory occupied by a task
@@ -60,6 +59,9 @@ private[spark] class MemoryStore(blockManager: BlockManager, 
memoryManager: Memo
   private val unrollMemoryThreshold: Long =
     conf.getLong("spark.storage.unrollMemoryThreshold", 1024 * 1024)
 
+  /** Total amount of memory available for storage, in bytes. */
+  private def maxMemory: Long = memoryManager.maxStorageMemory
+
   if (maxMemory < unrollMemoryThreshold) {
     logWarning(s"Max memory ${Utils.bytesToString(maxMemory)} is less than the 
initial memory " +
       s"threshold ${Utils.bytesToString(unrollMemoryThreshold)} needed to 
store a block in " +
@@ -75,7 +77,9 @@ private[spark] class MemoryStore(blockManager: BlockManager, 
memoryManager: Memo
    * Amount of storage memory, in bytes, used for caching blocks.
    * This does not include memory used for unrolling.
    */
-  private def blocksMemoryUsed: Long = memoryUsed - currentUnrollMemory
+  private def blocksMemoryUsed: Long = memoryManager.synchronized {
+    memoryUsed - currentUnrollMemory
+  }
 
   override def getSize(blockId: BlockId): Long = {
     entries.synchronized {
@@ -208,7 +212,7 @@ private[spark] class MemoryStore(blockManager: 
BlockManager, memoryManager: Memo
     }
   }
 
-  override def remove(blockId: BlockId): Boolean = {
+  override def remove(blockId: BlockId): Boolean = memoryManager.synchronized {
     val entry = entries.synchronized { entries.remove(blockId) }
     if (entry != null) {
       memoryManager.releaseStorageMemory(entry.size)
@@ -220,11 +224,13 @@ private[spark] class MemoryStore(blockManager: 
BlockManager, memoryManager: Memo
     }
   }
 
-  override def clear() {
+  override def clear(): Unit = memoryManager.synchronized {
     entries.synchronized {
       entries.clear()
     }
-    memoryManager.releaseStorageMemory()
+    unrollMemoryMap.clear()
+    pendingUnrollMemoryMap.clear()
+    memoryManager.releaseAllStorageMemory()
     logInfo("MemoryStore cleared")
   }
 
@@ -299,22 +305,23 @@ private[spark] class MemoryStore(blockManager: 
BlockManager, memoryManager: Memo
       }
 
     } finally {
-      // If we return an array, the values returned will later be cached in 
`tryToPut`.
-      // In this case, we should release the memory after we cache the block 
there.
-      // Otherwise, if we return an iterator, we release the memory reserved 
here
-      // later when the task finishes.
+      // If we return an array, the values returned here will be cached in 
`tryToPut` later.
+      // In this case, we should release the memory only after we cache the 
block there.
       if (keepUnrolling) {
         val taskAttemptId = currentTaskAttemptId()
-        accountingLock.synchronized {
-          // Here, we transfer memory from unroll to pending unroll because we 
expect to cache this
-          // block in `tryToPut`. We do not release and re-acquire memory from 
the MemoryManager in
-          // order to avoid race conditions where another component steals the 
memory that we're
-          // trying to transfer.
+        memoryManager.synchronized {
+          // Since we continue to hold onto the array until we actually cache 
it, we cannot
+          // release the unroll memory yet. Instead, we transfer it to pending 
unroll memory
+          // so `tryToPut` can further transfer it to normal storage memory 
later.
+          // TODO: we can probably express this without pending unroll memory 
(SPARK-10907)
           val amountToTransferToPending = currentUnrollMemoryForThisTask - 
previousMemoryReserved
           unrollMemoryMap(taskAttemptId) -= amountToTransferToPending
           pendingUnrollMemoryMap(taskAttemptId) =
             pendingUnrollMemoryMap.getOrElse(taskAttemptId, 0L) + 
amountToTransferToPending
         }
+      } else {
+        // Otherwise, if we return an iterator, we can only release the unroll 
memory when
+        // the task finishes since we don't know when the iterator will be 
consumed.
       }
     }
   }
@@ -343,7 +350,7 @@ private[spark] class MemoryStore(blockManager: 
BlockManager, memoryManager: Memo
    * `value` will be lazily created. If it cannot be put into MemoryStore or 
disk, `value` won't be
    * created to avoid OOM since it may be a big ByteBuffer.
    *
-   * Synchronize on `accountingLock` to ensure that all the put requests and 
its associated block
+   * Synchronize on `memoryManager` to ensure that all the put requests and 
its associated block
    * dropping is done by only on thread at a time. Otherwise while one thread 
is dropping
    * blocks to free memory for one block, another thread may use up the freed 
space for
    * another block.
@@ -365,16 +372,13 @@ private[spark] class MemoryStore(blockManager: 
BlockManager, memoryManager: Memo
      * for freeing up more space for another block that needs to be put. Only 
then the actually
      * dropping of blocks (and writing to disk if necessary) can proceed in 
parallel. */
 
-    accountingLock.synchronized {
+    memoryManager.synchronized {
       // Note: if we have previously unrolled this block successfully, then 
pending unroll
       // memory should be non-zero. This is the amount that we already 
reserved during the
       // unrolling process. In this case, we can just reuse this space to 
cache our block.
-      //
-      // Note: the StaticMemoryManager counts unroll memory as storage memory. 
Here, the
-      // synchronization on `accountingLock` guarantees that the release of 
unroll memory and
-      // acquisition of storage memory happens atomically. However, if storage 
memory is acquired
-      // outside of MemoryStore or if unroll memory is counted as execution 
memory, then we will
-      // have to revisit this assumption. See SPARK-10983 for more context.
+      // The synchronization on `memoryManager` here guarantees that the 
release and acquire
+      // happen atomically. This relies on the assumption that all memory 
acquisitions are
+      // synchronized on the same lock.
       releasePendingUnrollMemoryForThisTask()
       val enoughMemory = memoryManager.acquireStorageMemory(blockId, size, 
droppedBlocks)
       if (enoughMemory) {
@@ -402,33 +406,61 @@ private[spark] class MemoryStore(blockManager: 
BlockManager, memoryManager: Memo
   }
 
   /**
+   * Try to free up a given amount of space by evicting existing blocks.
+   *
+   * @param space the amount of memory to free, in bytes
+   * @param droppedBlocks a holder for blocks evicted in the process
+   * @return whether the requested free space is freed.
+   */
+  private[spark] def ensureFreeSpace(
+      space: Long,
+      droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = {
+    ensureFreeSpace(None, space, droppedBlocks)
+  }
+
+  /**
+   * Try to free up a given amount of space to store a block by evicting 
existing ones.
+   *
+   * @param space the amount of memory to free, in bytes
+   * @param droppedBlocks a holder for blocks evicted in the process
+   * @return whether the requested free space is freed.
+   */
+  private[spark] def ensureFreeSpace(
+      blockId: BlockId,
+      space: Long,
+      droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = {
+    ensureFreeSpace(Some(blockId), space, droppedBlocks)
+  }
+
+  /**
    * Try to free up a given amount of space to store a particular block, but 
can fail if
    * either the block is bigger than our memory or it would require replacing 
another block
    * from the same RDD (which leads to a wasteful cyclic replacement pattern 
for RDDs that
    * don't fit into memory that we want to avoid).
    *
-   * @param blockId the ID of the block we are freeing space for
+   * @param blockId the ID of the block we are freeing space for, if any
    * @param space the size of this block
    * @param droppedBlocks a holder for blocks evicted in the process
-   * @return whether there is enough free space.
+   * @return whether the requested free space is freed.
    */
-  private[spark] def ensureFreeSpace(
-      blockId: BlockId,
+  private def ensureFreeSpace(
+      blockId: Option[BlockId],
       space: Long,
       droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = {
-    accountingLock.synchronized {
+    memoryManager.synchronized {
       val freeMemory = maxMemory - memoryUsed
-      val rddToAdd = getRddId(blockId)
+      val rddToAdd = blockId.flatMap(getRddId)
       val selectedBlocks = new ArrayBuffer[BlockId]
       var selectedMemory = 0L
 
-      logInfo(s"Ensuring $space bytes of free space for block $blockId " +
+      logInfo(s"Ensuring $space bytes of free space " +
+        blockId.map { id => s"for block $id" }.getOrElse("") +
         s"(free: $freeMemory, max: $maxMemory)")
 
       // Fail fast if the block simply won't fit
       if (space > maxMemory) {
-        logInfo(s"Will not store $blockId as the required space " +
-          s"($space bytes) than our memory limit ($maxMemory bytes)")
+        logInfo("Will not " + blockId.map { id => s"store $id" 
}.getOrElse("free memory") +
+          s" as the required space ($space bytes) exceeds our memory limit 
($maxMemory bytes)")
         return false
       }
 
@@ -471,8 +503,10 @@ private[spark] class MemoryStore(blockManager: 
BlockManager, memoryManager: Memo
         }
         true
       } else {
-        logInfo(s"Will not store $blockId as it would require dropping another 
block " +
-          "from the same RDD")
+        blockId.foreach { id =>
+          logInfo(s"Will not store $id as it would require dropping another 
block " +
+            "from the same RDD")
+        }
         false
       }
     }
@@ -495,8 +529,7 @@ private[spark] class MemoryStore(blockManager: 
BlockManager, memoryManager: Memo
       blockId: BlockId,
       memory: Long,
       droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = {
-    accountingLock.synchronized {
-      // Note: all acquisitions of unroll memory must be synchronized on 
`accountingLock`
+    memoryManager.synchronized {
       val success = memoryManager.acquireUnrollMemory(blockId, memory, 
droppedBlocks)
       if (success) {
         val taskAttemptId = currentTaskAttemptId()
@@ -512,7 +545,7 @@ private[spark] class MemoryStore(blockManager: 
BlockManager, memoryManager: Memo
    */
   def releaseUnrollMemoryForThisTask(memory: Long = Long.MaxValue): Unit = {
     val taskAttemptId = currentTaskAttemptId()
-    accountingLock.synchronized {
+    memoryManager.synchronized {
       if (unrollMemoryMap.contains(taskAttemptId)) {
         val memoryToRelease = math.min(memory, unrollMemoryMap(taskAttemptId))
         if (memoryToRelease > 0) {
@@ -531,7 +564,7 @@ private[spark] class MemoryStore(blockManager: 
BlockManager, memoryManager: Memo
    */
   def releasePendingUnrollMemoryForThisTask(memory: Long = Long.MaxValue): 
Unit = {
     val taskAttemptId = currentTaskAttemptId()
-    accountingLock.synchronized {
+    memoryManager.synchronized {
       if (pendingUnrollMemoryMap.contains(taskAttemptId)) {
         val memoryToRelease = math.min(memory, 
pendingUnrollMemoryMap(taskAttemptId))
         if (memoryToRelease > 0) {
@@ -548,21 +581,21 @@ private[spark] class MemoryStore(blockManager: 
BlockManager, memoryManager: Memo
   /**
    * Return the amount of memory currently occupied for unrolling blocks 
across all tasks.
    */
-  def currentUnrollMemory: Long = accountingLock.synchronized {
+  def currentUnrollMemory: Long = memoryManager.synchronized {
     unrollMemoryMap.values.sum + pendingUnrollMemoryMap.values.sum
   }
 
   /**
    * Return the amount of memory currently occupied for unrolling blocks by 
this task.
    */
-  def currentUnrollMemoryForThisTask: Long = accountingLock.synchronized {
+  def currentUnrollMemoryForThisTask: Long = memoryManager.synchronized {
     unrollMemoryMap.getOrElse(currentTaskAttemptId(), 0L)
   }
 
   /**
    * Return the number of tasks currently unrolling blocks.
    */
-  private def numTasksUnrolling: Int = accountingLock.synchronized { 
unrollMemoryMap.keys.size }
+  private def numTasksUnrolling: Int = memoryManager.synchronized { 
unrollMemoryMap.keys.size }
 
   /**
    * Log information about current memory usage.

http://git-wip-us.apache.org/repos/asf/spark/blob/b3ffac51/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
 
b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 29c5732..6a96b5d 100644
--- 
a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ 
b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -48,16 +48,6 @@ import org.apache.spark.executor.ShuffleWriteMetrics
  * However, if the spill threshold is too low, we spill frequently and incur 
unnecessary disk
  * writes. This may lead to a performance regression compared to the normal 
case of using the
  * non-spilling AppendOnlyMap.
- *
- * Two parameters control the memory threshold:
- *
- *   `spark.shuffle.memoryFraction` specifies the collective amount of memory 
used for storing
- *   these maps as a fraction of the executor's total memory. Since each 
concurrently running
- *   task maintains one map, the actual threshold for each map is this 
quantity divided by the
- *   number of running tasks.
- *
- *   `spark.shuffle.safetyFraction` specifies an additional margin of safety 
as a fraction of
- *   this threshold, in case map size estimation is not sufficiently accurate.
  */
 @DeveloperApi
 class ExternalAppendOnlyMap[K, V, C](

http://git-wip-us.apache.org/repos/asf/spark/blob/b3ffac51/core/src/test/scala/org/apache/spark/DistributedSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala 
b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
index 600c140..34a4bb9 100644
--- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
@@ -213,11 +213,8 @@ class DistributedSuite extends SparkFunSuite with Matchers 
with LocalSparkContex
   }
 
   test("compute when only some partitions fit in memory") {
-    val conf = new SparkConf().set("spark.storage.memoryFraction", "0.01")
-    sc = new SparkContext(clusterUrl, "test", conf)
-    // data will be 4 million * 4 bytes = 16 MB in size, but our 
memoryFraction set the cache
-    // to only 5 MB (0.01 of 512 MB), so not all of it will fit in memory; we 
use 20 partitions
-    // to make sure that *some* of them do fit though
+    sc = new SparkContext(clusterUrl, "test", new SparkConf)
+    // TODO: verify that only a subset of partitions fit in memory 
(SPARK-11078)
     val data = sc.parallelize(1 to 4000000, 
20).persist(StorageLevel.MEMORY_ONLY_SER)
     assert(data.count() === 4000000)
     assert(data.count() === 4000000)

http://git-wip-us.apache.org/repos/asf/spark/blob/b3ffac51/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala 
b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
index d91b799..4a0877d 100644
--- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
@@ -247,11 +247,13 @@ abstract class ShuffleSuite extends SparkFunSuite with 
Matchers with LocalSparkC
         .setMaster("local")
         .set("spark.shuffle.spill.compress", shuffleSpillCompress.toString)
         .set("spark.shuffle.compress", shuffleCompress.toString)
-        .set("spark.shuffle.memoryFraction", "0.001")
       resetSparkContext()
       sc = new SparkContext(myConf)
+      val diskBlockManager = sc.env.blockManager.diskBlockManager
       try {
-        sc.parallelize(0 until 100000).map(i => (i / 4, 
i)).groupByKey().collect()
+        assert(diskBlockManager.getAllFiles().isEmpty)
+        sc.parallelize(0 until 10).map(i => (i / 4, i)).groupByKey().collect()
+        assert(diskBlockManager.getAllFiles().nonEmpty)
       } catch {
         case e: Exception =>
           val errMsg = s"Failed with 
spark.shuffle.spill.compress=$shuffleSpillCompress," +

http://git-wip-us.apache.org/repos/asf/spark/blob/b3ffac51/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
new file mode 100644
index 0000000..36e4566
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.memory
+
+import java.util.concurrent.atomic.AtomicLong
+
+import org.mockito.Matchers.{any, anyLong}
+import org.mockito.Mockito.{mock, when}
+import org.mockito.invocation.InvocationOnMock
+import org.mockito.stubbing.Answer
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.storage.MemoryStore
+
+
+/**
+ * Helper trait for sharing code among [[MemoryManager]] tests.
+ */
+private[memory] trait MemoryManagerSuite extends SparkFunSuite {
+
+  import MemoryManagerSuite.DEFAULT_ENSURE_FREE_SPACE_CALLED
+
+  // Note: Mockito's verify mechanism does not provide a way to reset method 
call counts
+  // without also resetting stubbed methods. Since our test code relies on the 
latter,
+  // we need to use our own variable to track invocations of `ensureFreeSpace`.
+
+  /**
+   * The amount of free space requested in the last call to 
[[MemoryStore.ensureFreeSpace]]
+   *
+   * This set whenever [[MemoryStore.ensureFreeSpace]] is called, and cleared 
when the test
+   * code makes explicit assertions on this variable through 
[[assertEnsureFreeSpaceCalled]].
+   */
+  private val ensureFreeSpaceCalled = new 
AtomicLong(DEFAULT_ENSURE_FREE_SPACE_CALLED)
+
+  /**
+   * Make a mocked [[MemoryStore]] whose [[MemoryStore.ensureFreeSpace]] 
method is stubbed.
+   *
+   * This allows our test code to release storage memory when 
[[MemoryStore.ensureFreeSpace]]
+   * is called without relying on [[org.apache.spark.storage.BlockManager]] 
and all of its
+   * dependencies.
+   */
+  protected def makeMemoryStore(mm: MemoryManager): MemoryStore = {
+    val ms = mock(classOf[MemoryStore])
+    when(ms.ensureFreeSpace(anyLong(), 
any())).thenAnswer(ensureFreeSpaceAnswer(mm, 0))
+    when(ms.ensureFreeSpace(any(), anyLong(), 
any())).thenAnswer(ensureFreeSpaceAnswer(mm, 1))
+    mm.setMemoryStore(ms)
+    ms
+  }
+
+  /**
+   * Make an [[Answer]] that stubs [[MemoryStore.ensureFreeSpace]] with the 
right arguments.
+   */
+  private def ensureFreeSpaceAnswer(mm: MemoryManager, numBytesPos: Int): 
Answer[Boolean] = {
+    new Answer[Boolean] {
+      override def answer(invocation: InvocationOnMock): Boolean = {
+        val args = invocation.getArguments
+        require(args.size > numBytesPos, s"bad test: expected >$numBytesPos 
arguments " +
+          s"in ensureFreeSpace, found ${args.size}")
+        require(args(numBytesPos).isInstanceOf[Long], s"bad test: expected 
ensureFreeSpace " +
+          s"argument at index $numBytesPos to be a Long: ${args.mkString(", 
")}")
+        val numBytes = args(numBytesPos).asInstanceOf[Long]
+        mockEnsureFreeSpace(mm, numBytes)
+      }
+    }
+  }
+
+  /**
+   * Simulate the part of [[MemoryStore.ensureFreeSpace]] that releases 
storage memory.
+   *
+   * This is a significant simplification of the real method, which actually 
drops existing
+   * blocks based on the size of each block. Instead, here we simply release 
as many bytes
+   * as needed to ensure the requested amount of free space. This allows us to 
set up the
+   * test without relying on the [[org.apache.spark.storage.BlockManager]], 
which brings in
+   * many other dependencies.
+   *
+   * Every call to this method will set a global variable, 
[[ensureFreeSpaceCalled]], that
+   * records the number of bytes this is called with. This variable is 
expected to be cleared
+   * by the test code later through [[assertEnsureFreeSpaceCalled]].
+   */
+  private def mockEnsureFreeSpace(mm: MemoryManager, numBytes: Long): Boolean 
= mm.synchronized {
+    require(ensureFreeSpaceCalled.get() === DEFAULT_ENSURE_FREE_SPACE_CALLED,
+      "bad test: ensure free space variable was not reset")
+    // Record the number of bytes we freed this call
+    ensureFreeSpaceCalled.set(numBytes)
+    if (numBytes <= mm.maxStorageMemory) {
+      def freeMemory = mm.maxStorageMemory - mm.storageMemoryUsed
+      val spaceToRelease = numBytes - freeMemory
+      if (spaceToRelease > 0) {
+        mm.releaseStorageMemory(spaceToRelease)
+      }
+      freeMemory >= numBytes
+    } else {
+      // We attempted to free more bytes than our max allowable memory
+      false
+    }
+  }
+
+  /**
+   * Assert that [[MemoryStore.ensureFreeSpace]] is called with the given 
parameters.
+   */
+  protected def assertEnsureFreeSpaceCalled(ms: MemoryStore, numBytes: Long): 
Unit = {
+    assert(ensureFreeSpaceCalled.get() === numBytes,
+      s"expected ensure free space to be called with $numBytes")
+    ensureFreeSpaceCalled.set(DEFAULT_ENSURE_FREE_SPACE_CALLED)
+  }
+
+  /**
+   * Assert that [[MemoryStore.ensureFreeSpace]] is NOT called.
+   */
+  protected def assertEnsureFreeSpaceNotCalled[T](ms: MemoryStore): Unit = {
+    assert(ensureFreeSpaceCalled.get() === DEFAULT_ENSURE_FREE_SPACE_CALLED,
+      "ensure free space should not have been called!")
+  }
+}
+
+private object MemoryManagerSuite {
+  private val DEFAULT_ENSURE_FREE_SPACE_CALLED = -1L
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/b3ffac51/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
index c436a8b..6cae1f8 100644
--- a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
@@ -19,32 +19,44 @@ package org.apache.spark.memory
 
 import scala.collection.mutable.ArrayBuffer
 
-import org.mockito.Mockito.{mock, reset, verify, when}
-import org.mockito.Matchers.{any, eq => meq}
+import org.mockito.Mockito.when
 
+import org.apache.spark.SparkConf
 import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore, 
TestBlockId}
-import org.apache.spark.{SparkConf, SparkFunSuite}
 
 
-class StaticMemoryManagerSuite extends SparkFunSuite {
+class StaticMemoryManagerSuite extends MemoryManagerSuite {
   private val conf = new SparkConf().set("spark.storage.unrollFraction", "0.4")
+  private val evictedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
+
+  /**
+   * Make a [[StaticMemoryManager]] and a [[MemoryStore]] with limited class 
dependencies.
+   */
+  private def makeThings(
+      maxExecutionMem: Long,
+      maxStorageMem: Long): (StaticMemoryManager, MemoryStore) = {
+    val mm = new StaticMemoryManager(
+      conf, maxExecutionMemory = maxExecutionMem, maxStorageMemory = 
maxStorageMem)
+    val ms = makeMemoryStore(mm)
+    (mm, ms)
+  }
 
   test("basic execution memory") {
     val maxExecutionMem = 1000L
     val (mm, _) = makeThings(maxExecutionMem, Long.MaxValue)
     assert(mm.executionMemoryUsed === 0L)
-    assert(mm.acquireExecutionMemory(10L) === 10L)
+    assert(mm.acquireExecutionMemory(10L, evictedBlocks) === 10L)
     assert(mm.executionMemoryUsed === 10L)
-    assert(mm.acquireExecutionMemory(100L) === 100L)
+    assert(mm.acquireExecutionMemory(100L, evictedBlocks) === 100L)
     // Acquire up to the max
-    assert(mm.acquireExecutionMemory(1000L) === 890L)
+    assert(mm.acquireExecutionMemory(1000L, evictedBlocks) === 890L)
     assert(mm.executionMemoryUsed === maxExecutionMem)
-    assert(mm.acquireExecutionMemory(1L) === 0L)
+    assert(mm.acquireExecutionMemory(1L, evictedBlocks) === 0L)
     assert(mm.executionMemoryUsed === maxExecutionMem)
     mm.releaseExecutionMemory(800L)
     assert(mm.executionMemoryUsed === 200L)
     // Acquire after release
-    assert(mm.acquireExecutionMemory(1L) === 1L)
+    assert(mm.acquireExecutionMemory(1L, evictedBlocks) === 1L)
     assert(mm.executionMemoryUsed === 201L)
     // Release beyond what was acquired
     mm.releaseExecutionMemory(maxExecutionMem)
@@ -54,37 +66,36 @@ class StaticMemoryManagerSuite extends SparkFunSuite {
   test("basic storage memory") {
     val maxStorageMem = 1000L
     val dummyBlock = TestBlockId("you can see the world you brought to live")
-    val evictedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
     val (mm, ms) = makeThings(Long.MaxValue, maxStorageMem)
     assert(mm.storageMemoryUsed === 0L)
     assert(mm.acquireStorageMemory(dummyBlock, 10L, evictedBlocks))
     // `ensureFreeSpace` should be called with the number of bytes requested
-    assertEnsureFreeSpaceCalled(ms, dummyBlock, 10L)
+    assertEnsureFreeSpaceCalled(ms, 10L)
     assert(mm.storageMemoryUsed === 10L)
-    assert(evictedBlocks.isEmpty)
     assert(mm.acquireStorageMemory(dummyBlock, 100L, evictedBlocks))
-    assertEnsureFreeSpaceCalled(ms, dummyBlock, 100L)
+    assertEnsureFreeSpaceCalled(ms, 100L)
     assert(mm.storageMemoryUsed === 110L)
-    // Acquire up to the max, not granted
-    assert(!mm.acquireStorageMemory(dummyBlock, 1000L, evictedBlocks))
-    assertEnsureFreeSpaceCalled(ms, dummyBlock, 1000L)
+    // Acquire more than the max, not granted
+    assert(!mm.acquireStorageMemory(dummyBlock, maxStorageMem + 1L, 
evictedBlocks))
+    assertEnsureFreeSpaceCalled(ms, maxStorageMem + 1L)
     assert(mm.storageMemoryUsed === 110L)
-    assert(mm.acquireStorageMemory(dummyBlock, 890L, evictedBlocks))
-    assertEnsureFreeSpaceCalled(ms, dummyBlock, 890L)
+    // Acquire up to the max, requests after this are still granted due to LRU 
eviction
+    assert(mm.acquireStorageMemory(dummyBlock, maxStorageMem, evictedBlocks))
+    assertEnsureFreeSpaceCalled(ms, 1000L)
     assert(mm.storageMemoryUsed === 1000L)
-    assert(!mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks))
-    assertEnsureFreeSpaceCalled(ms, dummyBlock, 1L)
+    assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks))
+    assertEnsureFreeSpaceCalled(ms, 1L)
     assert(mm.storageMemoryUsed === 1000L)
     mm.releaseStorageMemory(800L)
     assert(mm.storageMemoryUsed === 200L)
     // Acquire after release
     assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks))
-    assertEnsureFreeSpaceCalled(ms, dummyBlock, 1L)
+    assertEnsureFreeSpaceCalled(ms, 1L)
     assert(mm.storageMemoryUsed === 201L)
-    mm.releaseStorageMemory()
+    mm.releaseAllStorageMemory()
     assert(mm.storageMemoryUsed === 0L)
     assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks))
-    assertEnsureFreeSpaceCalled(ms, dummyBlock, 1L)
+    assertEnsureFreeSpaceCalled(ms, 1L)
     assert(mm.storageMemoryUsed === 1L)
     // Release beyond what was acquired
     mm.releaseStorageMemory(100L)
@@ -95,18 +106,17 @@ class StaticMemoryManagerSuite extends SparkFunSuite {
     val maxExecutionMem = 200L
     val maxStorageMem = 1000L
     val dummyBlock = TestBlockId("ain't nobody love like you do")
-    val dummyBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
     val (mm, ms) = makeThings(maxExecutionMem, maxStorageMem)
     // Only execution memory should increase
-    assert(mm.acquireExecutionMemory(100L) === 100L)
+    assert(mm.acquireExecutionMemory(100L, evictedBlocks) === 100L)
     assert(mm.storageMemoryUsed === 0L)
     assert(mm.executionMemoryUsed === 100L)
-    assert(mm.acquireExecutionMemory(1000L) === 100L)
+    assert(mm.acquireExecutionMemory(1000L, evictedBlocks) === 100L)
     assert(mm.storageMemoryUsed === 0L)
     assert(mm.executionMemoryUsed === 200L)
     // Only storage memory should increase
-    assert(mm.acquireStorageMemory(dummyBlock, 50L, dummyBlocks))
-    assertEnsureFreeSpaceCalled(ms, dummyBlock, 50L)
+    assert(mm.acquireStorageMemory(dummyBlock, 50L, evictedBlocks))
+    assertEnsureFreeSpaceCalled(ms, 50L)
     assert(mm.storageMemoryUsed === 50L)
     assert(mm.executionMemoryUsed === 200L)
     // Only execution memory should be released
@@ -114,7 +124,7 @@ class StaticMemoryManagerSuite extends SparkFunSuite {
     assert(mm.storageMemoryUsed === 50L)
     assert(mm.executionMemoryUsed === 67L)
     // Only storage memory should be released
-    mm.releaseStorageMemory()
+    mm.releaseAllStorageMemory()
     assert(mm.storageMemoryUsed === 0L)
     assert(mm.executionMemoryUsed === 67L)
   }
@@ -122,51 +132,26 @@ class StaticMemoryManagerSuite extends SparkFunSuite {
   test("unroll memory") {
     val maxStorageMem = 1000L
     val dummyBlock = TestBlockId("lonely water")
-    val dummyBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
     val (mm, ms) = makeThings(Long.MaxValue, maxStorageMem)
-    assert(mm.acquireUnrollMemory(dummyBlock, 100L, dummyBlocks))
-    assertEnsureFreeSpaceCalled(ms, dummyBlock, 100L)
+    assert(mm.acquireUnrollMemory(dummyBlock, 100L, evictedBlocks))
+    assertEnsureFreeSpaceCalled(ms, 100L)
     assert(mm.storageMemoryUsed === 100L)
     mm.releaseUnrollMemory(40L)
     assert(mm.storageMemoryUsed === 60L)
     when(ms.currentUnrollMemory).thenReturn(60L)
-    assert(mm.acquireUnrollMemory(dummyBlock, 500L, dummyBlocks))
+    assert(mm.acquireUnrollMemory(dummyBlock, 500L, evictedBlocks))
     // `spark.storage.unrollFraction` is 0.4, so the max unroll space is 400 
bytes.
     // Since we already occupy 60 bytes, we will try to ensure only 400 - 60 = 
340 bytes.
-    assertEnsureFreeSpaceCalled(ms, dummyBlock, 340L)
+    assertEnsureFreeSpaceCalled(ms, 340L)
     assert(mm.storageMemoryUsed === 560L)
     when(ms.currentUnrollMemory).thenReturn(560L)
-    assert(!mm.acquireUnrollMemory(dummyBlock, 800L, dummyBlocks))
+    assert(!mm.acquireUnrollMemory(dummyBlock, 800L, evictedBlocks))
     assert(mm.storageMemoryUsed === 560L)
     // We already have 560 bytes > the max unroll space of 400 bytes, so no 
bytes are freed
-    assertEnsureFreeSpaceCalled(ms, dummyBlock, 0L)
+    assertEnsureFreeSpaceCalled(ms, 0L)
     // Release beyond what was acquired
     mm.releaseUnrollMemory(maxStorageMem)
     assert(mm.storageMemoryUsed === 0L)
   }
 
-  /**
-   * Make a [[StaticMemoryManager]] and a [[MemoryStore]] with limited class 
dependencies.
-   */
-  private def makeThings(
-      maxExecutionMem: Long,
-      maxStorageMem: Long): (StaticMemoryManager, MemoryStore) = {
-    val mm = new StaticMemoryManager(
-      conf, maxExecutionMemory = maxExecutionMem, maxStorageMemory = 
maxStorageMem)
-    val ms = mock(classOf[MemoryStore])
-    mm.setMemoryStore(ms)
-    (mm, ms)
-  }
-
-  /**
-   * Assert that [[MemoryStore.ensureFreeSpace]] is called with the given 
parameters.
-   */
-  private def assertEnsureFreeSpaceCalled(
-      ms: MemoryStore,
-      blockId: BlockId,
-      numBytes: Long): Unit = {
-    verify(ms).ensureFreeSpace(meq(blockId), meq(numBytes: java.lang.Long), 
any())
-    reset(ms)
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/b3ffac51/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
new file mode 100644
index 0000000..e7baa50
--- /dev/null
+++ 
b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.memory
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.scalatest.PrivateMethodTester
+
+import org.apache.spark.SparkConf
+import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore, 
TestBlockId}
+
+
+class UnifiedMemoryManagerSuite extends MemoryManagerSuite with 
PrivateMethodTester {
+  private val conf = new SparkConf().set("spark.memory.storageFraction", "0.5")
+  private val dummyBlock = TestBlockId("--")
+  private val evictedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
+
+  /**
+   * Make a [[UnifiedMemoryManager]] and a [[MemoryStore]] with limited class 
dependencies.
+   */
+  private def makeThings(maxMemory: Long): (UnifiedMemoryManager, MemoryStore) 
= {
+    val mm = new UnifiedMemoryManager(conf, maxMemory)
+    val ms = makeMemoryStore(mm)
+    (mm, ms)
+  }
+
+  private def getStorageRegionSize(mm: UnifiedMemoryManager): Long = {
+    mm invokePrivate PrivateMethod[Long]('storageRegionSize)()
+  }
+
+  test("storage region size") {
+    val maxMemory = 1000L
+    val (mm, _) = makeThings(maxMemory)
+    val storageFraction = conf.get("spark.memory.storageFraction").toDouble
+    val expectedStorageRegionSize = maxMemory * storageFraction
+    val actualStorageRegionSize = getStorageRegionSize(mm)
+    assert(expectedStorageRegionSize === actualStorageRegionSize)
+  }
+
+  test("basic execution memory") {
+    val maxMemory = 1000L
+    val (mm, _) = makeThings(maxMemory)
+    assert(mm.executionMemoryUsed === 0L)
+    assert(mm.acquireExecutionMemory(10L, evictedBlocks) === 10L)
+    assert(mm.executionMemoryUsed === 10L)
+    assert(mm.acquireExecutionMemory(100L, evictedBlocks) === 100L)
+    // Acquire up to the max
+    assert(mm.acquireExecutionMemory(1000L, evictedBlocks) === 890L)
+    assert(mm.executionMemoryUsed === maxMemory)
+    assert(mm.acquireExecutionMemory(1L, evictedBlocks) === 0L)
+    assert(mm.executionMemoryUsed === maxMemory)
+    mm.releaseExecutionMemory(800L)
+    assert(mm.executionMemoryUsed === 200L)
+    // Acquire after release
+    assert(mm.acquireExecutionMemory(1L, evictedBlocks) === 1L)
+    assert(mm.executionMemoryUsed === 201L)
+    // Release beyond what was acquired
+    mm.releaseExecutionMemory(maxMemory)
+    assert(mm.executionMemoryUsed === 0L)
+  }
+
+  test("basic storage memory") {
+    val maxMemory = 1000L
+    val (mm, ms) = makeThings(maxMemory)
+    assert(mm.storageMemoryUsed === 0L)
+    assert(mm.acquireStorageMemory(dummyBlock, 10L, evictedBlocks))
+    // `ensureFreeSpace` should be called with the number of bytes requested
+    assertEnsureFreeSpaceCalled(ms, 10L)
+    assert(mm.storageMemoryUsed === 10L)
+    assert(mm.acquireStorageMemory(dummyBlock, 100L, evictedBlocks))
+    assertEnsureFreeSpaceCalled(ms, 100L)
+    assert(mm.storageMemoryUsed === 110L)
+    // Acquire more than the max, not granted
+    assert(!mm.acquireStorageMemory(dummyBlock, maxMemory + 1L, evictedBlocks))
+    assertEnsureFreeSpaceCalled(ms, maxMemory + 1L)
+    assert(mm.storageMemoryUsed === 110L)
+    // Acquire up to the max, requests after this are still granted due to LRU 
eviction
+    assert(mm.acquireStorageMemory(dummyBlock, maxMemory, evictedBlocks))
+    assertEnsureFreeSpaceCalled(ms, 1000L)
+    assert(mm.storageMemoryUsed === 1000L)
+    assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks))
+    assertEnsureFreeSpaceCalled(ms, 1L)
+    assert(mm.storageMemoryUsed === 1000L)
+    mm.releaseStorageMemory(800L)
+    assert(mm.storageMemoryUsed === 200L)
+    // Acquire after release
+    assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks))
+    assertEnsureFreeSpaceCalled(ms, 1L)
+    assert(mm.storageMemoryUsed === 201L)
+    mm.releaseAllStorageMemory()
+    assert(mm.storageMemoryUsed === 0L)
+    assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks))
+    assertEnsureFreeSpaceCalled(ms, 1L)
+    assert(mm.storageMemoryUsed === 1L)
+    // Release beyond what was acquired
+    mm.releaseStorageMemory(100L)
+    assert(mm.storageMemoryUsed === 0L)
+  }
+
+  test("execution evicts storage") {
+    val maxMemory = 1000L
+    val (mm, ms) = makeThings(maxMemory)
+    // First, ensure the test classes are set up as expected
+    val expectedStorageRegionSize = 500L
+    val expectedExecutionRegionSize = 500L
+    val storageRegionSize = getStorageRegionSize(mm)
+    val executionRegionSize = maxMemory - expectedStorageRegionSize
+    require(storageRegionSize === expectedStorageRegionSize,
+      "bad test: storage region size is unexpected")
+    require(executionRegionSize === expectedExecutionRegionSize,
+      "bad test: storage region size is unexpected")
+    // Acquire enough storage memory to exceed the storage region
+    assert(mm.acquireStorageMemory(dummyBlock, 750L, evictedBlocks))
+    assertEnsureFreeSpaceCalled(ms, 750L)
+    assert(mm.executionMemoryUsed === 0L)
+    assert(mm.storageMemoryUsed === 750L)
+    require(mm.storageMemoryUsed > storageRegionSize,
+      s"bad test: storage memory used should exceed the storage region")
+    // Execution needs to request 250 bytes to evict storage memory
+    assert(mm.acquireExecutionMemory(100L, evictedBlocks) === 100L)
+    assert(mm.executionMemoryUsed === 100L)
+    assert(mm.storageMemoryUsed === 750L)
+    assertEnsureFreeSpaceNotCalled(ms)
+    // Execution wants 200 bytes but only 150 are free, so storage is evicted
+    assert(mm.acquireExecutionMemory(200L, evictedBlocks) === 200L)
+    assertEnsureFreeSpaceCalled(ms, 200L)
+    assert(mm.executionMemoryUsed === 300L)
+    mm.releaseAllStorageMemory()
+    require(mm.executionMemoryUsed < executionRegionSize,
+      s"bad test: execution memory used should be within the execution region")
+    require(mm.storageMemoryUsed === 0, "bad test: all storage memory should 
have been released")
+    // Acquire some storage memory again, but this time keep it within the 
storage region
+    assert(mm.acquireStorageMemory(dummyBlock, 400L, evictedBlocks))
+    assertEnsureFreeSpaceCalled(ms, 400L)
+    require(mm.storageMemoryUsed < storageRegionSize,
+      s"bad test: storage memory used should be within the storage region")
+    // Execution cannot evict storage because the latter is within the storage 
fraction,
+    // so grant only what's remaining without evicting anything, i.e. 1000 - 
300 - 400 = 300
+    assert(mm.acquireExecutionMemory(400L, evictedBlocks) === 300L)
+    assert(mm.executionMemoryUsed === 600L)
+    assert(mm.storageMemoryUsed === 400L)
+    assertEnsureFreeSpaceNotCalled(ms)
+  }
+
+  test("storage does not evict execution") {
+    val maxMemory = 1000L
+    val (mm, ms) = makeThings(maxMemory)
+    // First, ensure the test classes are set up as expected
+    val expectedStorageRegionSize = 500L
+    val expectedExecutionRegionSize = 500L
+    val storageRegionSize = getStorageRegionSize(mm)
+    val executionRegionSize = maxMemory - expectedStorageRegionSize
+    require(storageRegionSize === expectedStorageRegionSize,
+      "bad test: storage region size is unexpected")
+    require(executionRegionSize === expectedExecutionRegionSize,
+      "bad test: storage region size is unexpected")
+    // Acquire enough execution memory to exceed the execution region
+    assert(mm.acquireExecutionMemory(800L, evictedBlocks) === 800L)
+    assert(mm.executionMemoryUsed === 800L)
+    assert(mm.storageMemoryUsed === 0L)
+    assertEnsureFreeSpaceNotCalled(ms)
+    require(mm.executionMemoryUsed > executionRegionSize,
+      s"bad test: execution memory used should exceed the execution region")
+    // Storage should not be able to evict execution
+    assert(mm.acquireStorageMemory(dummyBlock, 100L, evictedBlocks))
+    assert(mm.executionMemoryUsed === 800L)
+    assert(mm.storageMemoryUsed === 100L)
+    assertEnsureFreeSpaceCalled(ms, 100L)
+    assert(!mm.acquireStorageMemory(dummyBlock, 250L, evictedBlocks))
+    assert(mm.executionMemoryUsed === 800L)
+    assert(mm.storageMemoryUsed === 100L)
+    assertEnsureFreeSpaceCalled(ms, 250L)
+    mm.releaseExecutionMemory(maxMemory)
+    mm.releaseStorageMemory(maxMemory)
+    // Acquire some execution memory again, but this time keep it within the 
execution region
+    assert(mm.acquireExecutionMemory(200L, evictedBlocks) === 200L)
+    assert(mm.executionMemoryUsed === 200L)
+    assert(mm.storageMemoryUsed === 0L)
+    assertEnsureFreeSpaceNotCalled(ms)
+    require(mm.executionMemoryUsed < executionRegionSize,
+      s"bad test: execution memory used should be within the execution region")
+    // Storage should still not be able to evict execution
+    assert(mm.acquireStorageMemory(dummyBlock, 750L, evictedBlocks))
+    assert(mm.executionMemoryUsed === 200L)
+    assert(mm.storageMemoryUsed === 750L)
+    assertEnsureFreeSpaceCalled(ms, 750L)
+    assert(!mm.acquireStorageMemory(dummyBlock, 850L, evictedBlocks))
+    assert(mm.executionMemoryUsed === 200L)
+    assert(mm.storageMemoryUsed === 750L)
+    assertEnsureFreeSpaceCalled(ms, 850L)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/b3ffac51/core/src/test/scala/org/apache/spark/shuffle/ShuffleMemoryManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/shuffle/ShuffleMemoryManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/shuffle/ShuffleMemoryManagerSuite.scala
index 6d45b1a..5877aa0 100644
--- 
a/core/src/test/scala/org/apache/spark/shuffle/ShuffleMemoryManagerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/shuffle/ShuffleMemoryManagerSuite.scala
@@ -24,7 +24,8 @@ import org.mockito.Mockito._
 import org.scalatest.concurrent.Timeouts
 import org.scalatest.time.SpanSugar._
 
-import org.apache.spark.{SparkConf, SparkFunSuite, TaskContext}
+import org.apache.spark.{SparkFunSuite, TaskContext}
+import org.apache.spark.executor.TaskMetrics
 
 class ShuffleMemoryManagerSuite extends SparkFunSuite with Timeouts {
 
@@ -37,7 +38,9 @@ class ShuffleMemoryManagerSuite extends SparkFunSuite with 
Timeouts {
         try {
           val taskAttemptId = nextTaskAttemptId.getAndIncrement
           val mockTaskContext = mock(classOf[TaskContext], RETURNS_SMART_NULLS)
+          val taskMetrics = new TaskMetrics
           when(mockTaskContext.taskAttemptId()).thenReturn(taskAttemptId)
+          when(mockTaskContext.taskMetrics()).thenReturn(taskMetrics)
           TaskContext.setTaskContext(mockTaskContext)
           body
         } finally {

http://git-wip-us.apache.org/repos/asf/spark/blob/b3ffac51/core/src/test/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleSuite.scala 
b/core/src/test/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleSuite.scala
index 6351539..259020a 100644
--- 
a/core/src/test/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleSuite.scala
@@ -36,9 +36,6 @@ class UnsafeShuffleSuite extends ShuffleSuite with 
BeforeAndAfterAll {
 
   override def beforeAll() {
     conf.set("spark.shuffle.manager", "tungsten-sort")
-    // UnsafeShuffleManager requires at least 128 MB of memory per task in 
order to be able to sort
-    // shuffle records.
-    conf.set("spark.shuffle.memoryFraction", "0.5")
   }
 
   test("UnsafeShuffleManager properly cleans up files for shuffles that use 
the new shuffle path") {

http://git-wip-us.apache.org/repos/asf/spark/blob/b3ffac51/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
 
b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
index 12e9baf..0a03c32 100644
--- 
a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
@@ -22,6 +22,8 @@ import scala.collection.mutable.ArrayBuffer
 import org.apache.spark._
 import org.apache.spark.io.CompressionCodec
 
+// TODO: some of these spilling tests probably aren't actually spilling 
(SPARK-11078)
+
 class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext {
   private val allCompressionCodecs = CompressionCodec.ALL_COMPRESSION_CODECS
   private def createCombiner[T](i: T) = ArrayBuffer[T](i)
@@ -243,7 +245,6 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with 
LocalSparkContext {
    */
   private def testSimpleSpilling(codec: Option[String] = None): Unit = {
     val conf = createSparkConf(loadDefaults = true, codec)  // Load defaults 
for Spark home
-    conf.set("spark.shuffle.memoryFraction", "0.001")
     sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
 
     // reduceByKey - should spill ~8 times
@@ -291,7 +292,6 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with 
LocalSparkContext {
 
   test("spilling with hash collisions") {
     val conf = createSparkConf(loadDefaults = true)
-    conf.set("spark.shuffle.memoryFraction", "0.001")
     sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
     val map = createExternalMap[String]
 
@@ -340,7 +340,6 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with 
LocalSparkContext {
 
   test("spilling with many hash collisions") {
     val conf = createSparkConf(loadDefaults = true)
-    conf.set("spark.shuffle.memoryFraction", "0.0001")
     sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
     val map = new ExternalAppendOnlyMap[FixedHashObject, Int, Int](_ => 1, _ + 
_, _ + _)
 
@@ -365,7 +364,6 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with 
LocalSparkContext {
 
   test("spilling with hash collisions using the Int.MaxValue key") {
     val conf = createSparkConf(loadDefaults = true)
-    conf.set("spark.shuffle.memoryFraction", "0.001")
     sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
     val map = createExternalMap[Int]
 
@@ -382,7 +380,6 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with 
LocalSparkContext {
 
   test("spilling with null keys and values") {
     val conf = createSparkConf(loadDefaults = true)
-    conf.set("spark.shuffle.memoryFraction", "0.001")
     sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
     val map = createExternalMap[Int]
 
@@ -401,8 +398,8 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with 
LocalSparkContext {
 
   test("external aggregation updates peak execution memory") {
     val conf = createSparkConf(loadDefaults = false)
-      .set("spark.shuffle.memoryFraction", "0.001")
       .set("spark.shuffle.manager", "hash") // make sure we're not also using 
ExternalSorter
+      .set("spark.testing.memory", (10 * 1024 * 1024).toString)
     sc = new SparkContext("local", "test", conf)
     // No spilling
     AccumulatorSuite.verifyPeakExecutionMemorySet(sc, "external map without 
spilling") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to