Updated Branches:
  refs/heads/master c1d928a89 -> 3713f8129

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
----------------------------------------------------------------------
diff --git 
a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala 
b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index 70be15d..41ac292 100644
--- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -35,7 +35,7 @@ class ClientArguments(val args: Array[String]) {
   var workerMemory = 1024 // MB
   var workerCores = 1
   var numWorkers = 2
-  var amQueue = System.getProperty("QUEUE", "default")
+  var amQueue = conf.getOrElse("QUEUE",  "default")
   var amMemory: Int = 512 // MB
   var amClass: String = "org.apache.spark.deploy.yarn.ApplicationMaster"
   var appName: String = "Spark"

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
----------------------------------------------------------------------
diff --git 
a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala 
b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
index bc31bb2..f7d73f0 100644
--- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
+++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
@@ -136,8 +136,8 @@ class WorkerLauncher(args: ApplicationMasterArguments, 
conf: Configuration) exte
         Thread.sleep(100)
       }
     }
-    System.setProperty("spark.driver.host", driverHost)
-    System.setProperty("spark.driver.port", driverPort.toString)
+    conf.set("spark.driver.host",  driverHost)
+    conf.set("spark.driver.port",  driverPort.toString)
 
     val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
       driverHost, driverPort.toString, 
CoarseGrainedSchedulerBackend.ACTOR_NAME)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
----------------------------------------------------------------------
diff --git 
a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
 
b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index c27257c..71d1cbd 100644
--- 
a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ 
b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -254,8 +254,8 @@ private[yarn] class YarnAllocationHandler(
         } else {
           val workerId = workerIdCounter.incrementAndGet().toString
           val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
-            System.getProperty("spark.driver.host"),
-            System.getProperty("spark.driver.port"),
+            conf.get("spark.driver.host"),
+            conf.get("spark.driver.port"),
             CoarseGrainedSchedulerBackend.ACTOR_NAME)
 
           logInfo("Launching container %s for on host %s".format(containerId, 
workerHostname))

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
 
