Repository: spark
Updated Branches:
  refs/heads/master 4faa8a3ec -> 2bc1c9515


[SPARK-18895][TESTS] Fix resource-closing-related and path-related test 
failures in identified ones on Windows

## What changes were proposed in this pull request?

There are several tests failing due to resource-closing-related and 
path-related  problems on Windows as below.

- `RPackageUtilsSuite`:

```
- build an R package from a jar end to end *** FAILED *** (1 second, 625 
milliseconds)
  java.io.IOException: Unable to delete file: 
C:\projects\spark\target\tmp\1481729427517-0\a\dep2\d\dep2-d.jar
  at org.apache.commons.io.FileUtils.forceDelete(FileUtils.java:2279)
  at org.apache.commons.io.FileUtils.cleanDirectory(FileUtils.java:1653)
  at org.apache.commons.io.FileUtils.deleteDirectory(FileUtils.java:1535)

- faulty R package shows documentation *** FAILED *** (359 milliseconds)
  java.io.IOException: Unable to delete file: 
C:\projects\spark\target\tmp\1481729428970-0\dep1-c.jar
  at org.apache.commons.io.FileUtils.forceDelete(FileUtils.java:2279)
  at org.apache.commons.io.FileUtils.cleanDirectory(FileUtils.java:1653)
  at org.apache.commons.io.FileUtils.deleteDirectory(FileUtils.java:1535)

- SparkR zipping works properly *** FAILED *** (47 milliseconds)
  java.util.regex.PatternSyntaxException: Unknown character property name {r} 
near index 4

C:\projects\spark\target\tmp\1481729429282-0

    ^
  at java.util.regex.Pattern.error(Pattern.java:1955)
  at java.util.regex.Pattern.charPropertyNodeFor(Pattern.java:2781)
```

- `InputOutputMetricsSuite`:

```
- input metrics for old hadoop with coalesce *** FAILED *** (240 milliseconds)
  java.io.IOException: Not a file: file:/C:/projects/spark/core/ignored
  at 
org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:277)
  at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:202)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)

- input metrics with cache and coalesce *** FAILED *** (109 milliseconds)
  java.io.IOException: Not a file: file:/C:/projects/spark/core/ignored
  at 
org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:277)
  at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:202)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)

- input metrics for new Hadoop API with coalesce *** FAILED *** (0 milliseconds)
  java.lang.IllegalArgumentException: Wrong FS: 
file://C:\projects\spark\target\tmp\spark-9366ec94-dac7-4a5c-a74b-3e7594a692ab\test\InputOutputMetricsSuite.txt,
 expected: file:///
  at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:642)
  at org.apache.hadoop.fs.FileSystem.makeQualified(FileSystem.java:462)
  at 
org.apache.hadoop.fs.FilterFileSystem.makeQualified(FilterFileSystem.java:114)

- input metrics when reading text file *** FAILED *** (110 milliseconds)
  java.io.IOException: Not a file: file:/C:/projects/spark/core/ignored
  at 
org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:277)
  at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:202)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)

- input metrics on records read - simple *** FAILED *** (125 milliseconds)
  java.io.IOException: Not a file: file:/C:/projects/spark/core/ignored
  at 
org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:277)
  at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:202)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)

- input metrics on records read - more stages *** FAILED *** (110 milliseconds)
  java.io.IOException: Not a file: file:/C:/projects/spark/core/ignored
  at 
org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:277)
  at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:202)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)

- input metrics on records - New Hadoop API *** FAILED *** (16 milliseconds)
  java.lang.IllegalArgumentException: Wrong FS: 
file://C:\projects\spark\target\tmp\spark-3f10a1a4-7820-4772-b821-25fd7523bf6f\test\InputOutputMetricsSuite.txt,
 expected: file:///
  at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:642)
  at org.apache.hadoop.fs.FileSystem.makeQualified(FileSystem.java:462)
  at 
org.apache.hadoop.fs.FilterFileSystem.makeQualified(FilterFileSystem.java:114)

- input metrics on records read with cache *** FAILED *** (93 milliseconds)
  java.io.IOException: Not a file: file:/C:/projects/spark/core/ignored
  at 
org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:277)
  at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:202)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)

- input read/write and shuffle read/write metrics all line up *** FAILED *** 
(93 milliseconds)
  java.io.IOException: Not a file: file:/C:/projects/spark/core/ignored
  at 
org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:277)
  at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:202)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)

- input metrics with interleaved reads *** FAILED *** (0 milliseconds)
  java.lang.IllegalArgumentException: Wrong FS: 
file://C:\projects\spark\target\tmp\spark-2638d893-e89b-47ce-acd0-bbaeee78dd9b\InputOutputMetricsSuite_cart.txt,
 expected: file:///
  at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:642)
  at org.apache.hadoop.fs.FileSystem.makeQualified(FileSystem.java:462)
  at 
org.apache.hadoop.fs.FilterFileSystem.makeQualified(FilterFileSystem.java:114)

- input metrics with old CombineFileInputFormat *** FAILED *** (157 
milliseconds)
  17947 was not greater than or equal to 300000 
(InputOutputMetricsSuite.scala:324)
  org.scalatest.exceptions.TestFailedException:
  at 
org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:500)
  at org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1555)
  at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:466)

- input metrics with new CombineFileInputFormat *** FAILED *** (16 milliseconds)
  java.lang.IllegalArgumentException: Wrong FS: 
file://C:\projects\spark\target\tmp\spark-11920c08-19d8-4c7c-9fba-28ed72b79f80\test\InputOutputMetricsSuite.txt,
 expected: file:///
  at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:642)
  at org.apache.hadoop.fs.FileSystem.makeQualified(FileSystem.java:462)
  at 
org.apache.hadoop.fs.FilterFileSystem.makeQualified(FilterFileSystem.java:114)
```

