Repository: spark
Updated Branches:
  refs/heads/master e772b4e4e -> 1390e56fa


http://git-wip-us.apache.org/repos/asf/spark/blob/1390e56f/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolResponse.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolResponse.scala
 
b/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolResponse.scala
new file mode 100644
index 0000000..16dfe04
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolResponse.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.rest
+
+import java.lang.Boolean
+
+/**
+ * An abstract response sent from the server in the REST application 
submission protocol.
+ */
+private[spark] abstract class SubmitRestProtocolResponse extends 
SubmitRestProtocolMessage {
+  var serverSparkVersion: String = null
+  var success: Boolean = null
+  var unknownFields: Array[String] = null
+  protected override def doValidate(): Unit = {
+    super.doValidate()
+    assertFieldIsSet(serverSparkVersion, "serverSparkVersion")
+  }
+}
+
+/**
+ * A response to a [[CreateSubmissionRequest]] in the REST application 
submission protocol.
+ */
+private[spark] class CreateSubmissionResponse extends 
SubmitRestProtocolResponse {
+  var submissionId: String = null
+  protected override def doValidate(): Unit = {
+    super.doValidate()
+    assertFieldIsSet(success, "success")
+  }
+}
+
+/**
+ * A response to a kill request in the REST application submission protocol.
+ */
+private[spark] class KillSubmissionResponse extends SubmitRestProtocolResponse 
{
+  var submissionId: String = null
+  protected override def doValidate(): Unit = {
+    super.doValidate()
+    assertFieldIsSet(submissionId, "submissionId")
+    assertFieldIsSet(success, "success")
+  }
+}
+
+/**
+ * A response to a status request in the REST application submission protocol.
+ */
+private[spark] class SubmissionStatusResponse extends 
SubmitRestProtocolResponse {
+  var submissionId: String = null
+  var driverState: String = null
+  var workerId: String = null
+  var workerHostPort: String = null
+
+  protected override def doValidate(): Unit = {
+    super.doValidate()
+    assertFieldIsSet(submissionId, "submissionId")
+    assertFieldIsSet(success, "success")
+  }
+}
+
+/**
+ * An error response message used in the REST application submission protocol.
+ */
+private[spark] class ErrorResponse extends SubmitRestProtocolResponse {
+  // The highest protocol version that the server knows about
+  // This is set when the client specifies an unknown version
+  var highestProtocolVersion: String = null
+  protected override def doValidate(): Unit = {
+    super.doValidate()
+    assertFieldIsSet(message, "message")
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/1390e56f/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
index ed02ca8..e955636 100644
--- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
@@ -68,7 +68,8 @@ class JsonProtocolSuite extends FunSuite {
     val completedApps = Array[ApplicationInfo]()
     val activeDrivers = Array(createDriverInfo())
     val completedDrivers = Array(createDriverInfo())
-    val stateResponse = new MasterStateResponse("host", 8080, workers, 
activeApps, completedApps,
+    val stateResponse = new MasterStateResponse(
+      "host", 8080, None, workers, activeApps, completedApps,
       activeDrivers, completedDrivers, RecoveryState.ALIVE)
     val output = JsonProtocol.writeMasterState(stateResponse)
     assertValidJson(output)

http://git-wip-us.apache.org/repos/asf/spark/blob/1390e56f/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 3f1355f..1ddccae 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -141,7 +141,7 @@ class SparkSubmitSuite extends FunSuite with Matchers with 
ResetSystemProperties
       "thejar.jar",
       "arg1", "arg2")
     val appArgs = new SparkSubmitArguments(clArgs)
-    val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
+    val (childArgs, classpath, sysProps, mainClass) = 
prepareSubmitEnvironment(appArgs)
     val childArgsStr = childArgs.mkString(" ")
     childArgsStr should include ("--class org.SomeClass")
     childArgsStr should include ("--executor-memory 5g")
@@ -180,7 +180,7 @@ class SparkSubmitSuite extends FunSuite with Matchers with 
ResetSystemProperties
       "thejar.jar",
       "arg1", "arg2")
     val appArgs = new SparkSubmitArguments(clArgs)
-    val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
+    val (childArgs, classpath, sysProps, mainClass) = 
prepareSubmitEnvironment(appArgs)
     childArgs.mkString(" ") should be ("arg1 arg2")
     mainClass should be ("org.SomeClass")
     classpath should have length (4)
@@ -201,6 +201,18 @@ class SparkSubmitSuite extends FunSuite with Matchers with 
ResetSystemProperties
   }
 
   test("handles standalone cluster mode") {
+    testStandaloneCluster(useRest = true)
+  }
+
+  test("handles legacy standalone cluster mode") {
+    testStandaloneCluster(useRest = false)
+  }
+
+  /**
+   * Test whether the launch environment is correctly set up in standalone 
cluster mode.
+   * @param useRest whether to use the REST submission gateway introduced in 
Spark 1.3
+   */
+  private def testStandaloneCluster(useRest: Boolean): Unit = {
     val clArgs = Seq(
       "--deploy-mode", "cluster",
       "--master", "spark://h:p",
@@ -212,17 +224,26 @@ class SparkSubmitSuite extends FunSuite with Matchers 
with ResetSystemProperties
       "thejar.jar",
       "arg1", "arg2")
     val appArgs = new SparkSubmitArguments(clArgs)
-    val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
+    appArgs.useRest = useRest
+    val (childArgs, classpath, sysProps, mainClass) = 
prepareSubmitEnvironment(appArgs)
     val childArgsStr = childArgs.mkString(" ")
-    childArgsStr should startWith ("--memory 4g --cores 5 --supervise")
-    childArgsStr should include regex ("launch spark://h:p .*thejar.jar 
org.SomeClass arg1 arg2")
-    mainClass should be ("org.apache.spark.deploy.Client")
-    classpath should have size (0)
-    sysProps should have size (5)
+    if (useRest) {
+      childArgsStr should endWith ("thejar.jar org.SomeClass arg1 arg2")
+      mainClass should be ("org.apache.spark.deploy.rest.StandaloneRestClient")
+    } else {
+      childArgsStr should startWith ("--supervise --memory 4g --cores 5")
+      childArgsStr should include regex "launch spark://h:p .*thejar.jar 
org.SomeClass arg1 arg2"
+      mainClass should be ("org.apache.spark.deploy.Client")
+    }
+    classpath should have size 0
+    sysProps should have size 8
     sysProps.keys should contain ("SPARK_SUBMIT")
     sysProps.keys should contain ("spark.master")
     sysProps.keys should contain ("spark.app.name")
     sysProps.keys should contain ("spark.jars")
+    sysProps.keys should contain ("spark.driver.memory")
+    sysProps.keys should contain ("spark.driver.cores")
+    sysProps.keys should contain ("spark.driver.supervise")
     sysProps.keys should contain ("spark.shuffle.spill")
     sysProps("spark.shuffle.spill") should be ("false")
   }
@@ -239,7 +260,7 @@ class SparkSubmitSuite extends FunSuite with Matchers with 
ResetSystemProperties
       "thejar.jar",
       "arg1", "arg2")
     val appArgs = new SparkSubmitArguments(clArgs)
-    val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
+    val (childArgs, classpath, sysProps, mainClass) = 
prepareSubmitEnvironment(appArgs)
     childArgs.mkString(" ") should be ("arg1 arg2")
     mainClass should be ("org.SomeClass")
     classpath should have length (1)
@@ -261,7 +282,7 @@ class SparkSubmitSuite extends FunSuite with Matchers with 
ResetSystemProperties
       "thejar.jar",
       "arg1", "arg2")
     val appArgs = new SparkSubmitArguments(clArgs)