b/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index b206780..6feaaff 100644
--- 
a/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ 
b/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -52,8 +52,8 @@ private[spark] class YarnClientSchedulerBackend(
     if (workerNumber == null)
       workerNumber = defaultWorkerNumber
 
-    val driverHost = System.getProperty("spark.driver.host")
-    val driverPort = System.getProperty("spark.driver.port")
+    val driverHost = conf.get("spark.driver.host")
+    val driverPort = conf.get("spark.driver.port")
     val hostport = driverHost + ":" + driverPort
 
     val argsArray = Array[String](

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/project/SparkBuild.scala
----------------------------------------------------------------------
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index ab96cfa..ffb54a2 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -244,7 +244,8 @@ object SparkBuild extends Build {
         "com.codahale.metrics"     % "metrics-ganglia"  % "3.0.0",
         "com.codahale.metrics"     % "metrics-graphite" % "3.0.0",
         "com.twitter"             %% "chill"            % "0.3.1",
-        "com.twitter"              % "chill-java"       % "0.3.1"
+        "com.twitter"              % "chill-java"       % "0.3.1",
+        "com.typesafe"             % "config"           % "1.0.2"
       )
   )
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
----------------------------------------------------------------------
diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala 
b/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
index 523fd12..b2f499e 100644
--- a/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
@@ -930,9 +930,6 @@ class SparkILoop(in0: Option[BufferedReader], protected val 
out: JPrintWriter,
 
   def createSparkContext(): SparkContext = {
     val uri = System.getenv("SPARK_EXECUTOR_URI")
-    if (uri != null) {
-      System.setProperty("spark.executor.uri", uri)
-    }
     val master = this.master match {
       case Some(m) => m
       case None => {
@@ -942,6 +939,10 @@ class SparkILoop(in0: Option[BufferedReader], protected 
val out: JPrintWriter,
     }
     val jars = SparkILoop.getAddedJars.map(new java.io.File(_).getAbsolutePath)
     sparkContext = new SparkContext(master, "Spark shell", 
System.getenv("SPARK_HOME"), jars)
+    if (uri != null) {
+      sparkContext.conf.set("spark.executor.uri",  uri)
+    }
+    sparkContext.conf.set("spark.repl.class.uri",  intp.classServer.uri)
     echo("Created spark context..")
     sparkContext
   }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala
----------------------------------------------------------------------
diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala 
b/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala
index e1455ef..0d412e4 100644
--- a/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala
@@ -34,10 +34,8 @@ import scala.tools.reflect.StdRuntimeTags._
 import scala.util.control.ControlThrowable
 import util.stackTraceString
 
-import org.apache.spark.HttpServer
+import org.apache.spark.{SparkContext, HttpServer, SparkEnv, Logging}
 import org.apache.spark.util.Utils
-import org.apache.spark.SparkEnv
-import org.apache.spark.Logging
 
 // /** directory to save .class files to */
 // private class ReplVirtualDirectory(out: JPrintWriter) extends 
VirtualDirectory("((memory))", None) {
@@ -91,7 +89,7 @@ import org.apache.spark.Logging
       /** Local directory to save .class files too */
       val outputDir = {
         val tmp = System.getProperty("java.io.tmpdir")
-        val rootDir = System.getProperty("spark.repl.classdir", tmp)
+        val rootDir = SparkContext.globalConf.getOrElse("spark.repl.classdir", 
 tmp)
         Utils.createTempDir(rootDir)
       }
       if (SPARK_DEBUG_REPL) {
@@ -112,7 +110,6 @@ import org.apache.spark.Logging
         // Start the classServer and store its URI in a spark system property
     // (which will be passed to executors so that they can connect to it)
       classServer.start()
-      System.setProperty("spark.repl.class.uri", classServer.uri)
       if (SPARK_DEBUG_REPL) {
         echo("Class server started, URI = " + classServer.uri)
       }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 9271914..b8e1427 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -34,7 +34,7 @@ class Checkpoint(@transient ssc: StreamingContext, val 
checkpointTime: Time)
   extends Logging with Serializable {
   val master = ssc.sc.master
   val framework = ssc.sc.appName
-  val sparkHome = ssc.sc.sparkHome
+  val sparkHome = ssc.sc.getSparkHome.getOrElse(null)
   val jars = ssc.sc.jars
   val environment = ssc.sc.environment
   val graph = ssc.graph
@@ -42,6 +42,7 @@ class Checkpoint(@transient ssc: StreamingContext, val 
checkpointTime: Time)
   val checkpointDuration = ssc.checkpointDuration
   val pendingTimes = ssc.scheduler.jobManager.getPendingTimes()
   val delaySeconds = MetadataCleaner.getDelaySeconds
+  val sparkConf = ssc.sc.conf
 
   def validate() {
     assert(master != null, "Checkpoint.master is null")

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/streaming/src/main/scala/org/apache/spark/streaming/Scheduler.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/Scheduler.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/Scheduler.scala
index ed892e3..1d23713 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Scheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Scheduler.scala
@@ -26,7 +26,7 @@ class Scheduler(ssc: StreamingContext) extends Logging {
 
   initLogging()
 
-  val concurrentJobs = System.getProperty("spark.streaming.concurrentJobs", 
"1").toInt
+  val concurrentJobs = ssc.sc.conf.getOrElse("spark.streaming.concurrentJobs", 
 "1").toInt
   val jobManager = new JobManager(ssc, concurrentJobs)
   val checkpointWriter = if (ssc.checkpointDuration != null && 
ssc.checkpointDir != null) {
     new CheckpointWriter(ssc.checkpointDir)
@@ -34,7 +34,7 @@ class Scheduler(ssc: StreamingContext) extends Logging {
     null
   }
 
-  val clockClass = System.getProperty(
+  val clockClass = ssc.sc.conf.getOrElse(
     "spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock")
   val clock = Class.forName(clockClass).newInstance().asInstanceOf[Clock]
   val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
@@ -73,7 +73,7 @@ class Scheduler(ssc: StreamingContext) extends Logging {
     // or if the property is defined set it to that time
     if (clock.isInstanceOf[ManualClock]) {
       val lastTime = ssc.initialCheckpoint.checkpointTime.milliseconds
-      val jumpTime = System.getProperty("spark.streaming.manualClock.jump", 
"0").toLong
+      val jumpTime = ssc.sc.conf.getOrElse("spark.streaming.manualClock.jump", 
 "0").toLong
       clock.asInstanceOf[ManualClock].setTime(lastTime + jumpTime)
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index d2c4fde..7674422 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -115,7 +115,7 @@ class StreamingContext private (
 
   protected[streaming] val sc: SparkContext = {
     if (isCheckpointPresent) {
-      new SparkContext(cp_.master, cp_.framework, cp_.sparkHome, cp_.jars, 
cp_.environment)
+      new SparkContext(cp_.sparkConf, cp_.environment)
     } else {
       sc_
     }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
index d5ae8ae..8bf761b 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
@@ -175,8 +175,8 @@ abstract class NetworkReceiver[T: ClassTag]() extends 
Serializable with Logging
   /** A helper actor that communicates with the NetworkInputTracker */
   private class NetworkReceiverActor extends Actor {
     logInfo("Attempting to register with tracker")
-    val ip = System.getProperty("spark.driver.host", "localhost")
-    val port = System.getProperty("spark.driver.port", "7077").toInt
+    val ip = env.conf.getOrElse("spark.driver.host",  "localhost")
+    val port = env.conf.getOrElse("spark.driver.port",  "7077").toInt
     val url = "akka.tcp://spark@%s:%s/user/NetworkInputTracker".format(ip, 
port)
     val tracker = env.actorSystem.actorSelection(url)
     val timeout = 5.seconds
@@ -213,7 +213,7 @@ abstract class NetworkReceiver[T: ClassTag]() extends 
Serializable with Logging
     case class Block(id: BlockId, buffer: ArrayBuffer[T], metadata: Any = null)
 
     val clock = new SystemClock()
-    val blockInterval = System.getProperty("spark.streaming.blockInterval", 
"200").toLong
+    val blockInterval = env.conf.getOrElse("spark.streaming.blockInterval",  
"200").toLong
     val blockIntervalTimer = new RecurringTimer(clock, blockInterval, 
updateCurrentBuffer)
     val blockStorageLevel = storageLevel
     val blocksForPushing = new ArrayBlockingQueue[Block](1000)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala 
b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index e81287b..315bd54 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -42,7 +42,7 @@ import org.apache.spark.streaming.util.ManualClock
  */
 class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
 
-  System.setProperty("spark.streaming.clock", 
"org.apache.spark.streaming.util.ManualClock")
+  conf.set("spark.streaming.clock", 
"org.apache.spark.streaming.util.ManualClock")
 
   before {
     FileUtils.deleteDirectory(new File(checkpointDir))
@@ -69,7 +69,7 @@ class CheckpointSuite extends TestSuiteBase with 
BeforeAndAfter {
 
     assert(batchDuration === Milliseconds(500), "batchDuration for this test 
must be 1 second")
 
-    System.setProperty("spark.streaming.clock", 
"org.apache.spark.streaming.util.ManualClock")
+    conf.set("spark.streaming.clock", 
"org.apache.spark.streaming.util.ManualClock")
 
     val stateStreamCheckpointInterval = Seconds(1)
 
@@ -135,13 +135,13 @@ class CheckpointSuite extends TestSuiteBase with 
BeforeAndAfter {
 
     // Restart stream computation from the new checkpoint file to see whether 
that file has
     // correct checkpoint data
+    conf.set("spark.streaming.manualClock.jump", (batchDuration.milliseconds * 
7).toString)
     ssc = new StreamingContext(checkpointDir)
     stateStream = 
ssc.graph.getOutputStreams().head.dependencies.head.dependencies.head
     logInfo("Restored data of state stream = \n[" + 
stateStream.generatedRDDs.mkString("\n") + "]")
     assert(!stateStream.generatedRDDs.isEmpty, "No restored RDDs in state 
stream after recovery from second failure")
 
     // Adjust manual clock time as if it is being restarted after a delay
-    System.setProperty("spark.streaming.manualClock.jump", 
(batchDuration.milliseconds * 7).toString)
     ssc.start()
     advanceTimeWithRealDelay(ssc, 4)
     ssc.stop()

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala 
b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 7dc82de..da8f135 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -53,7 +53,7 @@ class InputStreamsSuite extends TestSuiteBase with 
BeforeAndAfter {
   override def checkpointDir = "checkpoint"
 
   before {
-    System.setProperty("spark.streaming.clock", 
"org.apache.spark.streaming.util.ManualClock")
+    conf.set("spark.streaming.clock", 
"org.apache.spark.streaming.util.ManualClock")
   }
 
   after {
@@ -68,7 +68,7 @@ class InputStreamsSuite extends TestSuiteBase with 
BeforeAndAfter {
     testServer.start()
 
     // Set up the streaming context and input streams
-    val ssc = new StreamingContext(master, framework, batchDuration)
+    val ssc = new StreamingContext(new SparkContext(conf), batchDuration)
     val networkStream = ssc.socketTextStream("localhost", testServer.port, 
StorageLevel.MEMORY_AND_DISK)
     val outputBuffer = new ArrayBuffer[Seq[String]] with 
SynchronizedBuffer[Seq[String  ]]
     val outputStream = new TestOutputStream(networkStream, outputBuffer)
@@ -113,7 +113,7 @@ class InputStreamsSuite extends TestSuiteBase with 
BeforeAndAfter {
 
   test("flume input stream") {
     // Set up the streaming context and input streams
-    val ssc = new StreamingContext(master, framework, batchDuration)
+    val ssc = new StreamingContext(new SparkContext(conf), batchDuration)
     val flumeStream = ssc.flumeStream("localhost", testPort, 
StorageLevel.MEMORY_AND_DISK)
     val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
       with SynchronizedBuffer[Seq[SparkFlumeEvent]]
@@ -162,11 +162,11 @@ class InputStreamsSuite extends TestSuiteBase with 
BeforeAndAfter {
 
   test("file input stream") {
     // Disable manual clock as FileInputDStream does not work with manual clock
-    System.clearProperty("spark.streaming.clock")
+    conf.set("spark.streaming.clock", 
"org.apache.spark.streaming.util.SystemClock")
 
     // Set up the streaming context and input streams
     val testDir = Files.createTempDir()
-    val ssc = new StreamingContext(master, framework, batchDuration)
+    val ssc = new StreamingContext(new SparkContext(conf), batchDuration)
     val fileStream = ssc.textFileStream(testDir.toString)
     val outputBuffer = new ArrayBuffer[Seq[String]] with 
SynchronizedBuffer[Seq[String]]
     def output = outputBuffer.flatMap(x => x)
@@ -207,7 +207,7 @@ class InputStreamsSuite extends TestSuiteBase with 
BeforeAndAfter {
     FileUtils.deleteDirectory(testDir)
 
     // Enable manual clock back again for other tests
-    System.setProperty("spark.streaming.clock", 
"org.apache.spark.streaming.util.ManualClock")
+    conf.set("spark.streaming.clock", 
"org.apache.spark.streaming.util.ManualClock")
   }
 
 
@@ -218,7 +218,7 @@ class InputStreamsSuite extends TestSuiteBase with 
BeforeAndAfter {
     testServer.start()
 
     // Set up the streaming context and input streams
-    val ssc = new StreamingContext(master, framework, batchDuration)
+    val ssc = new StreamingContext(new SparkContext(conf), batchDuration)
     val networkStream = ssc.actorStream[String](Props(new TestActor(port)), 
"TestActor",
       StorageLevel.MEMORY_AND_DISK) //Had to pass the local value of port to 
prevent from closing over entire scope
     val outputBuffer = new ArrayBuffer[Seq[String]] with 
SynchronizedBuffer[Seq[String]]
@@ -262,7 +262,7 @@ class InputStreamsSuite extends TestSuiteBase with 
BeforeAndAfter {
   }
 
   test("kafka input stream") {
-    val ssc = new StreamingContext(master, framework, batchDuration)
+    val ssc = new StreamingContext(new SparkContext(conf), batchDuration)
     val topics = Map("my-topic" -> 1)
     val test1 = ssc.kafkaStream("localhost:12345", "group", topics)
     val test2 = ssc.kafkaStream("localhost:12345", "group", topics, 
StorageLevel.MEMORY_AND_DISK)
@@ -285,7 +285,7 @@ class InputStreamsSuite extends TestSuiteBase with 
BeforeAndAfter {
     MultiThreadTestReceiver.haveAllThreadsFinished = false
 
     // set up the network stream using the test receiver
-    val ssc = new StreamingContext(master, framework, batchDuration)
+    val ssc = new StreamingContext(new SparkContext(conf), batchDuration)
     val networkStream = ssc.networkStream[Int](testReceiver)
     val countStream = networkStream.count
     val outputBuffer = new ArrayBuffer[Seq[Long]] with 
SynchronizedBuffer[Seq[Long]]

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala 
b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index 2f34e81..d1cab0c 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -28,7 +28,7 @@ import java.io.{ObjectInputStream, IOException}
 
 import org.scalatest.{BeforeAndAfter, FunSuite}
 
-import org.apache.spark.Logging
+import org.apache.spark.{SparkContext, SparkConf, Logging}
 import org.apache.spark.rdd.RDD
 
 /**
@@ -130,6 +130,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter 
with Logging {
   // Whether to actually wait in real time before changing manual clock
   def actuallyWait = false
 
+  def conf = new 
SparkConf().setMasterUrl(master).setAppName(framework).set("spark.cleaner.ttl", 
"3600")
   /**
    * Set up required DStreams to test the DStream operation using the two 
sequences
    * of input collections.
@@ -139,9 +140,9 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter 
with Logging {
       operation: DStream[U] => DStream[V],
       numPartitions: Int = numInputPartitions
     ): StreamingContext = {
-
+    val sc = new SparkContext(conf)
     // Create StreamingContext
-    val ssc = new StreamingContext(master, framework, batchDuration)
+    val ssc = new StreamingContext(sc, batchDuration)
     if (checkpointDir != null) {
       ssc.checkpoint(checkpointDir)
     }
@@ -165,9 +166,9 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter 
with Logging {
       input2: Seq[Seq[V]],
       operation: (DStream[U], DStream[V]) => DStream[W]
     ): StreamingContext = {
-
+    val sc = new SparkContext(conf)
     // Create StreamingContext
-    val ssc = new StreamingContext(master, framework, batchDuration)
+    val ssc = new StreamingContext(sc, batchDuration)
     if (checkpointDir != null) {
       ssc.checkpoint(checkpointDir)
     }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 240ed8b..1dd38dd 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -58,13 +58,13 @@ class ApplicationMaster(args: ApplicationMasterArguments, 
conf: Configuration) e
     YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES)
   private var isLastAMRetry: Boolean = true
   // default to numWorkers * 2, with minimum of 3
-  private val maxNumWorkerFailures = 
System.getProperty("spark.yarn.max.worker.failures",
+  private val maxNumWorkerFailures = 
conf.getOrElse("spark.yarn.max.worker.failures",
     math.max(args.numWorkers * 2, 3).toString()).toInt
 
   def run() {
     // Setup the directories so things go to yarn approved directories rather
     // then user specified and /tmp.
-    System.setProperty("spark.local.dir", getLocalDirs())
+    conf.set("spark.local.dir",  getLocalDirs())
 
     // Use priority 30 as its higher then HDFS. Its same priority as MapReduce 
is using.
     ShutdownHookManager.get().addShutdownHook(new AppMasterShutdownHook(this), 
30)
@@ -165,10 +165,10 @@ class ApplicationMaster(args: ApplicationMasterArguments, 
conf: Configuration) e
     logInfo("Waiting for spark driver to be reachable.")
     var driverUp = false
     var tries = 0
-    val numTries = 
System.getProperty("spark.yarn.applicationMaster.waitTries", "10").toInt
+    val numTries = conf.getOrElse("spark.yarn.applicationMaster.waitTries",  
"10").toInt
     while(!driverUp && tries < numTries) {
-      val driverHost = System.getProperty("spark.driver.host")
-      val driverPort = System.getProperty("spark.driver.port")
+      val driverHost = conf.get("spark.driver.host")
+      val driverPort = conf.get("spark.driver.port")
       try {
         val socket = new Socket(driverHost, driverPort.toInt)
         socket.close()
@@ -226,7 +226,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, 
conf: Configuration) e
       ApplicationMaster.sparkContextRef.synchronized {
         var count = 0
         val waitTime = 10000L
-        val numTries = 
System.getProperty("spark.yarn.ApplicationMaster.waitTries", "10").toInt
+        val numTries = 
conf.getOrElse("spark.yarn.ApplicationMaster.waitTries",  "10").toInt
         while (ApplicationMaster.sparkContextRef.get() == null && count < 
numTries) {
           logInfo("Waiting for spark context initialization ... " + count)
           count = count + 1
@@ -294,7 +294,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, 
conf: Configuration) e
 
       // we want to be reasonably responsive without causing too many requests 
to RM.
       val schedulerInterval =
-        System.getProperty("spark.yarn.scheduler.heartbeat.interval-ms", 
"5000").toLong
+        conf.getOrElse("spark.yarn.scheduler.heartbeat.interval-ms",  
"5000").toLong
 
       // must be <= timeoutInterval / 2.
       val interval = math.min(timeoutInterval / 2, schedulerInterval)
@@ -377,7 +377,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, 
conf: Configuration) e
   private def cleanupStagingDir() { 
     var stagingDirPath: Path = null
     try {
-      val preserveFiles = 
System.getProperty("spark.yarn.preserve.staging.files", "false").toBoolean
+      val preserveFiles = conf.getOrElse("spark.yarn.preserve.staging.files",  
"false").toBoolean
       if (!preserveFiles) {
         stagingDirPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR"))
         if (stagingDirPath == null) {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 79dd038..29892e9 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -230,7 +230,7 @@ class Client(conf: Configuration, args: ClientArguments) 
extends YarnClientImpl
       }
     }
     val dst = new Path(fs.getHomeDirectory(), appStagingDir)
-    val replication = System.getProperty("spark.yarn.submit.file.replication", 
"3").toShort
+    val replication = conf.getOrElse("spark.yarn.submit.file.replication",  
"3").toShort
 
     if (UserGroupInformation.isSecurityEnabled()) {
       val dstFs = dst.getFileSystem(conf)
@@ -461,7 +461,7 @@ object Client {
   def main(argStrings: Array[String]) {
     // Set an env variable indicating we are running in YARN mode.
     // Note that anything with SPARK prefix gets propagated to all (remote) 
processes
-    System.setProperty("SPARK_YARN_MODE", "true")
+    conf.set("SPARK_YARN_MODE",  "true")
 
     val args = new ClientArguments(argStrings)
 
@@ -483,7 +483,7 @@ object Client {
         Path.SEPARATOR + LOG4J_PROP)
     }
     // Normally the users app.jar is last in case conflicts with spark jars
-    val userClasspathFirst = 
System.getProperty("spark.yarn.user.classpath.first", "false")
+    val userClasspathFirst = conf.getOrElse("spark.yarn.user.classpath.first", 
 "false")
       .toBoolean
     if (userClasspathFirst) {
       Apps.addToEnvironment(env, Environment.CLASSPATH.name, 
Environment.PWD.$() + 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index b3a7886..617289f 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -33,7 +33,7 @@ class ClientArguments(val args: Array[String]) {
   var workerMemory = 1024
   var workerCores = 1
   var numWorkers = 2
-  var amQueue = System.getProperty("QUEUE", "default")
+  var amQueue = conf.getOrElse("QUEUE",  "default")
   var amMemory: Int = 512
   var amClass: String = "org.apache.spark.deploy.yarn.ApplicationMaster"
   var appName: String = "Spark"

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
index 6903884..c1e79cb 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
@@ -162,8 +162,8 @@ class WorkerLauncher(args: ApplicationMasterArguments, 
conf: Configuration) exte
         Thread.sleep(100)
       }
     }
-    System.setProperty("spark.driver.host", driverHost)
-    System.setProperty("spark.driver.port", driverPort.toString)
+    conf.set("spark.driver.host",  driverHost)
+    conf.set("spark.driver.port",  driverPort.toString)
 
     val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
       driverHost, driverPort.toString, 
CoarseGrainedSchedulerBackend.ACTOR_NAME)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index 9ab2073..4c9fee5 100644
--- 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -239,7 +239,7 @@ private[yarn] class YarnAllocationHandler(
           // (workerIdCounter)
           val workerId = workerIdCounter.incrementAndGet().toString
           val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
-            System.getProperty("spark.driver.host"), 
System.getProperty("spark.driver.port"),
+            conf.get("spark.driver.host"), conf.get("spark.driver.port"),
             CoarseGrainedSchedulerBackend.ACTOR_NAME)
 
           logInfo("launching container on " + containerId + " host " + 
workerHostname)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
 
b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index b206780..6feaaff 100644
--- 
a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ 
b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -52,8 +52,8 @@ private[spark] class YarnClientSchedulerBackend(
     if (workerNumber == null)
       workerNumber = defaultWorkerNumber
 
-    val driverHost = System.getProperty("spark.driver.host")
-    val driverPort = System.getProperty("spark.driver.port")
+    val driverHost = conf.get("spark.driver.host")
+    val driverPort = conf.get("spark.driver.port")
     val hostport = driverHost + ":" + driverPort
 
     val argsArray = Array[String](

Reply via email to