Repository: spark Updated Branches: refs/heads/branch-1.6 68c1d9fa6 -> e2546c227
http://git-wip-us.apache.org/repos/asf/spark/blob/e2546c22/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala index 885c450..54cb28c 100644 --- a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala @@ -24,7 +24,6 @@ import org.mockito.Mockito.when import org.apache.spark.SparkConf import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore, TestBlockId} - class StaticMemoryManagerSuite extends MemoryManagerSuite { private val conf = new SparkConf().set("spark.storage.unrollFraction", "0.4") private val evictedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] @@ -36,38 +35,47 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite { maxExecutionMem: Long, maxStorageMem: Long): (StaticMemoryManager, MemoryStore) = { val mm = new StaticMemoryManager( - conf, maxExecutionMemory = maxExecutionMem, maxStorageMemory = maxStorageMem, numCores = 1) + conf, + maxOnHeapExecutionMemory = maxExecutionMem, + maxStorageMemory = maxStorageMem, + numCores = 1) val ms = makeMemoryStore(mm) (mm, ms) } - override protected def createMemoryManager(maxMemory: Long): MemoryManager = { + override protected def createMemoryManager( + maxOnHeapExecutionMemory: Long, + maxOffHeapExecutionMemory: Long): StaticMemoryManager = { new StaticMemoryManager( - conf, - maxExecutionMemory = maxMemory, + conf.clone + .set("spark.memory.fraction", "1") + .set("spark.testing.memory", maxOnHeapExecutionMemory.toString) + .set("spark.memory.offHeapSize", maxOffHeapExecutionMemory.toString), + maxOnHeapExecutionMemory = maxOnHeapExecutionMemory, maxStorageMemory = 0, numCores = 1) } test("basic execution memory") { val maxExecutionMem = 1000L + val taskAttemptId = 0L val (mm, _) = makeThings(maxExecutionMem, Long.MaxValue) assert(mm.executionMemoryUsed === 0L) - assert(mm.doAcquireExecutionMemory(10L, evictedBlocks) === 10L) + assert(mm.acquireExecutionMemory(10L, taskAttemptId, MemoryMode.ON_HEAP) === 10L) assert(mm.executionMemoryUsed === 10L) - assert(mm.doAcquireExecutionMemory(100L, evictedBlocks) === 100L) + assert(mm.acquireExecutionMemory(100L, taskAttemptId, MemoryMode.ON_HEAP) === 100L) // Acquire up to the max - assert(mm.doAcquireExecutionMemory(1000L, evictedBlocks) === 890L) + assert(mm.acquireExecutionMemory(1000L, taskAttemptId, MemoryMode.ON_HEAP) === 890L) assert(mm.executionMemoryUsed === maxExecutionMem) - assert(mm.doAcquireExecutionMemory(1L, evictedBlocks) === 0L) + assert(mm.acquireExecutionMemory(1L, taskAttemptId, MemoryMode.ON_HEAP) === 0L) assert(mm.executionMemoryUsed === maxExecutionMem) - mm.releaseExecutionMemory(800L) + mm.releaseExecutionMemory(800L, taskAttemptId, MemoryMode.ON_HEAP) assert(mm.executionMemoryUsed === 200L) // Acquire after release - assert(mm.doAcquireExecutionMemory(1L, evictedBlocks) === 1L) + assert(mm.acquireExecutionMemory(1L, taskAttemptId, MemoryMode.ON_HEAP) === 1L) assert(mm.executionMemoryUsed === 201L) // Release beyond what was acquired - mm.releaseExecutionMemory(maxExecutionMem) + mm.releaseExecutionMemory(maxExecutionMem, taskAttemptId, MemoryMode.ON_HEAP) assert(mm.executionMemoryUsed === 0L) } @@ -113,13 +121,14 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite { test("execution and storage isolation") { val maxExecutionMem = 200L val maxStorageMem = 1000L + val taskAttemptId = 0L val dummyBlock = TestBlockId("ain't nobody love like you do") val (mm, ms) = makeThings(maxExecutionMem, maxStorageMem) // Only execution memory should increase - assert(mm.doAcquireExecutionMemory(100L, evictedBlocks) === 100L) + assert(mm.acquireExecutionMemory(100L, taskAttemptId, MemoryMode.ON_HEAP) === 100L) assert(mm.storageMemoryUsed === 0L) assert(mm.executionMemoryUsed === 100L) - assert(mm.doAcquireExecutionMemory(1000L, evictedBlocks) === 100L) + assert(mm.acquireExecutionMemory(1000L, taskAttemptId, MemoryMode.ON_HEAP) === 100L) assert(mm.storageMemoryUsed === 0L) assert(mm.executionMemoryUsed === 200L) // Only storage memory should increase @@ -128,7 +137,7 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite { assert(mm.storageMemoryUsed === 50L) assert(mm.executionMemoryUsed === 200L) // Only execution memory should be released - mm.releaseExecutionMemory(133L) + mm.releaseExecutionMemory(133L, taskAttemptId, MemoryMode.ON_HEAP) assert(mm.storageMemoryUsed === 50L) assert(mm.executionMemoryUsed === 67L) // Only storage memory should be released http://git-wip-us.apache.org/repos/asf/spark/blob/e2546c22/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala b/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala index 77e4355..0706a6e 100644 --- a/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala +++ b/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala @@ -22,19 +22,20 @@ import scala.collection.mutable import org.apache.spark.SparkConf import org.apache.spark.storage.{BlockStatus, BlockId} -class TestMemoryManager(conf: SparkConf) extends MemoryManager(conf, numCores = 1) { - private[memory] override def doAcquireExecutionMemory( +class TestMemoryManager(conf: SparkConf) + extends MemoryManager(conf, numCores = 1, Long.MaxValue, Long.MaxValue) { + + override private[memory] def acquireExecutionMemory( numBytes: Long, - evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Long = synchronized { + taskAttemptId: Long, + memoryMode: MemoryMode): Long = { if (oomOnce) { oomOnce = false 0 } else if (available >= numBytes) { - _executionMemoryUsed += numBytes // To suppress warnings when freeing unallocated memory available -= numBytes numBytes } else { - _executionMemoryUsed += available val grant = available available = 0 grant @@ -48,12 +49,13 @@ class TestMemoryManager(conf: SparkConf) extends MemoryManager(conf, numCores = blockId: BlockId, numBytes: Long, evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = true - override def releaseExecutionMemory(numBytes: Long): Unit = { + override def releaseStorageMemory(numBytes: Long): Unit = {} + override private[memory] def releaseExecutionMemory( + numBytes: Long, + taskAttemptId: Long, + memoryMode: MemoryMode): Unit = { available += numBytes - _executionMemoryUsed -= numBytes } - override def releaseStorageMemory(numBytes: Long): Unit = {} - override def maxExecutionMemory: Long = Long.MaxValue override def maxStorageMemory: Long = Long.MaxValue private var oomOnce = false http://git-wip-us.apache.org/repos/asf/spark/blob/e2546c22/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala index 0c97f2b..8cebe81 100644 --- a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala @@ -24,57 +24,52 @@ import org.scalatest.PrivateMethodTester import org.apache.spark.SparkConf import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore, TestBlockId} - class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTester { - private val conf = new SparkConf().set("spark.memory.storageFraction", "0.5") private val dummyBlock = TestBlockId("--") private val evictedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] + private val storageFraction: Double = 0.5 + /** * Make a [[UnifiedMemoryManager]] and a [[MemoryStore]] with limited class dependencies. */ private def makeThings(maxMemory: Long): (UnifiedMemoryManager, MemoryStore) = { - val mm = new UnifiedMemoryManager(conf, maxMemory, numCores = 1) + val mm = createMemoryManager(maxMemory) val ms = makeMemoryStore(mm) (mm, ms) } - override protected def createMemoryManager(maxMemory: Long): MemoryManager = { - new UnifiedMemoryManager(conf, maxMemory, numCores = 1) - } - - private def getStorageRegionSize(mm: UnifiedMemoryManager): Long = { - mm invokePrivate PrivateMethod[Long]('storageRegionSize)() - } - - test("storage region size") { - val maxMemory = 1000L - val (mm, _) = makeThings(maxMemory) - val storageFraction = conf.get("spark.memory.storageFraction").toDouble - val expectedStorageRegionSize = maxMemory * storageFraction - val actualStorageRegionSize = getStorageRegionSize(mm) - assert(expectedStorageRegionSize === actualStorageRegionSize) + override protected def createMemoryManager( + maxOnHeapExecutionMemory: Long, + maxOffHeapExecutionMemory: Long): UnifiedMemoryManager = { + val conf = new SparkConf() + .set("spark.memory.fraction", "1") + .set("spark.testing.memory", maxOnHeapExecutionMemory.toString) + .set("spark.memory.offHeapSize", maxOffHeapExecutionMemory.toString) + .set("spark.memory.storageFraction", storageFraction.toString) + UnifiedMemoryManager(conf, numCores = 1) } test("basic execution memory") { val maxMemory = 1000L + val taskAttemptId = 0L val (mm, _) = makeThings(maxMemory) assert(mm.executionMemoryUsed === 0L) - assert(mm.doAcquireExecutionMemory(10L, evictedBlocks) === 10L) + assert(mm.acquireExecutionMemory(10L, taskAttemptId, MemoryMode.ON_HEAP) === 10L) assert(mm.executionMemoryUsed === 10L) - assert(mm.doAcquireExecutionMemory(100L, evictedBlocks) === 100L) + assert(mm.acquireExecutionMemory(100L, taskAttemptId, MemoryMode.ON_HEAP) === 100L) // Acquire up to the max - assert(mm.doAcquireExecutionMemory(1000L, evictedBlocks) === 890L) + assert(mm.acquireExecutionMemory(1000L, taskAttemptId, MemoryMode.ON_HEAP) === 890L) assert(mm.executionMemoryUsed === maxMemory) - assert(mm.doAcquireExecutionMemory(1L, evictedBlocks) === 0L) + assert(mm.acquireExecutionMemory(1L, taskAttemptId, MemoryMode.ON_HEAP) === 0L) assert(mm.executionMemoryUsed === maxMemory) - mm.releaseExecutionMemory(800L) + mm.releaseExecutionMemory(800L, taskAttemptId, MemoryMode.ON_HEAP) assert(mm.executionMemoryUsed === 200L) // Acquire after release - assert(mm.doAcquireExecutionMemory(1L, evictedBlocks) === 1L) + assert(mm.acquireExecutionMemory(1L, taskAttemptId, MemoryMode.ON_HEAP) === 1L) assert(mm.executionMemoryUsed === 201L) // Release beyond what was acquired - mm.releaseExecutionMemory(maxMemory) + mm.releaseExecutionMemory(maxMemory, taskAttemptId, MemoryMode.ON_HEAP) assert(mm.executionMemoryUsed === 0L) } @@ -118,44 +113,34 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes test("execution evicts storage") { val maxMemory = 1000L + val taskAttemptId = 0L val (mm, ms) = makeThings(maxMemory) - // First, ensure the test classes are set up as expected - val expectedStorageRegionSize = 500L - val expectedExecutionRegionSize = 500L - val storageRegionSize = getStorageRegionSize(mm) - val executionRegionSize = maxMemory - expectedStorageRegionSize - require(storageRegionSize === expectedStorageRegionSize, - "bad test: storage region size is unexpected") - require(executionRegionSize === expectedExecutionRegionSize, - "bad test: storage region size is unexpected") // Acquire enough storage memory to exceed the storage region assert(mm.acquireStorageMemory(dummyBlock, 750L, evictedBlocks)) assertEnsureFreeSpaceCalled(ms, 750L) assert(mm.executionMemoryUsed === 0L) assert(mm.storageMemoryUsed === 750L) - require(mm.storageMemoryUsed > storageRegionSize, - s"bad test: storage memory used should exceed the storage region") // Execution needs to request 250 bytes to evict storage memory - assert(mm.doAcquireExecutionMemory(100L, evictedBlocks) === 100L) + assert(mm.acquireExecutionMemory(100L, taskAttemptId, MemoryMode.ON_HEAP) === 100L) assert(mm.executionMemoryUsed === 100L) assert(mm.storageMemoryUsed === 750L) assertEnsureFreeSpaceNotCalled(ms) // Execution wants 200 bytes but only 150 are free, so storage is evicted - assert(mm.doAcquireExecutionMemory(200L, evictedBlocks) === 200L) - assertEnsureFreeSpaceCalled(ms, 200L) + assert(mm.acquireExecutionMemory(200L, taskAttemptId, MemoryMode.ON_HEAP) === 200L) + assert(mm.executionMemoryUsed === 300L) + assertEnsureFreeSpaceCalled(ms, 50L) assert(mm.executionMemoryUsed === 300L) mm.releaseAllStorageMemory() - require(mm.executionMemoryUsed < executionRegionSize, - s"bad test: execution memory used should be within the execution region") + require(mm.executionMemoryUsed === 300L) require(mm.storageMemoryUsed === 0, "bad test: all storage memory should have been released") // Acquire some storage memory again, but this time keep it within the storage region assert(mm.acquireStorageMemory(dummyBlock, 400L, evictedBlocks)) assertEnsureFreeSpaceCalled(ms, 400L) - require(mm.storageMemoryUsed < storageRegionSize, - s"bad test: storage memory used should be within the storage region") + assert(mm.storageMemoryUsed === 400L) + assert(mm.executionMemoryUsed === 300L) // Execution cannot evict storage because the latter is within the storage fraction, // so grant only what's remaining without evicting anything, i.e. 1000 - 300 - 400 = 300 - assert(mm.doAcquireExecutionMemory(400L, evictedBlocks) === 300L) + assert(mm.acquireExecutionMemory(400L, taskAttemptId, MemoryMode.ON_HEAP) === 300L) assert(mm.executionMemoryUsed === 600L) assert(mm.storageMemoryUsed === 400L) assertEnsureFreeSpaceNotCalled(ms) @@ -163,23 +148,13 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes test("storage does not evict execution") { val maxMemory = 1000L + val taskAttemptId = 0L val (mm, ms) = makeThings(maxMemory) - // First, ensure the test classes are set up as expected - val expectedStorageRegionSize = 500L - val expectedExecutionRegionSize = 500L - val storageRegionSize = getStorageRegionSize(mm) - val executionRegionSize = maxMemory - expectedStorageRegionSize - require(storageRegionSize === expectedStorageRegionSize, - "bad test: storage region size is unexpected") - require(executionRegionSize === expectedExecutionRegionSize, - "bad test: storage region size is unexpected") // Acquire enough execution memory to exceed the execution region - assert(mm.doAcquireExecutionMemory(800L, evictedBlocks) === 800L) + assert(mm.acquireExecutionMemory(800L, taskAttemptId, MemoryMode.ON_HEAP) === 800L) assert(mm.executionMemoryUsed === 800L) assert(mm.storageMemoryUsed === 0L) assertEnsureFreeSpaceNotCalled(ms) - require(mm.executionMemoryUsed > executionRegionSize, - s"bad test: execution memory used should exceed the execution region") // Storage should not be able to evict execution assert(mm.acquireStorageMemory(dummyBlock, 100L, evictedBlocks)) assert(mm.executionMemoryUsed === 800L) @@ -189,15 +164,13 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes assert(mm.executionMemoryUsed === 800L) assert(mm.storageMemoryUsed === 100L) assertEnsureFreeSpaceCalled(ms, 250L) - mm.releaseExecutionMemory(maxMemory) + mm.releaseExecutionMemory(maxMemory, taskAttemptId, MemoryMode.ON_HEAP) mm.releaseStorageMemory(maxMemory) // Acquire some execution memory again, but this time keep it within the execution region - assert(mm.doAcquireExecutionMemory(200L, evictedBlocks) === 200L) + assert(mm.acquireExecutionMemory(200L, taskAttemptId, MemoryMode.ON_HEAP) === 200L) assert(mm.executionMemoryUsed === 200L) assert(mm.storageMemoryUsed === 0L) assertEnsureFreeSpaceNotCalled(ms) - require(mm.executionMemoryUsed < executionRegionSize, - s"bad test: execution memory used should be within the execution region") // Storage should still not be able to evict execution assert(mm.acquireStorageMemory(dummyBlock, 750L, evictedBlocks)) assert(mm.executionMemoryUsed === 200L) http://git-wip-us.apache.org/repos/asf/spark/blob/e2546c22/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index d49015a..53991d8 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -825,7 +825,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE val transfer = new NettyBlockTransferService(conf, securityMgr, numCores = 1) val memoryManager = new StaticMemoryManager( conf, - maxExecutionMemory = Long.MaxValue, + maxOnHeapExecutionMemory = Long.MaxValue, maxStorageMemory = 1200, numCores = 1) store = new BlockManager(SparkContext.DRIVER_IDENTIFIER, rpcEnv, master, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org