Repository: spark Updated Branches: refs/heads/master cda4603de -> f5ea7fe53
[SPARK-16166][CORE] Also take off-heap memory usage into consideration in log and webui display ## What changes were proposed in this pull request? Currently in the log and UI display, only on-heap storage memory is calculated and displayed, ``` 16/06/27 13:41:52 INFO MemoryStore: Block rdd_5_0 stored as values in memory (estimated size 17.8 KB, free 665.9 MB) ``` <img width="1232" alt="untitled" src="https://cloud.githubusercontent.com/assets/850797/16369960/53fb614e-3c6e-11e6-8fa3-7ffe65abcb49.png"> With [SPARK-13992](https://issues.apache.org/jira/browse/SPARK-13992) off-heap memory is supported for data persistence, so here change to also take off-heap storage memory into consideration. ## How was this patch tested? Unit test and local verification. Author: jerryshao <ss...@hortonworks.com> Closes #13920 from jerryshao/SPARK-16166. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f5ea7fe5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f5ea7fe5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f5ea7fe5 Branch: refs/heads/master Commit: f5ea7fe53974a7e8cbfc222b9a6f47669b53ccfd Parents: cda4603 Author: jerryshao <ss...@hortonworks.com> Authored: Mon Jul 25 15:17:06 2016 -0700 Committer: Josh Rosen <joshro...@databricks.com> Committed: Mon Jul 25 15:17:06 2016 -0700 ---------------------------------------------------------------------- .../scala/org/apache/spark/memory/MemoryManager.scala | 10 ++++++++-- .../org/apache/spark/memory/StaticMemoryManager.scala | 2 ++ .../org/apache/spark/memory/UnifiedMemoryManager.scala | 4 ++++ .../scala/org/apache/spark/storage/BlockManager.scala | 5 +++-- .../org/apache/spark/storage/memory/MemoryStore.scala | 4 +++- .../scala/org/apache/spark/memory/TestMemoryManager.scala | 2 ++ .../org/apache/spark/storage/BlockManagerSuite.scala | 8 ++++---- 7 files changed, 26 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/f5ea7fe5/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala index 0210217..82442cf 100644 --- a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala @@ -62,13 +62,19 @@ private[spark] abstract class MemoryManager( offHeapStorageMemoryPool.incrementPoolSize(offHeapStorageMemory) /** - * Total available memory for storage, in bytes. This amount can vary over time, depending on - * the MemoryManager implementation. + * Total available on heap memory for storage, in bytes. This amount can vary over time, + * depending on the MemoryManager implementation. * In this model, this is equivalent to the amount of memory not occupied by execution. */ def maxOnHeapStorageMemory: Long /** + * Total available off heap memory for storage, in bytes. This amount can vary over time, + * depending on the MemoryManager implementation. + */ + def maxOffHeapStorageMemory: Long + + /** * Set the [[MemoryStore]] used by this manager to evict cached blocks. * This must be set after construction due to initialization ordering constraints. */ http://git-wip-us.apache.org/repos/asf/spark/blob/f5ea7fe5/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala index 08155aa..a6f7db0 100644 --- a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala @@ -55,6 +55,8 @@ private[spark] class StaticMemoryManager( (maxOnHeapStorageMemory * conf.getDouble("spark.storage.unrollFraction", 0.2)).toLong } + override def maxOffHeapStorageMemory: Long = 0L + override def acquireStorageMemory( blockId: BlockId, numBytes: Long, http://git-wip-us.apache.org/repos/asf/spark/blob/f5ea7fe5/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala index c7b36be..fea2808 100644 --- a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala @@ -67,6 +67,10 @@ private[spark] class UnifiedMemoryManager private[memory] ( maxHeapMemory - onHeapExecutionMemoryPool.memoryUsed } + override def maxOffHeapStorageMemory: Long = synchronized { + maxOffHeapMemory - offHeapExecutionMemoryPool.memoryUsed + } + /** * Try to acquire up to `numBytes` of execution memory for the current task and return the * number of bytes obtained, or 0 if none can be allocated. http://git-wip-us.apache.org/repos/asf/spark/blob/f5ea7fe5/core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 83a9cbd..015e71d 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -96,7 +96,8 @@ private[spark] class BlockManager( // However, since we use this only for reporting and logging, what we actually want here is // the absolute maximum value that `maxMemory` can ever possibly reach. We may need // to revisit whether reporting this value as the "max" is intuitive to the user. - private val maxMemory = memoryManager.maxOnHeapStorageMemory + private val maxMemory = + memoryManager.maxOnHeapStorageMemory + memoryManager.maxOffHeapStorageMemory // Port used by the external shuffle service. In Yarn mode, this may be already be // set through the Hadoop configuration as the server is launched in the Yarn NM. @@ -802,7 +803,7 @@ private[spark] class BlockManager( val putBlockStatus = getCurrentBlockStatus(blockId, info) val blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid if (blockWasSuccessfullyStored) { - // Now that the block is in either the memory, externalBlockStore, or disk store, + // Now that the block is in either the memory or disk store, // tell the master about it. info.size = size if (tellMaster) { http://git-wip-us.apache.org/repos/asf/spark/blob/f5ea7fe5/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index 0349da0..586339a 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -101,7 +101,9 @@ private[spark] class MemoryStore( conf.getLong("spark.storage.unrollMemoryThreshold", 1024 * 1024) /** Total amount of memory available for storage, in bytes. */ - private def maxMemory: Long = memoryManager.maxOnHeapStorageMemory + private def maxMemory: Long = { + memoryManager.maxOnHeapStorageMemory + memoryManager.maxOffHeapStorageMemory + } if (maxMemory < unrollMemoryThreshold) { logWarning(s"Max memory ${Utils.bytesToString(maxMemory)} is less than the initial memory " + http://git-wip-us.apache.org/repos/asf/spark/blob/f5ea7fe5/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala b/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala index 6a4f409..5f699df 100644 --- a/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala +++ b/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala @@ -56,6 +56,8 @@ class TestMemoryManager(conf: SparkConf) } override def maxOnHeapStorageMemory: Long = Long.MaxValue + override def maxOffHeapStorageMemory: Long = 0L + private var oomOnce = false private var available = Long.MaxValue http://git-wip-us.apache.org/repos/asf/spark/blob/f5ea7fe5/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 6821582..8077a1b 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -239,8 +239,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE // Checking whether blocks are in memory and memory size val memStatus = master.getMemoryStatus.head._2 - assert(memStatus._1 == 20000L, "total memory " + memStatus._1 + " should equal 20000") - assert(memStatus._2 <= 12000L, "remaining memory " + memStatus._2 + " should <= 12000") + assert(memStatus._1 == 40000L, "total memory " + memStatus._1 + " should equal 40000") + assert(memStatus._2 <= 32000L, "remaining memory " + memStatus._2 + " should <= 12000") assert(store.getSingleAndReleaseLock("a1-to-remove").isDefined, "a1 was not in store") assert(store.getSingleAndReleaseLock("a2-to-remove").isDefined, "a2 was not in store") assert(store.getSingleAndReleaseLock("a3-to-remove").isDefined, "a3 was not in store") @@ -269,8 +269,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE } eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { val memStatus = master.getMemoryStatus.head._2 - memStatus._1 should equal (20000L) - memStatus._2 should equal (20000L) + memStatus._1 should equal (40000L) + memStatus._2 should equal (40000L) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org