Repository: spark
Updated Branches:
  refs/heads/branch-1.1 e08333463 -> 25cabd7ee


[SPARK-2718] [yarn] Handle quotes and other characters in user args.

Due to the way Yarn runs things through bash, normal quoting doesn't
work as expected. This change applies the necessary voodoo to the user
args to avoid issues with bash and special characters.

The change also uncovered an issue with the event logger app name
sanitizing code; it wasn't cleaning up all "bad" characters, so
sometimes it would fail to create the log dirs. I just added some
more bad character replacements.

Author: Marcelo Vanzin <van...@cloudera.com>

Closes #1724 from vanzin/SPARK-2718 and squashes the following commits:

cc84b89 [Marcelo Vanzin] Review feedback.
c1a257a [Marcelo Vanzin] Add test for backslashes.
55571d4 [Marcelo Vanzin] Unbreak yarn-client.
515613d [Marcelo Vanzin] [SPARK-2718] [yarn] Handle quotes and other characters 
in user args.

(cherry picked from commit 6201b27643023569e19b68aa9d5c4e4e59ce0d79)
Signed-off-by: Andrew Or <andrewo...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/25cabd7e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/25cabd7e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/25cabd7e

Branch: refs/heads/branch-1.1
Commit: 25cabd7eec6e499fce94bce0d45087e9d8726a50
Parents: e083334
Author: Marcelo Vanzin <van...@cloudera.com>
Authored: Mon Aug 18 14:10:10 2014 -0700
Committer: Andrew Or <andrewo...@gmail.com>
Committed: Mon Aug 18 14:10:49 2014 -0700

----------------------------------------------------------------------
 .../spark/scheduler/EventLoggingListener.scala  |  3 +-
 .../yarn/ApplicationMasterArguments.scala       |  6 +-
 .../apache/spark/deploy/yarn/ClientBase.scala   |  9 +--
 .../deploy/yarn/ExecutorRunnableUtil.scala      |  4 +-
 .../spark/deploy/yarn/YarnSparkHadoopUtil.scala | 25 ++++++++
 .../deploy/yarn/YarnSparkHadoopUtilSuite.scala  | 64 ++++++++++++++++++++
 6 files changed, 101 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/25cabd7e/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala 
