Repository: spark
Updated Branches:
  refs/heads/master 70f44ad2d -> 015f7ef50


http://git-wip-us.apache.org/repos/asf/spark/blob/015f7ef5/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala 
b/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
index 17c59ff..12494b0 100644
--- 
a/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
+++ 
b/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
@@ -22,15 +22,18 @@ import java.util.Properties
 import java.util.concurrent.TimeUnit
 
 import scala.collection.JavaConverters._
+import scala.concurrent.duration._
+import scala.language.postfixOps
 
 import com.google.common.base.Charsets.UTF_8
 import com.google.common.io.Files
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.server.MiniYARNCluster
 import org.scalatest.{BeforeAndAfterAll, Matchers}
+import org.scalatest.concurrent.Eventually._
 
 import org.apache.spark._
-import org.apache.spark.launcher.TestClasspathBuilder
+import org.apache.spark.launcher._
 import org.apache.spark.util.Utils
 
 abstract class BaseYarnClusterSuite
@@ -46,13 +49,14 @@ abstract class BaseYarnClusterSuite
     |log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p 
%c{1}: %m%n
     |log4j.logger.org.apache.hadoop=WARN
     |log4j.logger.org.eclipse.jetty=WARN
+    |log4j.logger.org.mortbay=WARN
     |log4j.logger.org.spark-project.jetty=WARN
     """.stripMargin
 
   private var yarnCluster: MiniYARNCluster = _
   protected var tempDir: File = _
   private var fakeSparkJar: File = _
-  private var hadoopConfDir: File = _
+  protected var hadoopConfDir: File = _
   private var logConfDir: File = _
 
   def newYarnConfig(): YarnConfiguration
@@ -120,15 +124,77 @@ abstract class BaseYarnClusterSuite
       clientMode: Boolean,
       klass: String,
       appArgs: Seq[String] = Nil,
-      sparkArgs: Seq[String] = Nil,
+      sparkArgs: Seq[(String, String)] = Nil,
       extraClassPath: Seq[String] = Nil,
       extraJars: Seq[String] = Nil,
       extraConf: Map[String, String] = Map(),
-      extraEnv: Map[String, String] = Map()): Unit = {
+      extraEnv: Map[String, String] = Map()): SparkAppHandle.State = {
     val master = if (clientMode) "yarn-client" else "yarn-cluster"
-    val props = new Properties()
+    val propsFile = createConfFile(extraClassPath = extraClassPath, extraConf 
= extraConf)
+    val env = Map("YARN_CONF_DIR" -> hadoopConfDir.getAbsolutePath()) ++ 
extraEnv
+
+    val launcher = new SparkLauncher(env.asJava)
+    if (klass.endsWith(".py")) {
+      launcher.setAppResource(klass)
+    } else {
+      launcher.setMainClass(klass)
+      launcher.setAppResource(fakeSparkJar.getAbsolutePath())
+    }
+    launcher.setSparkHome(sys.props("spark.test.home"))
+      .setMaster(master)
+      .setConf("spark.executor.instances", "1")
+      .setPropertiesFile(propsFile)
+      .addAppArgs(appArgs.toArray: _*)
+
+    sparkArgs.foreach { case (name, value) =>
+      if (value != null) {
+        launcher.addSparkArg(name, value)
+      } else {
+        launcher.addSparkArg(name)
+      }
+    }
+    extraJars.foreach(launcher.addJar)
 
-    props.setProperty("spark.yarn.jar", "local:" + 
fakeSparkJar.getAbsolutePath())
+    val handle = launcher.startApplication()
+    try {
+      eventually(timeout(2 minutes), interval(1 second)) {
+        assert(handle.getState().isFinal())
+      }
+    } finally {
+      handle.kill()
+    }
+
+    handle.getState()
+  }
+
+  /**
+   * This is a workaround for an issue with yarn-cluster mode: the Client 
class will not provide
+   * any sort of error when the job process finishes successfully, but the job 
itself fails. So
+   * the tests enforce that something is written to a file after everything is 
ok to indicate
+   * that the job succeeded.
+   */
+  protected def checkResult(finalState: SparkAppHandle.State, result: File): 
Unit = {
+    checkResult(finalState, result, "success")
+  }
+
+  protected def checkResult(
+      finalState: SparkAppHandle.State,
+      result: File,
+      expected: String): Unit = {
+    finalState should be (SparkAppHandle.State.FINISHED)
+    val resultString = Files.toString(result, UTF_8)
+    resultString should be (expected)
+  }
+
+  protected def mainClassName(klass: Class[_]): String = {
+    klass.getName().stripSuffix("$")
+  }
+
+  protected def createConfFile(
+      extraClassPath: Seq[String] = Nil,
+      extraConf: Map[String, String] = Map()): String = {
+    val props = new Properties()
+    props.put("spark.yarn.jar", "local:" + fakeSparkJar.getAbsolutePath())
 
     val testClasspath = new TestClasspathBuilder()
       .buildClassPath(
@@ -138,69 +204,28 @@ abstract class BaseYarnClusterSuite
       .asScala
       .mkString(File.pathSeparator)
 
-    props.setProperty("spark.driver.extraClassPath", testClasspath)
-    props.setProperty("spark.executor.extraClassPath", testClasspath)
+    props.put("spark.driver.extraClassPath", testClasspath)
+    props.put("spark.executor.extraClassPath", testClasspath)
 
     // SPARK-4267: make sure java options are propagated correctly.
     props.setProperty("spark.driver.extraJavaOptions", "-Dfoo=\"one two 
three\"")
     props.setProperty("spark.executor.extraJavaOptions", "-Dfoo=\"one two 
three\"")
 
-    yarnCluster.getConfig.asScala.foreach { e =>
+    yarnCluster.getConfig().asScala.foreach { e =>
       props.setProperty("spark.hadoop." + e.getKey(), e.getValue())
     }
-
     sys.props.foreach { case (k, v) =>
       if (k.startsWith("spark.")) {
         props.setProperty(k, v)
       }
     }
-
     extraConf.foreach { case (k, v) => props.setProperty(k, v) }
 
     val propsFile = File.createTempFile("spark", ".properties", tempDir)
     val writer = new OutputStreamWriter(new FileOutputStream(propsFile), UTF_8)
     props.store(writer, "Spark properties.")
     writer.close()
-
-    val extraJarArgs = if (extraJars.nonEmpty) Seq("--jars", 
extraJars.mkString(",")) else Nil
-    val mainArgs =
-      if (klass.endsWith(".py")) {
-        Seq(klass)
-      } else {
-        Seq("--class", klass, fakeSparkJar.getAbsolutePath())
-      }
-    val argv =
-      Seq(
-        new File(sys.props("spark.test.home"), 
"bin/spark-submit").getAbsolutePath(),
-        "--master", master,
-        "--num-executors", "1",
-        "--properties-file", propsFile.getAbsolutePath()) ++
-      extraJarArgs ++
-      sparkArgs ++
-      mainArgs ++
-      appArgs
-
-    Utils.executeAndGetOutput(argv,
-      extraEnvironment = Map("YARN_CONF_DIR" -> 
hadoopConfDir.getAbsolutePath()) ++ extraEnv)
-  }
-
-  /**
-   * This is a workaround for an issue with yarn-cluster mode: the Client 
class will not provide
-   * any sort of error when the job process finishes successfully, but the job 
itself fails. So
-   * the tests enforce that something is written to a file after everything is 
ok to indicate
-   * that the job succeeded.
-   */
-  protected def checkResult(result: File): Unit = {
-    checkResult(result, "success")
-  }
-
-  protected def checkResult(result: File, expected: String): Unit = {
-    val resultString = Files.toString(result, UTF_8)
-    resultString should be (expected)
-  }
-
-  protected def mainClassName(klass: Class[_]): String = {
-    klass.getName().stripSuffix("$")
+    propsFile.getAbsolutePath()
   }
 
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/015f7ef5/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala 
b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index f1601cd..d1cd0c8 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -19,16 +19,20 @@ package org.apache.spark.deploy.yarn
 
 import java.io.File
 import java.net.URL
+import java.util.{HashMap => JHashMap, Properties}
 
 import scala.collection.mutable
+import scala.concurrent.duration._
+import scala.language.postfixOps
 
 import com.google.common.base.Charsets.UTF_8
 import com.google.common.io.{ByteStreams, Files}
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.scalatest.Matchers
+import org.scalatest.concurrent.Eventually._
 
 import org.apache.spark._
-import org.apache.spark.launcher.TestClasspathBuilder
+import org.apache.spark.launcher._
 import org.apache.spark.scheduler.{SparkListener, 
SparkListenerApplicationStart,
   SparkListenerExecutorAdded}
 import org.apache.spark.scheduler.cluster.ExecutorInfo
@@ -82,10 +86,8 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
 
   test("run Spark in yarn-cluster mode unsuccessfully") {
     // Don't provide arguments so the driver will fail.
-    val exception = intercept[SparkException] {
-      runSpark(false, mainClassName(YarnClusterDriver.getClass))
-      fail("Spark application should have failed.")
-    }
+    val finalState = runSpark(false, mainClassName(YarnClusterDriver.getClass))
+    finalState should be (SparkAppHandle.State.FAILED)
   }
 
   test("run Python application in yarn-client mode") {
@@ -104,11 +106,42 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
     testUseClassPathFirst(false)
   }
 
+  test("monitor app using launcher library") {
+    val env = new JHashMap[String, String]()
+    env.put("YARN_CONF_DIR", hadoopConfDir.getAbsolutePath())
+
+    val propsFile = createConfFile()
+    val handle = new SparkLauncher(env)
+      .setSparkHome(sys.props("spark.test.home"))
+      .setConf("spark.ui.enabled", "false")
+      .setPropertiesFile(propsFile)
+      .setMaster("yarn-client")
+      .setAppResource("spark-internal")
+      .setMainClass(mainClassName(YarnLauncherTestApp.getClass))
+      .startApplication()
+
+    try {
+      eventually(timeout(30 seconds), interval(100 millis)) {
+        handle.getState() should be (SparkAppHandle.State.RUNNING)
+      }
+
+      handle.getAppId() should not be (null)
+      handle.getAppId() should startWith ("application_")
+      handle.stop()
+
+      eventually(timeout(30 seconds), interval(100 millis)) {
+        handle.getState() should be (SparkAppHandle.State.KILLED)
+      }
+    } finally {
+      handle.kill()
+    }
+  }
+
   private def testBasicYarnApp(clientMode: Boolean): Unit = {
     val result = File.createTempFile("result", null, tempDir)
-    runSpark(clientMode, mainClassName(YarnClusterDriver.getClass),
+    val finalState = runSpark(clientMode, 
mainClassName(YarnClusterDriver.getClass),
       appArgs = Seq(result.getAbsolutePath()))
-    checkResult(result)
+    checkResult(finalState, result)
   }
 
   private def testPySpark(clientMode: Boolean): Unit = {
@@ -143,11 +176,11 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
     val pyFiles = Seq(pyModule.getAbsolutePath(), 
mod2Archive.getPath()).mkString(",")
     val result = File.createTempFile("result", null, tempDir)
 
-    runSpark(clientMode, primaryPyFile.getAbsolutePath(),
-      sparkArgs = Seq("--py-files", pyFiles),
+    val finalState = runSpark(clientMode, primaryPyFile.getAbsolutePath(),
+      sparkArgs = Seq("--py-files" -> pyFiles),
       appArgs = Seq(result.getAbsolutePath()),
       extraEnv = extraEnv)
-    checkResult(result)
+    checkResult(finalState, result)
   }
 
   private def testUseClassPathFirst(clientMode: Boolean): Unit = {
@@ -156,15 +189,15 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
     val userJar = TestUtils.createJarWithFiles(Map("test.resource" -> 
"OVERRIDDEN"), tempDir)
     val driverResult = File.createTempFile("driver", null, tempDir)
     val executorResult = File.createTempFile("executor", null, tempDir)
-    runSpark(clientMode, mainClassName(YarnClasspathTest.getClass),
+    val finalState = runSpark(clientMode, 
mainClassName(YarnClasspathTest.getClass),
       appArgs = Seq(driverResult.getAbsolutePath(), 
executorResult.getAbsolutePath()),
       extraClassPath = Seq(originalJar.getPath()),
       extraJars = Seq("local:" + userJar.getPath()),
       extraConf = Map(
         "spark.driver.userClassPathFirst" -> "true",
         "spark.executor.userClassPathFirst" -> "true"))
-    checkResult(driverResult, "OVERRIDDEN")
-    checkResult(executorResult, "OVERRIDDEN")
+    checkResult(finalState, driverResult, "OVERRIDDEN")
+    checkResult(finalState, executorResult, "OVERRIDDEN")
   }
 
 }
@@ -211,8 +244,8 @@ private object YarnClusterDriver extends Logging with 
Matchers {
       data should be (Set(1, 2, 3, 4))
       result = "success"
     } finally {
-      sc.stop()
       Files.write(result, status, UTF_8)
+      sc.stop()
     }
 
     // verify log urls are present
@@ -297,3 +330,18 @@ private object YarnClasspathTest extends Logging {
   }
 
 }
+
+private object YarnLauncherTestApp {
+
+  def main(args: Array[String]): Unit = {
+    // Do not stop the application; the test will stop it using the launcher 
lib. Just run a task
+    // that will prevent the process from exiting.
+    val sc = new SparkContext(new SparkConf())
+    sc.parallelize(Seq(1)).foreach { i =>
+      this.synchronized {
+        wait()
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/015f7ef5/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala
 
b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala
index a85e577..c17e869 100644
--- 
a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala
+++ 
b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala
@@ -53,7 +53,7 @@ class YarnShuffleIntegrationSuite extends 
BaseYarnClusterSuite {
 
     logInfo("Shuffle service port = " + shuffleServicePort)
     val result = File.createTempFile("result", null, tempDir)
-    runSpark(
+    val finalState = runSpark(
       false,
       mainClassName(YarnExternalShuffleDriver.getClass),
       appArgs = Seq(result.getAbsolutePath(), 
registeredExecFile.getAbsolutePath),
@@ -62,7 +62,7 @@ class YarnShuffleIntegrationSuite extends 
BaseYarnClusterSuite {
         "spark.shuffle.service.port" -> shuffleServicePort.toString
       )
     )
-    checkResult(result)
+    checkResult(finalState, result)
     assert(YarnTestAccessor.getRegisteredExecutorFile(shuffleService).exists())
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to