That stack trace is quite similar to the one that is generated when trying
to do a collect within a closure.  In this case, it feels "wrong" to
collect in a closure, but I wonder what's reason behind the NPE.
Curious to know whether they are related.

Here's a very simple example:
rrd1.flatMap(x=> rrd2.collect.flatMap(y=> List(y,x)))
res7: org.apache.spark.rdd.RDD[Int] = FlatMappedRDD[10] at flatMap at
<console>:17

scala> res7.collect
14/06/13 01:11:48 INFO SparkContext: Starting job: collect at <console>:19
14/06/13 01:11:48 INFO DAGScheduler: Got job 2 (collect at <console>:19)
with 3 output partitions (allowLocal=false)
14/06/13 01:11:48 INFO DAGScheduler: Final stage: Stage 4(collect at
<console>:19)
14/06/13 01:11:48 INFO DAGScheduler: Parents of final stage: List()
14/06/13 01:11:48 INFO DAGScheduler: Missing parents: List()
14/06/13 01:11:48 INFO DAGScheduler: Submitting Stage 4 (FlatMappedRDD[10]
at flatMap at <console>:17), which has no missing parents
14/06/13 01:11:48 INFO DAGScheduler: Submitting 3 missing tasks from Stage
4 (FlatMappedRDD[10] at flatMap at <console>:17)
14/06/13 01:11:48 INFO TaskSchedulerImpl: Adding task set 4.0 with 3 tasks
14/06/13 01:11:48 INFO TaskSetManager: Starting task 4.0:0 as TID 16 on
executor localhost: localhost (PROCESS_LOCAL)
14/06/13 01:11:48 INFO TaskSetManager: Serialized task 4.0:0 as 1850 bytes
in 0 ms
14/06/13 01:11:48 INFO TaskSetManager: Starting task 4.0:1 as TID 17 on
executor localhost: localhost (PROCESS_LOCAL)
14/06/13 01:11:48 INFO TaskSetManager: Serialized task 4.0:1 as 1850 bytes
in 0 ms
14/06/13 01:11:48 INFO TaskSetManager: Starting task 4.0:2 as TID 18 on
executor localhost: localhost (PROCESS_LOCAL)
14/06/13 01:11:48 INFO TaskSetManager: Serialized task 4.0:2 as 1850 bytes
in 0 ms
14/06/13 01:11:48 INFO Executor: Running task ID 16
14/06/13 01:11:48 INFO Executor: Running task ID 17
14/06/13 01:11:48 INFO Executor: Running task ID 18
14/06/13 01:11:48 ERROR Executor: Exception in task ID 18
java.lang.NullPointerException
at org.apache.spark.rdd.RDD.collect(RDD.scala:728)
at $line45.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:17)
 at $line45.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:17)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
 at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
 at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
 at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
 at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:728)
 at org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:728)
at
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1079)
 at
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1079)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
 at org.apache.spark.scheduler.Task.run(Task.scala:51)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
 at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:744)
14/06/13 01:11:48 ERROR Executor: Exception in task ID 16
... same for each partition.

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1037)
 at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1021)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1019)
 at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1019)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:637)
 at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:637)
at scala.Option.foreach(Option.scala:236)
 at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:637)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1211)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
 at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

-kr, Gerard.




On Fri, Jun 13, 2014 at 12:32 AM, bdamos <a...@adobe.com> wrote:

