Repository: spark
Updated Branches:
  refs/heads/master 58a6077e5 -> acd4ac7c9


SPARK-3837. Warn when YARN kills containers for exceeding memory limits

I triggered the issue and verified the message gets printed on a 
pseudo-distributed cluster.

Author: Sandy Ryza <sa...@cloudera.com>

Closes #2744 from sryza/sandy-spark-3837 and squashes the following commits:

858a268 [Sandy Ryza] Review feedback
c937f00 [Sandy Ryza] SPARK-3837. Warn when YARN kills containers for exceeding 
memory limits


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/acd4ac7c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/acd4ac7c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/acd4ac7c

Branch: refs/heads/master
Commit: acd4ac7c9a503445e27739708cf36e19119b8ddc
Parents: 58a6077
Author: Sandy Ryza <sa...@cloudera.com>
Authored: Fri Oct 31 08:43:06 2014 -0500
Committer: Thomas Graves <tgra...@apache.org>
Committed: Fri Oct 31 08:43:06 2014 -0500

----------------------------------------------------------------------
 .../spark/deploy/yarn/YarnAllocator.scala       | 30 +++++++++++++++--
 .../spark/deploy/yarn/YarnAllocatorSuite.scala  | 34 ++++++++++++++++++++
 2 files changed, 61 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/acd4ac7c/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
----------------------------------------------------------------------
diff --git 
a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala 
b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 7ae8ef2..e619619 100644
--- 
a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ 
b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -20,6 +20,7 @@ package org.apache.spark.deploy.yarn
 import java.util.{List => JList}
 import java.util.concurrent._
 import java.util.concurrent.atomic.AtomicInteger
+import java.util.regex.Pattern
 
 import scala.collection.JavaConversions._
 import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
@@ -375,12 +376,22 @@ private[yarn] abstract class YarnAllocator(
           logInfo("Completed container %s (state: %s, exit status: %s)".format(
             containerId,
             completedContainer.getState,
-            completedContainer.getExitStatus()))
+            completedContainer.getExitStatus))
           // Hadoop 2.2.X added a ContainerExitStatus we should switch to use
           // there are some exit status' we shouldn't necessarily count 
against us, but for
           // now I think its ok as none of the containers are expected to exit
-          if (completedContainer.getExitStatus() != 0) {
-            logInfo("Container marked as failed: " + containerId)
+          if (completedContainer.getExitStatus == -103) { // vmem limit 
exceeded
+            logWarning(memLimitExceededLogMessage(
+              completedContainer.getDiagnostics,
+              VMEM_EXCEEDED_PATTERN))
+          } else if (completedContainer.getExitStatus == -104) { // pmem limit 
exceeded
+            logWarning(memLimitExceededLogMessage(
+              completedContainer.getDiagnostics,
+              PMEM_EXCEEDED_PATTERN))
+          } else if (completedContainer.getExitStatus != 0) {
+            logInfo("Container marked as failed: " + containerId +
+              ". Exit status: " + completedContainer.getExitStatus +
+              ". Diagnostics: " + completedContainer.getDiagnostics)
             numExecutorsFailed.incrementAndGet()
           }
         }
@@ -428,6 +439,19 @@ private[yarn] abstract class YarnAllocator(
     }
   }
 
+  private val MEM_REGEX = "[0-9.]+ [KMG]B"
+  private val PMEM_EXCEEDED_PATTERN =
+    Pattern.compile(s"$MEM_REGEX of $MEM_REGEX physical memory used")
+  private val VMEM_EXCEEDED_PATTERN =
+    Pattern.compile(s"$MEM_REGEX of $MEM_REGEX virtual memory used")
+
+  def memLimitExceededLogMessage(diagnostics: String, pattern: Pattern): 
String = {
+    val matcher = pattern.matcher(diagnostics)
+    val diag = if (matcher.find()) " " + matcher.group() + "." else ""
+    ("Container killed by YARN for exceeding memory limits." + diag
+      + " Consider boosting spark.yarn.executor.memoryOverhead.")
+  }
+
   protected def allocatedContainersOnHost(host: String): Int = {
     var retval = 0
     allocatedHostToContainersMap.synchronized {

http://git-wip-us.apache.org/repos/asf/spark/blob/acd4ac7c/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
----------------------------------------------------------------------
diff --git 
a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
 
b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
new file mode 100644
index 0000000..9fff63f
--- /dev/null
+++ 
b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import org.apache.spark.deploy.yarn.MemLimitLogger._
+import org.scalatest.FunSuite
+
+class YarnAllocatorSuite extends FunSuite {
+  test("memory exceeded diagnostic regexes") {
+    val diagnostics =
+      "Container 
[pid=12465,containerID=container_1412887393566_0003_01_000002] is running " +
+      "beyond physical memory limits. Current usage: 2.1 MB of 2 GB physical 
memory used; " +
+      "5.8 GB of 4.2 GB virtual memory used. Killing container."
+    val vmemMsg = memLimitExceededLogMessage(diagnostics, 
VMEM_EXCEEDED_PATTERN)
+    val pmemMsg = memLimitExceededLogMessage(diagnostics, 
PMEM_EXCEEDED_PATTERN)
+    assert(vmemMsg.contains("5.8 GB of 4.2 GB virtual memory used."))
+    assert(pmemMsg.contains("2.1 MB of 2 GB physical memory used."))
+  }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to