Repository: spark Updated Branches: refs/heads/branch-1.5 f9ad0e543 -> e405c2a1f
[SPARK-10812] [YARN] Spark hadoop util support switching to yarn While this is likely not a huge issue for real production systems, for test systems which may setup a Spark Context and tear it down and stand up a Spark Context with a different master (e.g. some local mode & some yarn mode) tests this cane be an issue. Discovered during work on spark-testing-base on Spark 1.4.1, but seems like the logic that triggers it is present in master (see SparkHadoopUtil object). A valid work around for users encountering this issue is to fork a different JVM, however this can be heavy weight. ``` [info] SampleMiniClusterTest: [info] Exception encountered when attempting to run a suite with class name: com.holdenkarau.spark.testing.SampleMiniClusterTest *** ABORTED *** [info] java.lang.ClassCastException: org.apache.spark.deploy.SparkHadoopUtil cannot be cast to org.apache.spark.deploy.yarn.YarnSparkHadoopUtil [info] at org.apache.spark.deploy.yarn.YarnSparkHadoopUtil$.get(YarnSparkHadoopUtil.scala:163) [info] at org.apache.spark.deploy.yarn.Client.prepareLocalResources(Client.scala:257) [info] at org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:561) [info] at org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:115) [info] at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:57) [info] at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:141) [info] at org.apache.spark.SparkContext.<init>(SparkContext.scala:497) [info] at com.holdenkarau.spark.testing.SharedMiniCluster$class.setup(SharedMiniCluster.scala:186) [info] at com.holdenkarau.spark.testing.SampleMiniClusterTest.setup(SampleMiniClusterTest.scala:26) [info] at com.holdenkarau.spark.testing.SharedMiniCluster$class.beforeAll(SharedMiniCluster.scala:103) ``` Author: Holden Karau <hol...@pigscanfly.ca> Closes #8911 from holdenk/SPARK-10812-spark-hadoop-util-support-switching-to-yarn. (cherry picked from commit d8d50ed388d2e695b69d2b93a620045ef2f0bc18) Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c49e0c3f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c49e0c3f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c49e0c3f Branch: refs/heads/branch-1.5 Commit: c49e0c3f6d25aa15b7cc25db0e9ae5a869184480 Parents: f9ad0e5 Author: Holden Karau <hol...@pigscanfly.ca> Authored: Mon Sep 28 06:33:45 2015 -0700 Committer: Marcelo Vanzin <van...@cloudera.com> Committed: Thu Oct 22 13:14:21 2015 -0700 ---------------------------------------------------------------------- .../scala/org/apache/spark/SparkContext.scala | 2 ++ .../apache/spark/deploy/SparkHadoopUtil.scala | 30 ++++++++++---------- .../org/apache/spark/deploy/yarn/Client.scala | 6 +++- .../deploy/yarn/YarnSparkHadoopUtilSuite.scala | 12 ++++++++ 4 files changed, 34 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/c49e0c3f/core/src/main/scala/org/apache/spark/SparkContext.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 011e19f..2a2fa75 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1750,6 +1750,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli } SparkEnv.set(null) } + // Unset YARN mode system env variable, to allow switching between cluster types. + System.clearProperty("SPARK_YARN_MODE") SparkContext.clearActiveContext() logInfo("Successfully stopped SparkContext") } http://git-wip-us.apache.org/repos/asf/spark/blob/c49e0c3f/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index dda4216..1157ee0 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -380,20 +380,13 @@ class SparkHadoopUtil extends Logging { object SparkHadoopUtil { - private val hadoop = { - val yarnMode = java.lang.Boolean.valueOf( - System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE"))) - if (yarnMode) { - try { - Utils.classForName("org.apache.spark.deploy.yarn.YarnSparkHadoopUtil") - .newInstance() - .asInstanceOf[SparkHadoopUtil] - } catch { - case e: Exception => throw new SparkException("Unable to load YARN support", e) - } - } else { - new SparkHadoopUtil - } + private lazy val hadoop = new SparkHadoopUtil + private lazy val yarn = try { + Utils.classForName("org.apache.spark.deploy.yarn.YarnSparkHadoopUtil") + .newInstance() + .asInstanceOf[SparkHadoopUtil] + } catch { + case e: Exception => throw new SparkException("Unable to load YARN support", e) } val SPARK_YARN_CREDS_TEMP_EXTENSION = ".tmp" @@ -401,6 +394,13 @@ object SparkHadoopUtil { val SPARK_YARN_CREDS_COUNTER_DELIM = "-" def get: SparkHadoopUtil = { - hadoop + // Check each time to support changing to/from YARN + val yarnMode = java.lang.Boolean.valueOf( + System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE"))) + if (yarnMode) { + yarn + } else { + hadoop + } } } http://git-wip-us.apache.org/repos/asf/spark/blob/c49e0c3f/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala ---------------------------------------------------------------------- diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index f21f5ef..ffa35e5 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -86,7 +86,11 @@ private[spark] class Client( private val fireAndForget = isClusterMode && !sparkConf.getBoolean("spark.yarn.submit.waitAppCompletion", true) - def stop(): Unit = yarnClient.stop() + def stop(): Unit = { + yarnClient.stop() + // Unset YARN mode system env variable, to allow switching between cluster types. + System.clearProperty("SPARK_YARN_MODE") + } /** * Submit an application running our ApplicationMaster to the ResourceManager. http://git-wip-us.apache.org/repos/asf/spark/blob/c49e0c3f/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala ---------------------------------------------------------------------- diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala index 49bee08..e1c67db 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala @@ -30,6 +30,7 @@ import org.scalatest.Matchers import org.apache.hadoop.yarn.api.records.ApplicationAccessType import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException, SparkFunSuite} +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.util.Utils @@ -233,4 +234,15 @@ class YarnSparkHadoopUtilSuite extends SparkFunSuite with Matchers with Logging } assert(caught.getMessage === "Can't get Master Kerberos principal for use as renewer") } + + test("check different hadoop utils based on env variable") { + try { + System.setProperty("SPARK_YARN_MODE", "true") + assert(SparkHadoopUtil.get.getClass === classOf[YarnSparkHadoopUtil]) + System.setProperty("SPARK_YARN_MODE", "false") + assert(SparkHadoopUtil.get.getClass === classOf[SparkHadoopUtil]) + } finally { + System.clearProperty("SPARK_YARN_MODE") + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org