-    val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
+    val (childArgs, classpath, sysProps, mainClass) = 
prepareSubmitEnvironment(appArgs)
     childArgs.mkString(" ") should be ("arg1 arg2")
     mainClass should be ("org.SomeClass")
     classpath should have length (1)
@@ -281,7 +302,7 @@ class SparkSubmitSuite extends FunSuite with Matchers with 
ResetSystemProperties
       "thejar.jar",
       "arg1", "arg2")
     val appArgs = new SparkSubmitArguments(clArgs)
-    val (_, _, sysProps, mainClass) = createLaunchEnv(appArgs)
+    val (_, _, sysProps, mainClass) = prepareSubmitEnvironment(appArgs)
     sysProps("spark.executor.memory") should be ("5g")
     sysProps("spark.master") should be ("yarn-cluster")
     mainClass should be ("org.apache.spark.deploy.yarn.Client")
@@ -339,7 +360,7 @@ class SparkSubmitSuite extends FunSuite with Matchers with 
ResetSystemProperties
       "--files", files,
       "thejar.jar")
     val appArgs = new SparkSubmitArguments(clArgs)
-    val sysProps = SparkSubmit.createLaunchEnv(appArgs)._3
+    val sysProps = SparkSubmit.prepareSubmitEnvironment(appArgs)._3
     appArgs.jars should be (Utils.resolveURIs(jars))
     appArgs.files should be (Utils.resolveURIs(files))
     sysProps("spark.jars") should be (Utils.resolveURIs(jars + ",thejar.jar"))
@@ -354,7 +375,7 @@ class SparkSubmitSuite extends FunSuite with Matchers with 
ResetSystemProperties
       "thejar.jar"
     )
     val appArgs2 = new SparkSubmitArguments(clArgs2)
-    val sysProps2 = SparkSubmit.createLaunchEnv(appArgs2)._3
+    val sysProps2 = SparkSubmit.prepareSubmitEnvironment(appArgs2)._3
     appArgs2.files should be (Utils.resolveURIs(files))
     appArgs2.archives should be (Utils.resolveURIs(archives))
     sysProps2("spark.yarn.dist.files") should be (Utils.resolveURIs(files))
@@ -367,7 +388,7 @@ class SparkSubmitSuite extends FunSuite with Matchers with 
ResetSystemProperties
       "mister.py"
     )
     val appArgs3 = new SparkSubmitArguments(clArgs3)
