Repository: spark Updated Branches: refs/heads/master bd94ea4c8 -> 20c0bcd97
[SPARK-14135] Add off-heap storage memory bookkeeping support to MemoryManager This patch extends Spark's `UnifiedMemoryManager` to add bookkeeping support for off-heap storage memory, an requirement for enabling off-heap caching (which will be done by #11805). The `MemoryManager`'s `storageMemoryPool` has been split into separate on- and off-heap pools and the storage and unroll memory allocation methods have been updated to accept a `memoryMode` parameter to specify whether allocations should be performed on- or off-heap. In order to reduce the testing surface, the `StaticMemoryManager` does not support off-heap caching (we plan to eventually remove the `StaticMemoryManager`, so this isn't a significant limitation). Author: Josh Rosen <joshro...@databricks.com> Closes #11942 from JoshRosen/off-heap-storage-memory-bookkeeping. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/20c0bcd9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/20c0bcd9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/20c0bcd9 Branch: refs/heads/master Commit: 20c0bcd972cfbb2f2aa92948f9ee337724a70361 Parents: bd94ea4 Author: Josh Rosen <joshro...@databricks.com> Authored: Sat Mar 26 11:03:25 2016 -0700 Committer: Reynold Xin <r...@databricks.com> Committed: Sat Mar 26 11:03:25 2016 -0700 ---------------------------------------------------------------------- .../spark/memory/ExecutionMemoryPool.scala | 9 +- .../org/apache/spark/memory/MemoryManager.scala | 46 +++-- .../spark/memory/StaticMemoryManager.scala | 36 ++-- .../apache/spark/memory/StorageMemoryPool.scala | 18 +- .../spark/memory/UnifiedMemoryManager.scala | 168 +++++++++++-------- .../org/apache/spark/storage/BlockManager.scala | 4 +- .../org/apache/spark/storage/StorageLevel.scala | 6 + .../spark/storage/memory/MemoryStore.scala | 19 ++- .../spark/memory/MemoryManagerSuite.scala | 2 +- .../spark/memory/StaticMemoryManagerSuite.scala | 60 +++---- .../apache/spark/memory/TestMemoryManager.scala | 14 +- .../memory/UnifiedMemoryManagerSuite.scala | 80 +++++---- .../spark/storage/BlockManagerSuite.scala | 4 +- 13 files changed, 280 insertions(+), 186 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/20c0bcd9/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala b/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala index 319718e..f816707 100644 --- a/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala +++ b/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala @@ -37,13 +37,18 @@ import org.apache.spark.internal.Logging * tasks was performed by the ShuffleMemoryManager. * * @param lock a [[MemoryManager]] instance to synchronize on - * @param poolName a human-readable name for this pool, for use in log messages + * @param memoryMode the type of memory tracked by this pool (on- or off-heap) */ private[memory] class ExecutionMemoryPool( lock: Object, - poolName: String + memoryMode: MemoryMode ) extends MemoryPool(lock) with Logging { + private[this] val poolName: String = memoryMode match { + case MemoryMode.ON_HEAP => "on-heap execution" + case MemoryMode.OFF_HEAP => "off-heap execution" + } + /** * Map from taskAttemptId -> memory consumption in bytes */ http://git-wip-us.apache.org/repos/asf/spark/blob/20c0bcd9/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala index 5e8abee..10656bc 100644 --- a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala @@ -36,42 +36,52 @@ import org.apache.spark.unsafe.memory.MemoryAllocator private[spark] abstract class MemoryManager( conf: SparkConf, numCores: Int, - storageMemory: Long, + onHeapStorageMemory: Long, onHeapExecutionMemory: Long) extends Logging { // -- Methods related to memory allocation policies and bookkeeping ------------------------------ @GuardedBy("this") - protected val storageMemoryPool = new StorageMemoryPool(this) + protected val onHeapStorageMemoryPool = new StorageMemoryPool(this, MemoryMode.ON_HEAP) @GuardedBy("this") - protected val onHeapExecutionMemoryPool = new ExecutionMemoryPool(this, "on-heap execution") + protected val offHeapStorageMemoryPool = new StorageMemoryPool(this, MemoryMode.OFF_HEAP) @GuardedBy("this") - protected val offHeapExecutionMemoryPool = new ExecutionMemoryPool(this, "off-heap execution") + protected val onHeapExecutionMemoryPool = new ExecutionMemoryPool(this, MemoryMode.ON_HEAP) + @GuardedBy("this") + protected val offHeapExecutionMemoryPool = new ExecutionMemoryPool(this, MemoryMode.OFF_HEAP) - storageMemoryPool.incrementPoolSize(storageMemory) + onHeapStorageMemoryPool.incrementPoolSize(onHeapStorageMemory) onHeapExecutionMemoryPool.incrementPoolSize(onHeapExecutionMemory) - offHeapExecutionMemoryPool.incrementPoolSize(conf.getSizeAsBytes("spark.memory.offHeap.size", 0)) + + protected[this] val maxOffHeapMemory = conf.getSizeAsBytes("spark.memory.offHeap.size", 0) + protected[this] val offHeapStorageMemory = + (maxOffHeapMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong + + offHeapExecutionMemoryPool.incrementPoolSize(maxOffHeapMemory - offHeapStorageMemory) + offHeapStorageMemoryPool.incrementPoolSize(offHeapStorageMemory) /** * Total available memory for storage, in bytes. This amount can vary over time, depending on * the MemoryManager implementation. * In this model, this is equivalent to the amount of memory not occupied by execution. */ - def maxStorageMemory: Long + def maxOnHeapStorageMemory: Long /** * Set the [[MemoryStore]] used by this manager to evict cached blocks. * This must be set after construction due to initialization ordering constraints. */ final def setMemoryStore(store: MemoryStore): Unit = synchronized { - storageMemoryPool.setMemoryStore(store) + onHeapStorageMemoryPool.setMemoryStore(store) + offHeapStorageMemoryPool.setMemoryStore(store) } /** * Acquire N bytes of memory to cache the given block, evicting existing ones if necessary. + * * @return whether all N bytes were successfully granted. */ - def acquireStorageMemory(blockId: BlockId, numBytes: Long): Boolean + def acquireStorageMemory(blockId: BlockId, numBytes: Long, memoryMode: MemoryMode): Boolean /** * Acquire N bytes of memory to unroll the given block, evicting existing ones if necessary. @@ -82,7 +92,7 @@ private[spark] abstract class MemoryManager( * * @return whether all N bytes were successfully granted. */ - def acquireUnrollMemory(blockId: BlockId, numBytes: Long): Boolean + def acquireUnrollMemory(blockId: BlockId, numBytes: Long, memoryMode: MemoryMode): Boolean /** * Try to acquire up to `numBytes` of execution memory for the current task and return the @@ -126,22 +136,26 @@ private[spark] abstract class MemoryManager( /** * Release N bytes of storage memory. */ - def releaseStorageMemory(numBytes: Long): Unit = synchronized { - storageMemoryPool.releaseMemory(numBytes) + def releaseStorageMemory(numBytes: Long, memoryMode: MemoryMode): Unit = synchronized { + memoryMode match { + case MemoryMode.ON_HEAP => onHeapStorageMemoryPool.releaseMemory(numBytes) + case MemoryMode.OFF_HEAP => offHeapStorageMemoryPool.releaseMemory(numBytes) + } } /** * Release all storage memory acquired. */ final def releaseAllStorageMemory(): Unit = synchronized { - storageMemoryPool.releaseAllMemory() + onHeapStorageMemoryPool.releaseAllMemory() + offHeapStorageMemoryPool.releaseAllMemory() } /** * Release N bytes of unroll memory. */ - final def releaseUnrollMemory(numBytes: Long): Unit = synchronized { - releaseStorageMemory(numBytes) + final def releaseUnrollMemory(numBytes: Long, memoryMode: MemoryMode): Unit = synchronized { + releaseStorageMemory(numBytes, memoryMode) } /** @@ -155,7 +169,7 @@ private[spark] abstract class MemoryManager( * Storage memory currently in use, in bytes. */ final def storageMemoryUsed: Long = synchronized { - storageMemoryPool.memoryUsed + onHeapStorageMemoryPool.memoryUsed + offHeapStorageMemoryPool.memoryUsed } /** http://git-wip-us.apache.org/repos/asf/spark/blob/20c0bcd9/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala index f9f8f82..cbd0fa9 100644 --- a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala @@ -30,12 +30,12 @@ import org.apache.spark.storage.BlockId private[spark] class StaticMemoryManager( conf: SparkConf, maxOnHeapExecutionMemory: Long, - override val maxStorageMemory: Long, + override val maxOnHeapStorageMemory: Long, numCores: Int) extends MemoryManager( conf, numCores, - maxStorageMemory, + maxOnHeapStorageMemory, maxOnHeapExecutionMemory) { def this(conf: SparkConf, numCores: Int) { @@ -46,25 +46,39 @@ private[spark] class StaticMemoryManager( numCores) } + // The StaticMemoryManager does not support off-heap storage memory: + offHeapExecutionMemoryPool.incrementPoolSize(offHeapStorageMemoryPool.poolSize) + offHeapStorageMemoryPool.decrementPoolSize(offHeapStorageMemoryPool.poolSize) + // Max number of bytes worth of blocks to evict when unrolling private val maxUnrollMemory: Long = { - (maxStorageMemory * conf.getDouble("spark.storage.unrollFraction", 0.2)).toLong + (maxOnHeapStorageMemory * conf.getDouble("spark.storage.unrollFraction", 0.2)).toLong } - override def acquireStorageMemory(blockId: BlockId, numBytes: Long): Boolean = synchronized { - if (numBytes > maxStorageMemory) { + override def acquireStorageMemory( + blockId: BlockId, + numBytes: Long, + memoryMode: MemoryMode): Boolean = synchronized { + require(memoryMode != MemoryMode.OFF_HEAP, + "StaticMemoryManager does not support off-heap storage memory") + if (numBytes > maxOnHeapStorageMemory) { // Fail fast if the block simply won't fit logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " + - s"memory limit ($maxStorageMemory bytes)") + s"memory limit ($maxOnHeapStorageMemory bytes)") false } else { - storageMemoryPool.acquireMemory(blockId, numBytes) + onHeapStorageMemoryPool.acquireMemory(blockId, numBytes) } } - override def acquireUnrollMemory(blockId: BlockId, numBytes: Long): Boolean = synchronized { - val currentUnrollMemory = storageMemoryPool.memoryStore.currentUnrollMemory - val freeMemory = storageMemoryPool.memoryFree + override def acquireUnrollMemory( + blockId: BlockId, + numBytes: Long, + memoryMode: MemoryMode): Boolean = synchronized { + require(memoryMode != MemoryMode.OFF_HEAP, + "StaticMemoryManager does not support off-heap unroll memory") + val currentUnrollMemory = onHeapStorageMemoryPool.memoryStore.currentUnrollMemory + val freeMemory = onHeapStorageMemoryPool.memoryFree // When unrolling, we will use all of the existing free memory, and, if necessary, // some extra space freed from evicting cached blocks. We must place a cap on the // amount of memory to be evicted by unrolling, however, otherwise unrolling one @@ -72,7 +86,7 @@ private[spark] class StaticMemoryManager( val maxNumBytesToFree = math.max(0, maxUnrollMemory - currentUnrollMemory - freeMemory) // Keep it within the range 0 <= X <= maxNumBytesToFree val numBytesToFree = math.max(0, math.min(maxNumBytesToFree, numBytes - freeMemory)) - storageMemoryPool.acquireMemory(blockId, numBytes, numBytesToFree) + onHeapStorageMemoryPool.acquireMemory(blockId, numBytes, numBytesToFree) } private[memory] http://git-wip-us.apache.org/repos/asf/spark/blob/20c0bcd9/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala b/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala index 6fcf26e..a67e8da 100644 --- a/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala +++ b/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala @@ -28,8 +28,12 @@ import org.apache.spark.storage.memory.MemoryStore * (caching). * * @param lock a [[MemoryManager]] instance to synchronize on + * @param memoryMode the type of memory tracked by this pool (on- or off-heap) */ -private[memory] class StorageMemoryPool(lock: Object) extends MemoryPool(lock) with Logging { +private[memory] class StorageMemoryPool( + lock: Object, + memoryMode: MemoryMode + ) extends MemoryPool(lock) with Logging { @GuardedBy("lock") private[this] var _memoryUsed: Long = 0L @@ -79,7 +83,8 @@ private[memory] class StorageMemoryPool(lock: Object) extends MemoryPool(lock) w assert(numBytesToAcquire >= 0) assert(numBytesToFree >= 0) assert(memoryUsed <= poolSize) - if (numBytesToFree > 0) { + // Once we support off-heap caching, this will need to change: + if (numBytesToFree > 0 && memoryMode == MemoryMode.ON_HEAP) { memoryStore.evictBlocksToFreeSpace(Some(blockId), numBytesToFree) } // NOTE: If the memory store evicts blocks, then those evictions will synchronously call @@ -117,7 +122,14 @@ private[memory] class StorageMemoryPool(lock: Object) extends MemoryPool(lock) w val remainingSpaceToFree = spaceToFree - spaceFreedByReleasingUnusedMemory if (remainingSpaceToFree > 0) { // If reclaiming free memory did not adequately shrink the pool, begin evicting blocks: - val spaceFreedByEviction = memoryStore.evictBlocksToFreeSpace(None, remainingSpaceToFree) + val spaceFreedByEviction = { + // Once we support off-heap caching, this will need to change: + if (memoryMode == MemoryMode.ON_HEAP) { + memoryStore.evictBlocksToFreeSpace(None, remainingSpaceToFree) + } else { + 0 + } + } // When a block is released, BlockManager.dropFromMemory() calls releaseMemory(), so we do // not need to decrement _memoryUsed here. However, we do need to decrement the pool size. decrementPoolSize(spaceFreedByEviction) http://git-wip-us.apache.org/repos/asf/spark/blob/20c0bcd9/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala index 6c57c98..fa9c021 100644 --- a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala @@ -39,27 +39,32 @@ import org.apache.spark.storage.BlockId * up most of the storage space, in which case the new blocks will be evicted immediately * according to their respective storage levels. * - * @param storageRegionSize Size of the storage region, in bytes. + * @param onHeapStorageRegionSize Size of the storage region, in bytes. * This region is not statically reserved; execution can borrow from * it if necessary. Cached blocks can be evicted only if actual * storage memory usage exceeds this region. */ private[spark] class UnifiedMemoryManager private[memory] ( conf: SparkConf, - val maxMemory: Long, - storageRegionSize: Long, + val maxHeapMemory: Long, + onHeapStorageRegionSize: Long, numCores: Int) extends MemoryManager( conf, numCores, - storageRegionSize, - maxMemory - storageRegionSize) { + onHeapStorageRegionSize, + maxHeapMemory - onHeapStorageRegionSize) { - // We always maintain this invariant: - assert(onHeapExecutionMemoryPool.poolSize + storageMemoryPool.poolSize == maxMemory) + private def assertInvariants(): Unit = { + assert(onHeapExecutionMemoryPool.poolSize + onHeapStorageMemoryPool.poolSize == maxHeapMemory) + assert( + offHeapExecutionMemoryPool.poolSize + offHeapStorageMemoryPool.poolSize == maxOffHeapMemory) + } + + assertInvariants() - override def maxStorageMemory: Long = synchronized { - maxMemory - onHeapExecutionMemoryPool.memoryUsed + override def maxOnHeapStorageMemory: Long = synchronized { + maxHeapMemory - onHeapExecutionMemoryPool.memoryUsed } /** @@ -75,83 +80,104 @@ private[spark] class UnifiedMemoryManager private[memory] ( numBytes: Long, taskAttemptId: Long, memoryMode: MemoryMode): Long = synchronized { - assert(onHeapExecutionMemoryPool.poolSize + storageMemoryPool.poolSize == maxMemory) + assertInvariants() assert(numBytes >= 0) - memoryMode match { - case MemoryMode.ON_HEAP => - - /** - * Grow the execution pool by evicting cached blocks, thereby shrinking the storage pool. - * - * When acquiring memory for a task, the execution pool may need to make multiple - * attempts. Each attempt must be able to evict storage in case another task jumps in - * and caches a large block between the attempts. This is called once per attempt. - */ - def maybeGrowExecutionPool(extraMemoryNeeded: Long): Unit = { - if (extraMemoryNeeded > 0) { - // There is not enough free memory in the execution pool, so try to reclaim memory from - // storage. We can reclaim any free memory from the storage pool. If the storage pool - // has grown to become larger than `storageRegionSize`, we can evict blocks and reclaim - // the memory that storage has borrowed from execution. - val memoryReclaimableFromStorage = - math.max(storageMemoryPool.memoryFree, storageMemoryPool.poolSize - storageRegionSize) - if (memoryReclaimableFromStorage > 0) { - // Only reclaim as much space as is necessary and available: - val spaceReclaimed = storageMemoryPool.shrinkPoolToFreeSpace( - math.min(extraMemoryNeeded, memoryReclaimableFromStorage)) - onHeapExecutionMemoryPool.incrementPoolSize(spaceReclaimed) - } - } - } + val (executionPool, storagePool, storageRegionSize, maxMemory) = memoryMode match { + case MemoryMode.ON_HEAP => ( + onHeapExecutionMemoryPool, + onHeapStorageMemoryPool, + onHeapStorageRegionSize, + maxHeapMemory) + case MemoryMode.OFF_HEAP => ( + offHeapExecutionMemoryPool, + offHeapStorageMemoryPool, + offHeapStorageMemory, + maxOffHeapMemory) + } - /** - * The size the execution pool would have after evicting storage memory. - * - * The execution memory pool divides this quantity among the active tasks evenly to cap - * the execution memory allocation for each task. It is important to keep this greater - * than the execution pool size, which doesn't take into account potential memory that - * could be freed by evicting storage. Otherwise we may hit SPARK-12155. - * - * Additionally, this quantity should be kept below `maxMemory` to arbitrate fairness - * in execution memory allocation across tasks, Otherwise, a task may occupy more than - * its fair share of execution memory, mistakenly thinking that other tasks can acquire - * the portion of storage memory that cannot be evicted. - */ - def computeMaxExecutionPoolSize(): Long = { - maxMemory - math.min(storageMemoryUsed, storageRegionSize) + /** + * Grow the execution pool by evicting cached blocks, thereby shrinking the storage pool. + * + * When acquiring memory for a task, the execution pool may need to make multiple + * attempts. Each attempt must be able to evict storage in case another task jumps in + * and caches a large block between the attempts. This is called once per attempt. + */ + def maybeGrowExecutionPool(extraMemoryNeeded: Long): Unit = { + if (extraMemoryNeeded > 0) { + // There is not enough free memory in the execution pool, so try to reclaim memory from + // storage. We can reclaim any free memory from the storage pool. If the storage pool + // has grown to become larger than `storageRegionSize`, we can evict blocks and reclaim + // the memory that storage has borrowed from execution. + val memoryReclaimableFromStorage = math.max( + storagePool.memoryFree, + storagePool.poolSize - storageRegionSize) + if (memoryReclaimableFromStorage > 0) { + // Only reclaim as much space as is necessary and available: + val spaceReclaimed = storagePool.shrinkPoolToFreeSpace( + math.min(extraMemoryNeeded, memoryReclaimableFromStorage)) + executionPool.incrementPoolSize(spaceReclaimed) } + } + } - onHeapExecutionMemoryPool.acquireMemory( - numBytes, taskAttemptId, maybeGrowExecutionPool, computeMaxExecutionPoolSize) - - case MemoryMode.OFF_HEAP => - // For now, we only support on-heap caching of data, so we do not need to interact with - // the storage pool when allocating off-heap memory. This will change in the future, though. - offHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId) + /** + * The size the execution pool would have after evicting storage memory. + * + * The execution memory pool divides this quantity among the active tasks evenly to cap + * the execution memory allocation for each task. It is important to keep this greater + * than the execution pool size, which doesn't take into account potential memory that + * could be freed by evicting storage. Otherwise we may hit SPARK-12155. + * + * Additionally, this quantity should be kept below `maxMemory` to arbitrate fairness + * in execution memory allocation across tasks, Otherwise, a task may occupy more than + * its fair share of execution memory, mistakenly thinking that other tasks can acquire + * the portion of storage memory that cannot be evicted. + */ + def computeMaxExecutionPoolSize(): Long = { + maxMemory - math.min(storagePool.memoryUsed, storageRegionSize) } + + executionPool.acquireMemory( + numBytes, taskAttemptId, maybeGrowExecutionPool, computeMaxExecutionPoolSize) } - override def acquireStorageMemory(blockId: BlockId, numBytes: Long): Boolean = synchronized { - assert(onHeapExecutionMemoryPool.poolSize + storageMemoryPool.poolSize == maxMemory) + override def acquireStorageMemory( + blockId: BlockId, + numBytes: Long, + memoryMode: MemoryMode): Boolean = synchronized { + assertInvariants() assert(numBytes >= 0) - if (numBytes > maxStorageMemory) { + val (executionPool, storagePool, maxMemory) = memoryMode match { + case MemoryMode.ON_HEAP => ( + onHeapExecutionMemoryPool, + onHeapStorageMemoryPool, + maxOnHeapStorageMemory) + case MemoryMode.OFF_HEAP => ( + offHeapExecutionMemoryPool, + offHeapStorageMemoryPool, + maxOffHeapMemory) + } + if (numBytes > maxMemory) { // Fail fast if the block simply won't fit logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " + - s"memory limit ($maxStorageMemory bytes)") + s"memory limit ($maxMemory bytes)") return false } - if (numBytes > storageMemoryPool.memoryFree) { + if (numBytes > storagePool.memoryFree) { // There is not enough free memory in the storage pool, so try to borrow free memory from // the execution pool. - val memoryBorrowedFromExecution = Math.min(onHeapExecutionMemoryPool.memoryFree, numBytes) - onHeapExecutionMemoryPool.decrementPoolSize(memoryBorrowedFromExecution) - storageMemoryPool.incrementPoolSize(memoryBorrowedFromExecution) + val memoryBorrowedFromExecution = Math.min(executionPool.memoryFree, numBytes) + executionPool.decrementPoolSize(memoryBorrowedFromExecution) + storagePool.incrementPoolSize(memoryBorrowedFromExecution) } - storageMemoryPool.acquireMemory(blockId, numBytes) + storagePool.acquireMemory(blockId, numBytes) } - override def acquireUnrollMemory(blockId: BlockId, numBytes: Long): Boolean = synchronized { - acquireStorageMemory(blockId, numBytes) + override def acquireUnrollMemory( + blockId: BlockId, + numBytes: Long, + memoryMode: MemoryMode): Boolean = synchronized { + acquireStorageMemory(blockId, numBytes, memoryMode) } } @@ -167,8 +193,8 @@ object UnifiedMemoryManager { val maxMemory = getMaxMemory(conf) new UnifiedMemoryManager( conf, - maxMemory = maxMemory, - storageRegionSize = + maxHeapMemory = maxMemory, + onHeapStorageRegionSize = (maxMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong, numCores = numCores) } http://git-wip-us.apache.org/repos/asf/spark/blob/20c0bcd9/core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 30d2e23..0c7763f 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -29,7 +29,7 @@ import scala.util.control.NonFatal import org.apache.spark._ import org.apache.spark.executor.{DataReadMethod, ShuffleWriteMetrics} import org.apache.spark.internal.Logging -import org.apache.spark.memory.MemoryManager +import org.apache.spark.memory.{MemoryManager, MemoryMode} import org.apache.spark.network._ import org.apache.spark.network.buffer.{ManagedBuffer, NettyManagedBuffer} import org.apache.spark.network.netty.SparkTransportConf @@ -94,7 +94,7 @@ private[spark] class BlockManager( // However, since we use this only for reporting and logging, what we actually want here is // the absolute maximum value that `maxStorageMemory` can ever possibly reach. We may need // to revisit whether reporting this value as the "max" is intuitive to the user. - private val maxMemory = memoryManager.maxStorageMemory + private val maxMemory = memoryManager.maxOnHeapStorageMemory // Port used by the external shuffle service. In Yarn mode, this may be already be // set through the Hadoop configuration as the server is launched in the Yarn NM. http://git-wip-us.apache.org/repos/asf/spark/blob/20c0bcd9/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala index 38e9534..7d23295 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala @@ -21,6 +21,7 @@ import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput} import java.util.concurrent.ConcurrentHashMap import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.memory.MemoryMode import org.apache.spark.util.Utils /** @@ -65,6 +66,11 @@ class StorageLevel private( require(replication == 1, "Off-heap storage level does not support multiple replication") } + private[spark] def memoryMode: MemoryMode = { + if (useOffHeap) MemoryMode.OFF_HEAP + else MemoryMode.ON_HEAP + } + override def clone(): StorageLevel = { new StorageLevel(useDisk, useMemory, useOffHeap, deserialized, replication) } http://git-wip-us.apache.org/repos/asf/spark/blob/20c0bcd9/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index 1a78c9c..3ca41f3 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -29,7 +29,7 @@ import com.google.common.io.ByteStreams import org.apache.spark.{SparkConf, TaskContext} import org.apache.spark.internal.Logging -import org.apache.spark.memory.MemoryManager +import org.apache.spark.memory.{MemoryManager, MemoryMode} import org.apache.spark.serializer.{SerializationStream, SerializerManager} import org.apache.spark.storage.{BlockId, BlockInfoManager, StorageLevel} import org.apache.spark.util.{CompletionIterator, SizeEstimator, Utils} @@ -93,7 +93,7 @@ private[spark] class MemoryStore( conf.getLong("spark.storage.unrollMemoryThreshold", 1024 * 1024) /** Total amount of memory available for storage, in bytes. */ - private def maxMemory: Long = memoryManager.maxStorageMemory + private def maxMemory: Long = memoryManager.maxOnHeapStorageMemory if (maxMemory < unrollMemoryThreshold) { logWarning(s"Max memory ${Utils.bytesToString(maxMemory)} is less than the initial memory " + @@ -133,7 +133,7 @@ private[spark] class MemoryStore( size: Long, _bytes: () => ChunkedByteBuffer): Boolean = { require(!contains(blockId), s"Block $blockId is already present in the MemoryStore") - if (memoryManager.acquireStorageMemory(blockId, size)) { + if (memoryManager.acquireStorageMemory(blockId, size, MemoryMode.ON_HEAP)) { // We acquired enough memory for the block, so go ahead and put it val bytes = _bytes() assert(bytes.size == size) @@ -229,7 +229,7 @@ private[spark] class MemoryStore( // Synchronize so that transfer is atomic memoryManager.synchronized { releaseUnrollMemoryForThisTask(amount) - val success = memoryManager.acquireStorageMemory(blockId, amount) + val success = memoryManager.acquireStorageMemory(blockId, amount, MemoryMode.ON_HEAP) assert(success, "transferring unroll memory to storage memory failed") } } @@ -237,7 +237,8 @@ private[spark] class MemoryStore( val enoughStorageMemory = { if (unrollMemoryUsedByThisBlock <= size) { val acquiredExtra = - memoryManager.acquireStorageMemory(blockId, size - unrollMemoryUsedByThisBlock) + memoryManager.acquireStorageMemory( + blockId, size - unrollMemoryUsedByThisBlock, MemoryMode.ON_HEAP) if (acquiredExtra) { transferUnrollToStorage(unrollMemoryUsedByThisBlock) } @@ -353,7 +354,7 @@ private[spark] class MemoryStore( // Synchronize so that transfer is atomic memoryManager.synchronized { releaseUnrollMemoryForThisTask(unrollMemoryUsedByThisBlock) - val success = memoryManager.acquireStorageMemory(blockId, entry.size) + val success = memoryManager.acquireStorageMemory(blockId, entry.size, MemoryMode.ON_HEAP) assert(success, "transferring unroll memory to storage memory failed") } entries.synchronized { @@ -406,7 +407,7 @@ private[spark] class MemoryStore( entries.remove(blockId) } if (entry != null) { - memoryManager.releaseStorageMemory(entry.size) + memoryManager.releaseStorageMemory(entry.size, MemoryMode.ON_HEAP) logInfo(s"Block $blockId of size ${entry.size} dropped " + s"from memory (free ${maxMemory - blocksMemoryUsed})") true @@ -531,7 +532,7 @@ private[spark] class MemoryStore( */ def reserveUnrollMemoryForThisTask(blockId: BlockId, memory: Long): Boolean = { memoryManager.synchronized { - val success = memoryManager.acquireUnrollMemory(blockId, memory) + val success = memoryManager.acquireUnrollMemory(blockId, memory, MemoryMode.ON_HEAP) if (success) { val taskAttemptId = currentTaskAttemptId() unrollMemoryMap(taskAttemptId) = unrollMemoryMap.getOrElse(taskAttemptId, 0L) + memory @@ -554,7 +555,7 @@ private[spark] class MemoryStore( if (unrollMemoryMap(taskAttemptId) == 0) { unrollMemoryMap.remove(taskAttemptId) } - memoryManager.releaseUnrollMemory(memoryToRelease) + memoryManager.releaseUnrollMemory(memoryToRelease, MemoryMode.ON_HEAP) } } } http://git-wip-us.apache.org/repos/asf/spark/blob/20c0bcd9/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala index 686e948..aaca653 100644 --- a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala @@ -100,7 +100,7 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite with BeforeAndAft evictBlocksToFreeSpaceCalled.set(numBytesToFree) if (numBytesToFree <= mm.storageMemoryUsed) { // We can evict enough blocks to fulfill the request for space - mm.releaseStorageMemory(numBytesToFree) + mm.releaseStorageMemory(numBytesToFree, MemoryMode.ON_HEAP) evictedBlocks.append( (null, BlockStatus(StorageLevel.MEMORY_ONLY, numBytesToFree, 0L))) numBytesToFree http://git-wip-us.apache.org/repos/asf/spark/blob/20c0bcd9/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala index 741d4fd..4e31fb5 100644 --- a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala @@ -35,7 +35,7 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite { val mm = new StaticMemoryManager( conf, maxOnHeapExecutionMemory = maxExecutionMem, - maxStorageMemory = maxStorageMem, + maxOnHeapStorageMemory = maxStorageMem, numCores = 1) val ms = makeMemoryStore(mm) (mm, ms) @@ -50,7 +50,7 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite { .set("spark.testing.memory", maxOnHeapExecutionMemory.toString) .set("spark.memory.offHeap.size", maxOffHeapExecutionMemory.toString), maxOnHeapExecutionMemory = maxOnHeapExecutionMemory, - maxStorageMemory = 0, + maxOnHeapStorageMemory = 0, numCores = 1) } @@ -58,22 +58,23 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite { val maxExecutionMem = 1000L val taskAttemptId = 0L val (mm, _) = makeThings(maxExecutionMem, Long.MaxValue) + val memoryMode = MemoryMode.ON_HEAP assert(mm.executionMemoryUsed === 0L) - assert(mm.acquireExecutionMemory(10L, taskAttemptId, MemoryMode.ON_HEAP) === 10L) + assert(mm.acquireExecutionMemory(10L, taskAttemptId, memoryMode) === 10L) assert(mm.executionMemoryUsed === 10L) - assert(mm.acquireExecutionMemory(100L, taskAttemptId, MemoryMode.ON_HEAP) === 100L) + assert(mm.acquireExecutionMemory(100L, taskAttemptId, memoryMode) === 100L) // Acquire up to the max - assert(mm.acquireExecutionMemory(1000L, taskAttemptId, MemoryMode.ON_HEAP) === 890L) + assert(mm.acquireExecutionMemory(1000L, taskAttemptId, memoryMode) === 890L) assert(mm.executionMemoryUsed === maxExecutionMem) - assert(mm.acquireExecutionMemory(1L, taskAttemptId, MemoryMode.ON_HEAP) === 0L) + assert(mm.acquireExecutionMemory(1L, taskAttemptId, memoryMode) === 0L) assert(mm.executionMemoryUsed === maxExecutionMem) - mm.releaseExecutionMemory(800L, taskAttemptId, MemoryMode.ON_HEAP) + mm.releaseExecutionMemory(800L, taskAttemptId, memoryMode) assert(mm.executionMemoryUsed === 200L) // Acquire after release - assert(mm.acquireExecutionMemory(1L, taskAttemptId, MemoryMode.ON_HEAP) === 1L) + assert(mm.acquireExecutionMemory(1L, taskAttemptId, memoryMode) === 1L) assert(mm.executionMemoryUsed === 201L) // Release beyond what was acquired - mm.releaseExecutionMemory(maxExecutionMem, taskAttemptId, MemoryMode.ON_HEAP) + mm.releaseExecutionMemory(maxExecutionMem, taskAttemptId, memoryMode) assert(mm.executionMemoryUsed === 0L) } @@ -81,23 +82,24 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite { val maxStorageMem = 1000L val dummyBlock = TestBlockId("you can see the world you brought to live") val (mm, ms) = makeThings(Long.MaxValue, maxStorageMem) + val memoryMode = MemoryMode.ON_HEAP assert(mm.storageMemoryUsed === 0L) - assert(mm.acquireStorageMemory(dummyBlock, 10L)) + assert(mm.acquireStorageMemory(dummyBlock, 10L, memoryMode)) assertEvictBlocksToFreeSpaceNotCalled(ms) assert(mm.storageMemoryUsed === 10L) - assert(mm.acquireStorageMemory(dummyBlock, 100L)) + assert(mm.acquireStorageMemory(dummyBlock, 100L, memoryMode)) assertEvictBlocksToFreeSpaceNotCalled(ms) assert(mm.storageMemoryUsed === 110L) // Acquire more than the max, not granted - assert(!mm.acquireStorageMemory(dummyBlock, maxStorageMem + 1L)) + assert(!mm.acquireStorageMemory(dummyBlock, maxStorageMem + 1L, memoryMode)) assertEvictBlocksToFreeSpaceNotCalled(ms) assert(mm.storageMemoryUsed === 110L) // Acquire up to the max, requests after this are still granted due to LRU eviction - assert(mm.acquireStorageMemory(dummyBlock, maxStorageMem)) + assert(mm.acquireStorageMemory(dummyBlock, maxStorageMem, memoryMode)) assertEvictBlocksToFreeSpaceCalled(ms, 110L) assert(mm.storageMemoryUsed === 1000L) - assert(mm.acquireStorageMemory(dummyBlock, 1L)) + assert(mm.acquireStorageMemory(dummyBlock, 1L, memoryMode)) assertEvictBlocksToFreeSpaceCalled(ms, 1L) assert(evictedBlocks.nonEmpty) evictedBlocks.clear() @@ -105,19 +107,19 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite { // 1000 bytes. This is different from real behavior, where the 1-byte block would have evicted // the 1000-byte block entirely. This is set up differently so we can write finer-grained tests. assert(mm.storageMemoryUsed === 1000L) - mm.releaseStorageMemory(800L) + mm.releaseStorageMemory(800L, memoryMode) assert(mm.storageMemoryUsed === 200L) // Acquire after release - assert(mm.acquireStorageMemory(dummyBlock, 1L)) + assert(mm.acquireStorageMemory(dummyBlock, 1L, memoryMode)) assertEvictBlocksToFreeSpaceNotCalled(ms) assert(mm.storageMemoryUsed === 201L) mm.releaseAllStorageMemory() assert(mm.storageMemoryUsed === 0L) - assert(mm.acquireStorageMemory(dummyBlock, 1L)) + assert(mm.acquireStorageMemory(dummyBlock, 1L, memoryMode)) assertEvictBlocksToFreeSpaceNotCalled(ms) assert(mm.storageMemoryUsed === 1L) // Release beyond what was acquired - mm.releaseStorageMemory(100L) + mm.releaseStorageMemory(100L, memoryMode) assert(mm.storageMemoryUsed === 0L) } @@ -127,20 +129,21 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite { val taskAttemptId = 0L val dummyBlock = TestBlockId("ain't nobody love like you do") val (mm, ms) = makeThings(maxExecutionMem, maxStorageMem) + val memoryMode = MemoryMode.ON_HEAP // Only execution memory should increase - assert(mm.acquireExecutionMemory(100L, taskAttemptId, MemoryMode.ON_HEAP) === 100L) + assert(mm.acquireExecutionMemory(100L, taskAttemptId, memoryMode) === 100L) assert(mm.storageMemoryUsed === 0L) assert(mm.executionMemoryUsed === 100L) - assert(mm.acquireExecutionMemory(1000L, taskAttemptId, MemoryMode.ON_HEAP) === 100L) + assert(mm.acquireExecutionMemory(1000L, taskAttemptId, memoryMode) === 100L) assert(mm.storageMemoryUsed === 0L) assert(mm.executionMemoryUsed === 200L) // Only storage memory should increase - assert(mm.acquireStorageMemory(dummyBlock, 50L)) + assert(mm.acquireStorageMemory(dummyBlock, 50L, memoryMode)) assertEvictBlocksToFreeSpaceNotCalled(ms) assert(mm.storageMemoryUsed === 50L) assert(mm.executionMemoryUsed === 200L) // Only execution memory should be released - mm.releaseExecutionMemory(133L, taskAttemptId, MemoryMode.ON_HEAP) + mm.releaseExecutionMemory(133L, taskAttemptId, memoryMode) assert(mm.storageMemoryUsed === 50L) assert(mm.executionMemoryUsed === 67L) // Only storage memory should be released @@ -153,21 +156,22 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite { val maxStorageMem = 1000L val dummyBlock = TestBlockId("lonely water") val (mm, ms) = makeThings(Long.MaxValue, maxStorageMem) - assert(mm.acquireUnrollMemory(dummyBlock, 100L)) + val memoryMode = MemoryMode.ON_HEAP + assert(mm.acquireUnrollMemory(dummyBlock, 100L, memoryMode)) when(ms.currentUnrollMemory).thenReturn(100L) assertEvictBlocksToFreeSpaceNotCalled(ms) assert(mm.storageMemoryUsed === 100L) - mm.releaseUnrollMemory(40L) + mm.releaseUnrollMemory(40L, memoryMode) assert(mm.storageMemoryUsed === 60L) when(ms.currentUnrollMemory).thenReturn(60L) - assert(mm.acquireStorageMemory(dummyBlock, 800L)) + assert(mm.acquireStorageMemory(dummyBlock, 800L, memoryMode)) assertEvictBlocksToFreeSpaceNotCalled(ms) assert(mm.storageMemoryUsed === 860L) // `spark.storage.unrollFraction` is 0.4, so the max unroll space is 400 bytes. // As of this point, cache memory is 800 bytes and current unroll memory is 60 bytes. // Requesting 240 more bytes of unroll memory will leave our total unroll memory at // 300 bytes, still under the 400-byte limit. Therefore, all 240 bytes are granted. - assert(mm.acquireUnrollMemory(dummyBlock, 240L)) + assert(mm.acquireUnrollMemory(dummyBlock, 240L, memoryMode)) assertEvictBlocksToFreeSpaceCalled(ms, 100L) // 860 + 240 - 1000 when(ms.currentUnrollMemory).thenReturn(300L) // 60 + 240 assert(mm.storageMemoryUsed === 1000L) @@ -175,11 +179,11 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite { // We already have 300 bytes of unroll memory, so requesting 150 more will leave us // above the 400-byte limit. Since there is not enough free memory, this request will // fail even after evicting as much as we can (400 - 300 = 100 bytes). - assert(!mm.acquireUnrollMemory(dummyBlock, 150L)) + assert(!mm.acquireUnrollMemory(dummyBlock, 150L, memoryMode)) assertEvictBlocksToFreeSpaceCalled(ms, 100L) assert(mm.storageMemoryUsed === 900L) // Release beyond what was acquired - mm.releaseUnrollMemory(maxStorageMem) + mm.releaseUnrollMemory(maxStorageMem, memoryMode) assert(mm.storageMemoryUsed === 0L) } http://git-wip-us.apache.org/repos/asf/spark/blob/20c0bcd9/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala b/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala index 6dad3f4..6a4f409 100644 --- a/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala +++ b/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala @@ -39,16 +39,22 @@ class TestMemoryManager(conf: SparkConf) grant } } - override def acquireStorageMemory(blockId: BlockId, numBytes: Long): Boolean = true - override def acquireUnrollMemory(blockId: BlockId, numBytes: Long): Boolean = true - override def releaseStorageMemory(numBytes: Long): Unit = {} + override def acquireStorageMemory( + blockId: BlockId, + numBytes: Long, + memoryMode: MemoryMode): Boolean = true + override def acquireUnrollMemory( + blockId: BlockId, + numBytes: Long, + memoryMode: MemoryMode): Boolean = true + override def releaseStorageMemory(numBytes: Long, memoryMode: MemoryMode): Unit = {} override private[memory] def releaseExecutionMemory( numBytes: Long, taskAttemptId: Long, memoryMode: MemoryMode): Unit = { available += numBytes } - override def maxStorageMemory: Long = Long.MaxValue + override def maxOnHeapStorageMemory: Long = Long.MaxValue private var oomOnce = false private var available = Long.MaxValue http://git-wip-us.apache.org/repos/asf/spark/blob/20c0bcd9/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala index 9001a26..1425581 100644 --- a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala @@ -52,47 +52,49 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes val maxMemory = 1000L val taskAttemptId = 0L val (mm, _) = makeThings(maxMemory) + val memoryMode = MemoryMode.ON_HEAP assert(mm.executionMemoryUsed === 0L) - assert(mm.acquireExecutionMemory(10L, taskAttemptId, MemoryMode.ON_HEAP) === 10L) + assert(mm.acquireExecutionMemory(10L, taskAttemptId, memoryMode) === 10L) assert(mm.executionMemoryUsed === 10L) - assert(mm.acquireExecutionMemory(100L, taskAttemptId, MemoryMode.ON_HEAP) === 100L) + assert(mm.acquireExecutionMemory(100L, taskAttemptId, memoryMode) === 100L) // Acquire up to the max - assert(mm.acquireExecutionMemory(1000L, taskAttemptId, MemoryMode.ON_HEAP) === 890L) + assert(mm.acquireExecutionMemory(1000L, taskAttemptId, memoryMode) === 890L) assert(mm.executionMemoryUsed === maxMemory) - assert(mm.acquireExecutionMemory(1L, taskAttemptId, MemoryMode.ON_HEAP) === 0L) + assert(mm.acquireExecutionMemory(1L, taskAttemptId, memoryMode) === 0L) assert(mm.executionMemoryUsed === maxMemory) - mm.releaseExecutionMemory(800L, taskAttemptId, MemoryMode.ON_HEAP) + mm.releaseExecutionMemory(800L, taskAttemptId, memoryMode) assert(mm.executionMemoryUsed === 200L) // Acquire after release - assert(mm.acquireExecutionMemory(1L, taskAttemptId, MemoryMode.ON_HEAP) === 1L) + assert(mm.acquireExecutionMemory(1L, taskAttemptId, memoryMode) === 1L) assert(mm.executionMemoryUsed === 201L) // Release beyond what was acquired - mm.releaseExecutionMemory(maxMemory, taskAttemptId, MemoryMode.ON_HEAP) + mm.releaseExecutionMemory(maxMemory, taskAttemptId, memoryMode) assert(mm.executionMemoryUsed === 0L) } test("basic storage memory") { val maxMemory = 1000L val (mm, ms) = makeThings(maxMemory) + val memoryMode = MemoryMode.ON_HEAP assert(mm.storageMemoryUsed === 0L) - assert(mm.acquireStorageMemory(dummyBlock, 10L)) + assert(mm.acquireStorageMemory(dummyBlock, 10L, memoryMode)) assertEvictBlocksToFreeSpaceNotCalled(ms) assert(mm.storageMemoryUsed === 10L) - assert(mm.acquireStorageMemory(dummyBlock, 100L)) + assert(mm.acquireStorageMemory(dummyBlock, 100L, memoryMode)) assertEvictBlocksToFreeSpaceNotCalled(ms) assert(mm.storageMemoryUsed === 110L) // Acquire more than the max, not granted - assert(!mm.acquireStorageMemory(dummyBlock, maxMemory + 1L)) + assert(!mm.acquireStorageMemory(dummyBlock, maxMemory + 1L, memoryMode)) assertEvictBlocksToFreeSpaceNotCalled(ms) assert(mm.storageMemoryUsed === 110L) // Acquire up to the max, requests after this are still granted due to LRU eviction - assert(mm.acquireStorageMemory(dummyBlock, maxMemory)) + assert(mm.acquireStorageMemory(dummyBlock, maxMemory, memoryMode)) assertEvictBlocksToFreeSpaceCalled(ms, 110L) assert(mm.storageMemoryUsed === 1000L) assert(evictedBlocks.nonEmpty) evictedBlocks.clear() - assert(mm.acquireStorageMemory(dummyBlock, 1L)) + assert(mm.acquireStorageMemory(dummyBlock, 1L, memoryMode)) assertEvictBlocksToFreeSpaceCalled(ms, 1L) assert(evictedBlocks.nonEmpty) evictedBlocks.clear() @@ -100,19 +102,19 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes // 1000 bytes. This is different from real behavior, where the 1-byte block would have evicted // the 1000-byte block entirely. This is set up differently so we can write finer-grained tests. assert(mm.storageMemoryUsed === 1000L) - mm.releaseStorageMemory(800L) + mm.releaseStorageMemory(800L, memoryMode) assert(mm.storageMemoryUsed === 200L) // Acquire after release - assert(mm.acquireStorageMemory(dummyBlock, 1L)) + assert(mm.acquireStorageMemory(dummyBlock, 1L, memoryMode)) assertEvictBlocksToFreeSpaceNotCalled(ms) assert(mm.storageMemoryUsed === 201L) mm.releaseAllStorageMemory() assert(mm.storageMemoryUsed === 0L) - assert(mm.acquireStorageMemory(dummyBlock, 1L)) + assert(mm.acquireStorageMemory(dummyBlock, 1L, memoryMode)) assertEvictBlocksToFreeSpaceNotCalled(ms) assert(mm.storageMemoryUsed === 1L) // Release beyond what was acquired - mm.releaseStorageMemory(100L) + mm.releaseStorageMemory(100L, memoryMode) assert(mm.storageMemoryUsed === 0L) } @@ -120,18 +122,19 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes val maxMemory = 1000L val taskAttemptId = 0L val (mm, ms) = makeThings(maxMemory) + val memoryMode = MemoryMode.ON_HEAP // Acquire enough storage memory to exceed the storage region - assert(mm.acquireStorageMemory(dummyBlock, 750L)) + assert(mm.acquireStorageMemory(dummyBlock, 750L, memoryMode)) assertEvictBlocksToFreeSpaceNotCalled(ms) assert(mm.executionMemoryUsed === 0L) assert(mm.storageMemoryUsed === 750L) // Execution needs to request 250 bytes to evict storage memory - assert(mm.acquireExecutionMemory(100L, taskAttemptId, MemoryMode.ON_HEAP) === 100L) + assert(mm.acquireExecutionMemory(100L, taskAttemptId, memoryMode) === 100L) assert(mm.executionMemoryUsed === 100L) assert(mm.storageMemoryUsed === 750L) assertEvictBlocksToFreeSpaceNotCalled(ms) // Execution wants 200 bytes but only 150 are free, so storage is evicted - assert(mm.acquireExecutionMemory(200L, taskAttemptId, MemoryMode.ON_HEAP) === 200L) + assert(mm.acquireExecutionMemory(200L, taskAttemptId, memoryMode) === 200L) assert(mm.executionMemoryUsed === 300L) assert(mm.storageMemoryUsed === 700L) assertEvictBlocksToFreeSpaceCalled(ms, 50L) @@ -141,13 +144,13 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes require(mm.executionMemoryUsed === 300L) require(mm.storageMemoryUsed === 0, "bad test: all storage memory should have been released") // Acquire some storage memory again, but this time keep it within the storage region - assert(mm.acquireStorageMemory(dummyBlock, 400L)) + assert(mm.acquireStorageMemory(dummyBlock, 400L, memoryMode)) assertEvictBlocksToFreeSpaceNotCalled(ms) assert(mm.storageMemoryUsed === 400L) assert(mm.executionMemoryUsed === 300L) // Execution cannot evict storage because the latter is within the storage fraction, // so grant only what's remaining without evicting anything, i.e. 1000 - 300 - 400 = 300 - assert(mm.acquireExecutionMemory(400L, taskAttemptId, MemoryMode.ON_HEAP) === 300L) + assert(mm.acquireExecutionMemory(400L, taskAttemptId, memoryMode) === 300L) assert(mm.executionMemoryUsed === 600L) assert(mm.storageMemoryUsed === 400L) assertEvictBlocksToFreeSpaceNotCalled(ms) @@ -157,8 +160,9 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes val maxMemory = 1000L val taskAttemptId = 0L val (mm, ms) = makeThings(maxMemory) + val memoryMode = MemoryMode.ON_HEAP // Acquire enough storage memory to exceed the storage region size - assert(mm.acquireStorageMemory(dummyBlock, 700L)) + assert(mm.acquireStorageMemory(dummyBlock, 700L, memoryMode)) assertEvictBlocksToFreeSpaceNotCalled(ms) assert(mm.executionMemoryUsed === 0L) assert(mm.storageMemoryUsed === 700L) @@ -166,7 +170,7 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes // mistakenly think that the 300 bytes of free space was still available even after // using it to expand the execution pool. Consequently, no storage memory was released // and the following call granted only 300 bytes to execution. - assert(mm.acquireExecutionMemory(500L, taskAttemptId, MemoryMode.ON_HEAP) === 500L) + assert(mm.acquireExecutionMemory(500L, taskAttemptId, memoryMode) === 500L) assertEvictBlocksToFreeSpaceCalled(ms, 200L) assert(mm.storageMemoryUsed === 500L) assert(mm.executionMemoryUsed === 500L) @@ -177,34 +181,35 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes val maxMemory = 1000L val taskAttemptId = 0L val (mm, ms) = makeThings(maxMemory) + val memoryMode = MemoryMode.ON_HEAP // Acquire enough execution memory to exceed the execution region - assert(mm.acquireExecutionMemory(800L, taskAttemptId, MemoryMode.ON_HEAP) === 800L) + assert(mm.acquireExecutionMemory(800L, taskAttemptId, memoryMode) === 800L) assert(mm.executionMemoryUsed === 800L) assert(mm.storageMemoryUsed === 0L) assertEvictBlocksToFreeSpaceNotCalled(ms) // Storage should not be able to evict execution - assert(mm.acquireStorageMemory(dummyBlock, 100L)) + assert(mm.acquireStorageMemory(dummyBlock, 100L, memoryMode)) assert(mm.executionMemoryUsed === 800L) assert(mm.storageMemoryUsed === 100L) assertEvictBlocksToFreeSpaceNotCalled(ms) - assert(!mm.acquireStorageMemory(dummyBlock, 250L)) + assert(!mm.acquireStorageMemory(dummyBlock, 250L, memoryMode)) assert(mm.executionMemoryUsed === 800L) assert(mm.storageMemoryUsed === 100L) // Do not attempt to evict blocks, since evicting will not free enough memory: assertEvictBlocksToFreeSpaceNotCalled(ms) - mm.releaseExecutionMemory(maxMemory, taskAttemptId, MemoryMode.ON_HEAP) - mm.releaseStorageMemory(maxMemory) + mm.releaseExecutionMemory(maxMemory, taskAttemptId, memoryMode) + mm.releaseStorageMemory(maxMemory, memoryMode) // Acquire some execution memory again, but this time keep it within the execution region - assert(mm.acquireExecutionMemory(200L, taskAttemptId, MemoryMode.ON_HEAP) === 200L) + assert(mm.acquireExecutionMemory(200L, taskAttemptId, memoryMode) === 200L) assert(mm.executionMemoryUsed === 200L) assert(mm.storageMemoryUsed === 0L) assertEvictBlocksToFreeSpaceNotCalled(ms) // Storage should still not be able to evict execution - assert(mm.acquireStorageMemory(dummyBlock, 750L)) + assert(mm.acquireStorageMemory(dummyBlock, 750L, memoryMode)) assert(mm.executionMemoryUsed === 200L) assert(mm.storageMemoryUsed === 750L) assertEvictBlocksToFreeSpaceNotCalled(ms) // since there were 800 bytes free - assert(!mm.acquireStorageMemory(dummyBlock, 850L)) + assert(!mm.acquireStorageMemory(dummyBlock, 850L, memoryMode)) assert(mm.executionMemoryUsed === 200L) assert(mm.storageMemoryUsed === 750L) // Do not attempt to evict blocks, since evicting will not free enough memory: @@ -221,7 +226,7 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes .set("spark.testing.reservedMemory", reservedMemory.toString) val mm = UnifiedMemoryManager(conf, numCores = 1) val expectedMaxMemory = ((systemMemory - reservedMemory) * memoryFraction).toLong - assert(mm.maxMemory === expectedMaxMemory) + assert(mm.maxHeapMemory === expectedMaxMemory) // Try using a system memory that's too small val conf2 = conf.clone().set("spark.testing.memory", (reservedMemory / 2).toString) @@ -256,18 +261,19 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes .set("spark.testing.memory", "1000") val mm = UnifiedMemoryManager(conf, numCores = 2) val ms = makeMemoryStore(mm) - assert(mm.maxMemory === 1000) + val memoryMode = MemoryMode.ON_HEAP + assert(mm.maxHeapMemory === 1000) // Have two tasks each acquire some execution memory so that the memory pool registers that // there are two active tasks: - assert(mm.acquireExecutionMemory(100L, 0, MemoryMode.ON_HEAP) === 100L) - assert(mm.acquireExecutionMemory(100L, 1, MemoryMode.ON_HEAP) === 100L) + assert(mm.acquireExecutionMemory(100L, 0, memoryMode) === 100L) + assert(mm.acquireExecutionMemory(100L, 1, memoryMode) === 100L) // Fill up all of the remaining memory with storage. - assert(mm.acquireStorageMemory(dummyBlock, 800L)) + assert(mm.acquireStorageMemory(dummyBlock, 800L, memoryMode)) assertEvictBlocksToFreeSpaceNotCalled(ms) assert(mm.storageMemoryUsed === 800) assert(mm.executionMemoryUsed === 200) // A task should still be able to allocate 100 bytes execution memory by evicting blocks - assert(mm.acquireExecutionMemory(100L, 0, MemoryMode.ON_HEAP) === 100L) + assert(mm.acquireExecutionMemory(100L, 0, memoryMode) === 100L) assertEvictBlocksToFreeSpaceCalled(ms, 100L) assert(mm.executionMemoryUsed === 300) assert(mm.storageMemoryUsed === 700) http://git-wip-us.apache.org/repos/asf/spark/blob/20c0bcd9/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 7a4cb39..6fc32cb 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -34,7 +34,7 @@ import org.scalatest.concurrent.Timeouts._ import org.apache.spark._ import org.apache.spark.executor.DataReadMethod -import org.apache.spark.memory.StaticMemoryManager +import org.apache.spark.memory.{MemoryMode, StaticMemoryManager} import org.apache.spark.network.{BlockDataManager, BlockTransferService} import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer} import org.apache.spark.network.netty.NettyBlockTransferService @@ -821,7 +821,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE val memoryManager = new StaticMemoryManager( conf, maxOnHeapExecutionMemory = Long.MaxValue, - maxStorageMemory = 1200, + maxOnHeapStorageMemory = 1200, numCores = 1) val serializerManager = new SerializerManager(new JavaSerializer(conf), conf) store = new BlockManager(SparkContext.DRIVER_IDENTIFIER, rpcEnv, master, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org