Repository: spark
Updated Branches:
  refs/heads/branch-1.2 23bf3071f -> 30789f6ef


[SPARK-8062] Fix NullPointerException in 
SparkHadoopUtil.getFileSystemThreadStatistics (branch-1.2)

This patch adds a regression test for an extremely rare bug where 
`SparkHadoopUtil.getFileSystemThreadStatistics` would fail with a 
`NullPointerException` if the Hadoop `FileSystem.statisticsTable` contained a 
`Statistics` entry without a schema.  I'm not sure exactly how Hadoop gets into 
such a state, but this patch's regression test forces that state in order to 
reproduce this bug.

The fix is to add additional null-checking.  I debated adding an additional 
try-catch block around this entire metrics code to just ignore exceptions and 
keep going in the case of errors, but decided against that approach for now 
because it seemed overly conservative and might mask other bugs. We can revisit 
this in followup patches.

Author: Josh Rosen <joshro...@databricks.com>

Closes #6618 from JoshRosen/SPARK-8062-branch-1.2 and squashes the following 
commits:

652fa3c [Josh Rosen] Re-name test and reapply fix
66fc600 [Josh Rosen] Fix and minimize regression test (verified that it still 
fails)
1d8d125 [Josh Rosen] Fix SPARK-8062 with additional null checks
b6430f0 [Josh Rosen] Add failing regression test for SPARK-8062


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/30789f6e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/30789f6e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/30789f6e

Branch: refs/heads/branch-1.2
Commit: 30789f6ef6382e78f3109823db261557f1fbab10
Parents: 23bf307
Author: Josh Rosen <joshro...@databricks.com>
Authored: Mon Jun 8 10:51:25 2015 -0700
Committer: Josh Rosen <joshro...@databricks.com>
Committed: Mon Jun 8 10:51:25 2015 -0700

----------------------------------------------------------------------
 .../apache/spark/deploy/SparkHadoopUtil.scala   |  9 +++++++--
 .../spark/metrics/InputOutputMetricsSuite.scala | 20 ++++++++++++++++++++
 2 files changed, 27 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/30789f6e/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index 57f9faf..ee725be 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -175,8 +175,13 @@ class SparkHadoopUtil extends Logging {
   private def getFileSystemThreadStatistics(path: Path, conf: Configuration): 
Seq[AnyRef] = {
     val qualifiedPath = path.getFileSystem(conf).makeQualified(path)
     val scheme = qualifiedPath.toUri().getScheme()
-    val stats = 
FileSystem.getAllStatistics().filter(_.getScheme().equals(scheme))
-    stats.map(Utils.invoke(classOf[Statistics], _, "getThreadStatistics"))
+    if (scheme == null) {
+      Seq.empty
+    } else {
+      FileSystem.getAllStatistics
+        .filter { stats => scheme.equals(stats.getScheme()) }
+        .map(Utils.invoke(classOf[Statistics], _, "getThreadStatistics"))
+    }
   }
 
   private def getFileSystemThreadStatisticsMethod(methodName: String): Method 
= {

http://git-wip-us.apache.org/repos/asf/spark/blob/30789f6e/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala 
b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
index ca226fd..1b17c3a 100644
--- a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
@@ -22,6 +22,7 @@ import java.io.{FileWriter, PrintWriter, File}
 import org.apache.spark.SharedSparkContext
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.scheduler.{SparkListenerTaskEnd, SparkListener}
+import org.apache.spark.util.Utils
 
 import org.scalatest.FunSuite
 import org.scalatest.matchers.ShouldMatchers
@@ -106,4 +107,23 @@ class InputOutputMetricsSuite extends FunSuite with 
SharedSparkContext with Shou
       }
     }
   }
+
+  test("getFileSystemThreadStatistics should guard against null schemes 
(SPARK-8062)") {
+    val tempDir = Utils.createTempDir()
+    val outPath = new File(tempDir, "outfile")
+
+    // Intentionally call this method with a null scheme, which will store an 
entry for a FileSystem
+    // with a null scheme into Hadoop's global `FileSystem.statisticsTable`.
+    FileSystem.getStatistics(null, classOf[FileSystem])
+
+    // Prior to fixing SPARK-8062, this would fail with a NullPointerException 
in
+    // SparkHadoopUtil.getFileSystemThreadStatistics
+    try {
+      val rdd = sc.parallelize(Array("a", "b", "c", "d"), 2)
+      rdd.saveAsTextFile(outPath.toString)
+      sc.textFile(outPath.toString).count()
+    } finally {
+      Utils.deleteRecursively(tempDir)
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to