Repository: spark
Updated Branches:
  refs/heads/branch-1.6 68c1d9fa6 -> e2546c227


http://git-wip-us.apache.org/repos/asf/spark/blob/e2546c22/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
index 885c450..54cb28c 100644
--- a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
@@ -24,7 +24,6 @@ import org.mockito.Mockito.when
 import org.apache.spark.SparkConf
 import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore, 
TestBlockId}
 
-
 class StaticMemoryManagerSuite extends MemoryManagerSuite {
   private val conf = new SparkConf().set("spark.storage.unrollFraction", "0.4")
   private val evictedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
@@ -36,38 +35,47 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite {
       maxExecutionMem: Long,
       maxStorageMem: Long): (StaticMemoryManager, MemoryStore) = {
     val mm = new StaticMemoryManager(
-      conf, maxExecutionMemory = maxExecutionMem, maxStorageMemory = 
maxStorageMem, numCores = 1)
+      conf,
+      maxOnHeapExecutionMemory = maxExecutionMem,
+      maxStorageMemory = maxStorageMem,
+      numCores = 1)
     val ms = makeMemoryStore(mm)
     (mm, ms)
   }
 
-  override protected def createMemoryManager(maxMemory: Long): MemoryManager = 
{
+  override protected def createMemoryManager(
+      maxOnHeapExecutionMemory: Long,
+      maxOffHeapExecutionMemory: Long): StaticMemoryManager = {
     new StaticMemoryManager(
-      conf,
-      maxExecutionMemory = maxMemory,
+      conf.clone
+        .set("spark.memory.fraction", "1")
+        .set("spark.testing.memory", maxOnHeapExecutionMemory.toString)
+        .set("spark.memory.offHeapSize", maxOffHeapExecutionMemory.toString),
+      maxOnHeapExecutionMemory = maxOnHeapExecutionMemory,
       maxStorageMemory = 0,
       numCores = 1)
   }
 
   test("basic execution memory") {
     val maxExecutionMem = 1000L
+    val taskAttemptId = 0L
     val (mm, _) = makeThings(maxExecutionMem, Long.MaxValue)
     assert(mm.executionMemoryUsed === 0L)
-    assert(mm.doAcquireExecutionMemory(10L, evictedBlocks) === 10L)
+    assert(mm.acquireExecutionMemory(10L, taskAttemptId, MemoryMode.ON_HEAP) 
=== 10L)
     assert(mm.executionMemoryUsed === 10L)
-    assert(mm.doAcquireExecutionMemory(100L, evictedBlocks) === 100L)
+    assert(mm.acquireExecutionMemory(100L, taskAttemptId, MemoryMode.ON_HEAP) 
=== 100L)
     // Acquire up to the max
-    assert(mm.doAcquireExecutionMemory(1000L, evictedBlocks) === 890L)
+    assert(mm.acquireExecutionMemory(1000L, taskAttemptId, MemoryMode.ON_HEAP) 
=== 890L)
     assert(mm.executionMemoryUsed === maxExecutionMem)
-    assert(mm.doAcquireExecutionMemory(1L, evictedBlocks) === 0L)
+    assert(mm.acquireExecutionMemory(1L, taskAttemptId, MemoryMode.ON_HEAP) 
=== 0L)
     assert(mm.executionMemoryUsed === maxExecutionMem)
-    mm.releaseExecutionMemory(800L)
+    mm.releaseExecutionMemory(800L, taskAttemptId, MemoryMode.ON_HEAP)
     assert(mm.executionMemoryUsed === 200L)
     // Acquire after release
-    assert(mm.doAcquireExecutionMemory(1L, evictedBlocks) === 1L)
+    assert(mm.acquireExecutionMemory(1L, taskAttemptId, MemoryMode.ON_HEAP) 
=== 1L)
     assert(mm.executionMemoryUsed === 201L)
     // Release beyond what was acquired
-    mm.releaseExecutionMemory(maxExecutionMem)
+    mm.releaseExecutionMemory(maxExecutionMem, taskAttemptId, 
MemoryMode.ON_HEAP)
     assert(mm.executionMemoryUsed === 0L)
   }
 
@@ -113,13 +121,14 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite 
{
   test("execution and storage isolation") {
     val maxExecutionMem = 200L
     val maxStorageMem = 1000L
+    val taskAttemptId = 0L
     val dummyBlock = TestBlockId("ain't nobody love like you do")
     val (mm, ms) = makeThings(maxExecutionMem, maxStorageMem)
     // Only execution memory should increase
-    assert(mm.doAcquireExecutionMemory(100L, evictedBlocks) === 100L)
+    assert(mm.acquireExecutionMemory(100L, taskAttemptId, MemoryMode.ON_HEAP) 
=== 100L)
     assert(mm.storageMemoryUsed === 0L)
     assert(mm.executionMemoryUsed === 100L)
-    assert(mm.doAcquireExecutionMemory(1000L, evictedBlocks) === 100L)
+    assert(mm.acquireExecutionMemory(1000L, taskAttemptId, MemoryMode.ON_HEAP) 
=== 100L)
     assert(mm.storageMemoryUsed === 0L)
     assert(mm.executionMemoryUsed === 200L)
     // Only storage memory should increase
@@ -128,7 +137,7 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite {
     assert(mm.storageMemoryUsed === 50L)
     assert(mm.executionMemoryUsed === 200L)
     // Only execution memory should be released
-    mm.releaseExecutionMemory(133L)
+    mm.releaseExecutionMemory(133L, taskAttemptId, MemoryMode.ON_HEAP)
     assert(mm.storageMemoryUsed === 50L)
     assert(mm.executionMemoryUsed === 67L)
     // Only storage memory should be released

http://git-wip-us.apache.org/repos/asf/spark/blob/e2546c22/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala 
b/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala
index 77e4355..0706a6e 100644
--- a/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala
+++ b/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala
@@ -22,19 +22,20 @@ import scala.collection.mutable
 import org.apache.spark.SparkConf
 import org.apache.spark.storage.{BlockStatus, BlockId}
 
-class TestMemoryManager(conf: SparkConf) extends MemoryManager(conf, numCores 
= 1) {
-  private[memory] override def doAcquireExecutionMemory(
+class TestMemoryManager(conf: SparkConf)
+  extends MemoryManager(conf, numCores = 1, Long.MaxValue, Long.MaxValue) {
+
+  override private[memory] def acquireExecutionMemory(
       numBytes: Long,
-      evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Long = 
synchronized {
+      taskAttemptId: Long,
+      memoryMode: MemoryMode): Long = {
     if (oomOnce) {
       oomOnce = false
       0
     } else if (available >= numBytes) {
-      _executionMemoryUsed += numBytes // To suppress warnings when freeing 
unallocated memory
       available -= numBytes
       numBytes
     } else {
-      _executionMemoryUsed += available
       val grant = available
       available = 0
       grant
@@ -48,12 +49,13 @@ class TestMemoryManager(conf: SparkConf) extends 
MemoryManager(conf, numCores =
       blockId: BlockId,
       numBytes: Long,
       evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = true
-  override def releaseExecutionMemory(numBytes: Long): Unit = {
+  override def releaseStorageMemory(numBytes: Long): Unit = {}
+  override private[memory] def releaseExecutionMemory(
+      numBytes: Long,
+      taskAttemptId: Long,
+      memoryMode: MemoryMode): Unit = {
     available += numBytes
-    _executionMemoryUsed -= numBytes
   }
-  override def releaseStorageMemory(numBytes: Long): Unit = {}
-  override def maxExecutionMemory: Long = Long.MaxValue
   override def maxStorageMemory: Long = Long.MaxValue
 
   private var oomOnce = false

http://git-wip-us.apache.org/repos/asf/spark/blob/e2546c22/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
index 0c97f2b..8cebe81 100644
--- 
a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
@@ -24,57 +24,52 @@ import org.scalatest.PrivateMethodTester
 import org.apache.spark.SparkConf
 import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore, 
TestBlockId}
 
-
 class UnifiedMemoryManagerSuite extends MemoryManagerSuite with 
PrivateMethodTester {
-  private val conf = new SparkConf().set("spark.memory.storageFraction", "0.5")
   private val dummyBlock = TestBlockId("--")
   private val evictedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
 
+  private val storageFraction: Double = 0.5
+
   /**
    * Make a [[UnifiedMemoryManager]] and a [[MemoryStore]] with limited class 
dependencies.
    */
   private def makeThings(maxMemory: Long): (UnifiedMemoryManager, MemoryStore) 
= {
-    val mm = new UnifiedMemoryManager(conf, maxMemory, numCores = 1)
+    val mm = createMemoryManager(maxMemory)
     val ms = makeMemoryStore(mm)
     (mm, ms)
   }
 
-  override protected def createMemoryManager(maxMemory: Long): MemoryManager = 
{
-    new UnifiedMemoryManager(conf, maxMemory, numCores = 1)
-  }
-
-  private def getStorageRegionSize(mm: UnifiedMemoryManager): Long = {
-    mm invokePrivate PrivateMethod[Long]('storageRegionSize)()
-  }
-
-  test("storage region size") {
-    val maxMemory = 1000L
-    val (mm, _) = makeThings(maxMemory)
-    val storageFraction = conf.get("spark.memory.storageFraction").toDouble
-    val expectedStorageRegionSize = maxMemory * storageFraction
-    val actualStorageRegionSize = getStorageRegionSize(mm)
-    assert(expectedStorageRegionSize === actualStorageRegionSize)
+  override protected def createMemoryManager(
+      maxOnHeapExecutionMemory: Long,
+      maxOffHeapExecutionMemory: Long): UnifiedMemoryManager = {
+    val conf = new SparkConf()
+      .set("spark.memory.fraction", "1")
+      .set("spark.testing.memory", maxOnHeapExecutionMemory.toString)
+      .set("spark.memory.offHeapSize", maxOffHeapExecutionMemory.toString)
+      .set("spark.memory.storageFraction", storageFraction.toString)
+    UnifiedMemoryManager(conf, numCores = 1)
   }
 
   test("basic execution memory") {
     val maxMemory = 1000L
+    val taskAttemptId = 0L
     val (mm, _) = makeThings(maxMemory)
     assert(mm.executionMemoryUsed === 0L)
-    assert(mm.doAcquireExecutionMemory(10L, evictedBlocks) === 10L)
+    assert(mm.acquireExecutionMemory(10L, taskAttemptId, MemoryMode.ON_HEAP) 
=== 10L)
     assert(mm.executionMemoryUsed === 10L)
-    assert(mm.doAcquireExecutionMemory(100L, evictedBlocks) === 100L)
+    assert(mm.acquireExecutionMemory(100L, taskAttemptId, MemoryMode.ON_HEAP) 
=== 100L)
     // Acquire up to the max
-    assert(mm.doAcquireExecutionMemory(1000L, evictedBlocks) === 890L)
+    assert(mm.acquireExecutionMemory(1000L, taskAttemptId, MemoryMode.ON_HEAP) 
=== 890L)
     assert(mm.executionMemoryUsed === maxMemory)
-    assert(mm.doAcquireExecutionMemory(1L, evictedBlocks) === 0L)
+    assert(mm.acquireExecutionMemory(1L, taskAttemptId, MemoryMode.ON_HEAP) 
=== 0L)
     assert(mm.executionMemoryUsed === maxMemory)
-    mm.releaseExecutionMemory(800L)
+    mm.releaseExecutionMemory(800L, taskAttemptId, MemoryMode.ON_HEAP)
     assert(mm.executionMemoryUsed === 200L)
     // Acquire after release
-    assert(mm.doAcquireExecutionMemory(1L, evictedBlocks) === 1L)
+    assert(mm.acquireExecutionMemory(1L, taskAttemptId, MemoryMode.ON_HEAP) 
=== 1L)
     assert(mm.executionMemoryUsed === 201L)
     // Release beyond what was acquired
-    mm.releaseExecutionMemory(maxMemory)
+    mm.releaseExecutionMemory(maxMemory, taskAttemptId, MemoryMode.ON_HEAP)
     assert(mm.executionMemoryUsed === 0L)
   }
 
@@ -118,44 +113,34 @@ class UnifiedMemoryManagerSuite extends 
MemoryManagerSuite with PrivateMethodTes
 
   test("execution evicts storage") {
     val maxMemory = 1000L
+    val taskAttemptId = 0L
     val (mm, ms) = makeThings(maxMemory)
-    // First, ensure the test classes are set up as expected
-    val expectedStorageRegionSize = 500L
-    val expectedExecutionRegionSize = 500L
-    val storageRegionSize = getStorageRegionSize(mm)
-    val executionRegionSize = maxMemory - expectedStorageRegionSize
-    require(storageRegionSize === expectedStorageRegionSize,
-      "bad test: storage region size is unexpected")
-    require(executionRegionSize === expectedExecutionRegionSize,
-      "bad test: storage region size is unexpected")
     // Acquire enough storage memory to exceed the storage region
     assert(mm.acquireStorageMemory(dummyBlock, 750L, evictedBlocks))
     assertEnsureFreeSpaceCalled(ms, 750L)
     assert(mm.executionMemoryUsed === 0L)
     assert(mm.storageMemoryUsed === 750L)
-    require(mm.storageMemoryUsed > storageRegionSize,
-      s"bad test: storage memory used should exceed the storage region")
     // Execution needs to request 250 bytes to evict storage memory
-    assert(mm.doAcquireExecutionMemory(100L, evictedBlocks) === 100L)
+    assert(mm.acquireExecutionMemory(100L, taskAttemptId, MemoryMode.ON_HEAP) 
=== 100L)
     assert(mm.executionMemoryUsed === 100L)
     assert(mm.storageMemoryUsed === 750L)
     assertEnsureFreeSpaceNotCalled(ms)
     // Execution wants 200 bytes but only 150 are free, so storage is evicted
-    assert(mm.doAcquireExecutionMemory(200L, evictedBlocks) === 200L)
-    assertEnsureFreeSpaceCalled(ms, 200L)
+    assert(mm.acquireExecutionMemory(200L, taskAttemptId, MemoryMode.ON_HEAP) 
=== 200L)
+    assert(mm.executionMemoryUsed === 300L)
+    assertEnsureFreeSpaceCalled(ms, 50L)
     assert(mm.executionMemoryUsed === 300L)
     mm.releaseAllStorageMemory()
-    require(mm.executionMemoryUsed < executionRegionSize,
-      s"bad test: execution memory used should be within the execution region")
+    require(mm.executionMemoryUsed === 300L)
     require(mm.storageMemoryUsed === 0, "bad test: all storage memory should 
have been released")
     // Acquire some storage memory again, but this time keep it within the 
storage region
     assert(mm.acquireStorageMemory(dummyBlock, 400L, evictedBlocks))
     assertEnsureFreeSpaceCalled(ms, 400L)
-    require(mm.storageMemoryUsed < storageRegionSize,
-      s"bad test: storage memory used should be within the storage region")
+    assert(mm.storageMemoryUsed === 400L)
+    assert(mm.executionMemoryUsed === 300L)
     // Execution cannot evict storage because the latter is within the storage 
fraction,
     // so grant only what's remaining without evicting anything, i.e. 1000 - 
300 - 400 = 300
-    assert(mm.doAcquireExecutionMemory(400L, evictedBlocks) === 300L)
+    assert(mm.acquireExecutionMemory(400L, taskAttemptId, MemoryMode.ON_HEAP) 
=== 300L)
     assert(mm.executionMemoryUsed === 600L)
     assert(mm.storageMemoryUsed === 400L)
     assertEnsureFreeSpaceNotCalled(ms)
@@ -163,23 +148,13 @@ class UnifiedMemoryManagerSuite extends 
MemoryManagerSuite with PrivateMethodTes
 
   test("storage does not evict execution") {
     val maxMemory = 1000L
+    val taskAttemptId = 0L
     val (mm, ms) = makeThings(maxMemory)
-    // First, ensure the test classes are set up as expected
-    val expectedStorageRegionSize = 500L
-    val expectedExecutionRegionSize = 500L
-    val storageRegionSize = getStorageRegionSize(mm)
-    val executionRegionSize = maxMemory - expectedStorageRegionSize
-    require(storageRegionSize === expectedStorageRegionSize,
-      "bad test: storage region size is unexpected")
-    require(executionRegionSize === expectedExecutionRegionSize,
-      "bad test: storage region size is unexpected")
     // Acquire enough execution memory to exceed the execution region
-    assert(mm.doAcquireExecutionMemory(800L, evictedBlocks) === 800L)
+    assert(mm.acquireExecutionMemory(800L, taskAttemptId, MemoryMode.ON_HEAP) 
=== 800L)
     assert(mm.executionMemoryUsed === 800L)
     assert(mm.storageMemoryUsed === 0L)
     assertEnsureFreeSpaceNotCalled(ms)
-    require(mm.executionMemoryUsed > executionRegionSize,
-      s"bad test: execution memory used should exceed the execution region")
     // Storage should not be able to evict execution
     assert(mm.acquireStorageMemory(dummyBlock, 100L, evictedBlocks))
     assert(mm.executionMemoryUsed === 800L)
@@ -189,15 +164,13 @@ class UnifiedMemoryManagerSuite extends 
MemoryManagerSuite with PrivateMethodTes
     assert(mm.executionMemoryUsed === 800L)
     assert(mm.storageMemoryUsed === 100L)
     assertEnsureFreeSpaceCalled(ms, 250L)
-    mm.releaseExecutionMemory(maxMemory)
+    mm.releaseExecutionMemory(maxMemory, taskAttemptId, MemoryMode.ON_HEAP)
     mm.releaseStorageMemory(maxMemory)
     // Acquire some execution memory again, but this time keep it within the 
execution region
-    assert(mm.doAcquireExecutionMemory(200L, evictedBlocks) === 200L)
+    assert(mm.acquireExecutionMemory(200L, taskAttemptId, MemoryMode.ON_HEAP) 
=== 200L)
     assert(mm.executionMemoryUsed === 200L)
     assert(mm.storageMemoryUsed === 0L)
     assertEnsureFreeSpaceNotCalled(ms)
-    require(mm.executionMemoryUsed < executionRegionSize,
-      s"bad test: execution memory used should be within the execution region")
     // Storage should still not be able to evict execution
     assert(mm.acquireStorageMemory(dummyBlock, 750L, evictedBlocks))
     assert(mm.executionMemoryUsed === 200L)

http://git-wip-us.apache.org/repos/asf/spark/blob/e2546c22/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index d49015a..53991d8 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -825,7 +825,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers 
with BeforeAndAfterE
     val transfer = new NettyBlockTransferService(conf, securityMgr, numCores = 
1)
     val memoryManager = new StaticMemoryManager(
       conf,
-      maxExecutionMemory = Long.MaxValue,
+      maxOnHeapExecutionMemory = Long.MaxValue,
       maxStorageMemory = 1200,
       numCores = 1)
     store = new BlockManager(SparkContext.DRIVER_IDENTIFIER, rpcEnv, master,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to