Repository: spark
Updated Branches:
  refs/heads/master 51b306b93 -> 320bca450


[SPARK-6081] Support fetching http/https uris in driver runner.

Currently if passed uris such as http/https, it won't able to fetch them as it 
only calls HadoopFs get.
This fix utilizes the existing util method to fetch remote uris as well.

Author: Timothy Chen <tnac...@gmail.com>

Closes #4832 from tnachen/driver_remote and squashes the following commits:

aa52cd6 [Timothy Chen] Support fetching remote uris in driver runner.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/320bca45
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/320bca45
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/320bca45

Branch: refs/heads/master
Commit: 320bca4508e890b874c2eb7abb76a30ef14c932f
Parents: 51b306b
Author: Timothy Chen <tnac...@gmail.com>
Authored: Tue Apr 14 11:48:12 2015 -0700
Committer: Andrew Or <and...@databricks.com>
Committed: Tue Apr 14 11:49:04 2015 -0700

----------------------------------------------------------------------
 .../spark/deploy/worker/DriverRunner.scala      | 21 ++++++++++++--------
 .../org/apache/spark/deploy/worker/Worker.scala |  3 ++-
 .../apache/spark/deploy/JsonProtocolSuite.scala |  7 ++++---
 .../spark/deploy/worker/DriverRunnerTest.scala  |  7 ++++---
 4 files changed, 23 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/320bca45/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
index e0948e1..ef7a703 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
@@ -24,14 +24,14 @@ import scala.collection.JavaConversions._
 import akka.actor.ActorRef
 import com.google.common.base.Charsets.UTF_8
 import com.google.common.io.Files
-import org.apache.hadoop.fs.{FileUtil, Path}
+import org.apache.hadoop.fs.Path
 
-import org.apache.spark.{Logging, SparkConf}
+import org.apache.spark.{Logging, SparkConf, SecurityManager}
 import org.apache.spark.deploy.{DriverDescription, SparkHadoopUtil}
 import org.apache.spark.deploy.DeployMessages.DriverStateChanged
 import org.apache.spark.deploy.master.DriverState
 import org.apache.spark.deploy.master.DriverState.DriverState
-import org.apache.spark.util.{Clock, SystemClock}
+import org.apache.spark.util.{Utils, Clock, SystemClock}
 
 /**
  * Manages the execution of one driver, including automatically restarting the 
driver on failure.
@@ -44,7 +44,8 @@ private[deploy] class DriverRunner(
     val sparkHome: File,
     val driverDesc: DriverDescription,
     val worker: ActorRef,
-    val workerUrl: String)
+    val workerUrl: String,
+    val securityManager: SecurityManager)
   extends Logging {
 
   @volatile private var process: Option[Process] = None
@@ -136,12 +137,9 @@ private[deploy] class DriverRunner(
    * Will throw an exception if there are errors downloading the jar.
    */
   private def downloadUserJar(driverDir: File): String = {
-
     val jarPath = new Path(driverDesc.jarUrl)
 
     val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
-    val jarFileSystem = jarPath.getFileSystem(hadoopConf)
-
     val destPath = new File(driverDir.getAbsolutePath, jarPath.getName)
     val jarFileName = jarPath.getName
     val localJarFile = new File(driverDir, jarFileName)
@@ -149,7 +147,14 @@ private[deploy] class DriverRunner(
 
     if (!localJarFile.exists()) { // May already exist if running multiple 
workers on one node
       logInfo(s"Copying user jar $jarPath to $destPath")
-      FileUtil.copy(jarFileSystem, jarPath, destPath, false, hadoopConf)
+      Utils.fetchFile(
+        driverDesc.jarUrl,
+        driverDir,
+        conf,
+        securityManager,
+        hadoopConf,
+        System.currentTimeMillis(),
+        useCache = false)
     }
 
     if (!localJarFile.exists()) { // Verify copy succeeded

http://git-wip-us.apache.org/repos/asf/spark/blob/320bca45/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index c4c24a7..3ee2eb6 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -436,7 +436,8 @@ private[worker] class Worker(
         sparkHome,
         driverDesc.copy(command = 
Worker.maybeUpdateSSLSettings(driverDesc.command, conf)),
         self,
-        akkaUrl)
+        akkaUrl,
+        securityMgr)
       drivers(driverId) = driver
       driver.start()
 

http://git-wip-us.apache.org/repos/asf/spark/blob/320bca45/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
index 2071701..b58d625 100644
--- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
@@ -28,7 +28,7 @@ import org.scalatest.FunSuite
 import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, 
WorkerStateResponse}
 import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, 
RecoveryState, WorkerInfo}
 import org.apache.spark.deploy.worker.{DriverRunner, ExecutorRunner}
-import org.apache.spark.SparkConf
+import org.apache.spark.{SecurityManager, SparkConf}
 
 class JsonProtocolSuite extends FunSuite {
 
@@ -124,8 +124,9 @@ class JsonProtocolSuite extends FunSuite {
   }
 
   def createDriverRunner(): DriverRunner = {
-    new DriverRunner(new SparkConf(), "driverId", new File("workDir"), new 
File("sparkHome"),
-      createDriverDesc(), null, "akka://worker")
+    val conf = new SparkConf()
+    new DriverRunner(conf, "driverId", new File("workDir"), new 
File("sparkHome"),
+      createDriverDesc(), null, "akka://worker", new SecurityManager(conf))
   }
 
   def assertValidJson(json: JValue) {

http://git-wip-us.apache.org/repos/asf/spark/blob/320bca45/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala 
b/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala
index aa6e487..2159fd8 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala
@@ -25,7 +25,7 @@ import org.mockito.invocation.InvocationOnMock
 import org.mockito.stubbing.Answer
 import org.scalatest.FunSuite
 
-import org.apache.spark.SparkConf
+import org.apache.spark.{SecurityManager, SparkConf}
 import org.apache.spark.deploy.{Command, DriverDescription}
 import org.apache.spark.util.Clock
 
@@ -33,8 +33,9 @@ class DriverRunnerTest extends FunSuite {
   private def createDriverRunner() = {
     val command = new Command("mainClass", Seq(), Map(), Seq(), Seq(), Seq())
     val driverDescription = new DriverDescription("jarUrl", 512, 1, true, 
command)
-    new DriverRunner(new SparkConf(), "driverId", new File("workDir"), new 
File("sparkHome"),
-      driverDescription, null, "akka://1.2.3.4/worker/")
+    val conf = new SparkConf()
+    new DriverRunner(conf, "driverId", new File("workDir"), new 
File("sparkHome"),
+      driverDescription, null, "akka://1.2.3.4/worker/", new 
SecurityManager(conf))
   }
 
   private def createProcessBuilderAndProcess(): (ProcessBuilderLike, Process) 
= {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to