> Hi, I'm consistently getting NullPointerExceptions when trying to use
> String val objects defined in my main application -- even for broadcast
> vals!
> I'm deploying on a standalone cluster with a master and 4 workers on the
> same machine, which is not the machine I'm submitting from.
>
> The following example works in spark-shell, but does not when
> submitted to the cluster with spark-submit, and also does not work locally.
>
> Is there anything I can do to fix this?
> Do vals need to be explicitly synchronized for RDD operations?
> One workaround in would be to inline the vals,
> but the logic in my actual application doesn't allow for this.
>
> Thanks,
> Brandon.
>
> ---
>
> sbt-shell --master <my-server>
>
>   val suffix = "-suffix"
>   val l = sc.parallelize(List("a", "b", "c"))
>   println(l.map(_+suffix).collect().mkString(","))
>
>   Result: a-suffix,b-suffix,c-suffix
>
> ---
>
> Standalone Cluster with `submit.sh` (my script below):
>
> TestApp.scala:
>
>   package com.adobe.spark
>
>   // Spark.
>   import org.apache.spark.{SparkConf,SparkContext}
>   import org.apache.spark.broadcast._
>   import org.apache.spark.SparkContext._
>   import org.apache.spark.storage.StorageLevel
>
>   // Scala.
>   import scala.collection.mutable.ArrayBuffer
>
>   object TestApp extends App {
>     val memory = "1g"
>     val maxCores = "1"
>     val conf = new SparkConf()
>       .setMaster("spark://myserver:7077")
>       //.setMaster("local[4]")
>       .setAppName("ValError")
>       .setSparkHome("/usr/local/spark-1.0.0")
>       .setJars(Seq("/tmp/val-error.jar"))
>       .set("spark.executor.memory", memory)
>       .set("spark.cores.max", maxCores)
>     val sc = new SparkContext(conf)
>
>     val suffix = "-suffix"
>     val l = sc.parallelize(List("a", "b", "c"))
>     println(l.map(_+suffix).collect().mkString(","))
>
>     val suffix_bc = sc.broadcast(suffix)
>     println(l.map(_+suffix_bc.value).collect().mkString(","))
>
>     sc.stop()
>   }
>
> build.sbt:
>
>   import AssemblyKeys._
>
>   assemblySettings
>
>   jarName in assembly := "val-error.jar"
>
>   // Load "provided" libraries with `sbt run`.
>   run in Compile <<= Defaults.runTask(
>     fullClasspath in Compile, mainClass in (Compile, run), runner in
> (Compile, run)
>   )
>
>   name := "TestApp"
>
>   version := "1.0"
>
>   scalaVersion := "2.10.3"
>
>   libraryDependencies ++= Seq(
>     "org.apache.spark" %% "spark-core" % "1.0.0" % "provided",
>     "org.slf4j" % "slf4j-simple" % "1.7.7" // Logging.
>   )
>
>   resolvers ++= Seq(
>     "Akka Repository" at "http://repo.akka.io/releases/";
>   )
>
> submit.sh:
>
>   #!/bin/bash
>
>   rm -f *.log driver-id.txt
>
>   JAR=val-error.jar
>   CLASS=com.adobe.spark.TestApp
>   SPARK=/usr/local/spark-1.0.0
>
>   set -x
>   sbt assembly &> assembly.log || exit 1
>   scp target/scala-2.10/$JAR eagle:/tmp || exit 2
>
>   $SPARK/bin/spark-submit \
>     --class $CLASS \
>     --master spark://myserver:7077 \
>     --deploy-mode cluster \
>     /tmp/$JAR | tee submit.log
>   set +x
>
>   DRIVER_ID=$(grep 'Driver successfully submitted' submit.log | sed
> 's/Driver successfully submitted as \(.*\)/\1/g')
>   [ -z $DRIVER_ID ] && exit 3
>   echo $DRIVER_ID > driver-id.txt
>
> Output:
>   anull,bnull,cnull (For the first part.)
>
> Stack Trace: (For the broadcast var.)
>   Caused by: org.apache.spark.SparkException: Job aborted due to stage
> failure: Task 1.0:0 failed 4 times, most recent failure: Exception failure
> in TID 8 on host eagle.corp.adobe.com: java.lang.NullPointerException
>           com.adobe.spark.TestApp$$anonfun$2.apply(App.scala:29)
>           com.adobe.spark.TestApp$$anonfun$2.apply(App.scala:29)
>           scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>           scala.collection.Iterator$class.foreach(Iterator.scala:727)
>           scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
>
> scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
>           scala.collection.AbstractIterator.to(Iterator.scala:1157)
>
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
>           scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
>
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
>           scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
>           org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:717)
>           org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:717)
>
>
> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1080)
>
>
> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1080)
>
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
>           org.apache.spark.scheduler.Task.run(Task.scala:51)
>
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
>
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>           java.lang.Thread.run(Thread.java:744)
>   Driver stacktrace:
>     at
> org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
>     at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
>     at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
>     at
>
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>     at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
>     at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
>     at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
>     at scala.Option.foreach(Option.scala:236)
>     at
>
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633)
>     at
>
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207)
>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>     at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>     at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>     at
>
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>     at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>     at
>
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>     at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>     at
>
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> ---
>
> When using 'local[4]' and `sbt run` with `setJars` commented.
> This is happening when `collect()` is called on the first map.
>
>   org.apache.spark.SparkException: Job aborted due to stage failure: Task
> 0.0:0 failed 1 times, most recent failure: Exception failure in TID 0 on
> host localhost: java.lang.ClassNotFoundException: scala.None$
>           java.net.URLClassLoader$1.run(URLClassLoader.java:372)
>           java.net.URLClassLoader$1.run(URLClassLoader.java:361)
>           java.security.AccessController.doPrivileged(Native Method)
>           java.net.URLClassLoader.findClass(URLClassLoader.java:360)
>           java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>           java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>           java.lang.Class.forName0(Native Method)
>           java.lang.Class.forName(Class.java:340)
>
>
> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:60)
>
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
>
> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
>
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
>
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
>
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
>
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>           java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
>
>
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
>
> org.apache.spark.scheduler.ResultTask$.deserializeInfo(ResultTask.scala:61)
>
> org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:141)
>
> java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1840)
>
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1799)
>
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>           java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
>
>
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
>
>
> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:85)
>
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:169)
>
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>           java.lang.Thread.run(Thread.java:745)
>   Driver stacktrace:
>     at
> org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
>     at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
>     at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
>     at
>
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>     at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
>     at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
>     at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
>     at scala.Option.foreach(Option.scala:236)
>     at
>
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633)
>     at
>
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207)
>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>     at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>     at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>     at
>
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>     at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>     at
>
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>     at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>     at
>
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/NullPointerExceptions-when-using-val-or-broadcast-on-a-standalone-cluster-tp7524.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>

Reply via email to