Repository: spark
Updated Branches:
  refs/heads/master 4fdb49177 -> cdf2b0457


[SPARK-1930] The Container is running beyond physical memory limits, so as to 
be killed

Author: witgo <[email protected]>

Closes #894 from witgo/SPARK-1930 and squashes the following commits:

564307e [witgo] Update the running-on-yarn.md
3747515 [witgo] Merge branch 'master' of https://github.com/apache/spark into 
SPARK-1930
172647b [witgo] add memoryOverhead docs
a0ff545 [witgo] leaving only two configs
a17bda2 [witgo] Merge branch 'master' of https://github.com/apache/spark into 
SPARK-1930
478ca15 [witgo] Merge branch 'master' into SPARK-1930
d1244a1 [witgo] Merge branch 'master' into SPARK-1930
8b967ae [witgo] Merge branch 'master' into SPARK-1930
655a820 [witgo] review commit
71859a7 [witgo] Merge branch 'master' of https://github.com/apache/spark into 
SPARK-1930
e3c531d [witgo] review commit
e16f190 [witgo] different memoryOverhead
ffa7569 [witgo] review commit
5c9581f [witgo] Merge branch 'master' into SPARK-1930
9a6bcf2 [witgo] review commit
8fae45a [witgo] fix NullPointerException
e0dcc16 [witgo] Adding  configuration items
b6a989c [witgo] Fix container memory beyond limit, were killed


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cdf2b045
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cdf2b045
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cdf2b045

Branch: refs/heads/master
Commit: cdf2b04570871848442ca9f9e2316a37e4aaaae0
Parents: 4fdb491
Author: witgo <[email protected]>
Authored: Mon Jun 16 14:27:31 2014 -0500
Committer: Thomas Graves <[email protected]>
Committed: Mon Jun 16 14:27:31 2014 -0500

----------------------------------------------------------------------
 docs/running-on-yarn.md                               | 14 ++++++++++++++
 .../scala/org/apache/spark/deploy/yarn/Client.scala   |  4 ++--
 .../apache/spark/deploy/yarn/ExecutorLauncher.scala   |  4 +++-
 .../spark/deploy/yarn/YarnAllocationHandler.scala     | 12 ++++++++----
 .../org/apache/spark/deploy/yarn/ClientBase.scala     | 14 +++++++++-----
 .../scala/org/apache/spark/deploy/yarn/Client.scala   |  4 ++--
 .../spark/deploy/yarn/YarnAllocationHandler.scala     | 12 ++++++++----
 7 files changed, 46 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/cdf2b045/docs/running-on-yarn.md
----------------------------------------------------------------------
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index af1788f..4243ef4 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -67,6 +67,20 @@ Most of the configs are the same for Spark on YARN as for 
other deployment modes
     The address of the Spark history server (i.e. host.com:18080). The address 
