In scala case classes are serializable by default, your TileIdWritable
should be a case class. I usually enable Kryo ser for objects and keep
default ser for closures, this works pretty well.

Eugen


2013/12/24 Ameet Kini <ameetk...@gmail.com>

>
> If Java serialization is the only one that properly works for closures,
> then I shouldn't be setting "spark.closure.serializer" to
> "org.apache.spark.serializer.KryoSerializer", and my only hope for getting
> lookup (and other such methods that still use closure serializers) to work
> is to either a) use only Java serialization for my objects, or b) have my
> objects implement Serializable but enable Kryo as well for object
> serialization. For option b, I'd be setting "spark.serializer" to
> "org.apache.spark.serializer.KryoSerializer" but leave
> "spark.closure.serializer" to its default value (Java).
>
> If the above is true, then seems like as it stands today, the best
> practice is for objects that use Kryo to also either implement Serializable
> or Externalizable for closures to work properly.
>
> Thanks,
> Ameet
>
>
>
>
> On Mon, Dec 23, 2013 at 5:18 PM, Dmitriy Lyubimov <dlie...@gmail.com>wrote:
>
>> The problem really is that in certain cases task results -- and
>> front-end-passed parameters -- are passed thru closures. For closures, only
>> java serializer is properly supported (afaik).
>>
>> there has been a limited number of fixes for data parameter communication
>> between front end and backend for using other-than-java serialization (e.g.
>> for parallelize(), collect()-- these methods do not use closures to pass
>> in/grab data objects anymore); however, a certain number of methods is
>> still using closures to pass in a data object.
>>
>> afaik the methods doing correct front/back end parameter serialization
>> are:
>>
>> collect()
>> take() (maybe)
>> parallelize()
>> reduce()
>>
>> Everything else ("fold()", etc.) that communicates data between front end
>> and backend, still wraps data into closures. For a thing like fold() in
>> fact you'd have to use type that has both Java and Kryo support at the same
>> time, because it will always use both closure and object serializers while
>> executing.
>>
>> This IMO is inconsistent of course with assumption that same data type
>> should be supported uniformly regardless of where it serializes, but that's
>> the state of things as it stands.
>>
>>
>>
>> On Mon, Dec 23, 2013 at 8:21 AM, Ameet Kini <ameetk...@gmail.com> wrote:
>>
>>> Thanks Imran.
>>>
>>> I tried setting "spark.closure.serializer" to
>>> "org.apache.spark.serializer.KryoSerializer" and now end up seeing
>>> NullPointerException when the executor starts up. This is a snippet of the
>>> executor's log. Notice how "registered TileIdWritable" and "registered
>>> ArgWritable" is called, so I see that my KryoRegistrator is being called.
>>> However, it's not clear why there's a follow-on NPE. My spark log level is
>>> set to DEBUG in log4j.properties (log4j.rootCategory=DEBUG) so not sure if
>>> there s
>>> some other way to get the executor to be more verbose as to the cause of
>>> the NPE.
>>>
>>> When I take out the spark.closure.serializer setting (i.e., go back to
>>> the default Java serialization), the executors start up fine, and executes
>>> other RDD actions, but of course not the lookup action (my original
>>> problem). With the spark.closure.serializer setting to kryo, it dies with
>>> an NPE during executor startup.
>>>
>>>
>>> 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Connecting to driver:
>>> akka.tcp://[redacted]:48147/user/StandaloneScheduler<http://sp...@karadi.spadac.com:48147/user/StandaloneScheduler>
>>> 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Successfully
>>> registered with driver
>>> 13/12/23 11:00:36 INFO Slf4jLogger: Slf4jLogger started
>>> 13/12/23 11:00:36 INFO Remoting: Starting remoting
>>> 13/12/23 11:00:36 INFO Remoting: Remoting started; listening on
>>> addresses 
>>> :[akka.tcp:/[redacted]:56483<http://sp...@karadi.spadac.com:56483/>
>>> ]
>>> 13/12/23 11:00:36 INFO Remoting: Remoting now listens on addresses:
>>> [akka.tcp://[redacted]:56483 <http://sp...@karadi.spadac.com:56483/>]
>>> 13/12/23 11:00:36 INFO SparkEnv: Connecting to BlockManagerMaster:
>>> akka.tcp://[redacted]:48147/user/BlockManagerMaster<http://sp...@karadi.spadac.com:48147/user/BlockManagerMaster>
>>> 13/12/23 11:00:36 INFO MemoryStore: MemoryStore started with capacity
>>> 323.9 MB.
>>> 13/12/23 11:00:36 DEBUG DiskStore: Creating local directories at root
>>> dirs '/tmp'
>>> 13/12/23 11:00:36 INFO DiskStore: Created local directory at
>>> /tmp/spark-local-20131223110036-4335
>>> 13/12/23 11:00:36 INFO ConnectionManager: Bound socket to port 41617
>>> with id = ConnectionManagerId([redacted],41617)
>>> 13/12/23 11:00:36 INFO BlockManagerMaster: Trying to register
>>> BlockManager
>>> 13/12/23 11:00:36 INFO BlockManagerMaster: Registered BlockManager
>>> 13/12/23 11:00:36 INFO SparkEnv: Connecting to MapOutputTracker:
>>> akka.tcp:/[redacted]:48147/user/MapOutputTracker<http://sp...@karadi.spadac.com:48147/user/MapOutputTracker>
>>> 13/12/23 11:00:36 INFO HttpFileServer: HTTP File server directory is
>>> /tmp/spark-e71e0a2b-a247-4bb8-b06d-19c12467b65a
>>> 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 0
>>> 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 1
>>> 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 2
>>> 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 3
>>> 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator:
>>> geotrellis.spark.KryoRegistrator
>>> 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator:
>>> geotrellis.spark.KryoRegistrator
>>> 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator:
>>> geotrellis.spark.KryoRegistrator
>>> 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator:
>>> geotrellis.spark.KryoRegistrator
>>> registered TileIdWritable
>>> registered TileIdWritable
>>> registered TileIdWritable
>>> registered TileIdWritable
>>> registered ArgWritable
>>> registered ArgWritable
>>> registered ArgWritable
>>> registered ArgWritable
>>> 13/12/23 11:00:37 INFO Executor: Running task ID 2
>>> 13/12/23 11:00:37 INFO Executor: Running task ID 1
>>> 13/12/23 11:00:37 INFO Executor: Running task ID 3
>>> 13/12/23 11:00:37 INFO Executor: Running task ID 0
>>> 13/12/23 11:00:37 INFO Executor: Fetching
>>> http://10.194.70.21:51166/jars/geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar with
>>> timestamp 1387814434436
>>> 13/12/23 11:00:37 INFO Utils: Fetching
>>> http://10.194.70.21:51166/jars/geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar to
>>> /tmp/fetchFileTemp2456419097284083628.tmp
>>> 13/12/23 11:00:37 INFO Executor: Adding
>>> file[redacted]/spark/work/app-20131223110034-0000/0/./geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar
>>> to class loader
>>> 13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread
>>> Thread[pool-7-thread-4,5,main]
>>> java.lang.NullPointerException
>>>         at
>>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
>>>         at
>>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
>>>         at scala.Option.flatMap(Option.scala:170)
>>>         at
>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
>>>         at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>         at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>         at java.lang.Thread.run(Thread.java:724)
>>> 13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread
>>> Thread[pool-7-thread-2,5,main]
>>> java.lang.NullPointerException
>>>         at
>>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
>>>         at
>>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
>>>        at scala.Option.flatMap(Option.scala:170)
>>>         at
>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
>>>         at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>         at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>         at java.lang.Thread.run(Thread.java:724)
>>> 13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread
>>> Thread[pool-7-thread-1,5,main]
>>> java.lang.NullPointerException
>>>         at
>>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
>>>         at
>>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
>>>         at scala.Option.flatMap(Option.scala:170)
>>>         at
>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
>>>         at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>         at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>         at java.lang.Thread.run(Thread.java:724)
>>> 13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread
>>> Thread[pool-7-thread-3,5,main]
>>> java.lang.NullPointerException
>>>         at
>>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
>>>         at
>>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
>>>         at scala.Option.flatMap(Option.scala:170)
>>>         at
>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
>>>         at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>         at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>         at java.lang.Thread.run(Thread.java:724)
>>> 13/12/23 11:00:37 DEBUG DiskStore: Shutdown hook called
>>>
>>> Thanks,
>>> Ameet
>>>
>>>
>>> On Fri, Dec 20, 2013 at 3:24 PM, Imran Rashid <im...@quantifind.com>wrote:
>>>
>>>> there is a separate setting for serializing closures
>>>> "spark.closure.serializer" (listed here
>>>> http://spark.incubator.apache.org/docs/latest/configuration.html)
>>>>
>>>> that is used to serialize whatever is used by all the fucntions on an
>>>> RDD, eg., map, filter, and lookup.  Those closures include referenced
>>>> variables, like your
>>>> TileIdWritable.
>>>>
>>>> So you need to either change that to use kryo, or make your object
>>>> serializable to java.
>>>>
>>>>
>>>>
>>>> On Fri, Dec 20, 2013 at 2:18 PM, Ameet Kini <ameetk...@gmail.com>wrote:
>>>>
>>>>>
>>>>> I'm getting the below NotSerializableException despite using Kryo to
>>>>> serialize that class (TileIdWritable).
>>>>>
>>>>> The offending line: awtestRdd.lookup(TileIdWritable(200))
>>>>>
>>>>> Initially I thought Kryo is not being registered properly, so I tried
>>>>> running operations over awtestRDD which force a shuffle (e.g., 
>>>>> groupByKey),
>>>>> and that seemed to work fine. So it seems to be specific to the
>>>>> "TileIdWritable(200)" argument to lookup().  Is there anything unique 
>>>>> about
>>>>> companion objects and Kryo serialization? I even replaced
>>>>> "TileIdWritable(200)" by "new TileIdWritable" but still see that exception
>>>>>
>>>>>
>>>>> class TileIdWritable {
>>>>>  //
>>>>> }
>>>>>
>>>>> object TileIdWritable {
>>>>>  def apply(value: Long) = new TileIdWritable
>>>>> }
>>>>>
>>>>>
>>>>> My Kryo registrator:
>>>>> class KryoRegistrator extends SparkKryoRegistrator {
>>>>>     override def registerClasses(kryo: Kryo) {
>>>>>       println("Called KryoRegistrator")  // I see this printed during
>>>>> shuffle operations
>>>>>       val r = kryo.register(classOf[TileIdWritable])
>>>>>       val s = kryo.register(classOf[ArgWritable])
>>>>>     }
>>>>> }
>>>>>
>>>>> Then just before creating a Spark Context, I have these two lines:
>>>>>     System.setProperty("spark.serializer",
>>>>> "org.apache.spark.serializer.KryoSerializer")
>>>>>     System.setProperty("spark.kryo.registrator",
>>>>> "geotrellis.spark.KryoRegistrator")
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> The exception itself:
>>>>> Exception in thread "main" org.apache.spark.SparkException: Job
>>>>> failed: Task not serializable: java.io.NotSerializableException:
>>>>> geotrellis.spark.formats.TileIdWritable
>>>>>     - field (class "org.apache.spark.rdd.PairRDDFunctions$$anonfun$4",
>>>>> name: "key$1", type: "class java.lang.Object")
>>>>>     - object (class
>>>>> "org.apache.spark.rdd.PairRDDFunctions$$anonfun$4", <function1>)
>>>>>     - field (class "org.apache.spark.SparkContext$$anonfun$runJob$4",
>>>>> name: "func$1", type: "interface scala.Function1")
>>>>>     - root object (class
>>>>> "org.apache.spark.SparkContext$$anonfun$runJob$4", <function2>)
>>>>>     at
>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:763)
>>>>>     at
>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:761)
>>>>>     at
>>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>>>>     at
>>>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>>>     at
>>>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:761)
>>>>>     at org.apache.spark.scheduler.DAGScheduler.org
>>>>> $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556)
>>>>>     at org.apache.spark.scheduler.DAGScheduler.org
>>>>> $apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503)
>>>>>     at
>>>>> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361)
>>>>>     at org.apache.spark.scheduler.DAGScheduler.org
>>>>> $apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
>>>>>     at
>>>>> org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)
>>>>>
>>>>> Regards,
>>>>> Ameet
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to