-    val sysProps3 = SparkSubmit.createLaunchEnv(appArgs3)._3
+    val sysProps3 = SparkSubmit.prepareSubmitEnvironment(appArgs3)._3
     appArgs3.pyFiles should be (Utils.resolveURIs(pyFiles))
     sysProps3("spark.submit.pyFiles") should be (
       PythonRunner.formatPaths(Utils.resolveURIs(pyFiles)).mkString(","))
@@ -392,7 +413,7 @@ class SparkSubmitSuite extends FunSuite with Matchers with 
ResetSystemProperties
       "thejar.jar"
     )
     val appArgs = new SparkSubmitArguments(clArgs)
-    val sysProps = SparkSubmit.createLaunchEnv(appArgs)._3
+    val sysProps = SparkSubmit.prepareSubmitEnvironment(appArgs)._3
     sysProps("spark.jars") should be(Utils.resolveURIs(jars + ",thejar.jar"))
     sysProps("spark.files") should be(Utils.resolveURIs(files))
 
@@ -409,7 +430,7 @@ class SparkSubmitSuite extends FunSuite with Matchers with 
ResetSystemProperties
       "thejar.jar"
     )
     val appArgs2 = new SparkSubmitArguments(clArgs2)
-    val sysProps2 = SparkSubmit.createLaunchEnv(appArgs2)._3
+    val sysProps2 = SparkSubmit.prepareSubmitEnvironment(appArgs2)._3
     sysProps2("spark.yarn.dist.files") should be(Utils.resolveURIs(files))
     sysProps2("spark.yarn.dist.archives") should 
be(Utils.resolveURIs(archives))
 
@@ -424,7 +445,7 @@ class SparkSubmitSuite extends FunSuite with Matchers with 
ResetSystemProperties
       "mister.py"
     )
     val appArgs3 = new SparkSubmitArguments(clArgs3)
-    val sysProps3 = SparkSubmit.createLaunchEnv(appArgs3)._3
+    val sysProps3 = SparkSubmit.prepareSubmitEnvironment(appArgs3)._3
     sysProps3("spark.submit.pyFiles") should be(
       PythonRunner.formatPaths(Utils.resolveURIs(pyFiles)).mkString(","))
   }
@@ -440,7 +461,7 @@ class SparkSubmitSuite extends FunSuite with Matchers with 
ResetSystemProperties
       val appArgs = new SparkSubmitArguments(args, Map("SPARK_CONF_DIR" -> 
path))
       assert(appArgs.propertiesFile != null)
       assert(appArgs.propertiesFile.startsWith(path))
-      appArgs.executorMemory should  be ("2.3g")
+      appArgs.executorMemory should be ("2.3g")
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/1390e56f/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
 
