maybe try to implement your class with serializable...

2013/12/23 Ameet Kini <>

> Thanks Imran.
> I tried setting "spark.closure.serializer" to
> "org.apache.spark.serializer.KryoSerializer" and now end up seeing
> NullPointerException when the executor starts up. This is a snippet of the
> executor's log. Notice how "registered TileIdWritable" and "registered
> ArgWritable" is called, so I see that my KryoRegistrator is being called.
> However, it's not clear why there's a follow-on NPE. My spark log level is
> set to DEBUG in (log4j.rootCategory=DEBUG) so not sure if
> there s
> some other way to get the executor to be more verbose as to the cause of
> the NPE.
> When I take out the spark.closure.serializer setting (i.e., go back to the
> default Java serialization), the executors start up fine, and executes
> other RDD actions, but of course not the lookup action (my original
> problem). With the spark.closure.serializer setting to kryo, it dies with
> an NPE during executor startup.
> 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Connecting to driver:
> akka.tcp://[redacted]:48147/user/StandaloneScheduler<>
> 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Successfully registered
> with driver
> 13/12/23 11:00:36 INFO Slf4jLogger: Slf4jLogger started
> 13/12/23 11:00:36 INFO Remoting: Starting remoting
> 13/12/23 11:00:36 INFO Remoting: Remoting started; listening on addresses
> :[akka.tcp:/[redacted]:56483 <>]
> 13/12/23 11:00:36 INFO Remoting: Remoting now listens on addresses:
> [akka.tcp://[redacted]:56483 <>]
> 13/12/23 11:00:36 INFO SparkEnv: Connecting to BlockManagerMaster:
> akka.tcp://[redacted]:48147/user/BlockManagerMaster<>
> 13/12/23 11:00:36 INFO MemoryStore: MemoryStore started with capacity
> 323.9 MB.
> 13/12/23 11:00:36 DEBUG DiskStore: Creating local directories at root dirs
> '/tmp'
> 13/12/23 11:00:36 INFO DiskStore: Created local directory at
> /tmp/spark-local-20131223110036-4335
> 13/12/23 11:00:36 INFO ConnectionManager: Bound socket to port 41617 with
> id = ConnectionManagerId([redacted],41617)
> 13/12/23 11:00:36 INFO BlockManagerMaster: Trying to register BlockManager
> 13/12/23 11:00:36 INFO BlockManagerMaster: Registered BlockManager
> 13/12/23 11:00:36 INFO SparkEnv: Connecting to MapOutputTracker:
> akka.tcp:/[redacted]:48147/user/MapOutputTracker<>
> 13/12/23 11:00:36 INFO HttpFileServer: HTTP File server directory is
> /tmp/spark-e71e0a2b-a247-4bb8-b06d-19c12467b65a
> 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 0
> 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 1
> 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 2
> 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 3
> 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator:
> geotrellis.spark.KryoRegistrator
> 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator:
> geotrellis.spark.KryoRegistrator
> 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator:
> geotrellis.spark.KryoRegistrator
> 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator:
> geotrellis.spark.KryoRegistrator
> registered TileIdWritable
> registered TileIdWritable
> registered TileIdWritable
> registered TileIdWritable
> registered ArgWritable
> registered ArgWritable
> registered ArgWritable
> registered ArgWritable
> 13/12/23 11:00:37 INFO Executor: Running task ID 2
> 13/12/23 11:00:37 INFO Executor: Running task ID 1
> 13/12/23 11:00:37 INFO Executor: Running task ID 3
> 13/12/23 11:00:37 INFO Executor: Running task ID 0
> 13/12/23 11:00:37 INFO Executor: Fetching
> with
> timestamp 1387814434436
> 13/12/23 11:00:37 INFO Utils: Fetching
> to
> /tmp/fetchFileTemp2456419097284083628.tmp
> 13/12/23 11:00:37 INFO Executor: Adding
> file[redacted]/spark/work/app-20131223110034-0000/0/./geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar
> to class loader
> 13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread
> Thread[pool-7-thread-4,5,main]
> java.lang.NullPointerException
>         at
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
>         at
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
>         at scala.Option.flatMap(Option.scala:170)
>         at
> org.apache.spark.executor.Executor$
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(
>         at
> java.util.concurrent.ThreadPoolExecutor$
>         at
> 13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread
> Thread[pool-7-thread-2,5,main]
> java.lang.NullPointerException
>         at
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
>         at
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
>        at scala.Option.flatMap(Option.scala:170)
>         at
> org.apache.spark.executor.Executor$
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(
>         at
> java.util.concurrent.ThreadPoolExecutor$
>         at
> 13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread
> Thread[pool-7-thread-1,5,main]
> java.lang.NullPointerException
>         at
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
>         at
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
>         at scala.Option.flatMap(Option.scala:170)
>         at
> org.apache.spark.executor.Executor$
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(
>         at
> java.util.concurrent.ThreadPoolExecutor$
>         at
> 13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread
> Thread[pool-7-thread-3,5,main]
> java.lang.NullPointerException
>         at
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
>         at
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
>         at scala.Option.flatMap(Option.scala:170)
>         at
> org.apache.spark.executor.Executor$
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(
>         at
> java.util.concurrent.ThreadPoolExecutor$
>         at
> 13/12/23 11:00:37 DEBUG DiskStore: Shutdown hook called
> Thanks,
> Ameet
> On Fri, Dec 20, 2013 at 3:24 PM, Imran Rashid <>wrote:
>> there is a separate setting for serializing closures
>> "spark.closure.serializer" (listed here
>> that is used to serialize whatever is used by all the fucntions on an
>> RDD, eg., map, filter, and lookup.  Those closures include referenced
>> variables, like your
>> TileIdWritable.
>> So you need to either change that to use kryo, or make your object
>> serializable to java.
>> On Fri, Dec 20, 2013 at 2:18 PM, Ameet Kini <> wrote:
>>> I'm getting the below NotSerializableException despite using Kryo to
>>> serialize that class (TileIdWritable).
>>> The offending line: awtestRdd.lookup(TileIdWritable(200))
>>> Initially I thought Kryo is not being registered properly, so I tried
>>> running operations over awtestRDD which force a shuffle (e.g., groupByKey),
>>> and that seemed to work fine. So it seems to be specific to the
>>> "TileIdWritable(200)" argument to lookup().  Is there anything unique about
>>> companion objects and Kryo serialization? I even replaced
>>> "TileIdWritable(200)" by "new TileIdWritable" but still see that exception
>>> class TileIdWritable {
>>>  //
>>> }
>>> object TileIdWritable {
>>>  def apply(value: Long) = new TileIdWritable
>>> }
>>> My Kryo registrator:
>>> class KryoRegistrator extends SparkKryoRegistrator {
>>>     override def registerClasses(kryo: Kryo) {
>>>       println("Called KryoRegistrator")  // I see this printed during
>>> shuffle operations
>>>       val r = kryo.register(classOf[TileIdWritable])
>>>       val s = kryo.register(classOf[ArgWritable])
>>>     }
>>> }
>>> Then just before creating a Spark Context, I have these two lines:
>>>     System.setProperty("spark.serializer",
>>> "org.apache.spark.serializer.KryoSerializer")
>>>     System.setProperty("spark.kryo.registrator",
>>> "geotrellis.spark.KryoRegistrator")
>>> The exception itself:
>>> Exception in thread "main" org.apache.spark.SparkException: Job failed:
>>> Task not serializable:
>>> geotrellis.spark.formats.TileIdWritable
>>>     - field (class "org.apache.spark.rdd.PairRDDFunctions$$anonfun$4",
>>> name: "key$1", type: "class java.lang.Object")
>>>     - object (class "org.apache.spark.rdd.PairRDDFunctions$$anonfun$4",
>>> <function1>)
>>>     - field (class "org.apache.spark.SparkContext$$anonfun$runJob$4",
>>> name: "func$1", type: "interface scala.Function1")
>>>     - root object (class
>>> "org.apache.spark.SparkContext$$anonfun$runJob$4", <function2>)
>>>     at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:763)
>>>     at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:761)
>>>     at
>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>>     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>     at
>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:761)
>>>     at
>>> $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556)
>>>     at
>>> $apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503)
>>>     at
>>> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361)
>>>     at
>>> $apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
>>>     at
>>> org.apache.spark.scheduler.DAGScheduler$$anon$
>>> Regards,
>>> Ameet

Reply via email to