Repository: spark Updated Branches: refs/heads/branch-1.2 23bf3071f -> 30789f6ef
[SPARK-8062] Fix NullPointerException in SparkHadoopUtil.getFileSystemThreadStatistics (branch-1.2) This patch adds a regression test for an extremely rare bug where `SparkHadoopUtil.getFileSystemThreadStatistics` would fail with a `NullPointerException` if the Hadoop `FileSystem.statisticsTable` contained a `Statistics` entry without a schema. I'm not sure exactly how Hadoop gets into such a state, but this patch's regression test forces that state in order to reproduce this bug. The fix is to add additional null-checking. I debated adding an additional try-catch block around this entire metrics code to just ignore exceptions and keep going in the case of errors, but decided against that approach for now because it seemed overly conservative and might mask other bugs. We can revisit this in followup patches. Author: Josh Rosen <joshro...@databricks.com> Closes #6618 from JoshRosen/SPARK-8062-branch-1.2 and squashes the following commits: 652fa3c [Josh Rosen] Re-name test and reapply fix 66fc600 [Josh Rosen] Fix and minimize regression test (verified that it still fails) 1d8d125 [Josh Rosen] Fix SPARK-8062 with additional null checks b6430f0 [Josh Rosen] Add failing regression test for SPARK-8062 Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/30789f6e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/30789f6e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/30789f6e Branch: refs/heads/branch-1.2 Commit: 30789f6ef6382e78f3109823db261557f1fbab10 Parents: 23bf307 Author: Josh Rosen <joshro...@databricks.com> Authored: Mon Jun 8 10:51:25 2015 -0700 Committer: Josh Rosen <joshro...@databricks.com> Committed: Mon Jun 8 10:51:25 2015 -0700 ---------------------------------------------------------------------- .../apache/spark/deploy/SparkHadoopUtil.scala | 9 +++++++-- .../spark/metrics/InputOutputMetricsSuite.scala | 20 ++++++++++++++++++++ 2 files changed, 27 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/30789f6e/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 57f9faf..ee725be 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -175,8 +175,13 @@ class SparkHadoopUtil extends Logging { private def getFileSystemThreadStatistics(path: Path, conf: Configuration): Seq[AnyRef] = { val qualifiedPath = path.getFileSystem(conf).makeQualified(path) val scheme = qualifiedPath.toUri().getScheme() - val stats = FileSystem.getAllStatistics().filter(_.getScheme().equals(scheme)) - stats.map(Utils.invoke(classOf[Statistics], _, "getThreadStatistics")) + if (scheme == null) { + Seq.empty + } else { + FileSystem.getAllStatistics + .filter { stats => scheme.equals(stats.getScheme()) } + .map(Utils.invoke(classOf[Statistics], _, "getThreadStatistics")) + } } private def getFileSystemThreadStatisticsMethod(methodName: String): Method = { http://git-wip-us.apache.org/repos/asf/spark/blob/30789f6e/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala index ca226fd..1b17c3a 100644 --- a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala +++ b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala @@ -22,6 +22,7 @@ import java.io.{FileWriter, PrintWriter, File} import org.apache.spark.SharedSparkContext import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.scheduler.{SparkListenerTaskEnd, SparkListener} +import org.apache.spark.util.Utils import org.scalatest.FunSuite import org.scalatest.matchers.ShouldMatchers @@ -106,4 +107,23 @@ class InputOutputMetricsSuite extends FunSuite with SharedSparkContext with Shou } } } + + test("getFileSystemThreadStatistics should guard against null schemes (SPARK-8062)") { + val tempDir = Utils.createTempDir() + val outPath = new File(tempDir, "outfile") + + // Intentionally call this method with a null scheme, which will store an entry for a FileSystem + // with a null scheme into Hadoop's global `FileSystem.statisticsTable`. + FileSystem.getStatistics(null, classOf[FileSystem]) + + // Prior to fixing SPARK-8062, this would fail with a NullPointerException in + // SparkHadoopUtil.getFileSystemThreadStatistics + try { + val rdd = sc.parallelize(Array("a", "b", "c", "d"), 2) + rdd.saveAsTextFile(outPath.toString) + sc.textFile(outPath.toString).count() + } finally { + Utils.deleteRecursively(tempDir) + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org