Hi Michael,

I re-ran this on another machine which is on spark's master branch
0.9.0-SNAPSHOT from Dec 14 (right after the scala 2.10 branch was merged
back into master) and recreated the NPE towards the end of this message. I
can't tell looking at the relevant code what may have caused the exception
because line 262 is part of the catch block of a pretty big try/catch
block.


Executor.scala Line 262:  val metrics = attemptedTask.flatMap(t =>
t.metrics)

The surrounding lines are:
       case t: Throwable => {
          val serviceTime = (System.currentTimeMillis() - taskStart).toInt
Line 262 -->  val metrics = attemptedTask.flatMap(t => t.metrics)
          for (m <- metrics) {
            m.executorRunTime = serviceTime
            m.jvmGCTime = gcTime - startGCTime
          }
          val reason = ExceptionFailure(t.getClass.getName, t.toString,
t.getStackTrace, metrics)
          execBackend.statusUpdate(taskId, TaskState.FAILED,
ser.serialize(reason))

          // TODO: Should we exit the whole executor here? On the one hand,
the failed task may
          // have left some weird state around depending on when the
exception was thrown, but on
          // the other hand, maybe we could detect that when future tasks
fail and exit then.
          logError("Exception in task ID " + taskId, t)
          //System.exit(1)
        }




13/12/24 10:02:33 ERROR Executor: Uncaught exception in thread
Thread[Executor task launch worker-4,5,main]
java.lang.NullPointerException
    at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1$$anonfun$8.apply(Executor.scala:262)
    at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1$$anonfun$8.apply(Executor.scala:262)
    at scala.Option.flatMap(Option.scala:170)
    at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:262)
    at
org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:50)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:182)
    at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:724)

Thanks,
Ameet



On Mon, Dec 23, 2013 at 4:58 PM, Michael (Bach) Bui <free...@adatao.com>wrote:

