Repository: spark
Updated Branches:
  refs/heads/master cda4603de -> f5ea7fe53


[SPARK-16166][CORE] Also take off-heap memory usage into consideration in log 
and webui display

## What changes were proposed in this pull request?

Currently in the log and UI display, only on-heap storage memory is calculated 
and displayed,

```
16/06/27 13:41:52 INFO MemoryStore: Block rdd_5_0 stored as values in memory 
(estimated size 17.8 KB, free 665.9 MB)
```
<img width="1232" alt="untitled" 
src="https://cloud.githubusercontent.com/assets/850797/16369960/53fb614e-3c6e-11e6-8fa3-7ffe65abcb49.png";>

With [SPARK-13992](https://issues.apache.org/jira/browse/SPARK-13992) off-heap 
memory is supported for data persistence, so here change to also take off-heap 
storage memory into consideration.

## How was this patch tested?

Unit test and local verification.

Author: jerryshao <ss...@hortonworks.com>

Closes #13920 from jerryshao/SPARK-16166.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f5ea7fe5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f5ea7fe5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f5ea7fe5

Branch: refs/heads/master
Commit: f5ea7fe53974a7e8cbfc222b9a6f47669b53ccfd
Parents: cda4603
Author: jerryshao <ss...@hortonworks.com>
Authored: Mon Jul 25 15:17:06 2016 -0700
Committer: Josh Rosen <joshro...@databricks.com>
Committed: Mon Jul 25 15:17:06 2016 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/memory/MemoryManager.scala     | 10 ++++++++--
 .../org/apache/spark/memory/StaticMemoryManager.scala     |  2 ++
 .../org/apache/spark/memory/UnifiedMemoryManager.scala    |  4 ++++
 .../scala/org/apache/spark/storage/BlockManager.scala     |  5 +++--
 .../org/apache/spark/storage/memory/MemoryStore.scala     |  4 +++-
 .../scala/org/apache/spark/memory/TestMemoryManager.scala |  2 ++
 .../org/apache/spark/storage/BlockManagerSuite.scala      |  8 ++++----
 7 files changed, 26 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f5ea7fe5/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala 
b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
index 0210217..82442cf 100644
--- a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
@@ -62,13 +62,19 @@ private[spark] abstract class MemoryManager(
   offHeapStorageMemoryPool.incrementPoolSize(offHeapStorageMemory)
 
   /**
-   * Total available memory for storage, in bytes. This amount can vary over 
time, depending on
-   * the MemoryManager implementation.
+   * Total available on heap memory for storage, in bytes. This amount can 
vary over time,
+   * depending on the MemoryManager implementation.
    * In this model, this is equivalent to the amount of memory not occupied by 
execution.
    */
   def maxOnHeapStorageMemory: Long
 
   /**
+   * Total available off heap memory for storage, in bytes. This amount can 
vary over time,
+   * depending on the MemoryManager implementation.
+   */
+  def maxOffHeapStorageMemory: Long
+
+  /**
    * Set the [[MemoryStore]] used by this manager to evict cached blocks.
    * This must be set after construction due to initialization ordering 
constraints.
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/f5ea7fe5/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala 
b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
index 08155aa..a6f7db0 100644
--- a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
@@ -55,6 +55,8 @@ private[spark] class StaticMemoryManager(
     (maxOnHeapStorageMemory * conf.getDouble("spark.storage.unrollFraction", 
0.2)).toLong
   }
 
+  override def maxOffHeapStorageMemory: Long = 0L
+
   override def acquireStorageMemory(
       blockId: BlockId,
       numBytes: Long,

http://git-wip-us.apache.org/repos/asf/spark/blob/f5ea7fe5/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala 
b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
index c7b36be..fea2808 100644
--- a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
@@ -67,6 +67,10 @@ private[spark] class UnifiedMemoryManager private[memory] (
     maxHeapMemory - onHeapExecutionMemoryPool.memoryUsed
   }
 
+  override def maxOffHeapStorageMemory: Long = synchronized {
+    maxOffHeapMemory - offHeapExecutionMemoryPool.memoryUsed
+  }
+
   /**
    * Try to acquire up to `numBytes` of execution memory for the current task 
and return the
    * number of bytes obtained, or 0 if none can be allocated.

http://git-wip-us.apache.org/repos/asf/spark/blob/f5ea7fe5/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 83a9cbd..015e71d 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -96,7 +96,8 @@ private[spark] class BlockManager(
   // However, since we use this only for reporting and logging, what we 
actually want here is
   // the absolute maximum value that `maxMemory` can ever possibly reach. We 
may need
   // to revisit whether reporting this value as the "max" is intuitive to the 
user.
-  private val maxMemory = memoryManager.maxOnHeapStorageMemory
+  private val maxMemory =
+    memoryManager.maxOnHeapStorageMemory + 
memoryManager.maxOffHeapStorageMemory
 
   // Port used by the external shuffle service. In Yarn mode, this may be 
already be
   // set through the Hadoop configuration as the server is launched in the 
Yarn NM.
@@ -802,7 +803,7 @@ private[spark] class BlockManager(
       val putBlockStatus = getCurrentBlockStatus(blockId, info)
       val blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid
       if (blockWasSuccessfullyStored) {
-        // Now that the block is in either the memory, externalBlockStore, or 
disk store,
+        // Now that the block is in either the memory or disk store,
         // tell the master about it.
         info.size = size
         if (tellMaster) {

http://git-wip-us.apache.org/repos/asf/spark/blob/f5ea7fe5/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala 
b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
index 0349da0..586339a 100644
--- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
@@ -101,7 +101,9 @@ private[spark] class MemoryStore(
     conf.getLong("spark.storage.unrollMemoryThreshold", 1024 * 1024)
 
   /** Total amount of memory available for storage, in bytes. */
-  private def maxMemory: Long = memoryManager.maxOnHeapStorageMemory
+  private def maxMemory: Long = {
+    memoryManager.maxOnHeapStorageMemory + 
memoryManager.maxOffHeapStorageMemory
+  }
 
   if (maxMemory < unrollMemoryThreshold) {
     logWarning(s"Max memory ${Utils.bytesToString(maxMemory)} is less than the 
initial memory " +

http://git-wip-us.apache.org/repos/asf/spark/blob/f5ea7fe5/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala 
b/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala
index 6a4f409..5f699df 100644
--- a/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala
+++ b/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala
@@ -56,6 +56,8 @@ class TestMemoryManager(conf: SparkConf)
   }
   override def maxOnHeapStorageMemory: Long = Long.MaxValue
 
+  override def maxOffHeapStorageMemory: Long = 0L
+
   private var oomOnce = false
   private var available = Long.MaxValue
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f5ea7fe5/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 6821582..8077a1b 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -239,8 +239,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers 
with BeforeAndAfterE
 
     // Checking whether blocks are in memory and memory size
     val memStatus = master.getMemoryStatus.head._2
-    assert(memStatus._1 == 20000L, "total memory " + memStatus._1 + " should 
equal 20000")
-    assert(memStatus._2 <= 12000L, "remaining memory " + memStatus._2 + " 
should <= 12000")
+    assert(memStatus._1 == 40000L, "total memory " + memStatus._1 + " should 
equal 40000")
+    assert(memStatus._2 <= 32000L, "remaining memory " + memStatus._2 + " 
should <= 12000")
     assert(store.getSingleAndReleaseLock("a1-to-remove").isDefined, "a1 was 
not in store")
     assert(store.getSingleAndReleaseLock("a2-to-remove").isDefined, "a2 was 
not in store")
     assert(store.getSingleAndReleaseLock("a3-to-remove").isDefined, "a3 was 
not in store")
@@ -269,8 +269,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers 
with BeforeAndAfterE
     }
     eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
       val memStatus = master.getMemoryStatus.head._2
-      memStatus._1 should equal (20000L)
-      memStatus._2 should equal (20000L)
+      memStatus._1 should equal (40000L)
+      memStatus._2 should equal (40000L)
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to