should not contain a scheme (http://). Defaults to not being set since the 
history server is an optional service. This address is given to the YARN 
ResourceManager when the Spark application finishes to link the application 
from the ResourceManager UI to the Spark history server UI.
   </td>
 </tr>
+<tr>
+  <td><code>spark.yarn.executor.memoryOverhead</code></td>
+  <td>384</code></td>
+  <td>
+    The amount of off heap memory (in megabytes) to be allocated per executor. 
This is memory that accounts for things like VM overheads, interned strings, 
other native overheads, etc.
+  </td>
+</tr>
+<tr>
+  <td><code>spark.yarn.driver.memoryOverhead</code></td>
+  <td>384</code></td>
+  <td>
+    The amount of off heap memory (in megabytes) to be allocated per driver. 
This is memory that accounts for things like VM overheads, interned strings, 
other native overheads, etc.
+  </td>
+</tr>
 </table>
 
 By default, Spark on YARN will use a Spark jar installed locally, but the 
Spark JAR can also be in a world-readable location on HDFS. This allows YARN to 
cache it on nodes so that it doesn't need to be distributed each time an 
application runs. To point to a JAR on HDFS, `export 
SPARK_JAR=hdfs:///some/path`.

http://git-wip-us.apache.org/repos/asf/spark/blob/cdf2b045/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git 
a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala 
b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 4ccddc2..82f79d8 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -71,7 +71,7 @@ class Client(clientArgs: ClientArguments, hadoopConf: 
Configuration, spConf: Spa
 
     val capability = 
Records.newRecord(classOf[Resource]).asInstanceOf[Resource]
     // Memory for the ApplicationMaster.
-    capability.setMemory(args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
+    capability.setMemory(args.amMemory + memoryOverhead)
     amContainer.setResource(capability)
 
     appContext.setQueue(args.amQueue)
@@ -115,7 +115,7 @@ class Client(clientArgs: ClientArguments, hadoopConf: 
Configuration, spConf: Spa
     val minResMemory = newApp.getMinimumResourceCapability().getMemory()
     val amMemory = ((args.amMemory / minResMemory) * minResMemory) +
           ((if ((args.amMemory % minResMemory) == 0) 0 else minResMemory) -
-          YarnAllocationHandler.MEMORY_OVERHEAD)
+          memoryOverhead)
     amMemory
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/cdf2b045/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
----------------------------------------------------------------------
diff --git 
a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala 
b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
index b6ecae1..bfdb623 100644
--- 
a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
+++ 
b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
@@ -92,13 +92,15 @@ class ExecutorLauncher(args: ApplicationMasterArguments, 
conf: Configuration, sp
 
     appAttemptId = getApplicationAttemptId()
     resourceManager = registerWithResourceManager()
+
     val appMasterResponse: RegisterApplicationMasterResponse = 
registerApplicationMaster()
 
     // Compute number of threads for akka
     val minimumMemory = 
appMasterResponse.getMinimumResourceCapability().getMemory()
 
     if (minimumMemory > 0) {
-      val mem = args.executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD
+      val mem = args.executorMemory + 
sparkConf.getInt("spark.yarn.executor.memoryOverhead",
+        YarnAllocationHandler.MEMORY_OVERHEAD)
       val numCore = (mem  / minimumMemory) + (if (0 != (mem % minimumMemory)) 
1 else 0)
 
       if (numCore > 0) {

http://git-wip-us.apache.org/repos/asf/spark/blob/cdf2b045/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
----------------------------------------------------------------------
diff --git 
a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
 
b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index 856391e..80e0162 100644
--- 
a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ 
b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -88,6 +88,10 @@ private[yarn] class YarnAllocationHandler(
   // Containers to be released in next request to RM
   private val pendingReleaseContainers = new ConcurrentHashMap[ContainerId, 
Boolean]
 
+  // Additional memory overhead - in mb.
+  private def memoryOverhead: Int = 
sparkConf.getInt("spark.yarn.executor.memoryOverhead",
+    YarnAllocationHandler.MEMORY_OVERHEAD)
+
   private val numExecutorsRunning = new AtomicInteger()
   // Used to generate a unique id per executor
   private val executorIdCounter = new AtomicInteger()
@@ -99,7 +103,7 @@ private[yarn] class YarnAllocationHandler(
   def getNumExecutorsFailed: Int = numExecutorsFailed.intValue
 
   def isResourceConstraintSatisfied(container: Container): Boolean = {
-    container.getResource.getMemory >= (executorMemory + 
YarnAllocationHandler.MEMORY_OVERHEAD)
+    container.getResource.getMemory >= (executorMemory + memoryOverhead)
   }
 
   def allocateContainers(executorsToRequest: Int) {
@@ -229,7 +233,7 @@ private[yarn] class YarnAllocationHandler(
         val containerId = container.getId
 
         assert( container.getResource.getMemory >=
-          (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD))
+          (executorMemory + memoryOverhead))
 
         if (numExecutorsRunningNow > maxExecutors) {
           logInfo("""Ignoring container %s at host %s, since we already have 
the required number of
@@ -450,7 +454,7 @@ private[yarn] class YarnAllocationHandler(
 
     if (numExecutors > 0) {
       logInfo("Allocating %d executor containers with %d of memory 
each.".format(numExecutors,
-        executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD))
+        executorMemory + memoryOverhead))
     } else {
       logDebug("Empty allocation req ..  release : " + releasedContainerList)
     }
@@ -505,7 +509,7 @@ private[yarn] class YarnAllocationHandler(
     val rsrcRequest = Records.newRecord(classOf[ResourceRequest])
     val memCapability = Records.newRecord(classOf[Resource])
     // There probably is some overhead here, let's reserve a bit more memory.
-    memCapability.setMemory(executorMemory + 
YarnAllocationHandler.MEMORY_OVERHEAD)
+    memCapability.setMemory(executorMemory + memoryOverhead)
     rsrcRequest.setCapability(memCapability)
 
     val pri = Records.newRecord(classOf[Priority])

http://git-wip-us.apache.org/repos/asf/spark/blob/cdf2b045/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
----------------------------------------------------------------------
diff --git 
a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala 
b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
index 6861b50..858bcaa 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
@@ -65,6 +65,10 @@ trait ClientBase extends Logging {
   val APP_FILE_PERMISSION: FsPermission =
     FsPermission.createImmutable(Integer.parseInt("644", 8).toShort)
 
+  // Additional memory overhead - in mb.
+  protected def memoryOverhead: Int = 
sparkConf.getInt("spark.yarn.driver.memoryOverhead",
+    YarnAllocationHandler.MEMORY_OVERHEAD)
+
   // TODO(harvey): This could just go in ClientArguments.
   def validateArgs() = {
     Map(
@@ -72,10 +76,10 @@ trait ClientBase extends Logging {
           "Error: You must specify a user jar when running in standalone 
mode!"),
       (args.userClass == null) -> "Error: You must specify a user class!",
       (args.numExecutors <= 0) -> "Error: You must specify at least 1 
executor!",
-      (args.amMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) -> ("Error: AM 
memory size must be" +
-        "greater than: " + YarnAllocationHandler.MEMORY_OVERHEAD),
-      (args.executorMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) -> 
("Error: Executor memory size" +
-        "must be greater than: " + 
YarnAllocationHandler.MEMORY_OVERHEAD.toString)
+      (args.amMemory <= memoryOverhead) -> ("Error: AM memory size must be" +
+        "greater than: " + memoryOverhead),
+      (args.executorMemory <= memoryOverhead) -> ("Error: Executor memory 
size" +
+        "must be greater than: " + memoryOverhead.toString)
     ).foreach { case(cond, errStr) =>
       if (cond) {
         logError(errStr)
@@ -101,7 +105,7 @@ trait ClientBase extends Logging {
       logError(errorMessage)
       throw new IllegalArgumentException(errorMessage)
     }
-    val amMem = args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD
+    val amMem = args.amMemory + memoryOverhead
     if (amMem > maxMem) {
 
       val errorMessage = "Required AM memory (%d) is above the max threshold 
(%d) of this cluster."

http://git-wip-us.apache.org/repos/asf/spark/blob/cdf2b045/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git 
a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala 
b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 80a8bce..15f3c4f 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -84,7 +84,7 @@ class Client(clientArgs: ClientArguments, hadoopConf: 
Configuration, spConf: Spa
 
     // Memory for the ApplicationMaster.
     val memoryResource = 
Records.newRecord(classOf[Resource]).asInstanceOf[Resource]
-    memoryResource.setMemory(args.amMemory + 
YarnAllocationHandler.MEMORY_OVERHEAD)
+    memoryResource.setMemory(args.amMemory + memoryOverhead)
     appContext.setResource(memoryResource)
 
     // Finally, submit and monitor the application.
@@ -117,7 +117,7 @@ class Client(clientArgs: ClientArguments, hadoopConf: 
Configuration, spConf: Spa
     // val minResMemory: Int = 
newApp.getMinimumResourceCapability().getMemory()
     // var amMemory = ((args.amMemory / minResMemory) * minResMemory) +
     //  ((if ((args.amMemory % minResMemory) == 0) 0 else minResMemory) -
-    //    YarnAllocationHandler.MEMORY_OVERHEAD)
+    //    memoryOverhead )
     args.amMemory
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/cdf2b045/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
----------------------------------------------------------------------
diff --git 
a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
 
b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index a979fe4..29ccec2 100644
--- 
a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ 
b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -90,6 +90,10 @@ private[yarn] class YarnAllocationHandler(
   // Containers to be released in next request to RM
   private val pendingReleaseContainers = new ConcurrentHashMap[ContainerId, 
Boolean]
 
+  // Additional memory overhead - in mb.
+  private def memoryOverhead: Int = 
sparkConf.getInt("spark.yarn.executor.memoryOverhead",
+    YarnAllocationHandler.MEMORY_OVERHEAD)
+
   // Number of container requests that have been sent to, but not yet 
allocated by the
   // ApplicationMaster.
   private val numPendingAllocate = new AtomicInteger()
@@ -106,7 +110,7 @@ private[yarn] class YarnAllocationHandler(
   def getNumExecutorsFailed: Int = numExecutorsFailed.intValue
 
   def isResourceConstraintSatisfied(container: Container): Boolean = {
-    container.getResource.getMemory >= (executorMemory + 
YarnAllocationHandler.MEMORY_OVERHEAD)
+    container.getResource.getMemory >= (executorMemory + memoryOverhead)
   }
 
   def releaseContainer(container: Container) {
@@ -248,7 +252,7 @@ private[yarn] class YarnAllocationHandler(
         val executorHostname = container.getNodeId.getHost
         val containerId = container.getId
 
-        val executorMemoryOverhead = (executorMemory + 
YarnAllocationHandler.MEMORY_OVERHEAD)
+        val executorMemoryOverhead = (executorMemory + memoryOverhead)
         assert(container.getResource.getMemory >= executorMemoryOverhead)
 
         if (numExecutorsRunningNow > maxExecutors) {
@@ -477,7 +481,7 @@ private[yarn] class YarnAllocationHandler(
       numPendingAllocate.addAndGet(numExecutors)
       logInfo("Will Allocate %d executor containers, each with %d 
memory".format(
         numExecutors,
-        (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD)))
+        (executorMemory + memoryOverhead)))
     } else {
       logDebug("Empty allocation request ...")
     }
@@ -537,7 +541,7 @@ private[yarn] class YarnAllocationHandler(
       priority: Int
     ): ArrayBuffer[ContainerRequest] = {
 
-    val memoryRequest = executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD
+    val memoryRequest = executorMemory + memoryOverhead
     val resource = Resource.newInstance(memoryRequest, executorCores)
 
     val prioritySetting = Records.newRecord(classOf[Priority])

Reply via email to