Repository: spark
Updated Branches:
  refs/heads/master f6773edce -> 3a35a0dfe


[SPARK-6144] [core] Fix addFile when source files are on "hdfs:"

The code failed in two modes: it complained when it tried to re-create a 
directory that already existed, and it was placing some files in the wrong 
parent directory. The patch fixes both issues.

Author: Marcelo Vanzin <van...@cloudera.com>
Author: trystanleftwich <trys...@atscale.com>

Closes #4894 from vanzin/SPARK-6144 and squashes the following commits:

100b3a1 [Marcelo Vanzin] Style fix.
58266aa [Marcelo Vanzin] Fix fetchHcfs file for directories.
91733b7 [trystanleftwich] [SPARK-6144]When in cluster mode using ADD JAR with a 
hdfs:// sourced jar will fail


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3a35a0df
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3a35a0df
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3a35a0df

Branch: refs/heads/master
Commit: 3a35a0dfe940843c3f3a5f51acfe24def488faa9
Parents: f6773ed
Author: Marcelo Vanzin <van...@cloudera.com>
Authored: Wed Mar 4 12:58:39 2015 -0800
Committer: Andrew Or <and...@databricks.com>
Committed: Wed Mar 4 12:58:39 2015 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/util/Utils.scala     | 28 ++++---
 .../org/apache/spark/util/UtilsSuite.scala      | 85 +++++++++++---------
 2 files changed, 63 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3a35a0df/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 4644088..d3dc1d0 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -624,7 +624,8 @@ private[spark] object Utils extends Logging {
       case _ =>
         val fs = getHadoopFileSystem(uri, hadoopConf)
         val path = new Path(uri)
-        fetchHcfsFile(path, new File(targetDir, path.getName), fs, conf, 
hadoopConf, fileOverwrite)
+        fetchHcfsFile(path, targetDir, fs, conf, hadoopConf, fileOverwrite,
+                      filename = Some(filename))
     }
   }
 
@@ -639,19 +640,22 @@ private[spark] object Utils extends Logging {
       fs: FileSystem,
       conf: SparkConf,
       hadoopConf: Configuration,
-      fileOverwrite: Boolean): Unit = {
-    if (!targetDir.mkdir()) {
+      fileOverwrite: Boolean,
+      filename: Option[String] = None): Unit = {
+    if (!targetDir.exists() && !targetDir.mkdir()) {
       throw new IOException(s"Failed to create directory ${targetDir.getPath}")
     }
-    fs.listStatus(path).foreach { fileStatus =>
-      val innerPath = fileStatus.getPath
-      if (fileStatus.isDir) {
-        fetchHcfsFile(innerPath, new File(targetDir, innerPath.getName), fs, 
conf, hadoopConf,
-          fileOverwrite)
-      } else {
-        val in = fs.open(innerPath)
-        val targetFile = new File(targetDir, innerPath.getName)
-        downloadFile(innerPath.toString, in, targetFile, fileOverwrite)
+    val dest = new File(targetDir, filename.getOrElse(path.getName))
+    if (fs.isFile(path)) {
+      val in = fs.open(path)
+      try {
+        downloadFile(path.toString, in, dest, fileOverwrite)
+      } finally {
+        in.close()
+      }
+    } else {
+      fs.listStatus(path).foreach { fileStatus =>
+        fetchHcfsFile(fileStatus.getPath(), dest, fs, conf, hadoopConf, 
fileOverwrite)
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/3a35a0df/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala 
b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index fe2b644..fd77753 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -208,18 +208,18 @@ class UtilsSuite extends FunSuite with 
ResetSystemProperties {
     child1.setLastModified(System.currentTimeMillis() - (1000 * 30))
 
     // although child1 is old, child2 is still new so return true
-    assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5)) 
+    assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5))
 
     child2.setLastModified(System.currentTimeMillis - (1000 * 30))
-    assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5)) 
+    assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5))
 
     parent.setLastModified(System.currentTimeMillis - (1000 * 30))
     // although parent and its immediate children are new, child3 is still old
     // we expect a full recursive search for new files.
-    assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5)) 
+    assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5))
 
     child3.setLastModified(System.currentTimeMillis - (1000 * 30))
