Re: debugging NotSerializableException while using Kryo
Hi Michael, I re-ran this on another machine which is on spark's master branch 0.9.0-SNAPSHOT from Dec 14 (right after the scala 2.10 branch was merged back into master) and recreated the NPE towards the end of this message. I can't tell looking at the relevant code what may have caused the exception because line 262 is part of the catch block of a pretty big try/catch block. Executor.scala Line 262: val metrics = attemptedTask.flatMap(t = t.metrics) The surrounding lines are: case t: Throwable = { val serviceTime = (System.currentTimeMillis() - taskStart).toInt Line 262 -- val metrics = attemptedTask.flatMap(t = t.metrics) for (m - metrics) { m.executorRunTime = serviceTime m.jvmGCTime = gcTime - startGCTime } val reason = ExceptionFailure(t.getClass.getName, t.toString, t.getStackTrace, metrics) execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason)) // TODO: Should we exit the whole executor here? On the one hand, the failed task may // have left some weird state around depending on when the exception was thrown, but on // the other hand, maybe we could detect that when future tasks fail and exit then. logError(Exception in task ID + taskId, t) //System.exit(1) } 13/12/24 10:02:33 ERROR Executor: Uncaught exception in thread Thread[Executor task launch worker-4,5,main] java.lang.NullPointerException at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1$$anonfun$8.apply(Executor.scala:262) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1$$anonfun$8.apply(Executor.scala:262) at scala.Option.flatMap(Option.scala:170) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:262) at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:50) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:182) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:724) Thanks, Ameet On Mon, Dec 23, 2013 at 4:58 PM, Michael (Bach) Bui free...@adatao.comwrote: What spark version are you using? By looking at the code Executor.scala line195, you will at least know what cause the NPE. We can start from there. On Dec 23, 2013, at 10:21 AM, Ameet Kini ameetk...@gmail.com wrote: Thanks Imran. I tried setting spark.closure.serializer to org.apache.spark.serializer.KryoSerializer and now end up seeing NullPointerException when the executor starts up. This is a snippet of the executor's log. Notice how registered TileIdWritable and registered ArgWritable is called, so I see that my KryoRegistrator is being called. However, it's not clear why there's a follow-on NPE. My spark log level is set to DEBUG in log4j.properties (log4j.rootCategory=DEBUG) so not sure if there s some other way to get the executor to be more verbose as to the cause of the NPE. When I take out the spark.closure.serializer setting (i.e., go back to the default Java serialization), the executors start up fine, and executes other RDD actions, but of course not the lookup action (my original problem). With the spark.closure.serializer setting to kryo, it dies with an NPE during executor startup. 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Connecting to driver: akka.tcp://[redacted]:48147/user/StandaloneSchedulerhttp://sp...@karadi.spadac.com:48147/user/StandaloneScheduler 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Successfully registered with driver 13/12/23 11:00:36 INFO Slf4jLogger: Slf4jLogger started 13/12/23 11:00:36 INFO Remoting: Starting remoting 13/12/23 11:00:36 INFO Remoting: Remoting started; listening on addresses :[akka.tcp:/[redacted]:56483 http://sp...@karadi.spadac.com:56483/] 13/12/23 11:00:36 INFO Remoting: Remoting now listens on addresses: [ akka.tcp://[redacted]:56483 http://sp...@karadi.spadac.com:56483/] 13/12/23 11:00:36 INFO SparkEnv: Connecting to BlockManagerMaster: akka.tcp://[redacted]:48147/user/BlockManagerMasterhttp://sp...@karadi.spadac.com:48147/user/BlockManagerMaster 13/12/23 11:00:36 INFO MemoryStore: MemoryStore started with capacity 323.9 MB. 13/12/23 11:00:36 DEBUG DiskStore: Creating local directories at root dirs '/tmp' 13/12/23 11:00:36 INFO DiskStore: Created local directory at /tmp/spark-local-20131223110036-4335 13/12/23 11:00:36 INFO ConnectionManager: Bound socket to port 41617 with id = ConnectionManagerId([redacted],41617) 13/12/23 11:00:36 INFO BlockManagerMaster: Trying to register BlockManager 13/12/23 11:00:36 INFO BlockManagerMaster: Registered BlockManager 13/12/23 11:00:36 INFO SparkEnv: Connecting to MapOutputTracker:
Re: debugging NotSerializableException while using Kryo
If Java serialization is the only one that properly works for closures, then I shouldn't be setting spark.closure.serializer to org.apache.spark.serializer.KryoSerializer, and my only hope for getting lookup (and other such methods that still use closure serializers) to work is to either a) use only Java serialization for my objects, or b) have my objects implement Serializable but enable Kryo as well for object serialization. For option b, I'd be setting spark.serializer to org.apache.spark.serializer.KryoSerializer but leave spark.closure.serializer to its default value (Java). If the above is true, then seems like as it stands today, the best practice is for objects that use Kryo to also either implement Serializable or Externalizable for closures to work properly. Thanks, Ameet On Mon, Dec 23, 2013 at 5:18 PM, Dmitriy Lyubimov dlie...@gmail.com wrote: The problem really is that in certain cases task results -- and front-end-passed parameters -- are passed thru closures. For closures, only java serializer is properly supported (afaik). there has been a limited number of fixes for data parameter communication between front end and backend for using other-than-java serialization (e.g. for parallelize(), collect()-- these methods do not use closures to pass in/grab data objects anymore); however, a certain number of methods is still using closures to pass in a data object. afaik the methods doing correct front/back end parameter serialization are: collect() take() (maybe) parallelize() reduce() Everything else (fold(), etc.) that communicates data between front end and backend, still wraps data into closures. For a thing like fold() in fact you'd have to use type that has both Java and Kryo support at the same time, because it will always use both closure and object serializers while executing. This IMO is inconsistent of course with assumption that same data type should be supported uniformly regardless of where it serializes, but that's the state of things as it stands. On Mon, Dec 23, 2013 at 8:21 AM, Ameet Kini ameetk...@gmail.com wrote: Thanks Imran. I tried setting spark.closure.serializer to org.apache.spark.serializer.KryoSerializer and now end up seeing NullPointerException when the executor starts up. This is a snippet of the executor's log. Notice how registered TileIdWritable and registered ArgWritable is called, so I see that my KryoRegistrator is being called. However, it's not clear why there's a follow-on NPE. My spark log level is set to DEBUG in log4j.properties (log4j.rootCategory=DEBUG) so not sure if there s some other way to get the executor to be more verbose as to the cause of the NPE. When I take out the spark.closure.serializer setting (i.e., go back to the default Java serialization), the executors start up fine, and executes other RDD actions, but of course not the lookup action (my original problem). With the spark.closure.serializer setting to kryo, it dies with an NPE during executor startup. 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Connecting to driver: akka.tcp://[redacted]:48147/user/StandaloneSchedulerhttp://sp...@karadi.spadac.com:48147/user/StandaloneScheduler 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Successfully registered with driver 13/12/23 11:00:36 INFO Slf4jLogger: Slf4jLogger started 13/12/23 11:00:36 INFO Remoting: Starting remoting 13/12/23 11:00:36 INFO Remoting: Remoting started; listening on addresses :[akka.tcp:/[redacted]:56483 http://sp...@karadi.spadac.com:56483/] 13/12/23 11:00:36 INFO Remoting: Remoting now listens on addresses: [akka.tcp://[redacted]:56483 http://sp...@karadi.spadac.com:56483/] 13/12/23 11:00:36 INFO SparkEnv: Connecting to BlockManagerMaster: akka.tcp://[redacted]:48147/user/BlockManagerMasterhttp://sp...@karadi.spadac.com:48147/user/BlockManagerMaster 13/12/23 11:00:36 INFO MemoryStore: MemoryStore started with capacity 323.9 MB. 13/12/23 11:00:36 DEBUG DiskStore: Creating local directories at root dirs '/tmp' 13/12/23 11:00:36 INFO DiskStore: Created local directory at /tmp/spark-local-20131223110036-4335 13/12/23 11:00:36 INFO ConnectionManager: Bound socket to port 41617 with id = ConnectionManagerId([redacted],41617) 13/12/23 11:00:36 INFO BlockManagerMaster: Trying to register BlockManager 13/12/23 11:00:36 INFO BlockManagerMaster: Registered BlockManager 13/12/23 11:00:36 INFO SparkEnv: Connecting to MapOutputTracker: akka.tcp:/[redacted]:48147/user/MapOutputTrackerhttp://sp...@karadi.spadac.com:48147/user/MapOutputTracker 13/12/23 11:00:36 INFO HttpFileServer: HTTP File server directory is /tmp/spark-e71e0a2b-a247-4bb8-b06d-19c12467b65a 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 0 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 1 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 2 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 3 13/12/23
Re: debugging NotSerializableException while using Kryo
In scala case classes are serializable by default, your TileIdWritable should be a case class. I usually enable Kryo ser for objects and keep default ser for closures, this works pretty well. Eugen 2013/12/24 Ameet Kini ameetk...@gmail.com If Java serialization is the only one that properly works for closures, then I shouldn't be setting spark.closure.serializer to org.apache.spark.serializer.KryoSerializer, and my only hope for getting lookup (and other such methods that still use closure serializers) to work is to either a) use only Java serialization for my objects, or b) have my objects implement Serializable but enable Kryo as well for object serialization. For option b, I'd be setting spark.serializer to org.apache.spark.serializer.KryoSerializer but leave spark.closure.serializer to its default value (Java). If the above is true, then seems like as it stands today, the best practice is for objects that use Kryo to also either implement Serializable or Externalizable for closures to work properly. Thanks, Ameet On Mon, Dec 23, 2013 at 5:18 PM, Dmitriy Lyubimov dlie...@gmail.comwrote: The problem really is that in certain cases task results -- and front-end-passed parameters -- are passed thru closures. For closures, only java serializer is properly supported (afaik). there has been a limited number of fixes for data parameter communication between front end and backend for using other-than-java serialization (e.g. for parallelize(), collect()-- these methods do not use closures to pass in/grab data objects anymore); however, a certain number of methods is still using closures to pass in a data object. afaik the methods doing correct front/back end parameter serialization are: collect() take() (maybe) parallelize() reduce() Everything else (fold(), etc.) that communicates data between front end and backend, still wraps data into closures. For a thing like fold() in fact you'd have to use type that has both Java and Kryo support at the same time, because it will always use both closure and object serializers while executing. This IMO is inconsistent of course with assumption that same data type should be supported uniformly regardless of where it serializes, but that's the state of things as it stands. On Mon, Dec 23, 2013 at 8:21 AM, Ameet Kini ameetk...@gmail.com wrote: Thanks Imran. I tried setting spark.closure.serializer to org.apache.spark.serializer.KryoSerializer and now end up seeing NullPointerException when the executor starts up. This is a snippet of the executor's log. Notice how registered TileIdWritable and registered ArgWritable is called, so I see that my KryoRegistrator is being called. However, it's not clear why there's a follow-on NPE. My spark log level is set to DEBUG in log4j.properties (log4j.rootCategory=DEBUG) so not sure if there s some other way to get the executor to be more verbose as to the cause of the NPE. When I take out the spark.closure.serializer setting (i.e., go back to the default Java serialization), the executors start up fine, and executes other RDD actions, but of course not the lookup action (my original problem). With the spark.closure.serializer setting to kryo, it dies with an NPE during executor startup. 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Connecting to driver: akka.tcp://[redacted]:48147/user/StandaloneSchedulerhttp://sp...@karadi.spadac.com:48147/user/StandaloneScheduler 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Successfully registered with driver 13/12/23 11:00:36 INFO Slf4jLogger: Slf4jLogger started 13/12/23 11:00:36 INFO Remoting: Starting remoting 13/12/23 11:00:36 INFO Remoting: Remoting started; listening on addresses :[akka.tcp:/[redacted]:56483http://sp...@karadi.spadac.com:56483/ ] 13/12/23 11:00:36 INFO Remoting: Remoting now listens on addresses: [akka.tcp://[redacted]:56483 http://sp...@karadi.spadac.com:56483/] 13/12/23 11:00:36 INFO SparkEnv: Connecting to BlockManagerMaster: akka.tcp://[redacted]:48147/user/BlockManagerMasterhttp://sp...@karadi.spadac.com:48147/user/BlockManagerMaster 13/12/23 11:00:36 INFO MemoryStore: MemoryStore started with capacity 323.9 MB. 13/12/23 11:00:36 DEBUG DiskStore: Creating local directories at root dirs '/tmp' 13/12/23 11:00:36 INFO DiskStore: Created local directory at /tmp/spark-local-20131223110036-4335 13/12/23 11:00:36 INFO ConnectionManager: Bound socket to port 41617 with id = ConnectionManagerId([redacted],41617) 13/12/23 11:00:36 INFO BlockManagerMaster: Trying to register BlockManager 13/12/23 11:00:36 INFO BlockManagerMaster: Registered BlockManager 13/12/23 11:00:36 INFO SparkEnv: Connecting to MapOutputTracker: akka.tcp:/[redacted]:48147/user/MapOutputTrackerhttp://sp...@karadi.spadac.com:48147/user/MapOutputTracker 13/12/23 11:00:36 INFO HttpFileServer: HTTP File server directory is /tmp/spark-e71e0a2b-a247-4bb8-b06d-19c12467b65a 13/12/23 11:00:36 INFO
Re: debugging NotSerializableException while using Kryo
On Tue, Dec 24, 2013 at 7:29 AM, Ameet Kini ameetk...@gmail.com wrote: If Java serialization is the only one that properly works for closures, then I shouldn't be setting spark.closure.serializer to org.apache.spark.serializer.KryoSerializer, My understanding is that it's not that it kryo wouldn't necessarily work for closures, it's just that closure serialization is used not only for user defined closures but also for a lot of closures internal to Spark. For them, of course, there's no way to enable kryo support. Data objects (and their serializer) on the other hand are not defined anywhere but the user's code. Therefore overriding object serializer is in general a safe assumption and overriding closure serializer is not. and my only hope for getting lookup (and other such methods that still use closure serializers) to work is to either a) use only Java serialization for my objects, or b) have my objects implement Serializable but enable Kryo as well for object serialization. or c) avoid using Spark api that currently uses closures to communicate data objects between front end and backend for kryo-only objects (as i do). The most annoying of those is fold(). In fact, d) you can always wrap a kryo object into byte array in front end, pass in java-serializable byte array thru a closure, and unwrap it in backend. This technique is extremely ugly though with methods like fold() which force you to use the same object type in front end and backend operation (and obviously it may be not the type you want for the purposes of elegant folding). I had this problem extensively with 3rd party types for which i have no desire to add java serialization (mostly, hadoop Writables of various kind) so the obvious desirable solution is to add kryo support for them without having to modify the original class. This mostly works for me. For option b, I'd be setting spark.serializer to org.apache.spark.serializer.KryoSerializer but leave spark.closure.serializer to its default value (Java). Yes. like i said changing closure serializer is undesirable unless you can guarantee proper serialization support for all Spark internal closures as well your own closures. If the above is true, then seems like as it stands today, the best practice is for objects that use Kryo to also either implement Serializable or Externalizable for closures to work properly. Again, this is only true for a smaller portion of spark api. Most of spark api doesn't have this problem so you may well get away with either not using them or pre-serialize objects into byte arrays while using problematic api. Thanks, Ameet On Mon, Dec 23, 2013 at 5:18 PM, Dmitriy Lyubimov dlie...@gmail.comwrote: The problem really is that in certain cases task results -- and front-end-passed parameters -- are passed thru closures. For closures, only java serializer is properly supported (afaik). there has been a limited number of fixes for data parameter communication between front end and backend for using other-than-java serialization (e.g. for parallelize(), collect()-- these methods do not use closures to pass in/grab data objects anymore); however, a certain number of methods is still using closures to pass in a data object. afaik the methods doing correct front/back end parameter serialization are: collect() take() (maybe) parallelize() reduce() Everything else (fold(), etc.) that communicates data between front end and backend, still wraps data into closures. For a thing like fold() in fact you'd have to use type that has both Java and Kryo support at the same time, because it will always use both closure and object serializers while executing. This IMO is inconsistent of course with assumption that same data type should be supported uniformly regardless of where it serializes, but that's the state of things as it stands. On Mon, Dec 23, 2013 at 8:21 AM, Ameet Kini ameetk...@gmail.com wrote: Thanks Imran. I tried setting spark.closure.serializer to org.apache.spark.serializer.KryoSerializer and now end up seeing NullPointerException when the executor starts up. This is a snippet of the executor's log. Notice how registered TileIdWritable and registered ArgWritable is called, so I see that my KryoRegistrator is being called. However, it's not clear why there's a follow-on NPE. My spark log level is set to DEBUG in log4j.properties (log4j.rootCategory=DEBUG) so not sure if there s some other way to get the executor to be more verbose as to the cause of the NPE. When I take out the spark.closure.serializer setting (i.e., go back to the default Java serialization), the executors start up fine, and executes other RDD actions, but of course not the lookup action (my original problem). With the spark.closure.serializer setting to kryo, it dies with an NPE during executor startup. 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Connecting to driver:
Re: debugging NotSerializableException while using Kryo
Thanks Imran. I tried setting spark.closure.serializer to org.apache.spark.serializer.KryoSerializer and now end up seeing NullPointerException when the executor starts up. This is a snippet of the executor's log. Notice how registered TileIdWritable and registered ArgWritable is called, so I see that my KryoRegistrator is being called. However, it's not clear why there's a follow-on NPE. My spark log level is set to DEBUG in log4j.properties (log4j.rootCategory=DEBUG) so not sure if there s some other way to get the executor to be more verbose as to the cause of the NPE. When I take out the spark.closure.serializer setting (i.e., go back to the default Java serialization), the executors start up fine, and executes other RDD actions, but of course not the lookup action (my original problem). With the spark.closure.serializer setting to kryo, it dies with an NPE during executor startup. 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Connecting to driver: akka.tcp://[redacted]:48147/user/StandaloneSchedulerhttp://sp...@karadi.spadac.com:48147/user/StandaloneScheduler 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Successfully registered with driver 13/12/23 11:00:36 INFO Slf4jLogger: Slf4jLogger started 13/12/23 11:00:36 INFO Remoting: Starting remoting 13/12/23 11:00:36 INFO Remoting: Remoting started; listening on addresses :[akka.tcp:/[redacted]:56483 http://sp...@karadi.spadac.com:56483/] 13/12/23 11:00:36 INFO Remoting: Remoting now listens on addresses: [akka.tcp://[redacted]:56483 http://sp...@karadi.spadac.com:56483/] 13/12/23 11:00:36 INFO SparkEnv: Connecting to BlockManagerMaster: akka.tcp://[redacted]:48147/user/BlockManagerMasterhttp://sp...@karadi.spadac.com:48147/user/BlockManagerMaster 13/12/23 11:00:36 INFO MemoryStore: MemoryStore started with capacity 323.9 MB. 13/12/23 11:00:36 DEBUG DiskStore: Creating local directories at root dirs '/tmp' 13/12/23 11:00:36 INFO DiskStore: Created local directory at /tmp/spark-local-20131223110036-4335 13/12/23 11:00:36 INFO ConnectionManager: Bound socket to port 41617 with id = ConnectionManagerId([redacted],41617) 13/12/23 11:00:36 INFO BlockManagerMaster: Trying to register BlockManager 13/12/23 11:00:36 INFO BlockManagerMaster: Registered BlockManager 13/12/23 11:00:36 INFO SparkEnv: Connecting to MapOutputTracker: akka.tcp:/[redacted]:48147/user/MapOutputTrackerhttp://sp...@karadi.spadac.com:48147/user/MapOutputTracker 13/12/23 11:00:36 INFO HttpFileServer: HTTP File server directory is /tmp/spark-e71e0a2b-a247-4bb8-b06d-19c12467b65a 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 0 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 1 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 2 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 3 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator: geotrellis.spark.KryoRegistrator 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator: geotrellis.spark.KryoRegistrator 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator: geotrellis.spark.KryoRegistrator 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator: geotrellis.spark.KryoRegistrator registered TileIdWritable registered TileIdWritable registered TileIdWritable registered TileIdWritable registered ArgWritable registered ArgWritable registered ArgWritable registered ArgWritable 13/12/23 11:00:37 INFO Executor: Running task ID 2 13/12/23 11:00:37 INFO Executor: Running task ID 1 13/12/23 11:00:37 INFO Executor: Running task ID 3 13/12/23 11:00:37 INFO Executor: Running task ID 0 13/12/23 11:00:37 INFO Executor: Fetching http://10.194.70.21:51166/jars/geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar with timestamp 1387814434436 13/12/23 11:00:37 INFO Utils: Fetching http://10.194.70.21:51166/jars/geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar to /tmp/fetchFileTemp2456419097284083628.tmp 13/12/23 11:00:37 INFO Executor: Adding file[redacted]/spark/work/app-20131223110034-/0/./geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar to class loader 13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread Thread[pool-7-thread-4,5,main] java.lang.NullPointerException at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195) at scala.Option.flatMap(Option.scala:170) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:724) 13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread Thread[pool-7-thread-2,5,main] java.lang.NullPointerException at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195) at
Re: debugging NotSerializableException while using Kryo
maybe try to implement your class with serializable... 2013/12/23 Ameet Kini ameetk...@gmail.com Thanks Imran. I tried setting spark.closure.serializer to org.apache.spark.serializer.KryoSerializer and now end up seeing NullPointerException when the executor starts up. This is a snippet of the executor's log. Notice how registered TileIdWritable and registered ArgWritable is called, so I see that my KryoRegistrator is being called. However, it's not clear why there's a follow-on NPE. My spark log level is set to DEBUG in log4j.properties (log4j.rootCategory=DEBUG) so not sure if there s some other way to get the executor to be more verbose as to the cause of the NPE. When I take out the spark.closure.serializer setting (i.e., go back to the default Java serialization), the executors start up fine, and executes other RDD actions, but of course not the lookup action (my original problem). With the spark.closure.serializer setting to kryo, it dies with an NPE during executor startup. 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Connecting to driver: akka.tcp://[redacted]:48147/user/StandaloneSchedulerhttp://sp...@karadi.spadac.com:48147/user/StandaloneScheduler 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Successfully registered with driver 13/12/23 11:00:36 INFO Slf4jLogger: Slf4jLogger started 13/12/23 11:00:36 INFO Remoting: Starting remoting 13/12/23 11:00:36 INFO Remoting: Remoting started; listening on addresses :[akka.tcp:/[redacted]:56483 http://sp...@karadi.spadac.com:56483/] 13/12/23 11:00:36 INFO Remoting: Remoting now listens on addresses: [akka.tcp://[redacted]:56483 http://sp...@karadi.spadac.com:56483/] 13/12/23 11:00:36 INFO SparkEnv: Connecting to BlockManagerMaster: akka.tcp://[redacted]:48147/user/BlockManagerMasterhttp://sp...@karadi.spadac.com:48147/user/BlockManagerMaster 13/12/23 11:00:36 INFO MemoryStore: MemoryStore started with capacity 323.9 MB. 13/12/23 11:00:36 DEBUG DiskStore: Creating local directories at root dirs '/tmp' 13/12/23 11:00:36 INFO DiskStore: Created local directory at /tmp/spark-local-20131223110036-4335 13/12/23 11:00:36 INFO ConnectionManager: Bound socket to port 41617 with id = ConnectionManagerId([redacted],41617) 13/12/23 11:00:36 INFO BlockManagerMaster: Trying to register BlockManager 13/12/23 11:00:36 INFO BlockManagerMaster: Registered BlockManager 13/12/23 11:00:36 INFO SparkEnv: Connecting to MapOutputTracker: akka.tcp:/[redacted]:48147/user/MapOutputTrackerhttp://sp...@karadi.spadac.com:48147/user/MapOutputTracker 13/12/23 11:00:36 INFO HttpFileServer: HTTP File server directory is /tmp/spark-e71e0a2b-a247-4bb8-b06d-19c12467b65a 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 0 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 1 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 2 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 3 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator: geotrellis.spark.KryoRegistrator 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator: geotrellis.spark.KryoRegistrator 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator: geotrellis.spark.KryoRegistrator 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator: geotrellis.spark.KryoRegistrator registered TileIdWritable registered TileIdWritable registered TileIdWritable registered TileIdWritable registered ArgWritable registered ArgWritable registered ArgWritable registered ArgWritable 13/12/23 11:00:37 INFO Executor: Running task ID 2 13/12/23 11:00:37 INFO Executor: Running task ID 1 13/12/23 11:00:37 INFO Executor: Running task ID 3 13/12/23 11:00:37 INFO Executor: Running task ID 0 13/12/23 11:00:37 INFO Executor: Fetching http://10.194.70.21:51166/jars/geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar with timestamp 1387814434436 13/12/23 11:00:37 INFO Utils: Fetching http://10.194.70.21:51166/jars/geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar to /tmp/fetchFileTemp2456419097284083628.tmp 13/12/23 11:00:37 INFO Executor: Adding file[redacted]/spark/work/app-20131223110034-/0/./geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar to class loader 13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread Thread[pool-7-thread-4,5,main] java.lang.NullPointerException at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195) at scala.Option.flatMap(Option.scala:170) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:724) 13/12/23 11:00:37 ERROR Executor: Uncaught exception in
Re: debugging NotSerializableException while using Kryo
Using Java serialization would make the NPE go away, but it would be a less preferable solution. My application is network-intensive, and serialization cost is significant. In other words, these objects are ideal candidates for Kryo. On Mon, Dec 23, 2013 at 3:41 PM, Jie Deng deng113...@gmail.com wrote: maybe try to implement your class with serializable... 2013/12/23 Ameet Kini ameetk...@gmail.com Thanks Imran. I tried setting spark.closure.serializer to org.apache.spark.serializer.KryoSerializer and now end up seeing NullPointerException when the executor starts up. This is a snippet of the executor's log. Notice how registered TileIdWritable and registered ArgWritable is called, so I see that my KryoRegistrator is being called. However, it's not clear why there's a follow-on NPE. My spark log level is set to DEBUG in log4j.properties (log4j.rootCategory=DEBUG) so not sure if there s some other way to get the executor to be more verbose as to the cause of the NPE. When I take out the spark.closure.serializer setting (i.e., go back to the default Java serialization), the executors start up fine, and executes other RDD actions, but of course not the lookup action (my original problem). With the spark.closure.serializer setting to kryo, it dies with an NPE during executor startup. 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Connecting to driver: akka.tcp://[redacted]:48147/user/StandaloneSchedulerhttp://sp...@karadi.spadac.com:48147/user/StandaloneScheduler 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Successfully registered with driver 13/12/23 11:00:36 INFO Slf4jLogger: Slf4jLogger started 13/12/23 11:00:36 INFO Remoting: Starting remoting 13/12/23 11:00:36 INFO Remoting: Remoting started; listening on addresses :[akka.tcp:/[redacted]:56483 http://sp...@karadi.spadac.com:56483/] 13/12/23 11:00:36 INFO Remoting: Remoting now listens on addresses: [akka.tcp://[redacted]:56483 http://sp...@karadi.spadac.com:56483/] 13/12/23 11:00:36 INFO SparkEnv: Connecting to BlockManagerMaster: akka.tcp://[redacted]:48147/user/BlockManagerMasterhttp://sp...@karadi.spadac.com:48147/user/BlockManagerMaster 13/12/23 11:00:36 INFO MemoryStore: MemoryStore started with capacity 323.9 MB. 13/12/23 11:00:36 DEBUG DiskStore: Creating local directories at root dirs '/tmp' 13/12/23 11:00:36 INFO DiskStore: Created local directory at /tmp/spark-local-20131223110036-4335 13/12/23 11:00:36 INFO ConnectionManager: Bound socket to port 41617 with id = ConnectionManagerId([redacted],41617) 13/12/23 11:00:36 INFO BlockManagerMaster: Trying to register BlockManager 13/12/23 11:00:36 INFO BlockManagerMaster: Registered BlockManager 13/12/23 11:00:36 INFO SparkEnv: Connecting to MapOutputTracker: akka.tcp:/[redacted]:48147/user/MapOutputTrackerhttp://sp...@karadi.spadac.com:48147/user/MapOutputTracker 13/12/23 11:00:36 INFO HttpFileServer: HTTP File server directory is /tmp/spark-e71e0a2b-a247-4bb8-b06d-19c12467b65a 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 0 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 1 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 2 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 3 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator: geotrellis.spark.KryoRegistrator 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator: geotrellis.spark.KryoRegistrator 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator: geotrellis.spark.KryoRegistrator 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator: geotrellis.spark.KryoRegistrator registered TileIdWritable registered TileIdWritable registered TileIdWritable registered TileIdWritable registered ArgWritable registered ArgWritable registered ArgWritable registered ArgWritable 13/12/23 11:00:37 INFO Executor: Running task ID 2 13/12/23 11:00:37 INFO Executor: Running task ID 1 13/12/23 11:00:37 INFO Executor: Running task ID 3 13/12/23 11:00:37 INFO Executor: Running task ID 0 13/12/23 11:00:37 INFO Executor: Fetching http://10.194.70.21:51166/jars/geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar with timestamp 1387814434436 13/12/23 11:00:37 INFO Utils: Fetching http://10.194.70.21:51166/jars/geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar to /tmp/fetchFileTemp2456419097284083628.tmp 13/12/23 11:00:37 INFO Executor: Adding file[redacted]/spark/work/app-20131223110034-/0/./geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar to class loader 13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread Thread[pool-7-thread-4,5,main] java.lang.NullPointerException at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195) at scala.Option.flatMap(Option.scala:170) at
Re: debugging NotSerializableException while using Kryo
What spark version are you using? By looking at the code Executor.scala line195, you will at least know what cause the NPE. We can start from there. On Dec 23, 2013, at 10:21 AM, Ameet Kini ameetk...@gmail.com wrote: Thanks Imran. I tried setting spark.closure.serializer to org.apache.spark.serializer.KryoSerializer and now end up seeing NullPointerException when the executor starts up. This is a snippet of the executor's log. Notice how registered TileIdWritable and registered ArgWritable is called, so I see that my KryoRegistrator is being called. However, it's not clear why there's a follow-on NPE. My spark log level is set to DEBUG in log4j.properties (log4j.rootCategory=DEBUG) so not sure if there s some other way to get the executor to be more verbose as to the cause of the NPE. When I take out the spark.closure.serializer setting (i.e., go back to the default Java serialization), the executors start up fine, and executes other RDD actions, but of course not the lookup action (my original problem). With the spark.closure.serializer setting to kryo, it dies with an NPE during executor startup. 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Connecting to driver: akka.tcp://[redacted]:48147/user/StandaloneScheduler 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Successfully registered with driver 13/12/23 11:00:36 INFO Slf4jLogger: Slf4jLogger started 13/12/23 11:00:36 INFO Remoting: Starting remoting 13/12/23 11:00:36 INFO Remoting: Remoting started; listening on addresses :[akka.tcp:/[redacted]:56483] 13/12/23 11:00:36 INFO Remoting: Remoting now listens on addresses: [akka.tcp://[redacted]:56483] 13/12/23 11:00:36 INFO SparkEnv: Connecting to BlockManagerMaster: akka.tcp://[redacted]:48147/user/BlockManagerMaster 13/12/23 11:00:36 INFO MemoryStore: MemoryStore started with capacity 323.9 MB. 13/12/23 11:00:36 DEBUG DiskStore: Creating local directories at root dirs '/tmp' 13/12/23 11:00:36 INFO DiskStore: Created local directory at /tmp/spark-local-20131223110036-4335 13/12/23 11:00:36 INFO ConnectionManager: Bound socket to port 41617 with id = ConnectionManagerId([redacted],41617) 13/12/23 11:00:36 INFO BlockManagerMaster: Trying to register BlockManager 13/12/23 11:00:36 INFO BlockManagerMaster: Registered BlockManager 13/12/23 11:00:36 INFO SparkEnv: Connecting to MapOutputTracker: akka.tcp:/[redacted]:48147/user/MapOutputTracker 13/12/23 11:00:36 INFO HttpFileServer: HTTP File server directory is /tmp/spark-e71e0a2b-a247-4bb8-b06d-19c12467b65a 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 0 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 1 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 2 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 3 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator: geotrellis.spark.KryoRegistrator 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator: geotrellis.spark.KryoRegistrator 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator: geotrellis.spark.KryoRegistrator 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator: geotrellis.spark.KryoRegistrator registered TileIdWritable registered TileIdWritable registered TileIdWritable registered TileIdWritable registered ArgWritable registered ArgWritable registered ArgWritable registered ArgWritable 13/12/23 11:00:37 INFO Executor: Running task ID 2 13/12/23 11:00:37 INFO Executor: Running task ID 1 13/12/23 11:00:37 INFO Executor: Running task ID 3 13/12/23 11:00:37 INFO Executor: Running task ID 0 13/12/23 11:00:37 INFO Executor: Fetching http://10.194.70.21:51166/jars/geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar with timestamp 1387814434436 13/12/23 11:00:37 INFO Utils: Fetching http://10.194.70.21:51166/jars/geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar to /tmp/fetchFileTemp2456419097284083628.tmp 13/12/23 11:00:37 INFO Executor: Adding file[redacted]/spark/work/app-20131223110034-/0/./geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar to class loader 13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread Thread[pool-7-thread-4,5,main] java.lang.NullPointerException at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195) at scala.Option.flatMap(Option.scala:170) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:724) 13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread Thread[pool-7-thread-2,5,main] java.lang.NullPointerException at
Re: debugging NotSerializableException while using Kryo
there is a separate setting for serializing closures spark.closure.serializer (listed here http://spark.incubator.apache.org/docs/latest/configuration.html) that is used to serialize whatever is used by all the fucntions on an RDD, eg., map, filter, and lookup. Those closures include referenced variables, like your TileIdWritable. So you need to either change that to use kryo, or make your object serializable to java. On Fri, Dec 20, 2013 at 2:18 PM, Ameet Kini ameetk...@gmail.com wrote: I'm getting the below NotSerializableException despite using Kryo to serialize that class (TileIdWritable). The offending line: awtestRdd.lookup(TileIdWritable(200)) Initially I thought Kryo is not being registered properly, so I tried running operations over awtestRDD which force a shuffle (e.g., groupByKey), and that seemed to work fine. So it seems to be specific to the TileIdWritable(200) argument to lookup(). Is there anything unique about companion objects and Kryo serialization? I even replaced TileIdWritable(200) by new TileIdWritable but still see that exception class TileIdWritable { // } object TileIdWritable { def apply(value: Long) = new TileIdWritable } My Kryo registrator: class KryoRegistrator extends SparkKryoRegistrator { override def registerClasses(kryo: Kryo) { println(Called KryoRegistrator) // I see this printed during shuffle operations val r = kryo.register(classOf[TileIdWritable]) val s = kryo.register(classOf[ArgWritable]) } } Then just before creating a Spark Context, I have these two lines: System.setProperty(spark.serializer, org.apache.spark.serializer.KryoSerializer) System.setProperty(spark.kryo.registrator, geotrellis.spark.KryoRegistrator) The exception itself: Exception in thread main org.apache.spark.SparkException: Job failed: Task not serializable: java.io.NotSerializableException: geotrellis.spark.formats.TileIdWritable - field (class org.apache.spark.rdd.PairRDDFunctions$$anonfun$4, name: key$1, type: class java.lang.Object) - object (class org.apache.spark.rdd.PairRDDFunctions$$anonfun$4, function1) - field (class org.apache.spark.SparkContext$$anonfun$runJob$4, name: func$1, type: interface scala.Function1) - root object (class org.apache.spark.SparkContext$$anonfun$runJob$4, function2) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:763) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:761) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:761) at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556) at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503) at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361) at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441) at org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149) Regards, Ameet