This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 737f089  [SPARK-26527][CORE] Let acquireUnrollMemory fail fast if 
required space exceeds memory limit
737f089 is described below

commit 737f08949adecbae37bb92dfad71ae5f3a82cbee
Author: SongYadong <song.yado...@zte.com.cn>
AuthorDate: Sun Jan 6 08:46:20 2019 -0600

    [SPARK-26527][CORE] Let acquireUnrollMemory fail fast if required space 
exceeds memory limit
    
    ## What changes were proposed in this pull request?
    
    When acquiring unroll memory from `StaticMemoryManager`, let it fail fast 
if required space exceeds memory limit, just like acquiring storage memory.
    I think this may reduce some computation and memory evicting costs 
especially when required space(`numBytes`) is very big.
    
    ## How was this patch tested?
    
    Existing unit tests.
    
    Closes #23426 from SongYadong/acquireUnrollMemory_fail_fast.
    
    Authored-by: SongYadong <song.yado...@zte.com.cn>
    Signed-off-by: Sean Owen <sean.o...@databricks.com>
---
 .../apache/spark/memory/StaticMemoryManager.scala  | 27 ++++++++++++++--------
 .../apache/spark/storage/MemoryStoreSuite.scala    |  4 ++--
 2 files changed, 19 insertions(+), 12 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala 
b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
index 8286087..0fd349d 100644
--- a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
@@ -80,16 +80,23 @@ private[spark] class StaticMemoryManager(
       memoryMode: MemoryMode): Boolean = synchronized {
     require(memoryMode != MemoryMode.OFF_HEAP,
       "StaticMemoryManager does not support off-heap unroll memory")
-    val currentUnrollMemory = 
onHeapStorageMemoryPool.memoryStore.currentUnrollMemory
-    val freeMemory = onHeapStorageMemoryPool.memoryFree
-    // When unrolling, we will use all of the existing free memory, and, if 
necessary,
-    // some extra space freed from evicting cached blocks. We must place a cap 
on the
-    // amount of memory to be evicted by unrolling, however, otherwise 
unrolling one
-    // big block can blow away the entire cache.
-    val maxNumBytesToFree = math.max(0, maxUnrollMemory - currentUnrollMemory 
- freeMemory)
-    // Keep it within the range 0 <= X <= maxNumBytesToFree
-    val numBytesToFree = math.max(0, math.min(maxNumBytesToFree, numBytes - 
freeMemory))
-    onHeapStorageMemoryPool.acquireMemory(blockId, numBytes, numBytesToFree)
+    if (numBytes > maxOnHeapStorageMemory) {
+      // Fail fast if the block simply won't fit
+      logInfo(s"Will not store $blockId as the required space ($numBytes 
bytes) exceeds our " +
+        s"memory limit ($maxOnHeapStorageMemory bytes)")
+      false
+    } else {
+      val currentUnrollMemory = 
onHeapStorageMemoryPool.memoryStore.currentUnrollMemory
+      val freeMemory = onHeapStorageMemoryPool.memoryFree
+      // When unrolling, we will use all of the existing free memory, and, if 
necessary,
+      // some extra space freed from evicting cached blocks. We must place a 
cap on the
+      // amount of memory to be evicted by unrolling, however, otherwise 
unrolling one
+      // big block can blow away the entire cache.
+      val maxNumBytesToFree = math.max(0, maxUnrollMemory - 
currentUnrollMemory - freeMemory)
+      // Keep it within the range 0 <= X <= maxNumBytesToFree
+      val numBytesToFree = math.max(0, math.min(maxNumBytesToFree, numBytes - 
freeMemory))
+      onHeapStorageMemoryPool.acquireMemory(blockId, numBytes, numBytesToFree)
+    }
   }
 
   private[memory]
diff --git 
a/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala
index 7274072..baff672 100644
--- a/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala
@@ -291,11 +291,11 @@ class MemoryStoreSuite
     blockInfoManager.removeBlock("b3")
     putIteratorAsBytes("b3", smallIterator, ClassTag.Any)
 
-    // Unroll huge block with not enough space. This should fail and kick out 
b2 in the process.
+    // Unroll huge block with not enough space. This should fail.
     val result4 = putIteratorAsBytes("b4", bigIterator, ClassTag.Any)
     assert(result4.isLeft) // unroll was unsuccessful
     assert(!memoryStore.contains("b1"))
-    assert(!memoryStore.contains("b2"))
+    assert(memoryStore.contains("b2"))
     assert(memoryStore.contains("b3"))
     assert(!memoryStore.contains("b4"))
     assert(memoryStore.currentUnrollMemoryForThisTask > 0) // we returned an 
iterator


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to