In scala case classes are serializable by default, your TileIdWritable should be a case class. I usually enable Kryo ser for objects and keep default ser for closures, this works pretty well.
Eugen 2013/12/24 Ameet Kini <ameetk...@gmail.com> > > If Java serialization is the only one that properly works for closures, > then I shouldn't be setting "spark.closure.serializer" to > "org.apache.spark.serializer.KryoSerializer", and my only hope for getting > lookup (and other such methods that still use closure serializers) to work > is to either a) use only Java serialization for my objects, or b) have my > objects implement Serializable but enable Kryo as well for object > serialization. For option b, I'd be setting "spark.serializer" to > "org.apache.spark.serializer.KryoSerializer" but leave > "spark.closure.serializer" to its default value (Java). > > If the above is true, then seems like as it stands today, the best > practice is for objects that use Kryo to also either implement Serializable > or Externalizable for closures to work properly. > > Thanks, > Ameet > > > > > On Mon, Dec 23, 2013 at 5:18 PM, Dmitriy Lyubimov <dlie...@gmail.com>wrote: > >> The problem really is that in certain cases task results -- and >> front-end-passed parameters -- are passed thru closures. For closures, only >> java serializer is properly supported (afaik). >> >> there has been a limited number of fixes for data parameter communication >> between front end and backend for using other-than-java serialization (e.g. >> for parallelize(), collect()-- these methods do not use closures to pass >> in/grab data objects anymore); however, a certain number of methods is >> still using closures to pass in a data object. >> >> afaik the methods doing correct front/back end parameter serialization >> are: >> >> collect() >> take() (maybe) >> parallelize() >> reduce() >> >> Everything else ("fold()", etc.) that communicates data between front end >> and backend, still wraps data into closures. For a thing like fold() in >> fact you'd have to use type that has both Java and Kryo support at the same >> time, because it will always use both closure and object serializers while >> executing. >> >> This IMO is inconsistent of course with assumption that same data type >> should be supported uniformly regardless of where it serializes, but that's >> the state of things as it stands. >> >> >> >> On Mon, Dec 23, 2013 at 8:21 AM, Ameet Kini <ameetk...@gmail.com> wrote: >> >>> Thanks Imran. >>> >>> I tried setting "spark.closure.serializer" to >>> "org.apache.spark.serializer.KryoSerializer" and now end up seeing >>> NullPointerException when the executor starts up. This is a snippet of the >>> executor's log. Notice how "registered TileIdWritable" and "registered >>> ArgWritable" is called, so I see that my KryoRegistrator is being called. >>> However, it's not clear why there's a follow-on NPE. My spark log level is >>> set to DEBUG in log4j.properties (log4j.rootCategory=DEBUG) so not sure if >>> there s >>> some other way to get the executor to be more verbose as to the cause of >>> the NPE. >>> >>> When I take out the spark.closure.serializer setting (i.e., go back to >>> the default Java serialization), the executors start up fine, and executes >>> other RDD actions, but of course not the lookup action (my original >>> problem). With the spark.closure.serializer setting to kryo, it dies with >>> an NPE during executor startup. >>> >>> >>> 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Connecting to driver: >>> akka.tcp://[redacted]:48147/user/StandaloneScheduler<http://sp...@karadi.spadac.com:48147/user/StandaloneScheduler> >>> 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Successfully >>> registered with driver >>> 13/12/23 11:00:36 INFO Slf4jLogger: Slf4jLogger started >>> 13/12/23 11:00:36 INFO Remoting: Starting remoting >>> 13/12/23 11:00:36 INFO Remoting: Remoting started; listening on >>> addresses >>> :[akka.tcp:/[redacted]:56483<http://sp...@karadi.spadac.com:56483/> >>> ] >>> 13/12/23 11:00:36 INFO Remoting: Remoting now listens on addresses: >>> [akka.tcp://[redacted]:56483 <http://sp...@karadi.spadac.com:56483/>] >>> 13/12/23 11:00:36 INFO SparkEnv: Connecting to BlockManagerMaster: >>> akka.tcp://[redacted]:48147/user/BlockManagerMaster<http://sp...@karadi.spadac.com:48147/user/BlockManagerMaster> >>> 13/12/23 11:00:36 INFO MemoryStore: MemoryStore started with capacity >>> 323.9 MB. >>> 13/12/23 11:00:36 DEBUG DiskStore: Creating local directories at root >>> dirs '/tmp' >>> 13/12/23 11:00:36 INFO DiskStore: Created local directory at >>> /tmp/spark-local-20131223110036-4335 >>> 13/12/23 11:00:36 INFO ConnectionManager: Bound socket to port 41617 >>> with id = ConnectionManagerId([redacted],41617) >>> 13/12/23 11:00:36 INFO BlockManagerMaster: Trying to register >>> BlockManager >>> 13/12/23 11:00:36 INFO BlockManagerMaster: Registered BlockManager >>> 13/12/23 11:00:36 INFO SparkEnv: Connecting to MapOutputTracker: >>> akka.tcp:/[redacted]:48147/user/MapOutputTracker<http://sp...@karadi.spadac.com:48147/user/MapOutputTracker> >>> 13/12/23 11:00:36 INFO HttpFileServer: HTTP File server directory is >>> /tmp/spark-e71e0a2b-a247-4bb8-b06d-19c12467b65a >>> 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 0 >>> 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 1 >>> 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 2 >>> 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 3 >>> 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator: >>> geotrellis.spark.KryoRegistrator >>> 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator: >>> geotrellis.spark.KryoRegistrator >>> 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator: >>> geotrellis.spark.KryoRegistrator >>> 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator: >>> geotrellis.spark.KryoRegistrator >>> registered TileIdWritable >>> registered TileIdWritable >>> registered TileIdWritable >>> registered TileIdWritable >>> registered ArgWritable >>> registered ArgWritable >>> registered ArgWritable >>> registered ArgWritable >>> 13/12/23 11:00:37 INFO Executor: Running task ID 2 >>> 13/12/23 11:00:37 INFO Executor: Running task ID 1 >>> 13/12/23 11:00:37 INFO Executor: Running task ID 3 >>> 13/12/23 11:00:37 INFO Executor: Running task ID 0 >>> 13/12/23 11:00:37 INFO Executor: Fetching >>> http://10.194.70.21:51166/jars/geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar with >>> timestamp 1387814434436 >>> 13/12/23 11:00:37 INFO Utils: Fetching >>> http://10.194.70.21:51166/jars/geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar to >>> /tmp/fetchFileTemp2456419097284083628.tmp >>> 13/12/23 11:00:37 INFO Executor: Adding >>> file[redacted]/spark/work/app-20131223110034-0000/0/./geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar >>> to class loader >>> 13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread >>> Thread[pool-7-thread-4,5,main] >>> java.lang.NullPointerException >>> at >>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195) >>> at >>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195) >>> at scala.Option.flatMap(Option.scala:170) >>> at >>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195) >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>> at java.lang.Thread.run(Thread.java:724) >>> 13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread >>> Thread[pool-7-thread-2,5,main] >>> java.lang.NullPointerException >>> at >>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195) >>> at >>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195) >>> at scala.Option.flatMap(Option.scala:170) >>> at >>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195) >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>> at java.lang.Thread.run(Thread.java:724) >>> 13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread >>> Thread[pool-7-thread-1,5,main] >>> java.lang.NullPointerException >>> at >>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195) >>> at >>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195) >>> at scala.Option.flatMap(Option.scala:170) >>> at >>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195) >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>> at java.lang.Thread.run(Thread.java:724) >>> 13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread >>> Thread[pool-7-thread-3,5,main] >>> java.lang.NullPointerException >>> at >>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195) >>> at >>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195) >>> at scala.Option.flatMap(Option.scala:170) >>> at >>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195) >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>> at java.lang.Thread.run(Thread.java:724) >>> 13/12/23 11:00:37 DEBUG DiskStore: Shutdown hook called >>> >>> Thanks, >>> Ameet >>> >>> >>> On Fri, Dec 20, 2013 at 3:24 PM, Imran Rashid <im...@quantifind.com>wrote: >>> >>>> there is a separate setting for serializing closures >>>> "spark.closure.serializer" (listed here >>>> http://spark.incubator.apache.org/docs/latest/configuration.html) >>>> >>>> that is used to serialize whatever is used by all the fucntions on an >>>> RDD, eg., map, filter, and lookup. Those closures include referenced >>>> variables, like your >>>> TileIdWritable. >>>> >>>> So you need to either change that to use kryo, or make your object >>>> serializable to java. >>>> >>>> >>>> >>>> On Fri, Dec 20, 2013 at 2:18 PM, Ameet Kini <ameetk...@gmail.com>wrote: >>>> >>>>> >>>>> I'm getting the below NotSerializableException despite using Kryo to >>>>> serialize that class (TileIdWritable). >>>>> >>>>> The offending line: awtestRdd.lookup(TileIdWritable(200)) >>>>> >>>>> Initially I thought Kryo is not being registered properly, so I tried >>>>> running operations over awtestRDD which force a shuffle (e.g., >>>>> groupByKey), >>>>> and that seemed to work fine. So it seems to be specific to the >>>>> "TileIdWritable(200)" argument to lookup(). Is there anything unique >>>>> about >>>>> companion objects and Kryo serialization? I even replaced >>>>> "TileIdWritable(200)" by "new TileIdWritable" but still see that exception >>>>> >>>>> >>>>> class TileIdWritable { >>>>> // >>>>> } >>>>> >>>>> object TileIdWritable { >>>>> def apply(value: Long) = new TileIdWritable >>>>> } >>>>> >>>>> >>>>> My Kryo registrator: >>>>> class KryoRegistrator extends SparkKryoRegistrator { >>>>> override def registerClasses(kryo: Kryo) { >>>>> println("Called KryoRegistrator") // I see this printed during >>>>> shuffle operations >>>>> val r = kryo.register(classOf[TileIdWritable]) >>>>> val s = kryo.register(classOf[ArgWritable]) >>>>> } >>>>> } >>>>> >>>>> Then just before creating a Spark Context, I have these two lines: >>>>> System.setProperty("spark.serializer", >>>>> "org.apache.spark.serializer.KryoSerializer") >>>>> System.setProperty("spark.kryo.registrator", >>>>> "geotrellis.spark.KryoRegistrator") >>>>> >>>>> >>>>> >>>>> >>>>> The exception itself: >>>>> Exception in thread "main" org.apache.spark.SparkException: Job >>>>> failed: Task not serializable: java.io.NotSerializableException: >>>>> geotrellis.spark.formats.TileIdWritable >>>>> - field (class "org.apache.spark.rdd.PairRDDFunctions$$anonfun$4", >>>>> name: "key$1", type: "class java.lang.Object") >>>>> - object (class >>>>> "org.apache.spark.rdd.PairRDDFunctions$$anonfun$4", <function1>) >>>>> - field (class "org.apache.spark.SparkContext$$anonfun$runJob$4", >>>>> name: "func$1", type: "interface scala.Function1") >>>>> - root object (class >>>>> "org.apache.spark.SparkContext$$anonfun$runJob$4", <function2>) >>>>> at >>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:763) >>>>> at >>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:761) >>>>> at >>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >>>>> at >>>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >>>>> at >>>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:761) >>>>> at org.apache.spark.scheduler.DAGScheduler.org >>>>> $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556) >>>>> at org.apache.spark.scheduler.DAGScheduler.org >>>>> $apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503) >>>>> at >>>>> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361) >>>>> at org.apache.spark.scheduler.DAGScheduler.org >>>>> $apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441) >>>>> at >>>>> org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149) >>>>> >>>>> Regards, >>>>> Ameet >>>>> >>>>> >>>>> >>>>> >>>>> >>>> >>> >> >