Repository: spark
Updated Branches:
  refs/heads/master 81c68eceb -> bb88ad4e0


[SPARK-15260] Atomically resize memory pools

## What changes were proposed in this pull request?

When we acquire execution memory, we do a lot of things between shrinking the 
storage memory pool and enlarging the execution memory pool. In particular, we 
call `memoryStore.evictBlocksToFreeSpace`, which may do a lot of I/O and can 
throw exceptions. If an exception is thrown, the pool sizes on that executor 
will be in a bad state.

This patch minimizes the things we do between the two calls to make the 
resizing more atomic.

## How was this patch tested?

Jenkins.

Author: Andrew Or <and...@databricks.com>

Closes #13039 from andrewor14/safer-pool.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bb88ad4e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bb88ad4e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bb88ad4e

Branch: refs/heads/master
Commit: bb88ad4e0e870c88d474c71939a19541522a3023
Parents: 81c68ec
Author: Andrew Or <and...@databricks.com>
Authored: Wed May 11 12:58:57 2016 -0700
Committer: Davies Liu <davies....@gmail.com>
Committed: Wed May 11 12:58:57 2016 -0700

----------------------------------------------------------------------
 .../apache/spark/memory/StorageMemoryPool.scala | 11 +++++-----
 .../spark/memory/UnifiedMemoryManager.scala     |  5 +++--
 .../spark/memory/MemoryManagerSuite.scala       | 15 +++++++++++++
 .../memory/UnifiedMemoryManagerSuite.scala      | 23 ++++++++++++++++++++
 4 files changed, 46 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/bb88ad4e/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala 
b/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala
index 0b552ca..4c6b639 100644
--- a/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala
+++ b/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala
@@ -116,13 +116,13 @@ private[memory] class StorageMemoryPool(
   }
 
   /**
-   * Try to shrink the size of this storage memory pool by `spaceToFree` 
bytes. Return the number
-   * of bytes removed from the pool's capacity.
+   * Free space to shrink the size of this storage memory pool by 
`spaceToFree` bytes.
+   * Note: this method doesn't actually reduce the pool size but relies on the 
caller to do so.
+   *
+   * @return number of bytes to be removed from the pool's capacity.
    */
-  def shrinkPoolToFreeSpace(spaceToFree: Long): Long = lock.synchronized {
-    // First, shrink the pool by reclaiming free memory:
+  def freeSpaceToShrinkPool(spaceToFree: Long): Long = lock.synchronized {
     val spaceFreedByReleasingUnusedMemory = math.min(spaceToFree, memoryFree)
-    decrementPoolSize(spaceFreedByReleasingUnusedMemory)
     val remainingSpaceToFree = spaceToFree - spaceFreedByReleasingUnusedMemory
     if (remainingSpaceToFree > 0) {
       // If reclaiming free memory did not adequately shrink the pool, begin 
evicting blocks:
@@ -130,7 +130,6 @@ private[memory] class StorageMemoryPool(
         memoryStore.evictBlocksToFreeSpace(None, remainingSpaceToFree, 
memoryMode)
       // When a block is released, BlockManager.dropFromMemory() calls 
releaseMemory(), so we do
       // not need to decrement _memoryUsed here. However, we do need to 
decrement the pool size.
-      decrementPoolSize(spaceFreedByEviction)
       spaceFreedByReleasingUnusedMemory + spaceFreedByEviction
     } else {
       spaceFreedByReleasingUnusedMemory

http://git-wip-us.apache.org/repos/asf/spark/blob/bb88ad4e/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala 
b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
index 82023b5..ae747c1 100644
--- a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
@@ -113,9 +113,10 @@ private[spark] class UnifiedMemoryManager private[memory] (
           storagePool.poolSize - storageRegionSize)
         if (memoryReclaimableFromStorage > 0) {
           // Only reclaim as much space as is necessary and available:
-          val spaceReclaimed = storagePool.shrinkPoolToFreeSpace(
+          val spaceToReclaim = storagePool.freeSpaceToShrinkPool(
             math.min(extraMemoryNeeded, memoryReclaimableFromStorage))
-          executionPool.incrementPoolSize(spaceReclaimed)
+          storagePool.decrementPoolSize(spaceToReclaim)
+          executionPool.incrementPoolSize(spaceToReclaim)
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/bb88ad4e/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
index a128652..2c4928a 100644
--- a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
@@ -79,6 +79,21 @@ private[memory] trait MemoryManagerSuite extends 
SparkFunSuite with BeforeAndAft
   }
 
   /**
+   * Make a mocked [[MemoryStore]] whose 
[[MemoryStore.evictBlocksToFreeSpace]] method is
+   * stubbed to always throw [[RuntimeException]].
+   */
+  protected def makeBadMemoryStore(mm: MemoryManager): MemoryStore = {
+    val ms = mock(classOf[MemoryStore], RETURNS_SMART_NULLS)
+    when(ms.evictBlocksToFreeSpace(any(), anyLong(), any())).thenAnswer(new 
Answer[Long] {
+      override def answer(invocation: InvocationOnMock): Long = {
+        throw new RuntimeException("bad memory store!")
+      }
+    })
+    mm.setMemoryStore(ms)
+    ms
+  }
+
+  /**
    * Simulate the part of [[MemoryStore.evictBlocksToFreeSpace]] that releases 
storage memory.
    *
    * This is a significant simplification of the real method, which actually 
drops existing

http://git-wip-us.apache.org/repos/asf/spark/blob/bb88ad4e/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
index 1425581..c821054 100644
--- 
a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
@@ -280,4 +280,27 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite 
with PrivateMethodTes
     assert(evictedBlocks.nonEmpty)
   }
 
+  test("SPARK-15260: atomically resize memory pools") {
+    val conf = new SparkConf()
+      .set("spark.memory.fraction", "1")
+      .set("spark.memory.storageFraction", "0")
+      .set("spark.testing.memory", "1000")
+    val mm = UnifiedMemoryManager(conf, numCores = 2)
+    makeBadMemoryStore(mm)
+    val memoryMode = MemoryMode.ON_HEAP
+    // Acquire 1000 then release 600 bytes of storage memory, leaving the
+    // storage memory pool at 1000 bytes but only 400 bytes of which are used.
+    assert(mm.acquireStorageMemory(dummyBlock, 1000L, memoryMode))
+    mm.releaseStorageMemory(600L, memoryMode)
+    // Before the fix for SPARK-15260, we would first shrink the storage pool 
by the amount of
+    // unused storage memory (600 bytes), try to evict blocks, then enlarge 
the execution pool
+    // by the same amount. If the eviction threw an exception, then we would 
shrink one pool
+    // without enlarging the other, resulting in an assertion failure.
+    intercept[RuntimeException] {
+      mm.acquireExecutionMemory(1000L, 0, memoryMode)
+    }
+    val assertInvariants = PrivateMethod[Unit]('assertInvariants)
+    mm.invokePrivate[Unit](assertInvariants())
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to