-    assert(!Utils.doesDirectoryContainAnyNewFiles(parent, 5)) 
+    assert(!Utils.doesDirectoryContainAnyNewFiles(parent, 5))
   }
 
   test("resolveURI") {
@@ -339,21 +339,21 @@ class UtilsSuite extends FunSuite with 
ResetSystemProperties {
     assert(!tempDir1.exists())
 
     val tempDir2 = Utils.createTempDir()
-    val tempFile1 = new File(tempDir2, "foo.txt")
-    Files.touch(tempFile1)
-    assert(tempFile1.exists())
-    Utils.deleteRecursively(tempFile1)
-    assert(!tempFile1.exists())
+    val sourceFile1 = new File(tempDir2, "foo.txt")
+    Files.touch(sourceFile1)
+    assert(sourceFile1.exists())
+    Utils.deleteRecursively(sourceFile1)
+    assert(!sourceFile1.exists())
 
     val tempDir3 = new File(tempDir2, "subdir")
     assert(tempDir3.mkdir())
-    val tempFile2 = new File(tempDir3, "bar.txt")
-    Files.touch(tempFile2)
-    assert(tempFile2.exists())
+    val sourceFile2 = new File(tempDir3, "bar.txt")
+    Files.touch(sourceFile2)
+    assert(sourceFile2.exists())
     Utils.deleteRecursively(tempDir2)
     assert(!tempDir2.exists())
     assert(!tempDir3.exists())
-    assert(!tempFile2.exists())
+    assert(!sourceFile2.exists())
   }
 
   test("loading properties from file") {
@@ -386,30 +386,39 @@ class UtilsSuite extends FunSuite with 
ResetSystemProperties {
   }
 
   test("fetch hcfs dir") {
-    val tempDir = Utils.createTempDir()
-    val innerTempDir = Utils.createTempDir(tempDir.getPath)
-    val tempFile = File.createTempFile("someprefix", "somesuffix", 
innerTempDir)
-    val targetDir = new File("target-dir")
-    Files.write("some text", tempFile, UTF_8)
-
-    try {
-      val path = new Path("file://" + tempDir.getAbsolutePath)
-      val conf = new Configuration()
-      val fs = Utils.getHadoopFileSystem(path.toString, conf)
-      Utils.fetchHcfsFile(path, targetDir, fs, new SparkConf(), conf, false)
-      assert(targetDir.exists())
-      assert(targetDir.isDirectory())
-      val newInnerDir = new File(targetDir, innerTempDir.getName)
-      println("inner temp dir: " + innerTempDir.getName)
-      targetDir.listFiles().map(_.getName).foreach(println)
-      assert(newInnerDir.exists())
-      assert(newInnerDir.isDirectory())
-      val newInnerFile = new File(newInnerDir, tempFile.getName)
-      assert(newInnerFile.exists())
-      assert(newInnerFile.isFile())
-    } finally {
-      Utils.deleteRecursively(tempDir)
-      Utils.deleteRecursively(targetDir)
-    }
+    val sourceDir = Utils.createTempDir()
+    val innerSourceDir = Utils.createTempDir(root=sourceDir.getPath)
+    val sourceFile = File.createTempFile("someprefix", "somesuffix", 
innerSourceDir)
+    val targetDir = new File(Utils.createTempDir(), "target-dir")
+    Files.write("some text", sourceFile, UTF_8)
+
+    val path = new Path("file://" + sourceDir.getAbsolutePath)
+    val conf = new Configuration()
+    val fs = Utils.getHadoopFileSystem(path.toString, conf)
+
+    assert(!targetDir.isDirectory())
+    Utils.fetchHcfsFile(path, targetDir, fs, new SparkConf(), conf, false)
+    assert(targetDir.isDirectory())
+
+    // Copy again to make sure it doesn't error if the dir already exists.
+    Utils.fetchHcfsFile(path, targetDir, fs, new SparkConf(), conf, false)
+
+    val destDir = new File(targetDir, sourceDir.getName())
+    assert(destDir.isDirectory())
+
+    val destInnerDir = new File(destDir, innerSourceDir.getName)
+    assert(destInnerDir.isDirectory())
+
+    val destInnerFile = new File(destInnerDir, sourceFile.getName)
+    assert(destInnerFile.isFile())
+
+    val filePath = new Path("file://" + sourceFile.getAbsolutePath)
+    val testFileDir = new File("test-filename")
+    val testFileName = "testFName"
+    val testFilefs = Utils.getHadoopFileSystem(filePath.toString, conf)
+    Utils.fetchHcfsFile(filePath, testFileDir, testFilefs, new SparkConf(),
+                        conf, false, Some(testFileName))
+    val newFileName = new File(testFileDir, testFileName)
+    assert(newFileName.isFile())
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to