- `ReplayListenerSuite`:

```
- End-to-end replay *** FAILED *** (121 milliseconds)
  java.io.IOException: No FileSystem for scheme: C
  at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2421)
  at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2428)

- End-to-end replay with compression *** FAILED *** (516 milliseconds)
  java.io.IOException: No FileSystem for scheme: C
  at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2421)
  at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2428)
  at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:88)
```

- `EventLoggingListenerSuite`:

```
- End-to-end event logging *** FAILED *** (7 seconds, 435 milliseconds)
  java.io.IOException: No FileSystem for scheme: C
  at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2421)
  at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2428)
  at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:88)

- End-to-end event logging with compression *** FAILED *** (1 second)
  java.io.IOException: No FileSystem for scheme: C
  at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2421)
  at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2428)
  at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:88)

- Event log name *** FAILED *** (16 milliseconds)
  "file:/[]base-dir/app1" did not equal "file:/[C:/]base-dir/app1" 
(EventLoggingListenerSuite.scala:123)
  org.scalatest.exceptions.TestFailedException:
  at 
org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:500)
  at org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1555)
  at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:466)
```

This PR proposes to fix the test failures on Windows

## How was this patch tested?

Manually tested via AppVeyor

**Before**

`RPackageUtilsSuite`: 
https://ci.appveyor.com/project/spark-test/spark/build/273-RPackageUtilsSuite-before
`InputOutputMetricsSuite`: 
https://ci.appveyor.com/project/spark-test/spark/build/272-InputOutputMetricsSuite-before
`ReplayListenerSuite`: 
https://ci.appveyor.com/project/spark-test/spark/build/274-ReplayListenerSuite-before
`EventLoggingListenerSuite`: 
https://ci.appveyor.com/project/spark-test/spark/build/275-EventLoggingListenerSuite-before

**After**

`RPackageUtilsSuite`: 
https://ci.appveyor.com/project/spark-test/spark/build/270-RPackageUtilsSuite
`InputOutputMetricsSuite`: 
https://ci.appveyor.com/project/spark-test/spark/build/271-InputOutputMetricsSuite
`ReplayListenerSuite`: 
https://ci.appveyor.com/project/spark-test/spark/build/277-ReplayListenerSuite-after
`EventLoggingListenerSuite`: 
https://ci.appveyor.com/project/spark-test/spark/build/278-EventLoggingListenerSuite-after

