Josh Rosen created SPARK-8062:
---------------------------------

             Summary: NullPointerException in 
SparkHadoopUtil.getFileSystemThreadStatistics
                 Key: SPARK-8062
                 URL: https://issues.apache.org/jira/browse/SPARK-8062
             Project: Spark
          Issue Type: Bug
          Components: Spark Core
    Affects Versions: 1.2.1
         Environment: MapR 4.0.1, Hadoop 2.4.1, Yarn
            Reporter: Josh Rosen


I received the following error report from a user:

While running a Spark Streaming job that reads from MapRfs and writes to HBase 
using Spark 1.2.1, the job intermittently experiences a total job failure due 
to the following errors:

{code}
15/05/28 10:35:50 ERROR executor.Executor: Exception in task 1.1 in stage 6.0 
(TID 24) 
java.lang.NullPointerException 
at 
org.apache.spark.deploy.SparkHadoopUtil$$anonfun$4.apply(SparkHadoopUtil.scala:178)
 
at 
org.apache.spark.deploy.SparkHadoopUtil$$anonfun$4.apply(SparkHadoopUtil.scala:178)
 
at 
scala.collection.TraversableLike$$anonfun$filter$1.apply(TraversableLike.scala:264)
 
at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) 
at scala.collection.AbstractIterable.foreach(Iterable.scala:54) 
at scala.collection.TraversableLike$class.filter(TraversableLike.scala:263) 
at scala.collection.AbstractTraversable.filter(Traversable.scala:105) 
at 
org.apache.spark.deploy.SparkHadoopUtil.getFileSystemThreadStatistics(SparkHadoopUtil.scala:178)
 
at 
org.apache.spark.deploy.SparkHadoopUtil.getFSBytesReadOnThreadCallback(SparkHadoopUtil.scala:139)
 
at org.apache.spark.rdd.NewHadoopRDD$$anon$1.<init>(NewHadoopRDD.scala:116) 
at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:107) 
at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:69) 
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) 
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) 
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) 
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) 
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) 
at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33) 
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) 
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) 
at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34) 
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) 
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) 
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) 
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 
at org.apache.spark.scheduler.Task.run(Task.scala:56) 
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) 
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
at java.lang.Thread.run(Thread.java:744) 
15/05/28 10:35:50 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 
25 
15/05/28 10:35:50 INFO executor.Executor: Running task 2.1 in stage 6.0 (TID 
25) 
15/05/28 10:35:50 INFO rdd.NewHadoopRDD: Input split: hdfs:/[REDACTED] 
15/05/28 10:35:50 ERROR executor.Executor: Exception in task 2.1 in stage 6.0 
(TID 25) 
java.lang.NullPointerException 
at 
org.apache.spark.deploy.SparkHadoopUtil$$anonfun$4.apply(SparkHadoopUtil.scala:178)
 
at 
org.apache.spark.deploy.SparkHadoopUtil$$anonfun$4.apply(SparkHadoopUtil.scala:178)
 
at 
scala.collection.TraversableLike$$anonfun$filter$1.apply(TraversableLike.scala:264)
 
at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) 
{code}

Diving into the code here:

The NPE is occurring on this line of SparkHadoopUtil (in 1.2.1.): 
https://github.com/apache/spark/blob/v1.2.1/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala#L178

Here's that block of code from 1.2.1 (it's the same in 1.2.2):

{code}
  private def getFileSystemThreadStatistics(path: Path, conf: Configuration): 
Seq[AnyRef] = {
    val qualifiedPath = path.getFileSystem(conf).makeQualified(path)
    val scheme = qualifiedPath.toUri().getScheme()
    val stats = 
FileSystem.getAllStatistics().filter(_.getScheme().equals(scheme))   // <--- 
exception occurs at this line
    stats.map(Utils.invoke(classOf[Statistics], _, "getThreadStatistics"))
  }
{code}

Since the top call on the stack was 
{{org.apache.spark.deploy.SparkHadoopUtil$$anonfun$4}}, I'm assuming that the 
_.getScheme().equals(scheme) call here is failing because 
FileSystem.getAllStatistics() is returning a collection that has a null element 
or that _.getScheme() is null.

Diving into the Hadoop source, it looks like FileSystem.getAllStatistics() 
accesses some synchronized static state to return statistics for all Hadoop 
filesystems created within the JVM. I wonder if it's possible that some code is 
nondeterministically creating a new FIleSystem instance for a FileSystem that 
lacks a scheme, causing entires to be stored in the statistics map that will 
return null when we call getScheme() on them.

I am unable to reproduce this issue myself, but I think that we can fix it for 
the user by adding try-catch blocks to prevent errors in metrics collection 
from leading to task failures.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to