Repository: spark Updated Branches: refs/heads/master 70f44ad2d -> 015f7ef50
http://git-wip-us.apache.org/repos/asf/spark/blob/015f7ef5/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala ---------------------------------------------------------------------- diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala index 17c59ff..12494b0 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala @@ -22,15 +22,18 @@ import java.util.Properties import java.util.concurrent.TimeUnit import scala.collection.JavaConverters._ +import scala.concurrent.duration._ +import scala.language.postfixOps import com.google.common.base.Charsets.UTF_8 import com.google.common.io.Files import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.server.MiniYARNCluster import org.scalatest.{BeforeAndAfterAll, Matchers} +import org.scalatest.concurrent.Eventually._ import org.apache.spark._ -import org.apache.spark.launcher.TestClasspathBuilder +import org.apache.spark.launcher._ import org.apache.spark.util.Utils abstract class BaseYarnClusterSuite @@ -46,13 +49,14 @@ abstract class BaseYarnClusterSuite |log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n |log4j.logger.org.apache.hadoop=WARN |log4j.logger.org.eclipse.jetty=WARN + |log4j.logger.org.mortbay=WARN |log4j.logger.org.spark-project.jetty=WARN """.stripMargin private var yarnCluster: MiniYARNCluster = _ protected var tempDir: File = _ private var fakeSparkJar: File = _ - private var hadoopConfDir: File = _ + protected var hadoopConfDir: File = _ private var logConfDir: File = _ def newYarnConfig(): YarnConfiguration @@ -120,15 +124,77 @@ abstract class BaseYarnClusterSuite clientMode: Boolean, klass: String, appArgs: Seq[String] = Nil, - sparkArgs: Seq[String] = Nil, + sparkArgs: Seq[(String, String)] = Nil, extraClassPath: Seq[String] = Nil, extraJars: Seq[String] = Nil, extraConf: Map[String, String] = Map(), - extraEnv: Map[String, String] = Map()): Unit = { + extraEnv: Map[String, String] = Map()): SparkAppHandle.State = { val master = if (clientMode) "yarn-client" else "yarn-cluster" - val props = new Properties() + val propsFile = createConfFile(extraClassPath = extraClassPath, extraConf = extraConf) + val env = Map("YARN_CONF_DIR" -> hadoopConfDir.getAbsolutePath()) ++ extraEnv + + val launcher = new SparkLauncher(env.asJava) + if (klass.endsWith(".py")) { + launcher.setAppResource(klass) + } else { + launcher.setMainClass(klass) + launcher.setAppResource(fakeSparkJar.getAbsolutePath()) + } + launcher.setSparkHome(sys.props("spark.test.home")) + .setMaster(master) + .setConf("spark.executor.instances", "1") + .setPropertiesFile(propsFile) + .addAppArgs(appArgs.toArray: _*) + + sparkArgs.foreach { case (name, value) => + if (value != null) { + launcher.addSparkArg(name, value) + } else { + launcher.addSparkArg(name) + } + } + extraJars.foreach(launcher.addJar) - props.setProperty("spark.yarn.jar", "local:" + fakeSparkJar.getAbsolutePath()) + val handle = launcher.startApplication() + try { + eventually(timeout(2 minutes), interval(1 second)) { + assert(handle.getState().isFinal()) + } + } finally { + handle.kill() + } + + handle.getState() + } + + /** + * This is a workaround for an issue with yarn-cluster mode: the Client class will not provide + * any sort of error when the job process finishes successfully, but the job itself fails. So + * the tests enforce that something is written to a file after everything is ok to indicate + * that the job succeeded. + */ + protected def checkResult(finalState: SparkAppHandle.State, result: File): Unit = { + checkResult(finalState, result, "success") + } + + protected def checkResult( + finalState: SparkAppHandle.State, + result: File, + expected: String): Unit = { + finalState should be (SparkAppHandle.State.FINISHED) + val resultString = Files.toString(result, UTF_8) + resultString should be (expected) + } + + protected def mainClassName(klass: Class[_]): String = { + klass.getName().stripSuffix("$") + } + + protected def createConfFile( + extraClassPath: Seq[String] = Nil, + extraConf: Map[String, String] = Map()): String = { + val props = new Properties() + props.put("spark.yarn.jar", "local:" + fakeSparkJar.getAbsolutePath()) val testClasspath = new TestClasspathBuilder() .buildClassPath( @@ -138,69 +204,28 @@ abstract class BaseYarnClusterSuite .asScala .mkString(File.pathSeparator) - props.setProperty("spark.driver.extraClassPath", testClasspath) - props.setProperty("spark.executor.extraClassPath", testClasspath) + props.put("spark.driver.extraClassPath", testClasspath) + props.put("spark.executor.extraClassPath", testClasspath) // SPARK-4267: make sure java options are propagated correctly. props.setProperty("spark.driver.extraJavaOptions", "-Dfoo=\"one two three\"") props.setProperty("spark.executor.extraJavaOptions", "-Dfoo=\"one two three\"") - yarnCluster.getConfig.asScala.foreach { e => + yarnCluster.getConfig().asScala.foreach { e => props.setProperty("spark.hadoop." + e.getKey(), e.getValue()) } - sys.props.foreach { case (k, v) => if (k.startsWith("spark.")) { props.setProperty(k, v) } } - extraConf.foreach { case (k, v) => props.setProperty(k, v) } val propsFile = File.createTempFile("spark", ".properties", tempDir) val writer = new OutputStreamWriter(new FileOutputStream(propsFile), UTF_8) props.store(writer, "Spark properties.") writer.close() - - val extraJarArgs = if (extraJars.nonEmpty) Seq("--jars", extraJars.mkString(",")) else Nil - val mainArgs = - if (klass.endsWith(".py")) { - Seq(klass) - } else { - Seq("--class", klass, fakeSparkJar.getAbsolutePath()) - } - val argv = - Seq( - new File(sys.props("spark.test.home"), "bin/spark-submit").getAbsolutePath(), - "--master", master, - "--num-executors", "1", - "--properties-file", propsFile.getAbsolutePath()) ++ - extraJarArgs ++ - sparkArgs ++ - mainArgs ++ - appArgs - - Utils.executeAndGetOutput(argv, - extraEnvironment = Map("YARN_CONF_DIR" -> hadoopConfDir.getAbsolutePath()) ++ extraEnv) - } - - /** - * This is a workaround for an issue with yarn-cluster mode: the Client class will not provide - * any sort of error when the job process finishes successfully, but the job itself fails. So - * the tests enforce that something is written to a file after everything is ok to indicate - * that the job succeeded. - */ - protected def checkResult(result: File): Unit = { - checkResult(result, "success") - } - - protected def checkResult(result: File, expected: String): Unit = { - val resultString = Files.toString(result, UTF_8) - resultString should be (expected) - } - - protected def mainClassName(klass: Class[_]): String = { - klass.getName().stripSuffix("$") + propsFile.getAbsolutePath() } } http://git-wip-us.apache.org/repos/asf/spark/blob/015f7ef5/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala ---------------------------------------------------------------------- diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index f1601cd..d1cd0c8 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -19,16 +19,20 @@ package org.apache.spark.deploy.yarn import java.io.File import java.net.URL +import java.util.{HashMap => JHashMap, Properties} import scala.collection.mutable +import scala.concurrent.duration._ +import scala.language.postfixOps import com.google.common.base.Charsets.UTF_8 import com.google.common.io.{ByteStreams, Files} import org.apache.hadoop.yarn.conf.YarnConfiguration import org.scalatest.Matchers +import org.scalatest.concurrent.Eventually._ import org.apache.spark._ -import org.apache.spark.launcher.TestClasspathBuilder +import org.apache.spark.launcher._ import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationStart, SparkListenerExecutorAdded} import org.apache.spark.scheduler.cluster.ExecutorInfo @@ -82,10 +86,8 @@ class YarnClusterSuite extends BaseYarnClusterSuite { test("run Spark in yarn-cluster mode unsuccessfully") { // Don't provide arguments so the driver will fail. - val exception = intercept[SparkException] { - runSpark(false, mainClassName(YarnClusterDriver.getClass)) - fail("Spark application should have failed.") - } + val finalState = runSpark(false, mainClassName(YarnClusterDriver.getClass)) + finalState should be (SparkAppHandle.State.FAILED) } test("run Python application in yarn-client mode") { @@ -104,11 +106,42 @@ class YarnClusterSuite extends BaseYarnClusterSuite { testUseClassPathFirst(false) } + test("monitor app using launcher library") { + val env = new JHashMap[String, String]() + env.put("YARN_CONF_DIR", hadoopConfDir.getAbsolutePath()) + + val propsFile = createConfFile() + val handle = new SparkLauncher(env) + .setSparkHome(sys.props("spark.test.home")) + .setConf("spark.ui.enabled", "false") + .setPropertiesFile(propsFile) + .setMaster("yarn-client") + .setAppResource("spark-internal") + .setMainClass(mainClassName(YarnLauncherTestApp.getClass)) + .startApplication() + + try { + eventually(timeout(30 seconds), interval(100 millis)) { + handle.getState() should be (SparkAppHandle.State.RUNNING) + } + + handle.getAppId() should not be (null) + handle.getAppId() should startWith ("application_") + handle.stop() + + eventually(timeout(30 seconds), interval(100 millis)) { + handle.getState() should be (SparkAppHandle.State.KILLED) + } + } finally { + handle.kill() + } + } + private def testBasicYarnApp(clientMode: Boolean): Unit = { val result = File.createTempFile("result", null, tempDir) - runSpark(clientMode, mainClassName(YarnClusterDriver.getClass), + val finalState = runSpark(clientMode, mainClassName(YarnClusterDriver.getClass), appArgs = Seq(result.getAbsolutePath())) - checkResult(result) + checkResult(finalState, result) } private def testPySpark(clientMode: Boolean): Unit = { @@ -143,11 +176,11 @@ class YarnClusterSuite extends BaseYarnClusterSuite { val pyFiles = Seq(pyModule.getAbsolutePath(), mod2Archive.getPath()).mkString(",") val result = File.createTempFile("result", null, tempDir) - runSpark(clientMode, primaryPyFile.getAbsolutePath(), - sparkArgs = Seq("--py-files", pyFiles), + val finalState = runSpark(clientMode, primaryPyFile.getAbsolutePath(), + sparkArgs = Seq("--py-files" -> pyFiles), appArgs = Seq(result.getAbsolutePath()), extraEnv = extraEnv) - checkResult(result) + checkResult(finalState, result) } private def testUseClassPathFirst(clientMode: Boolean): Unit = { @@ -156,15 +189,15 @@ class YarnClusterSuite extends BaseYarnClusterSuite { val userJar = TestUtils.createJarWithFiles(Map("test.resource" -> "OVERRIDDEN"), tempDir) val driverResult = File.createTempFile("driver", null, tempDir) val executorResult = File.createTempFile("executor", null, tempDir) - runSpark(clientMode, mainClassName(YarnClasspathTest.getClass), + val finalState = runSpark(clientMode, mainClassName(YarnClasspathTest.getClass), appArgs = Seq(driverResult.getAbsolutePath(), executorResult.getAbsolutePath()), extraClassPath = Seq(originalJar.getPath()), extraJars = Seq("local:" + userJar.getPath()), extraConf = Map( "spark.driver.userClassPathFirst" -> "true", "spark.executor.userClassPathFirst" -> "true")) - checkResult(driverResult, "OVERRIDDEN") - checkResult(executorResult, "OVERRIDDEN") + checkResult(finalState, driverResult, "OVERRIDDEN") + checkResult(finalState, executorResult, "OVERRIDDEN") } } @@ -211,8 +244,8 @@ private object YarnClusterDriver extends Logging with Matchers { data should be (Set(1, 2, 3, 4)) result = "success" } finally { - sc.stop() Files.write(result, status, UTF_8) + sc.stop() } // verify log urls are present @@ -297,3 +330,18 @@ private object YarnClasspathTest extends Logging { } } + +private object YarnLauncherTestApp { + + def main(args: Array[String]): Unit = { + // Do not stop the application; the test will stop it using the launcher lib. Just run a task + // that will prevent the process from exiting. + val sc = new SparkContext(new SparkConf()) + sc.parallelize(Seq(1)).foreach { i => + this.synchronized { + wait() + } + } + } + +} http://git-wip-us.apache.org/repos/asf/spark/blob/015f7ef5/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala ---------------------------------------------------------------------- diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala index a85e577..c17e869 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala @@ -53,7 +53,7 @@ class YarnShuffleIntegrationSuite extends BaseYarnClusterSuite { logInfo("Shuffle service port = " + shuffleServicePort) val result = File.createTempFile("result", null, tempDir) - runSpark( + val finalState = runSpark( false, mainClassName(YarnExternalShuffleDriver.getClass), appArgs = Seq(result.getAbsolutePath(), registeredExecFile.getAbsolutePath), @@ -62,7 +62,7 @@ class YarnShuffleIntegrationSuite extends BaseYarnClusterSuite { "spark.shuffle.service.port" -> shuffleServicePort.toString ) ) - checkResult(result) + checkResult(finalState, result) assert(YarnTestAccessor.getRegisteredExecutorFile(shuffleService).exists()) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org