maybe try to implement your class with serializable...
2013/12/23 Ameet Kini <ameetk...@gmail.com> > Thanks Imran. > > I tried setting "spark.closure.serializer" to > "org.apache.spark.serializer.KryoSerializer" and now end up seeing > NullPointerException when the executor starts up. This is a snippet of the > executor's log. Notice how "registered TileIdWritable" and "registered > ArgWritable" is called, so I see that my KryoRegistrator is being called. > However, it's not clear why there's a follow-on NPE. My spark log level is > set to DEBUG in log4j.properties (log4j.rootCategory=DEBUG) so not sure if > there s > some other way to get the executor to be more verbose as to the cause of > the NPE. > > When I take out the spark.closure.serializer setting (i.e., go back to the > default Java serialization), the executors start up fine, and executes > other RDD actions, but of course not the lookup action (my original > problem). With the spark.closure.serializer setting to kryo, it dies with > an NPE during executor startup. > > > 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Connecting to driver: > akka.tcp://[redacted]:48147/user/StandaloneScheduler<http://sp...@karadi.spadac.com:48147/user/StandaloneScheduler> > 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Successfully registered > with driver > 13/12/23 11:00:36 INFO Slf4jLogger: Slf4jLogger started > 13/12/23 11:00:36 INFO Remoting: Starting remoting > 13/12/23 11:00:36 INFO Remoting: Remoting started; listening on addresses > :[akka.tcp:/[redacted]:56483 <http://sp...@karadi.spadac.com:56483/>] > 13/12/23 11:00:36 INFO Remoting: Remoting now listens on addresses: > [akka.tcp://[redacted]:56483 <http://sp...@karadi.spadac.com:56483/>] > 13/12/23 11:00:36 INFO SparkEnv: Connecting to BlockManagerMaster: > akka.tcp://[redacted]:48147/user/BlockManagerMaster<http://sp...@karadi.spadac.com:48147/user/BlockManagerMaster> > 13/12/23 11:00:36 INFO MemoryStore: MemoryStore started with capacity > 323.9 MB. > 13/12/23 11:00:36 DEBUG DiskStore: Creating local directories at root dirs > '/tmp' > 13/12/23 11:00:36 INFO DiskStore: Created local directory at > /tmp/spark-local-20131223110036-4335 > 13/12/23 11:00:36 INFO ConnectionManager: Bound socket to port 41617 with > id = ConnectionManagerId([redacted],41617) > 13/12/23 11:00:36 INFO BlockManagerMaster: Trying to register BlockManager > 13/12/23 11:00:36 INFO BlockManagerMaster: Registered BlockManager > 13/12/23 11:00:36 INFO SparkEnv: Connecting to MapOutputTracker: > akka.tcp:/[redacted]:48147/user/MapOutputTracker<http://sp...@karadi.spadac.com:48147/user/MapOutputTracker> > 13/12/23 11:00:36 INFO HttpFileServer: HTTP File server directory is > /tmp/spark-e71e0a2b-a247-4bb8-b06d-19c12467b65a > 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 0 > 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 1 > 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 2 > 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 3 > 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator: > geotrellis.spark.KryoRegistrator > 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator: > geotrellis.spark.KryoRegistrator > 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator: > geotrellis.spark.KryoRegistrator > 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator: > geotrellis.spark.KryoRegistrator > registered TileIdWritable > registered TileIdWritable > registered TileIdWritable > registered TileIdWritable > registered ArgWritable > registered ArgWritable > registered ArgWritable > registered ArgWritable > 13/12/23 11:00:37 INFO Executor: Running task ID 2 > 13/12/23 11:00:37 INFO Executor: Running task ID 1 > 13/12/23 11:00:37 INFO Executor: Running task ID 3 > 13/12/23 11:00:37 INFO Executor: Running task ID 0 > 13/12/23 11:00:37 INFO Executor: Fetching > http://10.194.70.21:51166/jars/geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar with > timestamp 1387814434436 > 13/12/23 11:00:37 INFO Utils: Fetching > http://10.194.70.21:51166/jars/geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar to > /tmp/fetchFileTemp2456419097284083628.tmp > 13/12/23 11:00:37 INFO Executor: Adding > file[redacted]/spark/work/app-20131223110034-0000/0/./geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar > to class loader > 13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread > Thread[pool-7-thread-4,5,main] > java.lang.NullPointerException > at > org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195) > at > org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195) > at scala.Option.flatMap(Option.scala:170) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:724) > 13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread > Thread[pool-7-thread-2,5,main] > java.lang.NullPointerException > at > org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195) > at > org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195) > at scala.Option.flatMap(Option.scala:170) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:724) > 13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread > Thread[pool-7-thread-1,5,main] > java.lang.NullPointerException > at > org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195) > at > org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195) > at scala.Option.flatMap(Option.scala:170) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:724) > 13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread > Thread[pool-7-thread-3,5,main] > java.lang.NullPointerException > at > org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195) > at > org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195) > at scala.Option.flatMap(Option.scala:170) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:724) > 13/12/23 11:00:37 DEBUG DiskStore: Shutdown hook called > > Thanks, > Ameet > > > On Fri, Dec 20, 2013 at 3:24 PM, Imran Rashid <im...@quantifind.com>wrote: > >> there is a separate setting for serializing closures >> "spark.closure.serializer" (listed here >> http://spark.incubator.apache.org/docs/latest/configuration.html) >> >> that is used to serialize whatever is used by all the fucntions on an >> RDD, eg., map, filter, and lookup. Those closures include referenced >> variables, like your >> TileIdWritable. >> >> So you need to either change that to use kryo, or make your object >> serializable to java. >> >> >> >> On Fri, Dec 20, 2013 at 2:18 PM, Ameet Kini <ameetk...@gmail.com> wrote: >> >>> >>> I'm getting the below NotSerializableException despite using Kryo to >>> serialize that class (TileIdWritable). >>> >>> The offending line: awtestRdd.lookup(TileIdWritable(200)) >>> >>> Initially I thought Kryo is not being registered properly, so I tried >>> running operations over awtestRDD which force a shuffle (e.g., groupByKey), >>> and that seemed to work fine. So it seems to be specific to the >>> "TileIdWritable(200)" argument to lookup(). Is there anything unique about >>> companion objects and Kryo serialization? I even replaced >>> "TileIdWritable(200)" by "new TileIdWritable" but still see that exception >>> >>> >>> class TileIdWritable { >>> // >>> } >>> >>> object TileIdWritable { >>> def apply(value: Long) = new TileIdWritable >>> } >>> >>> >>> My Kryo registrator: >>> class KryoRegistrator extends SparkKryoRegistrator { >>> override def registerClasses(kryo: Kryo) { >>> println("Called KryoRegistrator") // I see this printed during >>> shuffle operations >>> val r = kryo.register(classOf[TileIdWritable]) >>> val s = kryo.register(classOf[ArgWritable]) >>> } >>> } >>> >>> Then just before creating a Spark Context, I have these two lines: >>> System.setProperty("spark.serializer", >>> "org.apache.spark.serializer.KryoSerializer") >>> System.setProperty("spark.kryo.registrator", >>> "geotrellis.spark.KryoRegistrator") >>> >>> >>> >>> >>> The exception itself: >>> Exception in thread "main" org.apache.spark.SparkException: Job failed: >>> Task not serializable: java.io.NotSerializableException: >>> geotrellis.spark.formats.TileIdWritable >>> - field (class "org.apache.spark.rdd.PairRDDFunctions$$anonfun$4", >>> name: "key$1", type: "class java.lang.Object") >>> - object (class "org.apache.spark.rdd.PairRDDFunctions$$anonfun$4", >>> <function1>) >>> - field (class "org.apache.spark.SparkContext$$anonfun$runJob$4", >>> name: "func$1", type: "interface scala.Function1") >>> - root object (class >>> "org.apache.spark.SparkContext$$anonfun$runJob$4", <function2>) >>> at >>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:763) >>> at >>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:761) >>> at >>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >>> at >>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:761) >>> at org.apache.spark.scheduler.DAGScheduler.org >>> $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556) >>> at org.apache.spark.scheduler.DAGScheduler.org >>> $apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503) >>> at >>> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361) >>> at org.apache.spark.scheduler.DAGScheduler.org >>> $apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441) >>> at >>> org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149) >>> >>> Regards, >>> Ameet >>> >>> >>> >>> >>> >> >