Using Java serialization would make the NPE go away, but it would be a less preferable solution. My application is network-intensive, and serialization cost is significant. In other words, these objects are ideal candidates for Kryo.
On Mon, Dec 23, 2013 at 3:41 PM, Jie Deng <deng113...@gmail.com> wrote: > maybe try to implement your class with serializable... > > > 2013/12/23 Ameet Kini <ameetk...@gmail.com> > >> Thanks Imran. >> >> I tried setting "spark.closure.serializer" to >> "org.apache.spark.serializer.KryoSerializer" and now end up seeing >> NullPointerException when the executor starts up. This is a snippet of the >> executor's log. Notice how "registered TileIdWritable" and "registered >> ArgWritable" is called, so I see that my KryoRegistrator is being called. >> However, it's not clear why there's a follow-on NPE. My spark log level is >> set to DEBUG in log4j.properties (log4j.rootCategory=DEBUG) so not sure if >> there s >> some other way to get the executor to be more verbose as to the cause of >> the NPE. >> >> When I take out the spark.closure.serializer setting (i.e., go back to >> the default Java serialization), the executors start up fine, and executes >> other RDD actions, but of course not the lookup action (my original >> problem). With the spark.closure.serializer setting to kryo, it dies with >> an NPE during executor startup. >> >> >> 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Connecting to driver: >> akka.tcp://[redacted]:48147/user/StandaloneScheduler<http://sp...@karadi.spadac.com:48147/user/StandaloneScheduler> >> 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Successfully registered >> with driver >> 13/12/23 11:00:36 INFO Slf4jLogger: Slf4jLogger started >> 13/12/23 11:00:36 INFO Remoting: Starting remoting >> 13/12/23 11:00:36 INFO Remoting: Remoting started; listening on addresses >> :[akka.tcp:/[redacted]:56483 <http://sp...@karadi.spadac.com:56483/>] >> 13/12/23 11:00:36 INFO Remoting: Remoting now listens on addresses: >> [akka.tcp://[redacted]:56483 <http://sp...@karadi.spadac.com:56483/>] >> 13/12/23 11:00:36 INFO SparkEnv: Connecting to BlockManagerMaster: >> akka.tcp://[redacted]:48147/user/BlockManagerMaster<http://sp...@karadi.spadac.com:48147/user/BlockManagerMaster> >> 13/12/23 11:00:36 INFO MemoryStore: MemoryStore started with capacity >> 323.9 MB. >> 13/12/23 11:00:36 DEBUG DiskStore: Creating local directories at root >> dirs '/tmp' >> 13/12/23 11:00:36 INFO DiskStore: Created local directory at >> /tmp/spark-local-20131223110036-4335 >> 13/12/23 11:00:36 INFO ConnectionManager: Bound socket to port 41617 with >> id = ConnectionManagerId([redacted],41617) >> 13/12/23 11:00:36 INFO BlockManagerMaster: Trying to register BlockManager >> 13/12/23 11:00:36 INFO BlockManagerMaster: Registered BlockManager >> 13/12/23 11:00:36 INFO SparkEnv: Connecting to MapOutputTracker: >> akka.tcp:/[redacted]:48147/user/MapOutputTracker<http://sp...@karadi.spadac.com:48147/user/MapOutputTracker> >> 13/12/23 11:00:36 INFO HttpFileServer: HTTP File server directory is >> /tmp/spark-e71e0a2b-a247-4bb8-b06d-19c12467b65a >> 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 0 >> 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 1 >> 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 2 >> 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 3 >> 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator: >> geotrellis.spark.KryoRegistrator >> 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator: >> geotrellis.spark.KryoRegistrator >> 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator: >> geotrellis.spark.KryoRegistrator >> 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator: >> geotrellis.spark.KryoRegistrator >> registered TileIdWritable >> registered TileIdWritable >> registered TileIdWritable >> registered TileIdWritable >> registered ArgWritable >> registered ArgWritable >> registered ArgWritable >> registered ArgWritable >> 13/12/23 11:00:37 INFO Executor: Running task ID 2 >> 13/12/23 11:00:37 INFO Executor: Running task ID 1 >> 13/12/23 11:00:37 INFO Executor: Running task ID 3 >> 13/12/23 11:00:37 INFO Executor: Running task ID 0 >> 13/12/23 11:00:37 INFO Executor: Fetching >> http://10.194.70.21:51166/jars/geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar with >> timestamp 1387814434436 >> 13/12/23 11:00:37 INFO Utils: Fetching >> http://10.194.70.21:51166/jars/geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar to >> /tmp/fetchFileTemp2456419097284083628.tmp >> 13/12/23 11:00:37 INFO Executor: Adding >> file[redacted]/spark/work/app-20131223110034-0000/0/./geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar >> to class loader >> 13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread >> Thread[pool-7-thread-4,5,main] >> java.lang.NullPointerException >> at >> org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195) >> at >> org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195) >> at scala.Option.flatMap(Option.scala:170) >> at >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >> at java.lang.Thread.run(Thread.java:724) >> 13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread >> Thread[pool-7-thread-2,5,main] >> java.lang.NullPointerException >> at >> org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195) >> at >> org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195) >> at scala.Option.flatMap(Option.scala:170) >> at >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >> at java.lang.Thread.run(Thread.java:724) >> 13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread >> Thread[pool-7-thread-1,5,main] >> java.lang.NullPointerException >> at >> org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195) >> at >> org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195) >> at scala.Option.flatMap(Option.scala:170) >> at >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >> at java.lang.Thread.run(Thread.java:724) >> 13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread >> Thread[pool-7-thread-3,5,main] >> java.lang.NullPointerException >> at >> org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195) >> at >> org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195) >> at scala.Option.flatMap(Option.scala:170) >> at >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >> at java.lang.Thread.run(Thread.java:724) >> 13/12/23 11:00:37 DEBUG DiskStore: Shutdown hook called >> >> Thanks, >> Ameet >> >> >> On Fri, Dec 20, 2013 at 3:24 PM, Imran Rashid <im...@quantifind.com>wrote: >> >>> there is a separate setting for serializing closures >>> "spark.closure.serializer" (listed here >>> http://spark.incubator.apache.org/docs/latest/configuration.html) >>> >>> that is used to serialize whatever is used by all the fucntions on an >>> RDD, eg., map, filter, and lookup. Those closures include referenced >>> variables, like your >>> TileIdWritable. >>> >>> So you need to either change that to use kryo, or make your object >>> serializable to java. >>> >>> >>> >>> On Fri, Dec 20, 2013 at 2:18 PM, Ameet Kini <ameetk...@gmail.com> wrote: >>> >>>> >>>> I'm getting the below NotSerializableException despite using Kryo to >>>> serialize that class (TileIdWritable). >>>> >>>> The offending line: awtestRdd.lookup(TileIdWritable(200)) >>>> >>>> Initially I thought Kryo is not being registered properly, so I tried >>>> running operations over awtestRDD which force a shuffle (e.g., groupByKey), >>>> and that seemed to work fine. So it seems to be specific to the >>>> "TileIdWritable(200)" argument to lookup(). Is there anything unique about >>>> companion objects and Kryo serialization? I even replaced >>>> "TileIdWritable(200)" by "new TileIdWritable" but still see that exception >>>> >>>> >>>> class TileIdWritable { >>>> // >>>> } >>>> >>>> object TileIdWritable { >>>> def apply(value: Long) = new TileIdWritable >>>> } >>>> >>>> >>>> My Kryo registrator: >>>> class KryoRegistrator extends SparkKryoRegistrator { >>>> override def registerClasses(kryo: Kryo) { >>>> println("Called KryoRegistrator") // I see this printed during >>>> shuffle operations >>>> val r = kryo.register(classOf[TileIdWritable]) >>>> val s = kryo.register(classOf[ArgWritable]) >>>> } >>>> } >>>> >>>> Then just before creating a Spark Context, I have these two lines: >>>> System.setProperty("spark.serializer", >>>> "org.apache.spark.serializer.KryoSerializer") >>>> System.setProperty("spark.kryo.registrator", >>>> "geotrellis.spark.KryoRegistrator") >>>> >>>> >>>> >>>> >>>> The exception itself: >>>> Exception in thread "main" org.apache.spark.SparkException: Job failed: >>>> Task not serializable: java.io.NotSerializableException: >>>> geotrellis.spark.formats.TileIdWritable >>>> - field (class "org.apache.spark.rdd.PairRDDFunctions$$anonfun$4", >>>> name: "key$1", type: "class java.lang.Object") >>>> - object (class "org.apache.spark.rdd.PairRDDFunctions$$anonfun$4", >>>> <function1>) >>>> - field (class "org.apache.spark.SparkContext$$anonfun$runJob$4", >>>> name: "func$1", type: "interface scala.Function1") >>>> - root object (class >>>> "org.apache.spark.SparkContext$$anonfun$runJob$4", <function2>) >>>> at >>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:763) >>>> at >>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:761) >>>> at >>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >>>> at >>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >>>> at >>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:761) >>>> at org.apache.spark.scheduler.DAGScheduler.org >>>> $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556) >>>> at org.apache.spark.scheduler.DAGScheduler.org >>>> $apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503) >>>> at >>>> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361) >>>> at org.apache.spark.scheduler.DAGScheduler.org >>>> $apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441) >>>> at >>>> org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149) >>>> >>>> Regards, >>>> Ameet >>>> >>>> >>>> >>>> >>>> >>> >> >