> What spark version are you using? By looking at the code Executor.scala
> line195, you will at least know what cause the NPE.
> We can start from there.
>
>
>
>
> On Dec 23, 2013, at 10:21 AM, Ameet Kini <ameetk...@gmail.com> wrote:
>
> Thanks Imran.
>
> I tried setting "spark.closure.serializer" to
> "org.apache.spark.serializer.KryoSerializer" and now end up seeing
> NullPointerException when the executor starts up. This is a snippet of the
> executor's log. Notice how "registered TileIdWritable" and "registered
> ArgWritable" is called, so I see that my KryoRegistrator is being called.
> However, it's not clear why there's a follow-on NPE. My spark log level is
> set to DEBUG in log4j.properties (log4j.rootCategory=DEBUG) so not sure if
> there s
> some other way to get the executor to be more verbose as to the cause of
> the NPE.
>
> When I take out the spark.closure.serializer setting (i.e., go back to the
> default Java serialization), the executors start up fine, and executes
> other RDD actions, but of course not the lookup action (my original
> problem). With the spark.closure.serializer setting to kryo, it dies with
> an NPE during executor startup.
>
>
> 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Connecting to driver:
> akka.tcp://[redacted]:48147/user/StandaloneScheduler<http://sp...@karadi.spadac.com:48147/user/StandaloneScheduler>
> 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Successfully registered
> with driver
> 13/12/23 11:00:36 INFO Slf4jLogger: Slf4jLogger started
> 13/12/23 11:00:36 INFO Remoting: Starting remoting
> 13/12/23 11:00:36 INFO Remoting: Remoting started; listening on addresses
> :[akka.tcp:/[redacted]:56483 <http://sp...@karadi.spadac.com:56483/>]
> 13/12/23 11:00:36 INFO Remoting: Remoting now listens on addresses: [
> akka.tcp://[redacted]:56483 <http://sp...@karadi.spadac.com:56483/>]
> 13/12/23 11:00:36 INFO SparkEnv: Connecting to BlockManagerMaster:
> akka.tcp://[redacted]:48147/user/BlockManagerMaster<http://sp...@karadi.spadac.com:48147/user/BlockManagerMaster>
> 13/12/23 11:00:36 INFO MemoryStore: MemoryStore started with capacity
> 323.9 MB.
> 13/12/23 11:00:36 DEBUG DiskStore: Creating local directories at root dirs
> '/tmp'
> 13/12/23 11:00:36 INFO DiskStore: Created local directory at
> /tmp/spark-local-20131223110036-4335
> 13/12/23 11:00:36 INFO ConnectionManager: Bound socket to port 41617 with
> id = ConnectionManagerId([redacted],41617)
> 13/12/23 11:00:36 INFO BlockManagerMaster: Trying to register BlockManager
> 13/12/23 11:00:36 INFO BlockManagerMaster: Registered BlockManager
> 13/12/23 11:00:36 INFO SparkEnv: Connecting to MapOutputTracker:
> akka.tcp:/[redacted]:48147/user/MapOutputTracker<http://sp...@karadi.spadac.com:48147/user/MapOutputTracker>
> 13/12/23 11:00:36 INFO HttpFileServer: HTTP File server directory is
> /tmp/spark-e71e0a2b-a247-4bb8-b06d-19c12467b65a
> 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 0
> 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 1
> 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 2
> 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 3
> 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator:
> geotrellis.spark.KryoRegistrator
> 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator:
> geotrellis.spark.KryoRegistrator
> 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator:
> geotrellis.spark.KryoRegistrator
> 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator:
> geotrellis.spark.KryoRegistrator
> registered TileIdWritable
> registered TileIdWritable
> registered TileIdWritable
> registered TileIdWritable
> registered ArgWritable
> registered ArgWritable
> registered ArgWritable
> registered ArgWritable
> 13/12/23 11:00:37 INFO Executor: Running task ID 2
> 13/12/23 11:00:37 INFO Executor: Running task ID 1
> 13/12/23 11:00:37 INFO Executor: Running task ID 3
> 13/12/23 11:00:37 INFO Executor: Running task ID 0
> 13/12/23 11:00:37 INFO Executor: Fetching
> http://10.194.70.21:51166/jars/geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar with
> timestamp 1387814434436
> 13/12/23 11:00:37 INFO Utils: Fetching
> http://10.194.70.21:51166/jars/geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar to
> /tmp/fetchFileTemp2456419097284083628.tmp
> 13/12/23 11:00:37 INFO Executor: Adding
> file[redacted]/spark/work/app-20131223110034-0000/0/./geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar
> to class loader
> 13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread
> Thread[pool-7-thread-4,5,main]
> java.lang.NullPointerException
>         at
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
>         at
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
>         at scala.Option.flatMap(Option.scala:170)
>         at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         at java.lang.Thread.run(Thread.java:724)
> 13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread
> Thread[pool-7-thread-2,5,main]
> java.lang.NullPointerException
>         at
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
>         at
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
>        at scala.Option.flatMap(Option.scala:170)
>         at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         at java.lang.Thread.run(Thread.java:724)
> 13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread
> Thread[pool-7-thread-1,5,main]
> java.lang.NullPointerException
>         at
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
>         at
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
>         at scala.Option.flatMap(Option.scala:170)
>         at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         at java.lang.Thread.run(Thread.java:724)
> 13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread
> Thread[pool-7-thread-3,5,main]
> java.lang.NullPointerException
>         at
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
>         at
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
>         at scala.Option.flatMap(Option.scala:170)
>         at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         at java.lang.Thread.run(Thread.java:724)
> 13/12/23 11:00:37 DEBUG DiskStore: Shutdown hook called
>
> Thanks,
> Ameet
>
>
> On Fri, Dec 20, 2013 at 3:24 PM, Imran Rashid <im...@quantifind.com>wrote:
>
>> there is a separate setting for serializing closures
>> "spark.closure.serializer" (listed here
>> http://spark.incubator.apache.org/docs/latest/configuration.html)
>>
>> that is used to serialize whatever is used by all the fucntions on an
>> RDD, eg., map, filter, and lookup.  Those closures include referenced
>> variables, like your
>> TileIdWritable.
>>
>> So you need to either change that to use kryo, or make your object
>> serializable to java.
>>
>>
>>
>> On Fri, Dec 20, 2013 at 2:18 PM, Ameet Kini <ameetk...@gmail.com> wrote:
>>
>>>
>>> I'm getting the below NotSerializableException despite using Kryo to
>>> serialize that class (TileIdWritable).
>>>
>>> The offending line: awtestRdd.lookup(TileIdWritable(200))
>>>
>>> Initially I thought Kryo is not being registered properly, so I tried
>>> running operations over awtestRDD which force a shuffle (e.g., groupByKey),
>>> and that seemed to work fine. So it seems to be specific to the
>>> "TileIdWritable(200)" argument to lookup().  Is there anything unique about
>>> companion objects and Kryo serialization? I even replaced
>>> "TileIdWritable(200)" by "new TileIdWritable" but still see that exception
>>>
>>>
>>> class TileIdWritable {
>>>  //
>>> }
>>>
>>> object TileIdWritable {
>>>  def apply(value: Long) = new TileIdWritable
>>> }
>>>
>>>
>>> My Kryo registrator:
>>> class KryoRegistrator extends SparkKryoRegistrator {
>>>     override def registerClasses(kryo: Kryo) {
>>>       println("Called KryoRegistrator")  // I see this printed during
>>> shuffle operations
>>>       val r = kryo.register(classOf[TileIdWritable])
>>>       val s = kryo.register(classOf[ArgWritable])
>>>     }
>>> }
>>>
>>> Then just before creating a Spark Context, I have these two lines:
>>>     System.setProperty("spark.serializer",
>>> "org.apache.spark.serializer.KryoSerializer")
>>>     System.setProperty("spark.kryo.registrator",
>>> "geotrellis.spark.KryoRegistrator")
>>>
>>>
>>>
>>>
>>> The exception itself:
>>> Exception in thread "main" org.apache.spark.SparkException: Job failed:
>>> Task not serializable: java.io.NotSerializableException:
>>> geotrellis.spark.formats.TileIdWritable
>>>     - field (class "org.apache.spark.rdd.PairRDDFunctions$$anonfun$4",
>>> name: "key$1", type: "class java.lang.Object")
>>>     - object (class "org.apache.spark.rdd.PairRDDFunctions$$anonfun$4",
>>> <function1>)
>>>     - field (class "org.apache.spark.SparkContext$$anonfun$runJob$4",
>>> name: "func$1", type: "interface scala.Function1")
>>>     - root object (class
>>> "org.apache.spark.SparkContext$$anonfun$runJob$4", <function2>)
>>>     at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:763)
>>>     at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:761)
>>>     at
>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>>     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>     at
>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:761)
>>>     at 
>>> org.apache.spark.scheduler.DAGScheduler.org<http://org.apache.spark.scheduler.dagscheduler.org/>
>>> $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556)
>>>     at 
>>> org.apache.spark.scheduler.DAGScheduler.org<http://org.apache.spark.scheduler.dagscheduler.org/>
>>> $apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503)
>>>     at
>>> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361)
>>>     at 
>>> org.apache.spark.scheduler.DAGScheduler.org<http://org.apache.spark.scheduler.dagscheduler.org/>
>>> $apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
>>>     at
>>> org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)
>>>
>>> Regards,
>>> Ameet
>>>
>>>
>>>
>>>
>>>
>>
>
>

Reply via email to