Repository: spark
Updated Branches:
  refs/heads/master c377e49e3 -> d513c99c1


[SPARK-16414][YARN] Fix bugs for "Can not get user config when calling 
SparkHadoopUtil.get.conf on yarn cluser mode"

## What changes were proposed in this pull request?

The `SparkHadoopUtil` singleton was instantiated before `ApplicationMaster` in 
`ApplicationMaster.main` when deploying spark on yarn cluster mode, the `conf` 
in the `SparkHadoopUtil` singleton didn't include user's configuration.

So, we should load the properties file with the Spark configuration and set 
entries as system properties before `SparkHadoopUtil` first instantiate.

## How was this patch tested?

Add a test case

Author: sharkd <sharkd...@gmail.com>
Author: sharkdtu <shark...@tencent.com>

Closes #14088 from sharkdtu/master.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d513c99c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d513c99c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d513c99c

Branch: refs/heads/master
Commit: d513c99c19e229f72d03006e251725a43c13fefd
Parents: c377e49
Author: sharkd <sharkd...@gmail.com>
Authored: Tue Jul 12 10:10:35 2016 -0700
Committer: Marcelo Vanzin <van...@cloudera.com>
Committed: Tue Jul 12 10:10:35 2016 -0700

----------------------------------------------------------------------
 .../spark/deploy/yarn/ApplicationMaster.scala   | 17 ++++----
 .../spark/deploy/yarn/YarnClusterSuite.scala    | 45 ++++++++++++++++++++
 2 files changed, 54 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d513c99c/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index b6f45dd..c371ad6 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -50,14 +50,6 @@ private[spark] class ApplicationMaster(
     client: YarnRMClient)
   extends Logging {
 
-  // Load the properties file with the Spark configuration and set entries as 
system properties,
-  // so that user code run inside the AM also has access to them.
-  if (args.propertiesFile != null) {
-    Utils.getPropertiesFromFile(args.propertiesFile).foreach { case (k, v) =>
-      sys.props(k) = v
-    }
-  }
-
   // TODO: Currently, task to container is computed once (TaskSetManager) - 
which need not be
   // optimal as more containers are available. Might need to handle this 
better.
 
@@ -743,6 +735,15 @@ object ApplicationMaster extends Logging {
   def main(args: Array[String]): Unit = {
     SignalUtils.registerLogger(log)
     val amArgs = new ApplicationMasterArguments(args)
+
+    // Load the properties file with the Spark configuration and set entries 
as system properties,
+    // so that user code run inside the AM also has access to them.
+    // Note: we must do this before SparkHadoopUtil instantiated
+    if (amArgs.propertiesFile != null) {
+      Utils.getPropertiesFromFile(amArgs.propertiesFile).foreach { case (k, v) 
=>
+        sys.props(k) = v
+      }
+    }
     SparkHadoopUtil.get.runAsSparkUser { () =>
       master = new ApplicationMaster(amArgs, new YarnRMClient)
       System.exit(master.run())

http://git-wip-us.apache.org/repos/asf/spark/blob/d513c99c/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala 
b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index 9085fca..874e304 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -32,6 +32,7 @@ import org.scalatest.Matchers
 import org.scalatest.concurrent.Eventually._
 
 import org.apache.spark._
+import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
 import org.apache.spark.launcher._
 import org.apache.spark.scheduler.{SparkListener, 
SparkListenerApplicationStart,
@@ -106,6 +107,10 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
       ))
   }
 
+  test("run Spark in yarn-cluster mode with using SparkHadoopUtil.conf") {
+    testYarnAppUseSparkHadoopUtilConf()
+  }
+
   test("run Spark in yarn-client mode with additional jar") {
     testWithAddJar(true)
   }
@@ -181,6 +186,15 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
     checkResult(finalState, result)
   }
 
+  private def testYarnAppUseSparkHadoopUtilConf(): Unit = {
+    val result = File.createTempFile("result", null, tempDir)
+    val finalState = runSpark(false,
+      mainClassName(YarnClusterDriverUseSparkHadoopUtilConf.getClass),
+      appArgs = Seq("key=value", result.getAbsolutePath()),
+      extraConf = Map("spark.hadoop.key" -> "value"))
+    checkResult(finalState, result)
+  }
+
   private def testWithAddJar(clientMode: Boolean): Unit = {
     val originalJar = TestUtils.createJarWithFiles(Map("test.resource" -> 
"ORIGINAL"), tempDir)
     val driverResult = File.createTempFile("driver", null, tempDir)
@@ -274,6 +288,37 @@ private object YarnClusterDriverWithFailure extends 
Logging with Matchers {
   }
 }
 
+private object YarnClusterDriverUseSparkHadoopUtilConf extends Logging with 
Matchers {
+  def main(args: Array[String]): Unit = {
+    if (args.length != 2) {
+      // scalastyle:off println
+      System.err.println(
+        s"""
+        |Invalid command line: ${args.mkString(" ")}
+        |
+        |Usage: YarnClusterDriverUseSparkHadoopUtilConf [hadoopConfKey=value] 
[result file]
+        """.stripMargin)
+      // scalastyle:on println
+      System.exit(1)
+    }
+
+    val sc = new SparkContext(new SparkConf()
+      .set("spark.extraListeners", classOf[SaveExecutorInfo].getName)
+      .setAppName("yarn test using SparkHadoopUtil's conf"))
+
+    val kv = args(0).split("=")
+    val status = new File(args(1))
+    var result = "failure"
+    try {
+      SparkHadoopUtil.get.conf.get(kv(0)) should be (kv(1))
+      result = "success"
+    } finally {
+      Files.write(result, status, StandardCharsets.UTF_8)
+      sc.stop()
+    }
+  }
+}
+
 private object YarnClusterDriver extends Logging with Matchers {
 
   val WAIT_TIMEOUT_MILLIS = 10000


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to