Author: hyukjinkwon <gurwls...@gmail.com>

Closes #16305 from HyukjinKwon/RPackageUtilsSuite-InputOutputMetricsSuite.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2bc1c951
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2bc1c951
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2bc1c951

Branch: refs/heads/master
Commit: 2bc1c95154d071d53c9ef2e9e404eaf50ceb4675
Parents: 4faa8a3
Author: hyukjinkwon <gurwls...@gmail.com>
Authored: Fri Dec 16 21:32:24 2016 -0800
Committer: Shivaram Venkataraman <shiva...@cs.berkeley.edu>
Committed: Fri Dec 16 21:32:24 2016 -0800

----------------------------------------------------------------------
 .../org/apache/spark/deploy/RPackageUtils.scala | 47 ++++++++++++--------
 .../spark/metrics/InputOutputMetricsSuite.scala |  6 +--
 .../scheduler/EventLoggingListenerSuite.scala   | 19 ++++----
 3 files changed, 41 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2bc1c951/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala 
b/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala
index 3d2cabc..050778a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala
@@ -176,26 +176,31 @@ private[deploy] object RPackageUtils extends Logging {
       val file = new File(Utils.resolveURI(jarPath))
       if (file.exists()) {
         val jar = new JarFile(file)
-        if (checkManifestForR(jar)) {
-          print(s"$file contains R source code. Now installing package.", 
printStream, Level.INFO)
-          val rSource = extractRFolder(jar, printStream, verbose)
-          if (RUtils.rPackages.isEmpty) {
-            RUtils.rPackages = Some(Utils.createTempDir().getAbsolutePath)
-          }
-          try {
-            if (!rPackageBuilder(rSource, printStream, verbose, 
RUtils.rPackages.get)) {
-              print(s"ERROR: Failed to build R package in $file.", printStream)
-              print(RJarDoc, printStream)
+        Utils.tryWithSafeFinally {
+          if (checkManifestForR(jar)) {
+            print(s"$file contains R source code. Now installing package.", 
printStream, Level.INFO)
+            val rSource = extractRFolder(jar, printStream, verbose)
+            if (RUtils.rPackages.isEmpty) {
+              RUtils.rPackages = Some(Utils.createTempDir().getAbsolutePath)
             }
-          } finally { // clean up
-            if (!rSource.delete()) {
-              logWarning(s"Error deleting ${rSource.getPath()}")
+            try {
+              if (!rPackageBuilder(rSource, printStream, verbose, 
RUtils.rPackages.get)) {
+                print(s"ERROR: Failed to build R package in $file.", 
printStream)
+                print(RJarDoc, printStream)
+              }
+            } finally {
+              // clean up
+              if (!rSource.delete()) {
+                logWarning(s"Error deleting ${rSource.getPath()}")
+              }
+            }
+          } else {
+            if (verbose) {
+              print(s"$file doesn't contain R source code, skipping...", 
printStream)
             }
           }
-        } else {
-          if (verbose) {
-            print(s"$file doesn't contain R source code, skipping...", 
printStream)
-          }
+        } {
+          jar.close()
         }
       } else {
         print(s"WARN: $file resolved as dependency, but not found.", 
printStream, Level.WARNING)
@@ -231,8 +236,12 @@ private[deploy] object RPackageUtils extends Logging {
     val zipOutputStream = new ZipOutputStream(new FileOutputStream(zipFile, 
false))
     try {
       filesToBundle.foreach { file =>
-        // get the relative paths for proper naming in the zip file
-        val relPath = file.getAbsolutePath.replaceFirst(dir.getAbsolutePath, 
"")
+        // Get the relative paths for proper naming in the ZIP file. Note that
+        // we convert dir to URI to force / and then remove trailing / that 
show up for
+        // directories because the separator should always be / for according 
to ZIP
+        // specification and therefore `relPath` here should be, for example,
+        // "/packageTest/def.R" or "/test.R".
+        val relPath = 
file.toURI.toString.replaceFirst(dir.toURI.toString.stripSuffix("/"), "")
         val fis = new FileInputStream(file)
         val zipEntry = new ZipEntry(relPath)
         zipOutputStream.putNextEntry(zipEntry)

http://git-wip-us.apache.org/repos/asf/spark/blob/2bc1c951/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala 
b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
index f8054f5..a73b300 100644
--- a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
@@ -61,7 +61,7 @@ class InputOutputMetricsSuite extends SparkFunSuite with 
SharedSparkContext
     pw.close()
 
     // Path to tmpFile
-    tmpFilePath = "file://" + tmpFile.getAbsolutePath
+    tmpFilePath = tmpFile.toURI.toString
   }
 
   after {
@@ -181,7 +181,7 @@ class InputOutputMetricsSuite extends SparkFunSuite with 
SharedSparkContext
     sc.textFile(tmpFilePath, 4)
       .map(key => (key, 1))
       .reduceByKey(_ + _)
-      .saveAsTextFile("file://" + tmpFile.getAbsolutePath)
+      .saveAsTextFile(tmpFile.toURI.toString)
 
     sc.listenerBus.waitUntilEmpty(500)
     assert(inputRead == numRecords)
@@ -197,7 +197,7 @@ class InputOutputMetricsSuite extends SparkFunSuite with 
SharedSparkContext
     val numPartitions = 2
     val cartVector = 0 to 9
     val cartFile = new File(tmpDir, getClass.getSimpleName + "_cart.txt")
-    val cartFilePath = "file://" + cartFile.getAbsolutePath
+    val cartFilePath = cartFile.toURI.toString
 
     // write files to disk so we can read them later.
     sc.parallelize(cartVector).saveAsTextFile(cartFilePath)

http://git-wip-us.apache.org/repos/asf/spark/blob/2bc1c951/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
index 230e2c3..4c3d0b1 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
@@ -119,19 +119,20 @@ class EventLoggingListenerSuite extends SparkFunSuite 
with LocalSparkContext wit
   }
 
   test("Event log name") {
+    val baseDirUri = Utils.resolveURI("/base-dir")
     // without compression
-    assert(s"file:/base-dir/app1" === EventLoggingListener.getLogPath(
-      Utils.resolveURI("/base-dir"), "app1", None))
+    assert(s"${baseDirUri.toString}/app1" === EventLoggingListener.getLogPath(
+      baseDirUri, "app1", None))
     // with compression
-    assert(s"file:/base-dir/app1.lzf" ===
-      EventLoggingListener.getLogPath(Utils.resolveURI("/base-dir"), "app1", 
None, Some("lzf")))
+    assert(s"${baseDirUri.toString}/app1.lzf" ===
+      EventLoggingListener.getLogPath(baseDirUri, "app1", None, Some("lzf")))
     // illegal characters in app ID
-    assert(s"file:/base-dir/a-fine-mind_dollar_bills__1" ===
-      EventLoggingListener.getLogPath(Utils.resolveURI("/base-dir"),
+    assert(s"${baseDirUri.toString}/a-fine-mind_dollar_bills__1" ===
+      EventLoggingListener.getLogPath(baseDirUri,
         "a fine:mind$dollar{bills}.1", None))
     // illegal characters in app ID with compression
-    assert(s"file:/base-dir/a-fine-mind_dollar_bills__1.lz4" ===
-      EventLoggingListener.getLogPath(Utils.resolveURI("/base-dir"),
+    assert(s"${baseDirUri.toString}/a-fine-mind_dollar_bills__1.lz4" ===
+      EventLoggingListener.getLogPath(baseDirUri,
         "a fine:mind$dollar{bills}.1", None, Some("lz4")))
   }
 
@@ -289,7 +290,7 @@ object EventLoggingListenerSuite {
     val conf = new SparkConf
     conf.set("spark.eventLog.enabled", "true")
     conf.set("spark.eventLog.testing", "true")
-    conf.set("spark.eventLog.dir", logDir.toString)
+    conf.set("spark.eventLog.dir", logDir.toUri.toString)
     compressionCodec.foreach { codec =>
       conf.set("spark.eventLog.compress", "true")
       conf.set("spark.io.compression.codec", codec)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to