b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
new file mode 100644
index 0000000..29aed89
--- /dev/null
+++ 
b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.rest
+
+import java.io.{File, FileInputStream, FileOutputStream, PrintWriter}
+import java.util.jar.{JarEntry, JarOutputStream}
+import java.util.zip.ZipEntry
+
+import scala.collection.mutable.ArrayBuffer
+import scala.io.Source
+
+import akka.actor.ActorSystem
+import com.google.common.io.ByteStreams
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}
+import org.scalatest.exceptions.TestFailedException
+
+import org.apache.spark._
+import org.apache.spark.util.Utils
+import org.apache.spark.deploy.{SparkSubmit, SparkSubmitArguments}
+import org.apache.spark.deploy.master.{DriverState, Master}
+import org.apache.spark.deploy.worker.Worker
+
+/**
+ * End-to-end tests for the REST application submission protocol in standalone 
mode.
+ */
+class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterAll with 
BeforeAndAfterEach {
+  private val systemsToStop = new ArrayBuffer[ActorSystem]
+  private val masterRestUrl = startLocalCluster()
+  private val client = new StandaloneRestClient
+  private val mainJar = StandaloneRestSubmitSuite.createJar()
+  private val mainClass = StandaloneRestApp.getClass.getName.stripSuffix("$")
+
+  override def afterAll() {
+    systemsToStop.foreach(_.shutdown())
+  }
+
+  test("simple submit until completion") {
+    val resultsFile = File.createTempFile("test-submit", ".txt")
+    val numbers = Seq(1, 2, 3)
+    val size = 500
+    val submissionId = submitApplication(resultsFile, numbers, size)
+    waitUntilFinished(submissionId)
+    validateResult(resultsFile, numbers, size)
+  }
+
+  test("kill empty submission") {
+    val response = client.killSubmission(masterRestUrl, 
"submission-that-does-not-exist")
+    val killResponse = getKillResponse(response)
+    val killSuccess = killResponse.success
+    assert(!killSuccess)
+  }
+
+  test("kill running submission") {
+    val resultsFile = File.createTempFile("test-kill", ".txt")
+    val numbers = Seq(1, 2, 3)
+    val size = 500
+    val submissionId = submitApplication(resultsFile, numbers, size)
+    val response = client.killSubmission(masterRestUrl, submissionId)
+    val killResponse = getKillResponse(response)
+    val killSuccess = killResponse.success
+    waitUntilFinished(submissionId)
+    val response2 = client.requestSubmissionStatus(masterRestUrl, submissionId)
+    val statusResponse = getStatusResponse(response2)
+    val statusSuccess = statusResponse.success
+    val driverState = statusResponse.driverState
+    assert(killSuccess)
+    assert(statusSuccess)
+    assert(driverState === DriverState.KILLED.toString)
+    // we should not see the expected results because we killed the submission
+    intercept[TestFailedException] { validateResult(resultsFile, numbers, 
size) }
+  }
+
+  test("request status for empty submission") {
+    val response = client.requestSubmissionStatus(masterRestUrl, 
"submission-that-does-not-exist")
+    val statusResponse = getStatusResponse(response)
+    val statusSuccess = statusResponse.success
+    assert(!statusSuccess)
+  }
+
+  /**
+   * Start a local cluster containing one Master and a few Workers.
+   * Do not use [[org.apache.spark.deploy.LocalSparkCluster]] here because we 
want the REST URL.
+   * Return the Master's REST URL to which applications should be submitted.
+   */
+  private def startLocalCluster(): String = {
+    val conf = new SparkConf(false)
+      .set("spark.master.rest.enabled", "true")
+      .set("spark.master.rest.port", "0")
+    val (numWorkers, coresPerWorker, memPerWorker) = (2, 1, 512)
+    val localHostName = Utils.localHostName()
+    val (masterSystem, masterPort, _, _masterRestPort) =
+      Master.startSystemAndActor(localHostName, 0, 0, conf)
+    val masterRestPort = _masterRestPort.getOrElse { fail("REST server not 
started on Master!") }
+    val masterUrl = "spark://" + localHostName + ":" + masterPort
+    val masterRestUrl = "spark://" + localHostName + ":" + masterRestPort
+    (1 to numWorkers).foreach { n =>
+      val (workerSystem, _) = Worker.startSystemAndActor(
+        localHostName, 0, 0, coresPerWorker, memPerWorker, Array(masterUrl), 
null, Some(n))
+      systemsToStop.append(workerSystem)
+    }
+    systemsToStop.append(masterSystem)
+    masterRestUrl
+  }
+
+  /** Submit the [[StandaloneRestApp]] and return the corresponding submission 
ID. */
+  private def submitApplication(resultsFile: File, numbers: Seq[Int], size: 
Int): String = {
+    val appArgs = Seq(resultsFile.getAbsolutePath) ++ numbers.map(_.toString) 
++ Seq(size.toString)
+    val commandLineArgs = Array(
+      "--deploy-mode", "cluster",
+      "--master", masterRestUrl,
+      "--name", mainClass,
+      "--class", mainClass,
+      mainJar) ++ appArgs
+    val args = new SparkSubmitArguments(commandLineArgs)
+    val (_, _, sparkProperties, _) = SparkSubmit.prepareSubmitEnvironment(args)
+    val request = client.constructSubmitRequest(
+      mainJar, mainClass, appArgs.toArray, sparkProperties.toMap, Map.empty)
+    val response = client.createSubmission(masterRestUrl, request)
+    val submitResponse = getSubmitResponse(response)
+    val submissionId = submitResponse.submissionId
+    assert(submissionId != null, "Application submission was unsuccessful!")
+    submissionId
+  }
+
+  /** Wait until the given submission has finished running up to the specified 
timeout. */
+  private def waitUntilFinished(submissionId: String, maxSeconds: Int = 30): 
Unit = {
+    var finished = false
+    val expireTime = System.currentTimeMillis + maxSeconds * 1000
+    while (!finished) {
+      val response = client.requestSubmissionStatus(masterRestUrl, 
submissionId)
+      val statusResponse = getStatusResponse(response)
+      val driverState = statusResponse.driverState
+      finished =
+        driverState != DriverState.SUBMITTED.toString &&
+        driverState != DriverState.RUNNING.toString
+      if (System.currentTimeMillis > expireTime) {
+        fail(s"Driver $submissionId did not finish within $maxSeconds 
seconds.")
+      }
+    }
+  }
+
+  /** Return the response as a submit response, or fail with error otherwise. 
*/
+  private def getSubmitResponse(response: SubmitRestProtocolResponse): 
CreateSubmissionResponse = {
+    response match {
+      case s: CreateSubmissionResponse => s
+      case e: ErrorResponse => fail(s"Server returned error: ${e.message}")
+      case r => fail(s"Expected submit response. Actual: ${r.toJson}")
+    }
+  }
+
+  /** Return the response as a kill response, or fail with error otherwise. */
+  private def getKillResponse(response: SubmitRestProtocolResponse): 
KillSubmissionResponse = {
+    response match {
+      case k: KillSubmissionResponse => k
+      case e: ErrorResponse => fail(s"Server returned error: ${e.message}")
+      case r => fail(s"Expected kill response. Actual: ${r.toJson}")
+    }
+  }
+
+  /** Return the response as a status response, or fail with error otherwise. 
*/
+  private def getStatusResponse(response: SubmitRestProtocolResponse): 
SubmissionStatusResponse = {
+    response match {
+      case s: SubmissionStatusResponse => s
+      case e: ErrorResponse => fail(s"Server returned error: ${e.message}")
+      case r => fail(s"Expected status response. Actual: ${r.toJson}")
+    }
+  }
+
+  /** Validate whether the application produced the corrupt output. */
+  private def validateResult(resultsFile: File, numbers: Seq[Int], size: Int): 
Unit = {
+    val lines = Source.fromFile(resultsFile.getAbsolutePath).getLines().toSeq
+    val unexpectedContent =
+      if (lines.nonEmpty) {
+        "[\n" + lines.map { l => "  " + l }.mkString("\n") + "\n]"
+      } else {
+        "[EMPTY]"
+      }
+    assert(lines.size === 2, s"Unexpected content in file: $unexpectedContent")
+    assert(lines(0).toInt === numbers.sum, s"Sum of ${numbers.mkString(",")} 
is incorrect")
+    assert(lines(1).toInt === (size / 2) + 1, "Result of Spark job is 
incorrect")
+  }
+}
+
+private object StandaloneRestSubmitSuite {
+  private val pathPrefix = this.getClass.getPackage.getName.replaceAll("\\.", 
"/")
+
+  /**
+   * Create a jar that contains all the class files needed for running the 
[[StandaloneRestApp]].
+   * Return the absolute path to that jar.
+   */
+  def createJar(): String = {
+    val jarFile = File.createTempFile("test-standalone-rest-protocol", ".jar")
+    val jarFileStream = new FileOutputStream(jarFile)
+    val jarStream = new JarOutputStream(jarFileStream, new 
java.util.jar.Manifest)
+    jarStream.putNextEntry(new ZipEntry(pathPrefix))
+    getClassFiles.foreach { cf =>
+      jarStream.putNextEntry(new JarEntry(pathPrefix + "/" + cf.getName))
+      val in = new FileInputStream(cf)
+      ByteStreams.copy(in, jarStream)
+      in.close()
+    }
+    jarStream.close()
+    jarFileStream.close()
+    jarFile.getAbsolutePath
+  }
+
+  /**
+   * Return a list of class files compiled for [[StandaloneRestApp]].
+   * This includes all the anonymous classes used in the application.
+   */
+  private def getClassFiles: Seq[File] = {
+    val className = Utils.getFormattedClassName(StandaloneRestApp)
+    val clazz = StandaloneRestApp.getClass
+    val basePath = 
clazz.getProtectionDomain.getCodeSource.getLocation.toURI.getPath
+    val baseDir = new File(basePath + "/" + pathPrefix)
+    baseDir.listFiles().filter(_.getName.contains(className))
+  }
+}
+
+/**
+ * Sample application to be submitted to the cluster using the REST gateway.
+ * All relevant classes will be packaged into a jar at run time.
+ */
+object StandaloneRestApp {
+  // Usage: [path to results file] [num1] [num2] [num3] [rddSize]
+  // The first line of the results file should be (num1 + num2 + num3)
+  // The second line should be (rddSize / 2) + 1
+  def main(args: Array[String]) {
+    assert(args.size == 5, s"Expected exactly 5 arguments: 
${args.mkString(",")}")
+    val resultFile = new File(args(0))
+    val writer = new PrintWriter(resultFile)
+    try {
+      val conf = new SparkConf()
+      val sc = new SparkContext(conf)
+      val firstLine = args(1).toInt + args(2).toInt + args(3).toInt
+      val secondLine = sc.parallelize(1 to args(4).toInt)
+        .map { i => (i / 2, i) }
+        .reduceByKey(_ + _)
+        .count()
+      writer.println(firstLine)
+      writer.println(secondLine)
+    } catch {
+      case e: Exception =>
+        writer.println(e)
+        e.getStackTrace.foreach { l => writer.println("  " + l) }
+    } finally {
+      writer.close()
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/1390e56f/core/src/test/scala/org/apache/spark/deploy/rest/SubmitRestProtocolSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/rest/SubmitRestProtocolSuite.scala
 
b/core/src/test/scala/org/apache/spark/deploy/rest/SubmitRestProtocolSuite.scala
new file mode 100644
index 0000000..1d64ec2
--- /dev/null
+++ 
b/core/src/test/scala/org/apache/spark/deploy/rest/SubmitRestProtocolSuite.scala
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.rest
+
+import java.lang.Boolean
+import java.lang.Integer
+
+import org.json4s.jackson.JsonMethods._
+import org.scalatest.FunSuite
+
+import org.apache.spark.SparkConf
+
+/**
+ * Tests for the REST application submission protocol.
+ */
+class SubmitRestProtocolSuite extends FunSuite {
+
+  test("validate") {
+    val request = new DummyRequest
+    intercept[SubmitRestProtocolException] { request.validate() } // missing 
everything
+    request.clientSparkVersion = "1.2.3"
+    intercept[SubmitRestProtocolException] { request.validate() } // missing 
name and age
+    request.name = "something"
+    intercept[SubmitRestProtocolException] { request.validate() } // missing 
only age
+    request.age = 2
+    intercept[SubmitRestProtocolException] { request.validate() } // age too 
low
+    request.age = 10
+    request.validate() // everything is set properly
+    request.clientSparkVersion = null
+    intercept[SubmitRestProtocolException] { request.validate() } // missing 
only Spark version
+    request.clientSparkVersion = "1.2.3"
+    request.name = null
+    intercept[SubmitRestProtocolException] { request.validate() } // missing 
only name
+    request.message = "not-setting-name"
+    intercept[SubmitRestProtocolException] { request.validate() } // still 
missing name
+  }
+
+  test("request to and from JSON") {
+    val request = new DummyRequest
+    intercept[SubmitRestProtocolException] { request.toJson } // implicit 
validation
+    request.clientSparkVersion = "1.2.3"
+    request.active = true
+    request.age = 25
+    request.name = "jung"
+    val json = request.toJson
+    assertJsonEquals(json, dummyRequestJson)
+    val newRequest = SubmitRestProtocolMessage.fromJson(json, 
classOf[DummyRequest])
+    assert(newRequest.clientSparkVersion === "1.2.3")
+    assert(newRequest.clientSparkVersion === "1.2.3")
+    assert(newRequest.active)
+    assert(newRequest.age === 25)
+    assert(newRequest.name === "jung")
+    assert(newRequest.message === null)
+  }
+
+  test("response to and from JSON") {
+    val response = new DummyResponse
+    response.serverSparkVersion = "3.3.4"
+    response.success = true
+    val json = response.toJson
+    assertJsonEquals(json, dummyResponseJson)
+    val newResponse = SubmitRestProtocolMessage.fromJson(json, 
classOf[DummyResponse])
+    assert(newResponse.serverSparkVersion === "3.3.4")
+    assert(newResponse.serverSparkVersion === "3.3.4")
+    assert(newResponse.success)
+    assert(newResponse.message === null)
+  }
+
+  test("CreateSubmissionRequest") {
+    val message = new CreateSubmissionRequest
+    intercept[SubmitRestProtocolException] { message.validate() }
+    message.clientSparkVersion = "1.2.3"
+    message.appResource = "honey-walnut-cherry.jar"
+    message.mainClass = "org.apache.spark.examples.SparkPie"
+    val conf = new SparkConf(false)
+    conf.set("spark.app.name", "SparkPie")
+    message.sparkProperties = conf.getAll.toMap
+    message.validate()
+    // optional fields
+    conf.set("spark.jars", "mayonnaise.jar,ketchup.jar")
+    conf.set("spark.files", "fireball.png")
+    conf.set("spark.driver.memory", "512m")
+    conf.set("spark.driver.cores", "180")
+    conf.set("spark.driver.extraJavaOptions", " -Dslices=5 -Dcolor=mostly_red")
+    conf.set("spark.driver.extraClassPath", "food-coloring.jar")
+    conf.set("spark.driver.extraLibraryPath", "pickle.jar")
+    conf.set("spark.driver.supervise", "false")
+    conf.set("spark.executor.memory", "256m")
+    conf.set("spark.cores.max", "10000")
+    message.sparkProperties = conf.getAll.toMap
+    message.appArgs = Array("two slices", "a hint of cinnamon")
+    message.environmentVariables = Map("PATH" -> "/dev/null")
+    message.validate()
+    // bad fields
+    var badConf = conf.clone().set("spark.driver.cores", "one hundred feet")
+    message.sparkProperties = badConf.getAll.toMap
+    intercept[SubmitRestProtocolException] { message.validate() }
+    badConf = conf.clone().set("spark.driver.supervise", "nope, never")
+    message.sparkProperties = badConf.getAll.toMap
+    intercept[SubmitRestProtocolException] { message.validate() }
+    badConf = conf.clone().set("spark.cores.max", "two men")
+    message.sparkProperties = badConf.getAll.toMap
+    intercept[SubmitRestProtocolException] { message.validate() }
+    message.sparkProperties = conf.getAll.toMap
+    // test JSON
+    val json = message.toJson
+    assertJsonEquals(json, submitDriverRequestJson)
+    val newMessage = SubmitRestProtocolMessage.fromJson(json, 
classOf[CreateSubmissionRequest])
+    assert(newMessage.clientSparkVersion === "1.2.3")
+    assert(newMessage.appResource === "honey-walnut-cherry.jar")
+    assert(newMessage.mainClass === "org.apache.spark.examples.SparkPie")
+    assert(newMessage.sparkProperties("spark.app.name") === "SparkPie")
+    assert(newMessage.sparkProperties("spark.jars") === 
"mayonnaise.jar,ketchup.jar")
+    assert(newMessage.sparkProperties("spark.files") === "fireball.png")
+    assert(newMessage.sparkProperties("spark.driver.memory") === "512m")
+    assert(newMessage.sparkProperties("spark.driver.cores") === "180")
+    assert(newMessage.sparkProperties("spark.driver.extraJavaOptions") === " 
-Dslices=5 -Dcolor=mostly_red")
+    assert(newMessage.sparkProperties("spark.driver.extraClassPath") === 
"food-coloring.jar")
+    assert(newMessage.sparkProperties("spark.driver.extraLibraryPath") === 
"pickle.jar")
+    assert(newMessage.sparkProperties("spark.driver.supervise") === "false")
+    assert(newMessage.sparkProperties("spark.executor.memory") === "256m")
+    assert(newMessage.sparkProperties("spark.cores.max") === "10000")
+    assert(newMessage.appArgs === message.appArgs)
+    assert(newMessage.sparkProperties === message.sparkProperties)
+    assert(newMessage.environmentVariables === message.environmentVariables)
+  }
+
+  test("CreateSubmissionResponse") {
+    val message = new CreateSubmissionResponse
+    intercept[SubmitRestProtocolException] { message.validate() }
+    message.serverSparkVersion = "1.2.3"
+    message.submissionId = "driver_123"
+    message.success = true
+    message.validate()
+    // test JSON
+    val json = message.toJson
+    assertJsonEquals(json, submitDriverResponseJson)
+    val newMessage = SubmitRestProtocolMessage.fromJson(json, 
classOf[CreateSubmissionResponse])
+    assert(newMessage.serverSparkVersion === "1.2.3")
+    assert(newMessage.submissionId === "driver_123")
+    assert(newMessage.success)
+  }
+
+  test("KillSubmissionResponse") {
+    val message = new KillSubmissionResponse
+    intercept[SubmitRestProtocolException] { message.validate() }
+    message.serverSparkVersion = "1.2.3"
+    message.submissionId = "driver_123"
+    message.success = true
+    message.validate()
+    // test JSON
+    val json = message.toJson
+    assertJsonEquals(json, killDriverResponseJson)
+    val newMessage = SubmitRestProtocolMessage.fromJson(json, 
classOf[KillSubmissionResponse])
+    assert(newMessage.serverSparkVersion === "1.2.3")
+    assert(newMessage.submissionId === "driver_123")
+    assert(newMessage.success)
+  }
+
+  test("SubmissionStatusResponse") {
+    val message = new SubmissionStatusResponse
+    intercept[SubmitRestProtocolException] { message.validate() }
+    message.serverSparkVersion = "1.2.3"
+    message.submissionId = "driver_123"
+    message.success = true
+    message.validate()
+    // optional fields
+    message.driverState = "RUNNING"
+    message.workerId = "worker_123"
+    message.workerHostPort = "1.2.3.4:7780"
+    // test JSON
+    val json = message.toJson
+    assertJsonEquals(json, driverStatusResponseJson)
+    val newMessage = SubmitRestProtocolMessage.fromJson(json, 
classOf[SubmissionStatusResponse])
+    assert(newMessage.serverSparkVersion === "1.2.3")
+    assert(newMessage.submissionId === "driver_123")
+    assert(newMessage.driverState === "RUNNING")
+    assert(newMessage.success)
+    assert(newMessage.workerId === "worker_123")
+    assert(newMessage.workerHostPort === "1.2.3.4:7780")
+  }
+
+  test("ErrorResponse") {
+    val message = new ErrorResponse
+    intercept[SubmitRestProtocolException] { message.validate() }
+    message.serverSparkVersion = "1.2.3"
+    message.message = "Field not found in submit request: X"
+    message.validate()
+    // test JSON
+    val json = message.toJson
+    assertJsonEquals(json, errorJson)
+    val newMessage = SubmitRestProtocolMessage.fromJson(json, 
classOf[ErrorResponse])
+    assert(newMessage.serverSparkVersion === "1.2.3")
+    assert(newMessage.message === "Field not found in submit request: X")
+  }
+
+  private val dummyRequestJson =
+    """
+      |{
+      |  "action" : "DummyRequest",
+      |  "active" : true,
+      |  "age" : 25,
+      |  "clientSparkVersion" : "1.2.3",
+      |  "name" : "jung"
+      |}
+    """.stripMargin
+
+  private val dummyResponseJson =
+    """
+      |{
+      |  "action" : "DummyResponse",
+      |  "serverSparkVersion" : "3.3.4",
+      |  "success": true
+      |}
+    """.stripMargin
+
+  private val submitDriverRequestJson =
+    """
+      |{
+      |  "action" : "CreateSubmissionRequest",
+      |  "appArgs" : [ "two slices", "a hint of cinnamon" ],
+      |  "appResource" : "honey-walnut-cherry.jar",
+      |  "clientSparkVersion" : "1.2.3",
+      |  "environmentVariables" : {
+      |    "PATH" : "/dev/null"
+      |  },
+      |  "mainClass" : "org.apache.spark.examples.SparkPie",
+      |  "sparkProperties" : {
+      |    "spark.driver.extraLibraryPath" : "pickle.jar",
+      |    "spark.jars" : "mayonnaise.jar,ketchup.jar",
+      |    "spark.driver.supervise" : "false",
+      |    "spark.app.name" : "SparkPie",
+      |    "spark.cores.max" : "10000",
+      |    "spark.driver.memory" : "512m",
+      |    "spark.files" : "fireball.png",
+      |    "spark.driver.cores" : "180",
+      |    "spark.driver.extraJavaOptions" : " -Dslices=5 -Dcolor=mostly_red",
+      |    "spark.executor.memory" : "256m",
+      |    "spark.driver.extraClassPath" : "food-coloring.jar"
+      |  }
+      |}
+    """.stripMargin
+
+  private val submitDriverResponseJson =
+    """
+      |{
+      |  "action" : "CreateSubmissionResponse",
+      |  "serverSparkVersion" : "1.2.3",
+      |  "submissionId" : "driver_123",
+      |  "success" : true
+      |}
+    """.stripMargin
+
+  private val killDriverResponseJson =
+    """
+      |{
+      |  "action" : "KillSubmissionResponse",
+      |  "serverSparkVersion" : "1.2.3",
+      |  "submissionId" : "driver_123",
+      |  "success" : true
+      |}
+    """.stripMargin
+
+  private val driverStatusResponseJson =
+    """
+      |{
+      |  "action" : "SubmissionStatusResponse",
+      |  "driverState" : "RUNNING",
+      |  "serverSparkVersion" : "1.2.3",
+      |  "submissionId" : "driver_123",
+      |  "success" : true,
+      |  "workerHostPort" : "1.2.3.4:7780",
+      |  "workerId" : "worker_123"
+      |}
+    """.stripMargin
+
+  private val errorJson =
+    """
+      |{
+      |  "action" : "ErrorResponse",
+      |  "message" : "Field not found in submit request: X",
+      |  "serverSparkVersion" : "1.2.3"
+      |}
+    """.stripMargin
+
+  /** Assert that the contents in the two JSON strings are equal after 
ignoring whitespace. */
+  private def assertJsonEquals(jsonString1: String, jsonString2: String): Unit 
= {
+    val trimmedJson1 = jsonString1.trim
+    val trimmedJson2 = jsonString2.trim
+    val json1 = compact(render(parse(trimmedJson1)))
+    val json2 = compact(render(parse(trimmedJson2)))
+    // Put this on a separate line to avoid printing comparison twice when 
test fails
+    val equals = json1 == json2
+    assert(equals, "\"[%s]\" did not equal \"[%s]\"".format(trimmedJson1, 
trimmedJson2))
+  }
+}
+
+private class DummyResponse extends SubmitRestProtocolResponse
+private class DummyRequest extends SubmitRestProtocolRequest {
+  var active: Boolean = null
+  var age: Integer = null
+  var name: String = null
+  protected override def doValidate(): Unit = {
+    super.doValidate()
+    assertFieldIsSet(name, "name")
+    assertFieldIsSet(age, "age")
+    assert(age > 5, "Not old enough!")
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/1390e56f/core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala
 
b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala
index 855f1b6..054a4c6 100644
--- 
a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala
@@ -29,9 +29,9 @@ class KryoSerializerDistributedSuite extends FunSuite {
 
   test("kryo objects are serialised consistently in different processes") {
     val conf = new SparkConf(false)
-    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
-    conf.set("spark.kryo.registrator", classOf[AppJarRegistrator].getName)
-    conf.set("spark.task.maxFailures", "1")
+      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+      .set("spark.kryo.registrator", classOf[AppJarRegistrator].getName)
+      .set("spark.task.maxFailures", "1")
 
     val jar = 
TestUtils.createJarWithClasses(List(AppJarRegistrator.customClassName))
     conf.setJars(List(jar.getPath))

http://git-wip-us.apache.org/repos/asf/spark/blob/1390e56f/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index aef450a..da8ee07 100644
--- a/pom.xml
+++ b/pom.xml
@@ -154,6 +154,7 @@
     <jline.groupid>org.scala-lang</jline.groupid>
     <jodd.version>3.6.3</jodd.version>
     <codehaus.jackson.version>1.8.8</codehaus.jackson.version>
+    <fasterxml.jackson.version>2.4.4</fasterxml.jackson.version>
     <snappy.version>1.1.1.6</snappy.version>
 
     <!--
@@ -578,6 +579,16 @@
         <version>${codahale.metrics.version}</version>
       </dependency>
       <dependency>
+        <groupId>com.fasterxml.jackson.core</groupId>
+        <artifactId>jackson-databind</artifactId>
+        <version>${fasterxml.jackson.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>com.fasterxml.jackson.module</groupId>
+        <artifactId>jackson-module-scala_2.10</artifactId>
+        <version>${fasterxml.jackson.version}</version>
+      </dependency>
+      <dependency>
         <groupId>org.scala-lang</groupId>
         <artifactId>scala-compiler</artifactId>
         <version>${scala.version}</version>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to