[jira] [Issue Comment Deleted] (SPARK-8062) NullPointerException in SparkHadoopUtil.getFileSystemThreadStatistics

2015-06-08 Thread Josh Rosen (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-8062?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Josh Rosen updated SPARK-8062:
--
Comment: was deleted

(was: While working to try to reproduce this bug, I noticed something rather 
curious:

In {{InputOutputMetricsSuite}}, the output metrics tests are guarded by {{if}} 
statements that check whether the bytesWrittenOnThreadCallback is defined:

{code}
test("output metrics when writing text file") {
val fs = FileSystem.getLocal(new Configuration())
val outPath = new Path(fs.getWorkingDirectory, "outdir")

if (SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback(outPath, 
fs.getConf).isDefined) {
  // ... Body of test case ...
}
  }
{code}

AFAIK this test was introduced in order to prevent this test's assertions from 
failing under pre-Hadoop-2.5 versions of Hadoop.

Now, take a look at the regression test that I added to try to reproduce this 
bug:

{code}

  test("exceptions while getting IO thread statistics should not fail tasks / 
jobs (SPARK-8062)") {
FileSystem.getStatistics(null, classOf[FileSystem])


val fs = FileSystem.getLocal(new Configuration())
val outPath = new Path(fs.getWorkingDirectory, "outdir")
// This test passes unless the following line is commented out.  The 
following line therefore
// has some side-effects that are impacting the system under test:
SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback(outPath, 
fs.getConf).isDefined
val rdd = sc.parallelize(Array("a", "b", "c", "d"), 2)

try {
  rdd.saveAsTextFile(outPath.toString)
} finally {
  fs.delete(outPath, true)
}
  }
{code}

In this test, I try to pollute the global FileSystem statistics registry by 
storing a statistics entry for a filesystem with a null URI.  For this test, 
all I care about is Spark not crashing, so I didn't add the {{if}} check (I 
don't need to worry about the assertions failing on pre-Hadoop-2.5 versions 
here since there aren't any assertions that check the metrics for this test).

Surprisingly, though, my test was unable to fail until I added a 

{code}
SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback(outPath, 
fs.getConf).isDefined
{code}

check outside of an {{if}} statement.  This implies that this method has side 
effects which influence whether other metrics retrieval code is called.  I 
worry that this may imply that our other InputOutputMetrics code could be 
broken for real production jobs.  I'd like to investigate this and fix this 
issue, while also hardening this code: I think that we should be performing 
significantly more null checks for the input and output of Hadoop methods and 
should be using a pure function to determine whether our Hadoop version 
supports these metrics rather than calling a method that might have 
side-effects (I think we can do this purely via reflection without actually 
creating any objects / calling any methods).

Since this JIRA is somewhat time sensitive, though, I'm going to work on a 
patch just for the null checks here, then will open a followup to investigate 
further hardening of the input output metrics code.)

> NullPointerException in SparkHadoopUtil.getFileSystemThreadStatistics
> -
>
> Key: SPARK-8062
> URL: https://issues.apache.org/jira/browse/SPARK-8062
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.2.1
> Environment: MapR 4.0.1, Hadoop 2.4.1, Yarn
>Reporter: Josh Rosen
>Assignee: Josh Rosen
> Fix For: 1.2.3
>
>
> I received the following error report from a user:
> While running a Spark Streaming job that reads from MapRfs and writes to 
> HBase using Spark 1.2.1, the job intermittently experiences a total job 
> failure due to the following errors:
> {code}
> 15/05/28 10:35:50 ERROR executor.Executor: Exception in task 1.1 in stage 6.0 
> (TID 24) 
> java.lang.NullPointerException 
> at 
> org.apache.spark.deploy.SparkHadoopUtil$$anonfun$4.apply(SparkHadoopUtil.scala:178)
>  
> at 
> org.apache.spark.deploy.SparkHadoopUtil$$anonfun$4.apply(SparkHadoopUtil.scala:178)
>  
> at 
> scala.collection.TraversableLike$$anonfun$filter$1.apply(TraversableLike.scala:264)
>  
> at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) 
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54) 
> at scala.collection.TraversableLike$class.filter(TraversableLike.scala:263) 
> at scala.collection.AbstractTraversable.filter(Traversable.scala:105) 
> at 
> org.apache.spark.deploy.SparkHadoopUtil.getFileSystemThreadStatistics(SparkHadoopUtil.scala:178)
>  
> at 
> org.apache.spark.deploy.SparkHadoopUtil.getFSByte

