Repository: spark Updated Branches: refs/heads/master e772b4e4e -> 1390e56fa
http://git-wip-us.apache.org/repos/asf/spark/blob/1390e56f/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolResponse.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolResponse.scala b/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolResponse.scala new file mode 100644 index 0000000..16dfe04 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolResponse.scala @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.rest + +import java.lang.Boolean + +/** + * An abstract response sent from the server in the REST application submission protocol. + */ +private[spark] abstract class SubmitRestProtocolResponse extends SubmitRestProtocolMessage { + var serverSparkVersion: String = null + var success: Boolean = null + var unknownFields: Array[String] = null + protected override def doValidate(): Unit = { + super.doValidate() + assertFieldIsSet(serverSparkVersion, "serverSparkVersion") + } +} + +/** + * A response to a [[CreateSubmissionRequest]] in the REST application submission protocol. + */ +private[spark] class CreateSubmissionResponse extends SubmitRestProtocolResponse { + var submissionId: String = null + protected override def doValidate(): Unit = { + super.doValidate() + assertFieldIsSet(success, "success") + } +} + +/** + * A response to a kill request in the REST application submission protocol. + */ +private[spark] class KillSubmissionResponse extends SubmitRestProtocolResponse { + var submissionId: String = null + protected override def doValidate(): Unit = { + super.doValidate() + assertFieldIsSet(submissionId, "submissionId") + assertFieldIsSet(success, "success") + } +} + +/** + * A response to a status request in the REST application submission protocol. + */ +private[spark] class SubmissionStatusResponse extends SubmitRestProtocolResponse { + var submissionId: String = null + var driverState: String = null + var workerId: String = null + var workerHostPort: String = null + + protected override def doValidate(): Unit = { + super.doValidate() + assertFieldIsSet(submissionId, "submissionId") + assertFieldIsSet(success, "success") + } +} + +/** + * An error response message used in the REST application submission protocol. + */ +private[spark] class ErrorResponse extends SubmitRestProtocolResponse { + // The highest protocol version that the server knows about + // This is set when the client specifies an unknown version + var highestProtocolVersion: String = null + protected override def doValidate(): Unit = { + super.doValidate() + assertFieldIsSet(message, "message") + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/1390e56f/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala index ed02ca8..e955636 100644 --- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala @@ -68,7 +68,8 @@ class JsonProtocolSuite extends FunSuite { val completedApps = Array[ApplicationInfo]() val activeDrivers = Array(createDriverInfo()) val completedDrivers = Array(createDriverInfo()) - val stateResponse = new MasterStateResponse("host", 8080, workers, activeApps, completedApps, + val stateResponse = new MasterStateResponse( + "host", 8080, None, workers, activeApps, completedApps, activeDrivers, completedDrivers, RecoveryState.ALIVE) val output = JsonProtocol.writeMasterState(stateResponse) assertValidJson(output) http://git-wip-us.apache.org/repos/asf/spark/blob/1390e56f/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index 3f1355f..1ddccae 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -141,7 +141,7 @@ class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties "thejar.jar", "arg1", "arg2") val appArgs = new SparkSubmitArguments(clArgs) - val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs) + val (childArgs, classpath, sysProps, mainClass) = prepareSubmitEnvironment(appArgs) val childArgsStr = childArgs.mkString(" ") childArgsStr should include ("--class org.SomeClass") childArgsStr should include ("--executor-memory 5g") @@ -180,7 +180,7 @@ class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties "thejar.jar", "arg1", "arg2") val appArgs = new SparkSubmitArguments(clArgs) - val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs) + val (childArgs, classpath, sysProps, mainClass) = prepareSubmitEnvironment(appArgs) childArgs.mkString(" ") should be ("arg1 arg2") mainClass should be ("org.SomeClass") classpath should have length (4) @@ -201,6 +201,18 @@ class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties } test("handles standalone cluster mode") { + testStandaloneCluster(useRest = true) + } + + test("handles legacy standalone cluster mode") { + testStandaloneCluster(useRest = false) + } + + /** + * Test whether the launch environment is correctly set up in standalone cluster mode. + * @param useRest whether to use the REST submission gateway introduced in Spark 1.3 + */ + private def testStandaloneCluster(useRest: Boolean): Unit = { val clArgs = Seq( "--deploy-mode", "cluster", "--master", "spark://h:p", @@ -212,17 +224,26 @@ class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties "thejar.jar", "arg1", "arg2") val appArgs = new SparkSubmitArguments(clArgs) - val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs) + appArgs.useRest = useRest + val (childArgs, classpath, sysProps, mainClass) = prepareSubmitEnvironment(appArgs) val childArgsStr = childArgs.mkString(" ") - childArgsStr should startWith ("--memory 4g --cores 5 --supervise") - childArgsStr should include regex ("launch spark://h:p .*thejar.jar org.SomeClass arg1 arg2") - mainClass should be ("org.apache.spark.deploy.Client") - classpath should have size (0) - sysProps should have size (5) + if (useRest) { + childArgsStr should endWith ("thejar.jar org.SomeClass arg1 arg2") + mainClass should be ("org.apache.spark.deploy.rest.StandaloneRestClient") + } else { + childArgsStr should startWith ("--supervise --memory 4g --cores 5") + childArgsStr should include regex "launch spark://h:p .*thejar.jar org.SomeClass arg1 arg2" + mainClass should be ("org.apache.spark.deploy.Client") + } + classpath should have size 0 + sysProps should have size 8 sysProps.keys should contain ("SPARK_SUBMIT") sysProps.keys should contain ("spark.master") sysProps.keys should contain ("spark.app.name") sysProps.keys should contain ("spark.jars") + sysProps.keys should contain ("spark.driver.memory") + sysProps.keys should contain ("spark.driver.cores") + sysProps.keys should contain ("spark.driver.supervise") sysProps.keys should contain ("spark.shuffle.spill") sysProps("spark.shuffle.spill") should be ("false") } @@ -239,7 +260,7 @@ class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties "thejar.jar", "arg1", "arg2") val appArgs = new SparkSubmitArguments(clArgs) - val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs) + val (childArgs, classpath, sysProps, mainClass) = prepareSubmitEnvironment(appArgs) childArgs.mkString(" ") should be ("arg1 arg2") mainClass should be ("org.SomeClass") classpath should have length (1) @@ -261,7 +282,7 @@ class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties "thejar.jar", "arg1", "arg2") val appArgs = new SparkSubmitArguments(clArgs) - val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs) + val (childArgs, classpath, sysProps, mainClass) = prepareSubmitEnvironment(appArgs) childArgs.mkString(" ") should be ("arg1 arg2") mainClass should be ("org.SomeClass") classpath should have length (1) @@ -281,7 +302,7 @@ class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties "thejar.jar", "arg1", "arg2") val appArgs = new SparkSubmitArguments(clArgs) - val (_, _, sysProps, mainClass) = createLaunchEnv(appArgs) + val (_, _, sysProps, mainClass) = prepareSubmitEnvironment(appArgs) sysProps("spark.executor.memory") should be ("5g") sysProps("spark.master") should be ("yarn-cluster") mainClass should be ("org.apache.spark.deploy.yarn.Client") @@ -339,7 +360,7 @@ class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties "--files", files, "thejar.jar") val appArgs = new SparkSubmitArguments(clArgs) - val sysProps = SparkSubmit.createLaunchEnv(appArgs)._3 + val sysProps = SparkSubmit.prepareSubmitEnvironment(appArgs)._3 appArgs.jars should be (Utils.resolveURIs(jars)) appArgs.files should be (Utils.resolveURIs(files)) sysProps("spark.jars") should be (Utils.resolveURIs(jars + ",thejar.jar")) @@ -354,7 +375,7 @@ class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties "thejar.jar" ) val appArgs2 = new SparkSubmitArguments(clArgs2) - val sysProps2 = SparkSubmit.createLaunchEnv(appArgs2)._3 + val sysProps2 = SparkSubmit.prepareSubmitEnvironment(appArgs2)._3 appArgs2.files should be (Utils.resolveURIs(files)) appArgs2.archives should be (Utils.resolveURIs(archives)) sysProps2("spark.yarn.dist.files") should be (Utils.resolveURIs(files)) @@ -367,7 +388,7 @@ class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties "mister.py" ) val appArgs3 = new SparkSubmitArguments(clArgs3) - val sysProps3 = SparkSubmit.createLaunchEnv(appArgs3)._3 + val sysProps3 = SparkSubmit.prepareSubmitEnvironment(appArgs3)._3 appArgs3.pyFiles should be (Utils.resolveURIs(pyFiles)) sysProps3("spark.submit.pyFiles") should be ( PythonRunner.formatPaths(Utils.resolveURIs(pyFiles)).mkString(",")) @@ -392,7 +413,7 @@ class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties "thejar.jar" ) val appArgs = new SparkSubmitArguments(clArgs) - val sysProps = SparkSubmit.createLaunchEnv(appArgs)._3 + val sysProps = SparkSubmit.prepareSubmitEnvironment(appArgs)._3 sysProps("spark.jars") should be(Utils.resolveURIs(jars + ",thejar.jar")) sysProps("spark.files") should be(Utils.resolveURIs(files)) @@ -409,7 +430,7 @@ class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties "thejar.jar" ) val appArgs2 = new SparkSubmitArguments(clArgs2) - val sysProps2 = SparkSubmit.createLaunchEnv(appArgs2)._3 + val sysProps2 = SparkSubmit.prepareSubmitEnvironment(appArgs2)._3 sysProps2("spark.yarn.dist.files") should be(Utils.resolveURIs(files)) sysProps2("spark.yarn.dist.archives") should be(Utils.resolveURIs(archives)) @@ -424,7 +445,7 @@ class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties "mister.py" ) val appArgs3 = new SparkSubmitArguments(clArgs3) - val sysProps3 = SparkSubmit.createLaunchEnv(appArgs3)._3 + val sysProps3 = SparkSubmit.prepareSubmitEnvironment(appArgs3)._3 sysProps3("spark.submit.pyFiles") should be( PythonRunner.formatPaths(Utils.resolveURIs(pyFiles)).mkString(",")) } @@ -440,7 +461,7 @@ class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties val appArgs = new SparkSubmitArguments(args, Map("SPARK_CONF_DIR" -> path)) assert(appArgs.propertiesFile != null) assert(appArgs.propertiesFile.startsWith(path)) - appArgs.executorMemory should be ("2.3g") + appArgs.executorMemory should be ("2.3g") } } http://git-wip-us.apache.org/repos/asf/spark/blob/1390e56f/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala new file mode 100644 index 0000000..29aed89 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.rest + +import java.io.{File, FileInputStream, FileOutputStream, PrintWriter} +import java.util.jar.{JarEntry, JarOutputStream} +import java.util.zip.ZipEntry + +import scala.collection.mutable.ArrayBuffer +import scala.io.Source + +import akka.actor.ActorSystem +import com.google.common.io.ByteStreams +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite} +import org.scalatest.exceptions.TestFailedException + +import org.apache.spark._ +import org.apache.spark.util.Utils +import org.apache.spark.deploy.{SparkSubmit, SparkSubmitArguments} +import org.apache.spark.deploy.master.{DriverState, Master} +import org.apache.spark.deploy.worker.Worker + +/** + * End-to-end tests for the REST application submission protocol in standalone mode. + */ +class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterAll with BeforeAndAfterEach { + private val systemsToStop = new ArrayBuffer[ActorSystem] + private val masterRestUrl = startLocalCluster() + private val client = new StandaloneRestClient + private val mainJar = StandaloneRestSubmitSuite.createJar() + private val mainClass = StandaloneRestApp.getClass.getName.stripSuffix("$") + + override def afterAll() { + systemsToStop.foreach(_.shutdown()) + } + + test("simple submit until completion") { + val resultsFile = File.createTempFile("test-submit", ".txt") + val numbers = Seq(1, 2, 3) + val size = 500 + val submissionId = submitApplication(resultsFile, numbers, size) + waitUntilFinished(submissionId) + validateResult(resultsFile, numbers, size) + } + + test("kill empty submission") { + val response = client.killSubmission(masterRestUrl, "submission-that-does-not-exist") + val killResponse = getKillResponse(response) + val killSuccess = killResponse.success + assert(!killSuccess) + } + + test("kill running submission") { + val resultsFile = File.createTempFile("test-kill", ".txt") + val numbers = Seq(1, 2, 3) + val size = 500 + val submissionId = submitApplication(resultsFile, numbers, size) + val response = client.killSubmission(masterRestUrl, submissionId) + val killResponse = getKillResponse(response) + val killSuccess = killResponse.success + waitUntilFinished(submissionId) + val response2 = client.requestSubmissionStatus(masterRestUrl, submissionId) + val statusResponse = getStatusResponse(response2) + val statusSuccess = statusResponse.success + val driverState = statusResponse.driverState + assert(killSuccess) + assert(statusSuccess) + assert(driverState === DriverState.KILLED.toString) + // we should not see the expected results because we killed the submission + intercept[TestFailedException] { validateResult(resultsFile, numbers, size) } + } + + test("request status for empty submission") { + val response = client.requestSubmissionStatus(masterRestUrl, "submission-that-does-not-exist") + val statusResponse = getStatusResponse(response) + val statusSuccess = statusResponse.success + assert(!statusSuccess) + } + + /** + * Start a local cluster containing one Master and a few Workers. + * Do not use [[org.apache.spark.deploy.LocalSparkCluster]] here because we want the REST URL. + * Return the Master's REST URL to which applications should be submitted. + */ + private def startLocalCluster(): String = { + val conf = new SparkConf(false) + .set("spark.master.rest.enabled", "true") + .set("spark.master.rest.port", "0") + val (numWorkers, coresPerWorker, memPerWorker) = (2, 1, 512) + val localHostName = Utils.localHostName() + val (masterSystem, masterPort, _, _masterRestPort) = + Master.startSystemAndActor(localHostName, 0, 0, conf) + val masterRestPort = _masterRestPort.getOrElse { fail("REST server not started on Master!") } + val masterUrl = "spark://" + localHostName + ":" + masterPort + val masterRestUrl = "spark://" + localHostName + ":" + masterRestPort + (1 to numWorkers).foreach { n => + val (workerSystem, _) = Worker.startSystemAndActor( + localHostName, 0, 0, coresPerWorker, memPerWorker, Array(masterUrl), null, Some(n)) + systemsToStop.append(workerSystem) + } + systemsToStop.append(masterSystem) + masterRestUrl + } + + /** Submit the [[StandaloneRestApp]] and return the corresponding submission ID. */ + private def submitApplication(resultsFile: File, numbers: Seq[Int], size: Int): String = { + val appArgs = Seq(resultsFile.getAbsolutePath) ++ numbers.map(_.toString) ++ Seq(size.toString) + val commandLineArgs = Array( + "--deploy-mode", "cluster", + "--master", masterRestUrl, + "--name", mainClass, + "--class", mainClass, + mainJar) ++ appArgs + val args = new SparkSubmitArguments(commandLineArgs) + val (_, _, sparkProperties, _) = SparkSubmit.prepareSubmitEnvironment(args) + val request = client.constructSubmitRequest( + mainJar, mainClass, appArgs.toArray, sparkProperties.toMap, Map.empty) + val response = client.createSubmission(masterRestUrl, request) + val submitResponse = getSubmitResponse(response) + val submissionId = submitResponse.submissionId + assert(submissionId != null, "Application submission was unsuccessful!") + submissionId + } + + /** Wait until the given submission has finished running up to the specified timeout. */ + private def waitUntilFinished(submissionId: String, maxSeconds: Int = 30): Unit = { + var finished = false + val expireTime = System.currentTimeMillis + maxSeconds * 1000 + while (!finished) { + val response = client.requestSubmissionStatus(masterRestUrl, submissionId) + val statusResponse = getStatusResponse(response) + val driverState = statusResponse.driverState + finished = + driverState != DriverState.SUBMITTED.toString && + driverState != DriverState.RUNNING.toString + if (System.currentTimeMillis > expireTime) { + fail(s"Driver $submissionId did not finish within $maxSeconds seconds.") + } + } + } + + /** Return the response as a submit response, or fail with error otherwise. */ + private def getSubmitResponse(response: SubmitRestProtocolResponse): CreateSubmissionResponse = { + response match { + case s: CreateSubmissionResponse => s + case e: ErrorResponse => fail(s"Server returned error: ${e.message}") + case r => fail(s"Expected submit response. Actual: ${r.toJson}") + } + } + + /** Return the response as a kill response, or fail with error otherwise. */ + private def getKillResponse(response: SubmitRestProtocolResponse): KillSubmissionResponse = { + response match { + case k: KillSubmissionResponse => k + case e: ErrorResponse => fail(s"Server returned error: ${e.message}") + case r => fail(s"Expected kill response. Actual: ${r.toJson}") + } + } + + /** Return the response as a status response, or fail with error otherwise. */ + private def getStatusResponse(response: SubmitRestProtocolResponse): SubmissionStatusResponse = { + response match { + case s: SubmissionStatusResponse => s + case e: ErrorResponse => fail(s"Server returned error: ${e.message}") + case r => fail(s"Expected status response. Actual: ${r.toJson}") + } + } + + /** Validate whether the application produced the corrupt output. */ + private def validateResult(resultsFile: File, numbers: Seq[Int], size: Int): Unit = { + val lines = Source.fromFile(resultsFile.getAbsolutePath).getLines().toSeq + val unexpectedContent = + if (lines.nonEmpty) { + "[\n" + lines.map { l => " " + l }.mkString("\n") + "\n]" + } else { + "[EMPTY]" + } + assert(lines.size === 2, s"Unexpected content in file: $unexpectedContent") + assert(lines(0).toInt === numbers.sum, s"Sum of ${numbers.mkString(",")} is incorrect") + assert(lines(1).toInt === (size / 2) + 1, "Result of Spark job is incorrect") + } +} + +private object StandaloneRestSubmitSuite { + private val pathPrefix = this.getClass.getPackage.getName.replaceAll("\\.", "/") + + /** + * Create a jar that contains all the class files needed for running the [[StandaloneRestApp]]. + * Return the absolute path to that jar. + */ + def createJar(): String = { + val jarFile = File.createTempFile("test-standalone-rest-protocol", ".jar") + val jarFileStream = new FileOutputStream(jarFile) + val jarStream = new JarOutputStream(jarFileStream, new java.util.jar.Manifest) + jarStream.putNextEntry(new ZipEntry(pathPrefix)) + getClassFiles.foreach { cf => + jarStream.putNextEntry(new JarEntry(pathPrefix + "/" + cf.getName)) + val in = new FileInputStream(cf) + ByteStreams.copy(in, jarStream) + in.close() + } + jarStream.close() + jarFileStream.close() + jarFile.getAbsolutePath + } + + /** + * Return a list of class files compiled for [[StandaloneRestApp]]. + * This includes all the anonymous classes used in the application. + */ + private def getClassFiles: Seq[File] = { + val className = Utils.getFormattedClassName(StandaloneRestApp) + val clazz = StandaloneRestApp.getClass + val basePath = clazz.getProtectionDomain.getCodeSource.getLocation.toURI.getPath + val baseDir = new File(basePath + "/" + pathPrefix) + baseDir.listFiles().filter(_.getName.contains(className)) + } +} + +/** + * Sample application to be submitted to the cluster using the REST gateway. + * All relevant classes will be packaged into a jar at run time. + */ +object StandaloneRestApp { + // Usage: [path to results file] [num1] [num2] [num3] [rddSize] + // The first line of the results file should be (num1 + num2 + num3) + // The second line should be (rddSize / 2) + 1 + def main(args: Array[String]) { + assert(args.size == 5, s"Expected exactly 5 arguments: ${args.mkString(",")}") + val resultFile = new File(args(0)) + val writer = new PrintWriter(resultFile) + try { + val conf = new SparkConf() + val sc = new SparkContext(conf) + val firstLine = args(1).toInt + args(2).toInt + args(3).toInt + val secondLine = sc.parallelize(1 to args(4).toInt) + .map { i => (i / 2, i) } + .reduceByKey(_ + _) + .count() + writer.println(firstLine) + writer.println(secondLine) + } catch { + case e: Exception => + writer.println(e) + e.getStackTrace.foreach { l => writer.println(" " + l) } + } finally { + writer.close() + } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/1390e56f/core/src/test/scala/org/apache/spark/deploy/rest/SubmitRestProtocolSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/deploy/rest/SubmitRestProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/rest/SubmitRestProtocolSuite.scala new file mode 100644 index 0000000..1d64ec2 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/deploy/rest/SubmitRestProtocolSuite.scala @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.rest + +import java.lang.Boolean +import java.lang.Integer + +import org.json4s.jackson.JsonMethods._ +import org.scalatest.FunSuite + +import org.apache.spark.SparkConf + +/** + * Tests for the REST application submission protocol. + */ +class SubmitRestProtocolSuite extends FunSuite { + + test("validate") { + val request = new DummyRequest + intercept[SubmitRestProtocolException] { request.validate() } // missing everything + request.clientSparkVersion = "1.2.3" + intercept[SubmitRestProtocolException] { request.validate() } // missing name and age + request.name = "something" + intercept[SubmitRestProtocolException] { request.validate() } // missing only age + request.age = 2 + intercept[SubmitRestProtocolException] { request.validate() } // age too low + request.age = 10 + request.validate() // everything is set properly + request.clientSparkVersion = null + intercept[SubmitRestProtocolException] { request.validate() } // missing only Spark version + request.clientSparkVersion = "1.2.3" + request.name = null + intercept[SubmitRestProtocolException] { request.validate() } // missing only name + request.message = "not-setting-name" + intercept[SubmitRestProtocolException] { request.validate() } // still missing name + } + + test("request to and from JSON") { + val request = new DummyRequest + intercept[SubmitRestProtocolException] { request.toJson } // implicit validation + request.clientSparkVersion = "1.2.3" + request.active = true + request.age = 25 + request.name = "jung" + val json = request.toJson + assertJsonEquals(json, dummyRequestJson) + val newRequest = SubmitRestProtocolMessage.fromJson(json, classOf[DummyRequest]) + assert(newRequest.clientSparkVersion === "1.2.3") + assert(newRequest.clientSparkVersion === "1.2.3") + assert(newRequest.active) + assert(newRequest.age === 25) + assert(newRequest.name === "jung") + assert(newRequest.message === null) + } + + test("response to and from JSON") { + val response = new DummyResponse + response.serverSparkVersion = "3.3.4" + response.success = true + val json = response.toJson + assertJsonEquals(json, dummyResponseJson) + val newResponse = SubmitRestProtocolMessage.fromJson(json, classOf[DummyResponse]) + assert(newResponse.serverSparkVersion === "3.3.4") + assert(newResponse.serverSparkVersion === "3.3.4") + assert(newResponse.success) + assert(newResponse.message === null) + } + + test("CreateSubmissionRequest") { + val message = new CreateSubmissionRequest + intercept[SubmitRestProtocolException] { message.validate() } + message.clientSparkVersion = "1.2.3" + message.appResource = "honey-walnut-cherry.jar" + message.mainClass = "org.apache.spark.examples.SparkPie" + val conf = new SparkConf(false) + conf.set("spark.app.name", "SparkPie") + message.sparkProperties = conf.getAll.toMap + message.validate() + // optional fields + conf.set("spark.jars", "mayonnaise.jar,ketchup.jar") + conf.set("spark.files", "fireball.png") + conf.set("spark.driver.memory", "512m") + conf.set("spark.driver.cores", "180") + conf.set("spark.driver.extraJavaOptions", " -Dslices=5 -Dcolor=mostly_red") + conf.set("spark.driver.extraClassPath", "food-coloring.jar") + conf.set("spark.driver.extraLibraryPath", "pickle.jar") + conf.set("spark.driver.supervise", "false") + conf.set("spark.executor.memory", "256m") + conf.set("spark.cores.max", "10000") + message.sparkProperties = conf.getAll.toMap + message.appArgs = Array("two slices", "a hint of cinnamon") + message.environmentVariables = Map("PATH" -> "/dev/null") + message.validate() + // bad fields + var badConf = conf.clone().set("spark.driver.cores", "one hundred feet") + message.sparkProperties = badConf.getAll.toMap + intercept[SubmitRestProtocolException] { message.validate() } + badConf = conf.clone().set("spark.driver.supervise", "nope, never") + message.sparkProperties = badConf.getAll.toMap + intercept[SubmitRestProtocolException] { message.validate() } + badConf = conf.clone().set("spark.cores.max", "two men") + message.sparkProperties = badConf.getAll.toMap + intercept[SubmitRestProtocolException] { message.validate() } + message.sparkProperties = conf.getAll.toMap + // test JSON + val json = message.toJson + assertJsonEquals(json, submitDriverRequestJson) + val newMessage = SubmitRestProtocolMessage.fromJson(json, classOf[CreateSubmissionRequest]) + assert(newMessage.clientSparkVersion === "1.2.3") + assert(newMessage.appResource === "honey-walnut-cherry.jar") + assert(newMessage.mainClass === "org.apache.spark.examples.SparkPie") + assert(newMessage.sparkProperties("spark.app.name") === "SparkPie") + assert(newMessage.sparkProperties("spark.jars") === "mayonnaise.jar,ketchup.jar") + assert(newMessage.sparkProperties("spark.files") === "fireball.png") + assert(newMessage.sparkProperties("spark.driver.memory") === "512m") + assert(newMessage.sparkProperties("spark.driver.cores") === "180") + assert(newMessage.sparkProperties("spark.driver.extraJavaOptions") === " -Dslices=5 -Dcolor=mostly_red") + assert(newMessage.sparkProperties("spark.driver.extraClassPath") === "food-coloring.jar") + assert(newMessage.sparkProperties("spark.driver.extraLibraryPath") === "pickle.jar") + assert(newMessage.sparkProperties("spark.driver.supervise") === "false") + assert(newMessage.sparkProperties("spark.executor.memory") === "256m") + assert(newMessage.sparkProperties("spark.cores.max") === "10000") + assert(newMessage.appArgs === message.appArgs) + assert(newMessage.sparkProperties === message.sparkProperties) + assert(newMessage.environmentVariables === message.environmentVariables) + } + + test("CreateSubmissionResponse") { + val message = new CreateSubmissionResponse + intercept[SubmitRestProtocolException] { message.validate() } + message.serverSparkVersion = "1.2.3" + message.submissionId = "driver_123" + message.success = true + message.validate() + // test JSON + val json = message.toJson + assertJsonEquals(json, submitDriverResponseJson) + val newMessage = SubmitRestProtocolMessage.fromJson(json, classOf[CreateSubmissionResponse]) + assert(newMessage.serverSparkVersion === "1.2.3") + assert(newMessage.submissionId === "driver_123") + assert(newMessage.success) + } + + test("KillSubmissionResponse") { + val message = new KillSubmissionResponse + intercept[SubmitRestProtocolException] { message.validate() } + message.serverSparkVersion = "1.2.3" + message.submissionId = "driver_123" + message.success = true + message.validate() + // test JSON + val json = message.toJson + assertJsonEquals(json, killDriverResponseJson) + val newMessage = SubmitRestProtocolMessage.fromJson(json, classOf[KillSubmissionResponse]) + assert(newMessage.serverSparkVersion === "1.2.3") + assert(newMessage.submissionId === "driver_123") + assert(newMessage.success) + } + + test("SubmissionStatusResponse") { + val message = new SubmissionStatusResponse + intercept[SubmitRestProtocolException] { message.validate() } + message.serverSparkVersion = "1.2.3" + message.submissionId = "driver_123" + message.success = true + message.validate() + // optional fields + message.driverState = "RUNNING" + message.workerId = "worker_123" + message.workerHostPort = "1.2.3.4:7780" + // test JSON + val json = message.toJson + assertJsonEquals(json, driverStatusResponseJson) + val newMessage = SubmitRestProtocolMessage.fromJson(json, classOf[SubmissionStatusResponse]) + assert(newMessage.serverSparkVersion === "1.2.3") + assert(newMessage.submissionId === "driver_123") + assert(newMessage.driverState === "RUNNING") + assert(newMessage.success) + assert(newMessage.workerId === "worker_123") + assert(newMessage.workerHostPort === "1.2.3.4:7780") + } + + test("ErrorResponse") { + val message = new ErrorResponse + intercept[SubmitRestProtocolException] { message.validate() } + message.serverSparkVersion = "1.2.3" + message.message = "Field not found in submit request: X" + message.validate() + // test JSON + val json = message.toJson + assertJsonEquals(json, errorJson) + val newMessage = SubmitRestProtocolMessage.fromJson(json, classOf[ErrorResponse]) + assert(newMessage.serverSparkVersion === "1.2.3") + assert(newMessage.message === "Field not found in submit request: X") + } + + private val dummyRequestJson = + """ + |{ + | "action" : "DummyRequest", + | "active" : true, + | "age" : 25, + | "clientSparkVersion" : "1.2.3", + | "name" : "jung" + |} + """.stripMargin + + private val dummyResponseJson = + """ + |{ + | "action" : "DummyResponse", + | "serverSparkVersion" : "3.3.4", + | "success": true + |} + """.stripMargin + + private val submitDriverRequestJson = + """ + |{ + | "action" : "CreateSubmissionRequest", + | "appArgs" : [ "two slices", "a hint of cinnamon" ], + | "appResource" : "honey-walnut-cherry.jar", + | "clientSparkVersion" : "1.2.3", + | "environmentVariables" : { + | "PATH" : "/dev/null" + | }, + | "mainClass" : "org.apache.spark.examples.SparkPie", + | "sparkProperties" : { + | "spark.driver.extraLibraryPath" : "pickle.jar", + | "spark.jars" : "mayonnaise.jar,ketchup.jar", + | "spark.driver.supervise" : "false", + | "spark.app.name" : "SparkPie", + | "spark.cores.max" : "10000", + | "spark.driver.memory" : "512m", + | "spark.files" : "fireball.png", + | "spark.driver.cores" : "180", + | "spark.driver.extraJavaOptions" : " -Dslices=5 -Dcolor=mostly_red", + | "spark.executor.memory" : "256m", + | "spark.driver.extraClassPath" : "food-coloring.jar" + | } + |} + """.stripMargin + + private val submitDriverResponseJson = + """ + |{ + | "action" : "CreateSubmissionResponse", + | "serverSparkVersion" : "1.2.3", + | "submissionId" : "driver_123", + | "success" : true + |} + """.stripMargin + + private val killDriverResponseJson = + """ + |{ + | "action" : "KillSubmissionResponse", + | "serverSparkVersion" : "1.2.3", + | "submissionId" : "driver_123", + | "success" : true + |} + """.stripMargin + + private val driverStatusResponseJson = + """ + |{ + | "action" : "SubmissionStatusResponse", + | "driverState" : "RUNNING", + | "serverSparkVersion" : "1.2.3", + | "submissionId" : "driver_123", + | "success" : true, + | "workerHostPort" : "1.2.3.4:7780", + | "workerId" : "worker_123" + |} + """.stripMargin + + private val errorJson = + """ + |{ + | "action" : "ErrorResponse", + | "message" : "Field not found in submit request: X", + | "serverSparkVersion" : "1.2.3" + |} + """.stripMargin + + /** Assert that the contents in the two JSON strings are equal after ignoring whitespace. */ + private def assertJsonEquals(jsonString1: String, jsonString2: String): Unit = { + val trimmedJson1 = jsonString1.trim + val trimmedJson2 = jsonString2.trim + val json1 = compact(render(parse(trimmedJson1))) + val json2 = compact(render(parse(trimmedJson2))) + // Put this on a separate line to avoid printing comparison twice when test fails + val equals = json1 == json2 + assert(equals, "\"[%s]\" did not equal \"[%s]\"".format(trimmedJson1, trimmedJson2)) + } +} + +private class DummyResponse extends SubmitRestProtocolResponse +private class DummyRequest extends SubmitRestProtocolRequest { + var active: Boolean = null + var age: Integer = null + var name: String = null + protected override def doValidate(): Unit = { + super.doValidate() + assertFieldIsSet(name, "name") + assertFieldIsSet(age, "age") + assert(age > 5, "Not old enough!") + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/1390e56f/core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala index 855f1b6..054a4c6 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala @@ -29,9 +29,9 @@ class KryoSerializerDistributedSuite extends FunSuite { test("kryo objects are serialised consistently in different processes") { val conf = new SparkConf(false) - conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") - conf.set("spark.kryo.registrator", classOf[AppJarRegistrator].getName) - conf.set("spark.task.maxFailures", "1") + .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + .set("spark.kryo.registrator", classOf[AppJarRegistrator].getName) + .set("spark.task.maxFailures", "1") val jar = TestUtils.createJarWithClasses(List(AppJarRegistrator.customClassName)) conf.setJars(List(jar.getPath)) http://git-wip-us.apache.org/repos/asf/spark/blob/1390e56f/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index aef450a..da8ee07 100644 --- a/pom.xml +++ b/pom.xml @@ -154,6 +154,7 @@ <jline.groupid>org.scala-lang</jline.groupid> <jodd.version>3.6.3</jodd.version> <codehaus.jackson.version>1.8.8</codehaus.jackson.version> + <fasterxml.jackson.version>2.4.4</fasterxml.jackson.version> <snappy.version>1.1.1.6</snappy.version> <!-- @@ -578,6 +579,16 @@ <version>${codahale.metrics.version}</version> </dependency> <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + <version>${fasterxml.jackson.version}</version> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.module</groupId> + <artifactId>jackson-module-scala_2.10</artifactId> + <version>${fasterxml.jackson.version}</version> + </dependency> + <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-compiler</artifactId> <version>${scala.version}</version> --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org