b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index 7378ce9..370fcd8 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -54,7 +54,8 @@ private[spark] class EventLoggingListener(
   private val testing = sparkConf.getBoolean("spark.eventLog.testing", false)
   private val outputBufferSize = sparkConf.getInt("spark.eventLog.buffer.kb", 
100) * 1024
   private val logBaseDir = sparkConf.get("spark.eventLog.dir", 
DEFAULT_LOG_DIR).stripSuffix("/")
-  private val name = appName.replaceAll("[ :/]", "-").toLowerCase + "-" + 
System.currentTimeMillis
+  private val name = appName.replaceAll("[ :/]", "-").replaceAll("[${}'\"]", 
"_")
+    .toLowerCase + "-" + System.currentTimeMillis
   val logDir = Utils.resolveURI(logBaseDir) + "/" + name.stripSuffix("/")
 
   protected val logger = new FileLogger(logDir, sparkConf, hadoopConf, 
outputBufferSize,

http://git-wip-us.apache.org/repos/asf/spark/blob/25cabd7e/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
----------------------------------------------------------------------
diff --git 
a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
 
b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
index 4c383ab..424b0fb 100644
--- 
a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
+++ 
b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
@@ -29,7 +29,7 @@ class ApplicationMasterArguments(val args: Array[String]) {
   var numExecutors = ApplicationMasterArguments.DEFAULT_NUMBER_EXECUTORS
 
   parseArgs(args.toList)
-  
+
   private def parseArgs(inputArgs: List[String]): Unit = {
     val userArgsBuffer = new ArrayBuffer[String]()
 
@@ -47,7 +47,7 @@ class ApplicationMasterArguments(val args: Array[String]) {
           userClass = value
           args = tail
 
-        case ("--args") :: value :: tail =>
+        case ("--args" | "--arg") :: value :: tail =>
           userArgsBuffer += value
           args = tail
 
@@ -75,7 +75,7 @@ class ApplicationMasterArguments(val args: Array[String]) {
 
     userArgs = userArgsBuffer.readOnly
   }
-  
+
   def printUsageAndExit(exitCode: Int, unknownParam: Any = null) {
     if (unknownParam != null) {
       System.err.println("Unknown/unsupported param " + unknownParam)

http://git-wip-us.apache.org/repos/asf/spark/blob/25cabd7e/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
----------------------------------------------------------------------
diff --git 
a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala 
b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
index 1da0a1b..3897b3a 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
@@ -300,11 +300,11 @@ trait ClientBase extends Logging {
   }
 
   def userArgsToString(clientArgs: ClientArguments): String = {
-    val prefix = " --args "
+    val prefix = " --arg "
     val args = clientArgs.userArgs
     val retval = new StringBuilder()
     for (arg <- args) {
-      retval.append(prefix).append(" '").append(arg).append("' ")
+      retval.append(prefix).append(" 
").append(YarnSparkHadoopUtil.escapeForShell(arg))
     }
     retval.toString
   }
@@ -386,7 +386,7 @@ trait ClientBase extends Logging {
     // TODO: it might be nicer to pass these as an internal environment 
variable rather than
     // as Java options, due to complications with string parsing of nested 
quotes.
     for ((k, v) <- sparkConf.getAll) {
-      javaOpts += "-D" + k + "=" + "\\\"" + v + "\\\""
+      javaOpts += YarnSparkHadoopUtil.escapeForShell(s"-D$k=$v")
     }
 
     if (args.amClass == classOf[ApplicationMaster].getName) {
@@ -400,7 +400,8 @@ trait ClientBase extends Logging {
     // Command for the ApplicationMaster
     val commands = Seq(Environment.JAVA_HOME.$() + "/bin/java", "-server") ++
       javaOpts ++
-      Seq(args.amClass, "--class", args.userClass, "--jar ", args.userJar,
+      Seq(args.amClass, "--class", 
YarnSparkHadoopUtil.escapeForShell(args.userClass),
+        "--jar ", YarnSparkHadoopUtil.escapeForShell(args.userJar),
         userArgsToString(args),
         "--executor-memory", args.executorMemory.toString,
         "--executor-cores", args.executorCores.toString,

http://git-wip-us.apache.org/repos/asf/spark/blob/25cabd7e/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
----------------------------------------------------------------------
diff --git 
a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
 
b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
index 71a9e42..312d82a 100644
--- 
a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
+++ 
b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
@@ -68,10 +68,10 @@ trait ExecutorRunnableUtil extends Logging {
     // authentication settings.
     sparkConf.getAll.
       filter { case (k, v) => k.startsWith("spark.auth") || 
k.startsWith("spark.akka") }.
-      foreach { case (k, v) => javaOpts += "-D" + k + "=" + "\\\"" + v + 
"\\\"" }
+      foreach { case (k, v) => javaOpts += 
YarnSparkHadoopUtil.escapeForShell(s"-D$k=$v") }
 
     sparkConf.getAkkaConf.
-      foreach { case (k, v) => javaOpts += "-D" + k + "=" + "\\\"" + v + 
"\\\"" }
+      foreach { case (k, v) => javaOpts += 
YarnSparkHadoopUtil.escapeForShell(s"-D$k=$v") }
 
     // Commenting it out for now - so that people can refer to the properties 
if required. Remove
     // it once cpuset version is pushed out.

http://git-wip-us.apache.org/repos/asf/spark/blob/25cabd7e/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git 
a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
 
b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index e98308c..10aef5e 100644
--- 
a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ 
b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -148,4 +148,29 @@ object YarnSparkHadoopUtil {
     }
   }
 
+  /**
+   * Escapes a string for inclusion in a command line executed by Yarn. Yarn 
executes commands
+   * using `bash -c "command arg1 arg2"` and that means plain quoting doesn't 
really work. The
+   * argument is enclosed in single quotes and some key characters are escaped.
+   *
+   * @param arg A single argument.
+   * @return Argument quoted for execution via Yarn's generated shell script.
+   */
+  def escapeForShell(arg: String): String = {
+    if (arg != null) {
+      val escaped = new StringBuilder("'")
+      for (i <- 0 to arg.length() - 1) {
+        arg.charAt(i) match {
+          case '$' => escaped.append("\\$")
+          case '"' => escaped.append("\\\"")
+          case '\'' => escaped.append("'\\''")
+          case c => escaped.append(c)
+        }
+      }
+      escaped.append("'").toString()
+    } else {
+      arg
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/25cabd7e/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
----------------------------------------------------------------------
diff --git 
a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
 
b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
new file mode 100644
index 0000000..7650bd4
--- /dev/null
+++ 
b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.io.{File, IOException}
+
+import com.google.common.io.{ByteStreams, Files}
+import org.scalatest.{FunSuite, Matchers}
+
+import org.apache.spark.Logging
+
+class YarnSparkHadoopUtilSuite extends FunSuite with Matchers with Logging {
+
+  val hasBash =
+    try {
+      val exitCode = Runtime.getRuntime().exec(Array("bash", 
"--version")).waitFor()
+      exitCode == 0
+    } catch {
+      case e: IOException =>
+        false
+    }
+
+  if (!hasBash) {
+    logWarning("Cannot execute bash, skipping bash tests.")
+  }
+
+  def bashTest(name: String)(fn: => Unit) =
+    if (hasBash) test(name)(fn) else ignore(name)(fn)
+
+  bashTest("shell script escaping") {
+    val scriptFile = File.createTempFile("script.", ".sh")
+    val args = Array("arg1", "${arg.2}", "\"arg3\"", "'arg4'", "$arg5", 
"\\arg6")
+    try {
+      val argLine = args.map(a => 
YarnSparkHadoopUtil.escapeForShell(a)).mkString(" ")
+      Files.write(("bash -c \"echo " + argLine + "\"").getBytes(), scriptFile)
+      scriptFile.setExecutable(true)
+
+      val proc = Runtime.getRuntime().exec(Array(scriptFile.getAbsolutePath()))
+      val out = new 
String(ByteStreams.toByteArray(proc.getInputStream())).trim()
+      val err = new String(ByteStreams.toByteArray(proc.getErrorStream()))
+      val exitCode = proc.waitFor()
+      exitCode should be (0)
+      out should be (args.mkString(" "))
+    } finally {
+      scriptFile.delete()
+    }
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to