[jira] [Issue Comment Deleted] (SPARK-8062) NullPointerException in SparkHadoopUtil.getFileSystemThreadStatistics

2015-06-08 Thread Josh Rosen (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-8062?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Josh Rosen updated SPARK-8062:
--
Comment: was deleted

(was: Alright, I've filed https://issues.apache.org/jira/browse/SPARK-8086 to 
follow up on hardening InputOutputMetricsSuite.)

> NullPointerException in SparkHadoopUtil.getFileSystemThreadStatistics
> -
>
> Key: SPARK-8062
> URL: https://issues.apache.org/jira/browse/SPARK-8062
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.2.1
> Environment: MapR 4.0.1, Hadoop 2.4.1, Yarn
>Reporter: Josh Rosen
>Assignee: Josh Rosen
> Fix For: 1.2.3
>
>
> I received the following error report from a user:
> While running a Spark Streaming job that reads from MapRfs and writes to 
> HBase using Spark 1.2.1, the job intermittently experiences a total job 
> failure due to the following errors:
> {code}
> 15/05/28 10:35:50 ERROR executor.Executor: Exception in task 1.1 in stage 6.0 
> (TID 24) 
> java.lang.NullPointerException 
> at 
> org.apache.spark.deploy.SparkHadoopUtil$$anonfun$4.apply(SparkHadoopUtil.scala:178)
>  
> at 
> org.apache.spark.deploy.SparkHadoopUtil$$anonfun$4.apply(SparkHadoopUtil.scala:178)
>  
> at 
> scala.collection.TraversableLike$$anonfun$filter$1.apply(TraversableLike.scala:264)
>  
> at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) 
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54) 
> at scala.collection.TraversableLike$class.filter(TraversableLike.scala:263) 
> at scala.collection.AbstractTraversable.filter(Traversable.scala:105) 
> at 
> org.apache.spark.deploy.SparkHadoopUtil.getFileSystemThreadStatistics(SparkHadoopUtil.scala:178)
>  
> at 
> org.apache.spark.deploy.SparkHadoopUtil.getFSBytesReadOnThreadCallback(SparkHadoopUtil.scala:139)
>  
> at org.apache.spark.rdd.NewHadoopRDD$$anon$1.(NewHadoopRDD.scala:116) 
> at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:107) 
> at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:69) 
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) 
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) 
> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) 
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) 
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) 
> at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33) 
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) 
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) 
> at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34) 
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) 
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) 
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) 
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 
> at org.apache.spark.scheduler.Task.run(Task.scala:56) 
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) 
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>  
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>  
> at java.lang.Thread.run(Thread.java:744) 
> 15/05/28 10:35:50 INFO executor.CoarseGrainedExecutorBackend: Got assigned 
> task 25 
> 15/05/28 10:35:50 INFO executor.Executor: Running task 2.1 in stage 6.0 (TID 
> 25) 
> 15/05/28 10:35:50 INFO rdd.NewHadoopRDD: Input split: hdfs:/[REDACTED] 
> 15/05/28 10:35:50 ERROR executor.Executor: Exception in task 2.1 in stage 6.0 
> (TID 25) 
> java.lang.NullPointerException 
> at 
> org.apache.spark.deploy.SparkHadoopUtil$$anonfun$4.apply(SparkHadoopUtil.scala:178)
>  
> at 
> org.apache.spark.deploy.SparkHadoopUtil$$anonfun$4.apply(SparkHadoopUtil.scala:178)
>  
> at 
> scala.collection.TraversableLike$$anonfun$filter$1.apply(TraversableLike.scala:264)
>  
> at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) 
> {code}
> Diving into the code here:
> The NPE is occurring on this line of SparkHadoopUtil (in 1.2.1.): 
> https://github.com/apache/spark/blob/v1.2.1/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala#L178
> Here's that block of code from 1.2.1 (it's the same in 1.2.2):
> {code}
>   private def getFileSystemThreadStatistics(path: Path, conf: Configuration): 
> Seq[AnyRef] = {
> val qualifiedPath = path.getFileSystem(conf).makeQualified(path)
> val scheme = qualified