Using Java serialization would make the NPE go away, but it would be a less
preferable solution. My application is network-intensive, and serialization
cost is significant. In other words, these objects are ideal candidates for
Kryo.





On Mon, Dec 23, 2013 at 3:41 PM, Jie Deng <deng113...@gmail.com> wrote:

> maybe try to implement your class with serializable...
>
>
> 2013/12/23 Ameet Kini <ameetk...@gmail.com>
>
>> Thanks Imran.
>>
>> I tried setting "spark.closure.serializer" to
>> "org.apache.spark.serializer.KryoSerializer" and now end up seeing
>> NullPointerException when the executor starts up. This is a snippet of the
>> executor's log. Notice how "registered TileIdWritable" and "registered
>> ArgWritable" is called, so I see that my KryoRegistrator is being called.
>> However, it's not clear why there's a follow-on NPE. My spark log level is
>> set to DEBUG in log4j.properties (log4j.rootCategory=DEBUG) so not sure if
>> there s
>> some other way to get the executor to be more verbose as to the cause of
>> the NPE.
>>
>> When I take out the spark.closure.serializer setting (i.e., go back to
>> the default Java serialization), the executors start up fine, and executes
>> other RDD actions, but of course not the lookup action (my original
>> problem). With the spark.closure.serializer setting to kryo, it dies with
>> an NPE during executor startup.
>>
>>
>> 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Connecting to driver:
>> akka.tcp://[redacted]:48147/user/StandaloneScheduler<http://sp...@karadi.spadac.com:48147/user/StandaloneScheduler>
>> 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Successfully registered
>> with driver
>> 13/12/23 11:00:36 INFO Slf4jLogger: Slf4jLogger started
>> 13/12/23 11:00:36 INFO Remoting: Starting remoting
>> 13/12/23 11:00:36 INFO Remoting: Remoting started; listening on addresses
>> :[akka.tcp:/[redacted]:56483 <http://sp...@karadi.spadac.com:56483/>]
>> 13/12/23 11:00:36 INFO Remoting: Remoting now listens on addresses:
>> [akka.tcp://[redacted]:56483 <http://sp...@karadi.spadac.com:56483/>]
>> 13/12/23 11:00:36 INFO SparkEnv: Connecting to BlockManagerMaster:
>> akka.tcp://[redacted]:48147/user/BlockManagerMaster<http://sp...@karadi.spadac.com:48147/user/BlockManagerMaster>
>> 13/12/23 11:00:36 INFO MemoryStore: MemoryStore started with capacity
>> 323.9 MB.
>> 13/12/23 11:00:36 DEBUG DiskStore: Creating local directories at root
>> dirs '/tmp'
>> 13/12/23 11:00:36 INFO DiskStore: Created local directory at
>> /tmp/spark-local-20131223110036-4335
>> 13/12/23 11:00:36 INFO ConnectionManager: Bound socket to port 41617 with
>> id = ConnectionManagerId([redacted],41617)
>> 13/12/23 11:00:36 INFO BlockManagerMaster: Trying to register BlockManager
>> 13/12/23 11:00:36 INFO BlockManagerMaster: Registered BlockManager
>> 13/12/23 11:00:36 INFO SparkEnv: Connecting to MapOutputTracker:
>> akka.tcp:/[redacted]:48147/user/MapOutputTracker<http://sp...@karadi.spadac.com:48147/user/MapOutputTracker>
>> 13/12/23 11:00:36 INFO HttpFileServer: HTTP File server directory is
>> /tmp/spark-e71e0a2b-a247-4bb8-b06d-19c12467b65a
>> 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 0
>> 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 1
>> 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 2
>> 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 3
>> 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator:
>> geotrellis.spark.KryoRegistrator
>> 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator:
>> geotrellis.spark.KryoRegistrator
>> 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator:
>> geotrellis.spark.KryoRegistrator
>> 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator:
>> geotrellis.spark.KryoRegistrator
>> registered TileIdWritable
>> registered TileIdWritable
>> registered TileIdWritable
>> registered TileIdWritable
>> registered ArgWritable
>> registered ArgWritable
>> registered ArgWritable
>> registered ArgWritable
>> 13/12/23 11:00:37 INFO Executor: Running task ID 2
>> 13/12/23 11:00:37 INFO Executor: Running task ID 1
>> 13/12/23 11:00:37 INFO Executor: Running task ID 3
>> 13/12/23 11:00:37 INFO Executor: Running task ID 0
>> 13/12/23 11:00:37 INFO Executor: Fetching
>> http://10.194.70.21:51166/jars/geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar with
>> timestamp 1387814434436
>> 13/12/23 11:00:37 INFO Utils: Fetching
>> http://10.194.70.21:51166/jars/geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar to
>> /tmp/fetchFileTemp2456419097284083628.tmp
>> 13/12/23 11:00:37 INFO Executor: Adding
>> file[redacted]/spark/work/app-20131223110034-0000/0/./geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar
>> to class loader
>> 13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread
>> Thread[pool-7-thread-4,5,main]
>> java.lang.NullPointerException
>>         at
>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
>>         at
>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
>>         at scala.Option.flatMap(Option.scala:170)
>>         at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
>>         at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>         at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>         at java.lang.Thread.run(Thread.java:724)
>> 13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread
>> Thread[pool-7-thread-2,5,main]
>> java.lang.NullPointerException
>>         at
>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
>>         at
>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
>>        at scala.Option.flatMap(Option.scala:170)
>>         at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
>>         at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>         at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>         at java.lang.Thread.run(Thread.java:724)
>> 13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread
>> Thread[pool-7-thread-1,5,main]
>> java.lang.NullPointerException
>>         at
>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
>>         at
>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
>>         at scala.Option.flatMap(Option.scala:170)
>>         at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
>>         at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>         at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>         at java.lang.Thread.run(Thread.java:724)
>> 13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread
>> Thread[pool-7-thread-3,5,main]
>> java.lang.NullPointerException
>>         at
>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
>>         at
>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
>>         at scala.Option.flatMap(Option.scala:170)
>>         at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
>>         at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>         at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>         at java.lang.Thread.run(Thread.java:724)
>> 13/12/23 11:00:37 DEBUG DiskStore: Shutdown hook called
>>
>> Thanks,
>> Ameet
>>
>>
>> On Fri, Dec 20, 2013 at 3:24 PM, Imran Rashid <im...@quantifind.com>wrote:
>>
>>> there is a separate setting for serializing closures
>>> "spark.closure.serializer" (listed here
>>> http://spark.incubator.apache.org/docs/latest/configuration.html)
>>>
>>> that is used to serialize whatever is used by all the fucntions on an
>>> RDD, eg., map, filter, and lookup.  Those closures include referenced
>>> variables, like your
>>> TileIdWritable.
>>>
>>> So you need to either change that to use kryo, or make your object
>>> serializable to java.
>>>
>>>
>>>
>>> On Fri, Dec 20, 2013 at 2:18 PM, Ameet Kini <ameetk...@gmail.com> wrote:
>>>
>>>>
>>>> I'm getting the below NotSerializableException despite using Kryo to
>>>> serialize that class (TileIdWritable).
>>>>
>>>> The offending line: awtestRdd.lookup(TileIdWritable(200))
>>>>
>>>> Initially I thought Kryo is not being registered properly, so I tried
>>>> running operations over awtestRDD which force a shuffle (e.g., groupByKey),
>>>> and that seemed to work fine. So it seems to be specific to the
>>>> "TileIdWritable(200)" argument to lookup().  Is there anything unique about
>>>> companion objects and Kryo serialization? I even replaced
>>>> "TileIdWritable(200)" by "new TileIdWritable" but still see that exception
>>>>
>>>>
>>>> class TileIdWritable {
>>>>  //
>>>> }
>>>>
>>>> object TileIdWritable {
>>>>  def apply(value: Long) = new TileIdWritable
>>>> }
>>>>
>>>>
>>>> My Kryo registrator:
>>>> class KryoRegistrator extends SparkKryoRegistrator {
>>>>     override def registerClasses(kryo: Kryo) {
>>>>       println("Called KryoRegistrator")  // I see this printed during
>>>> shuffle operations
>>>>       val r = kryo.register(classOf[TileIdWritable])
>>>>       val s = kryo.register(classOf[ArgWritable])
>>>>     }
>>>> }
>>>>
>>>> Then just before creating a Spark Context, I have these two lines:
>>>>     System.setProperty("spark.serializer",
>>>> "org.apache.spark.serializer.KryoSerializer")
>>>>     System.setProperty("spark.kryo.registrator",
>>>> "geotrellis.spark.KryoRegistrator")
>>>>
>>>>
>>>>
>>>>
>>>> The exception itself:
>>>> Exception in thread "main" org.apache.spark.SparkException: Job failed:
>>>> Task not serializable: java.io.NotSerializableException:
>>>> geotrellis.spark.formats.TileIdWritable
>>>>     - field (class "org.apache.spark.rdd.PairRDDFunctions$$anonfun$4",
>>>> name: "key$1", type: "class java.lang.Object")
>>>>     - object (class "org.apache.spark.rdd.PairRDDFunctions$$anonfun$4",
>>>> <function1>)
>>>>     - field (class "org.apache.spark.SparkContext$$anonfun$runJob$4",
>>>> name: "func$1", type: "interface scala.Function1")
>>>>     - root object (class
>>>> "org.apache.spark.SparkContext$$anonfun$runJob$4", <function2>)
>>>>     at
>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:763)
>>>>     at
>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:761)
>>>>     at
>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>>>     at
>>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>>     at
>>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:761)
>>>>     at org.apache.spark.scheduler.DAGScheduler.org
>>>> $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556)
>>>>     at org.apache.spark.scheduler.DAGScheduler.org
>>>> $apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503)
>>>>     at
>>>> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361)
>>>>     at org.apache.spark.scheduler.DAGScheduler.org
>>>> $apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
>>>>     at
>>>> org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)
>>>>
>>>> Regards,
>>>> Ameet
>>>>
>>>>
>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to