Re: debugging NotSerializableException while using Kryo

2013-12-24 Thread Dmitriy Lyubimov
On Tue, Dec 24, 2013 at 7:29 AM, Ameet Kini  wrote:

>
> If Java serialization is the only one that properly works for closures,
> then I shouldn't be setting "spark.closure.serializer" to
> "org.apache.spark.serializer.KryoSerializer",
>

My understanding is that it's not that it kryo wouldn't necessarily work
for closures, it's just that closure serialization is used not only for
user defined closures but also for a lot of closures internal to Spark. For
them, of course, there's no way to enable kryo support.

Data objects (and their serializer) on the other hand are not defined
anywhere but the user's code. Therefore overriding object serializer is in
general a safe assumption and overriding closure serializer is not.


> and my only hope for getting lookup (and other such methods that still use
> closure serializers) to work is to either a) use only Java serialization
> for my objects, or b) have my objects implement Serializable but enable
> Kryo as well for object serialization.
>

or c) avoid using Spark api that currently uses closures to communicate
data objects between front end and backend for kryo-only objects (as i do).
The most annoying of those is fold().

In fact, d) you can always wrap a kryo object into byte array in front end,
pass in java-serializable byte array thru a closure, and unwrap it in
backend. This technique is extremely ugly though with methods like fold()
which force you to use the same object type in front end and backend
operation (and obviously it may be not the type you want for the purposes
of elegant folding). I had this problem extensively with 3rd party types
for which i have no desire to add java serialization (mostly, hadoop
Writables of various kind) so the obvious desirable solution is to add kryo
support for them without having to modify the original class. This mostly
works for me.


> For option b, I'd be setting "spark.serializer" to
> "org.apache.spark.serializer.KryoSerializer" but leave
> "spark.closure.serializer" to its default value (Java).
>
Yes. like i said changing closure serializer is undesirable unless you can
guarantee proper serialization support for all Spark internal closures as
well your own closures.


>
> If the above is true, then seems like as it stands today, the best
> practice is for objects that use Kryo to also either implement Serializable
> or Externalizable for closures to work properly.
>

Again, this is only true for a smaller portion of spark api.  Most of spark
api doesn't have this problem so you may well get away with either not
using them or pre-serialize objects into byte arrays while using
"problematic" api.


> Thanks,
> Ameet
>
>
>
>
> On Mon, Dec 23, 2013 at 5:18 PM, Dmitriy Lyubimov wrote:
>
>> The problem really is that in certain cases task results -- and
>> front-end-passed parameters -- are passed thru closures. For closures, only
>> java serializer is properly supported (afaik).
>>
>> there has been a limited number of fixes for data parameter communication
>> between front end and backend for using other-than-java serialization (e.g.
>> for parallelize(), collect()-- these methods do not use closures to pass
>> in/grab data objects anymore); however, a certain number of methods is
>> still using closures to pass in a data object.
>>
>> afaik the methods doing correct front/back end parameter serialization
>> are:
>>
>> collect()
>> take() (maybe)
>> parallelize()
>> reduce()
>>
>> Everything else ("fold()", etc.) that communicates data between front end
>> and backend, still wraps data into closures. For a thing like fold() in
>> fact you'd have to use type that has both Java and Kryo support at the same
>> time, because it will always use both closure and object serializers while
>> executing.
>>
>> This IMO is inconsistent of course with assumption that same data type
>> should be supported uniformly regardless of where it serializes, but that's
>> the state of things as it stands.
>>
>>
>>
>> On Mon, Dec 23, 2013 at 8:21 AM, Ameet Kini  wrote:
>>
>>> Thanks Imran.
>>>
>>> I tried setting "spark.closure.serializer" to
>>> "org.apache.spark.serializer.KryoSerializer" and now end up seeing
>>> NullPointerException when the executor starts up. This is a snippet of the
>>> executor's log. Notice how "registered TileIdWritable" and "registered
>>> ArgWritable" is called, so I see that my KryoRegistrator is being called.
>>> However, it's not clear why there's a follow-on NPE. My spark log level is
>>> set to DEBUG in log4j.properties (log4j.rootCategory=DEBUG) so not sure if
>>> there s
>>> some other way to get the executor to be more verbose as to the cause of
>>> the NPE.
>>>
>>> When I take out the spark.closure.serializer setting (i.e., go back to
>>> the default Java serialization), the executors start up fine, and executes
>>> other RDD actions, but of course not the lookup action (my original
>>> problem). With the spark.closure.serializer setting to kryo, it dies with
>>> an NPE during executor startup.
>>

Re: debugging NotSerializableException while using Kryo

2013-12-24 Thread Eugen Cepoi
In scala case classes are serializable by default, your TileIdWritable
should be a case class. I usually enable Kryo ser for objects and keep
default ser for closures, this works pretty well.

Eugen


2013/12/24 Ameet Kini 

>
> If Java serialization is the only one that properly works for closures,
> then I shouldn't be setting "spark.closure.serializer" to
> "org.apache.spark.serializer.KryoSerializer", and my only hope for getting
> lookup (and other such methods that still use closure serializers) to work
> is to either a) use only Java serialization for my objects, or b) have my
> objects implement Serializable but enable Kryo as well for object
> serialization. For option b, I'd be setting "spark.serializer" to
> "org.apache.spark.serializer.KryoSerializer" but leave
> "spark.closure.serializer" to its default value (Java).
>
> If the above is true, then seems like as it stands today, the best
> practice is for objects that use Kryo to also either implement Serializable
> or Externalizable for closures to work properly.
>
> Thanks,
> Ameet
>
>
>
>
> On Mon, Dec 23, 2013 at 5:18 PM, Dmitriy Lyubimov wrote:
>
>> The problem really is that in certain cases task results -- and
>> front-end-passed parameters -- are passed thru closures. For closures, only
>> java serializer is properly supported (afaik).
>>
>> there has been a limited number of fixes for data parameter communication
>> between front end and backend for using other-than-java serialization (e.g.
>> for parallelize(), collect()-- these methods do not use closures to pass
>> in/grab data objects anymore); however, a certain number of methods is
>> still using closures to pass in a data object.
>>
>> afaik the methods doing correct front/back end parameter serialization
>> are:
>>
>> collect()
>> take() (maybe)
>> parallelize()
>> reduce()
>>
>> Everything else ("fold()", etc.) that communicates data between front end
>> and backend, still wraps data into closures. For a thing like fold() in
>> fact you'd have to use type that has both Java and Kryo support at the same
>> time, because it will always use both closure and object serializers while
>> executing.
>>
>> This IMO is inconsistent of course with assumption that same data type
>> should be supported uniformly regardless of where it serializes, but that's
>> the state of things as it stands.
>>
>>
>>
>> On Mon, Dec 23, 2013 at 8:21 AM, Ameet Kini  wrote:
>>
>>> Thanks Imran.
>>>
>>> I tried setting "spark.closure.serializer" to
>>> "org.apache.spark.serializer.KryoSerializer" and now end up seeing
>>> NullPointerException when the executor starts up. This is a snippet of the
>>> executor's log. Notice how "registered TileIdWritable" and "registered
>>> ArgWritable" is called, so I see that my KryoRegistrator is being called.
>>> However, it's not clear why there's a follow-on NPE. My spark log level is
>>> set to DEBUG in log4j.properties (log4j.rootCategory=DEBUG) so not sure if
>>> there s
>>> some other way to get the executor to be more verbose as to the cause of
>>> the NPE.
>>>
>>> When I take out the spark.closure.serializer setting (i.e., go back to
>>> the default Java serialization), the executors start up fine, and executes
>>> other RDD actions, but of course not the lookup action (my original
>>> problem). With the spark.closure.serializer setting to kryo, it dies with
>>> an NPE during executor startup.
>>>
>>>
>>> 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Connecting to driver:
>>> akka.tcp://[redacted]:48147/user/StandaloneScheduler
>>> 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Successfully
>>> registered with driver
>>> 13/12/23 11:00:36 INFO Slf4jLogger: Slf4jLogger started
>>> 13/12/23 11:00:36 INFO Remoting: Starting remoting
>>> 13/12/23 11:00:36 INFO Remoting: Remoting started; listening on
>>> addresses 
>>> :[akka.tcp:/[redacted]:56483
>>> ]
>>> 13/12/23 11:00:36 INFO Remoting: Remoting now listens on addresses:
>>> [akka.tcp://[redacted]:56483 ]
>>> 13/12/23 11:00:36 INFO SparkEnv: Connecting to BlockManagerMaster:
>>> akka.tcp://[redacted]:48147/user/BlockManagerMaster
>>> 13/12/23 11:00:36 INFO MemoryStore: MemoryStore started with capacity
>>> 323.9 MB.
>>> 13/12/23 11:00:36 DEBUG DiskStore: Creating local directories at root
>>> dirs '/tmp'
>>> 13/12/23 11:00:36 INFO DiskStore: Created local directory at
>>> /tmp/spark-local-20131223110036-4335
>>> 13/12/23 11:00:36 INFO ConnectionManager: Bound socket to port 41617
>>> with id = ConnectionManagerId([redacted],41617)
>>> 13/12/23 11:00:36 INFO BlockManagerMaster: Trying to register
>>> BlockManager
>>> 13/12/23 11:00:36 INFO BlockManagerMaster: Registered BlockManager
>>> 13/12/23 11:00:36 INFO SparkEnv: Connecting to MapOutputTracker:
>>> akka.tcp:/[redacted]:48147/user/MapOutputTracker

Re: debugging NotSerializableException while using Kryo

2013-12-24 Thread Ameet Kini
If Java serialization is the only one that properly works for closures,
then I shouldn't be setting "spark.closure.serializer" to
"org.apache.spark.serializer.KryoSerializer", and my only hope for getting
lookup (and other such methods that still use closure serializers) to work
is to either a) use only Java serialization for my objects, or b) have my
objects implement Serializable but enable Kryo as well for object
serialization. For option b, I'd be setting "spark.serializer" to
"org.apache.spark.serializer.KryoSerializer" but leave
"spark.closure.serializer" to its default value (Java).

If the above is true, then seems like as it stands today, the best practice
is for objects that use Kryo to also either implement Serializable or
Externalizable for closures to work properly.

Thanks,
Ameet




On Mon, Dec 23, 2013 at 5:18 PM, Dmitriy Lyubimov  wrote:

> The problem really is that in certain cases task results -- and
> front-end-passed parameters -- are passed thru closures. For closures, only
> java serializer is properly supported (afaik).
>
> there has been a limited number of fixes for data parameter communication
> between front end and backend for using other-than-java serialization (e.g.
> for parallelize(), collect()-- these methods do not use closures to pass
> in/grab data objects anymore); however, a certain number of methods is
> still using closures to pass in a data object.
>
> afaik the methods doing correct front/back end parameter serialization are:
>
> collect()
> take() (maybe)
> parallelize()
> reduce()
>
> Everything else ("fold()", etc.) that communicates data between front end
> and backend, still wraps data into closures. For a thing like fold() in
> fact you'd have to use type that has both Java and Kryo support at the same
> time, because it will always use both closure and object serializers while
> executing.
>
> This IMO is inconsistent of course with assumption that same data type
> should be supported uniformly regardless of where it serializes, but that's
> the state of things as it stands.
>
>
>
> On Mon, Dec 23, 2013 at 8:21 AM, Ameet Kini  wrote:
>
>> Thanks Imran.
>>
>> I tried setting "spark.closure.serializer" to
>> "org.apache.spark.serializer.KryoSerializer" and now end up seeing
>> NullPointerException when the executor starts up. This is a snippet of the
>> executor's log. Notice how "registered TileIdWritable" and "registered
>> ArgWritable" is called, so I see that my KryoRegistrator is being called.
>> However, it's not clear why there's a follow-on NPE. My spark log level is
>> set to DEBUG in log4j.properties (log4j.rootCategory=DEBUG) so not sure if
>> there s
>> some other way to get the executor to be more verbose as to the cause of
>> the NPE.
>>
>> When I take out the spark.closure.serializer setting (i.e., go back to
>> the default Java serialization), the executors start up fine, and executes
>> other RDD actions, but of course not the lookup action (my original
>> problem). With the spark.closure.serializer setting to kryo, it dies with
>> an NPE during executor startup.
>>
>>
>> 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Connecting to driver:
>> akka.tcp://[redacted]:48147/user/StandaloneScheduler
>> 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Successfully registered
>> with driver
>> 13/12/23 11:00:36 INFO Slf4jLogger: Slf4jLogger started
>> 13/12/23 11:00:36 INFO Remoting: Starting remoting
>> 13/12/23 11:00:36 INFO Remoting: Remoting started; listening on addresses
>> :[akka.tcp:/[redacted]:56483 ]
>> 13/12/23 11:00:36 INFO Remoting: Remoting now listens on addresses:
>> [akka.tcp://[redacted]:56483 ]
>> 13/12/23 11:00:36 INFO SparkEnv: Connecting to BlockManagerMaster:
>> akka.tcp://[redacted]:48147/user/BlockManagerMaster
>> 13/12/23 11:00:36 INFO MemoryStore: MemoryStore started with capacity
>> 323.9 MB.
>> 13/12/23 11:00:36 DEBUG DiskStore: Creating local directories at root
>> dirs '/tmp'
>> 13/12/23 11:00:36 INFO DiskStore: Created local directory at
>> /tmp/spark-local-20131223110036-4335
>> 13/12/23 11:00:36 INFO ConnectionManager: Bound socket to port 41617 with
>> id = ConnectionManagerId([redacted],41617)
>> 13/12/23 11:00:36 INFO BlockManagerMaster: Trying to register BlockManager
>> 13/12/23 11:00:36 INFO BlockManagerMaster: Registered BlockManager
>> 13/12/23 11:00:36 INFO SparkEnv: Connecting to MapOutputTracker:
>> akka.tcp:/[redacted]:48147/user/MapOutputTracker
>> 13/12/23 11:00:36 INFO HttpFileServer: HTTP File server directory is
>> /tmp/spark-e71e0a2b-a247-4bb8-b06d-19c12467b65a
>> 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 0
>> 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 1
>> 13/12/23 11:00:36 INFO Stand

Re: debugging NotSerializableException while using Kryo

2013-12-24 Thread Ameet Kini
Hi Michael,

I re-ran this on another machine which is on spark's master branch
0.9.0-SNAPSHOT from Dec 14 (right after the scala 2.10 branch was merged
back into master) and recreated the NPE towards the end of this message. I
can't tell looking at the relevant code what may have caused the exception
because line 262 is part of the catch block of a pretty big try/catch
block.


Executor.scala Line 262:  val metrics = attemptedTask.flatMap(t =>
t.metrics)

The surrounding lines are:
   case t: Throwable => {
  val serviceTime = (System.currentTimeMillis() - taskStart).toInt
Line 262 -->  val metrics = attemptedTask.flatMap(t => t.metrics)
  for (m <- metrics) {
m.executorRunTime = serviceTime
m.jvmGCTime = gcTime - startGCTime
  }
  val reason = ExceptionFailure(t.getClass.getName, t.toString,
t.getStackTrace, metrics)
  execBackend.statusUpdate(taskId, TaskState.FAILED,
ser.serialize(reason))

  // TODO: Should we exit the whole executor here? On the one hand,
the failed task may
  // have left some weird state around depending on when the
exception was thrown, but on
  // the other hand, maybe we could detect that when future tasks
fail and exit then.
  logError("Exception in task ID " + taskId, t)
  //System.exit(1)
}




13/12/24 10:02:33 ERROR Executor: Uncaught exception in thread
Thread[Executor task launch worker-4,5,main]
java.lang.NullPointerException
at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1$$anonfun$8.apply(Executor.scala:262)
at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1$$anonfun$8.apply(Executor.scala:262)
at scala.Option.flatMap(Option.scala:170)
at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:262)
at
org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:50)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:182)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:724)

Thanks,
Ameet



On Mon, Dec 23, 2013 at 4:58 PM, Michael (Bach) Bui wrote:

> What spark version are you using? By looking at the code Executor.scala
> line195, you will at least know what cause the NPE.
> We can start from there.
>
>
>
>
> On Dec 23, 2013, at 10:21 AM, Ameet Kini  wrote:
>
> Thanks Imran.
>
> I tried setting "spark.closure.serializer" to
> "org.apache.spark.serializer.KryoSerializer" and now end up seeing
> NullPointerException when the executor starts up. This is a snippet of the
> executor's log. Notice how "registered TileIdWritable" and "registered
> ArgWritable" is called, so I see that my KryoRegistrator is being called.
> However, it's not clear why there's a follow-on NPE. My spark log level is
> set to DEBUG in log4j.properties (log4j.rootCategory=DEBUG) so not sure if
> there s
> some other way to get the executor to be more verbose as to the cause of
> the NPE.
>
> When I take out the spark.closure.serializer setting (i.e., go back to the
> default Java serialization), the executors start up fine, and executes
> other RDD actions, but of course not the lookup action (my original
> problem). With the spark.closure.serializer setting to kryo, it dies with
> an NPE during executor startup.
>
>
> 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Connecting to driver:
> akka.tcp://[redacted]:48147/user/StandaloneScheduler
> 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Successfully registered
> with driver
> 13/12/23 11:00:36 INFO Slf4jLogger: Slf4jLogger started
> 13/12/23 11:00:36 INFO Remoting: Starting remoting
> 13/12/23 11:00:36 INFO Remoting: Remoting started; listening on addresses
> :[akka.tcp:/[redacted]:56483 ]
> 13/12/23 11:00:36 INFO Remoting: Remoting now listens on addresses: [
> akka.tcp://[redacted]:56483 ]
> 13/12/23 11:00:36 INFO SparkEnv: Connecting to BlockManagerMaster:
> akka.tcp://[redacted]:48147/user/BlockManagerMaster
> 13/12/23 11:00:36 INFO MemoryStore: MemoryStore started with capacity
> 323.9 MB.
> 13/12/23 11:00:36 DEBUG DiskStore: Creating local directories at root dirs
> '/tmp'
> 13/12/23 11:00:36 INFO DiskStore: Created local directory at
> /tmp/spark-local-20131223110036-4335
> 13/12/23 11:00:36 INFO ConnectionManager: Bound socket to port 41617 with
> id = ConnectionManagerId([redacted],41617)
> 13/12/23 11:00:36 INFO BlockManagerMaster: Trying to register BlockManager
> 13/12/23 11:00:36 INFO BlockManagerMaster: Registered BlockManager
> 13/12/23 11:00:36 INFO SparkEnv: Connecting to MapOutputTracker:
> akka.tcp:/[redacted]:48147/user/MapOutputTrac

Re: debugging NotSerializableException while using Kryo

2013-12-23 Thread Dmitriy Lyubimov
The problem really is that in certain cases task results -- and
front-end-passed parameters -- are passed thru closures. For closures, only
java serializer is properly supported (afaik).

there has been a limited number of fixes for data parameter communication
between front end and backend for using other-than-java serialization (e.g.
for parallelize(), collect()-- these methods do not use closures to pass
in/grab data objects anymore); however, a certain number of methods is
still using closures to pass in a data object.

afaik the methods doing correct front/back end parameter serialization are:

collect()
take() (maybe)
parallelize()
reduce()

Everything else ("fold()", etc.) that communicates data between front end
and backend, still wraps data into closures. For a thing like fold() in
fact you'd have to use type that has both Java and Kryo support at the same
time, because it will always use both closure and object serializers while
executing.

This IMO is inconsistent of course with assumption that same data type
should be supported uniformly regardless of where it serializes, but that's
the state of things as it stands.



On Mon, Dec 23, 2013 at 8:21 AM, Ameet Kini  wrote:

> Thanks Imran.
>
> I tried setting "spark.closure.serializer" to
> "org.apache.spark.serializer.KryoSerializer" and now end up seeing
> NullPointerException when the executor starts up. This is a snippet of the
> executor's log. Notice how "registered TileIdWritable" and "registered
> ArgWritable" is called, so I see that my KryoRegistrator is being called.
> However, it's not clear why there's a follow-on NPE. My spark log level is
> set to DEBUG in log4j.properties (log4j.rootCategory=DEBUG) so not sure if
> there s
> some other way to get the executor to be more verbose as to the cause of
> the NPE.
>
> When I take out the spark.closure.serializer setting (i.e., go back to the
> default Java serialization), the executors start up fine, and executes
> other RDD actions, but of course not the lookup action (my original
> problem). With the spark.closure.serializer setting to kryo, it dies with
> an NPE during executor startup.
>
>
> 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Connecting to driver:
> akka.tcp://[redacted]:48147/user/StandaloneScheduler
> 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Successfully registered
> with driver
> 13/12/23 11:00:36 INFO Slf4jLogger: Slf4jLogger started
> 13/12/23 11:00:36 INFO Remoting: Starting remoting
> 13/12/23 11:00:36 INFO Remoting: Remoting started; listening on addresses
> :[akka.tcp:/[redacted]:56483 ]
> 13/12/23 11:00:36 INFO Remoting: Remoting now listens on addresses:
> [akka.tcp://[redacted]:56483 ]
> 13/12/23 11:00:36 INFO SparkEnv: Connecting to BlockManagerMaster:
> akka.tcp://[redacted]:48147/user/BlockManagerMaster
> 13/12/23 11:00:36 INFO MemoryStore: MemoryStore started with capacity
> 323.9 MB.
> 13/12/23 11:00:36 DEBUG DiskStore: Creating local directories at root dirs
> '/tmp'
> 13/12/23 11:00:36 INFO DiskStore: Created local directory at
> /tmp/spark-local-20131223110036-4335
> 13/12/23 11:00:36 INFO ConnectionManager: Bound socket to port 41617 with
> id = ConnectionManagerId([redacted],41617)
> 13/12/23 11:00:36 INFO BlockManagerMaster: Trying to register BlockManager
> 13/12/23 11:00:36 INFO BlockManagerMaster: Registered BlockManager
> 13/12/23 11:00:36 INFO SparkEnv: Connecting to MapOutputTracker:
> akka.tcp:/[redacted]:48147/user/MapOutputTracker
> 13/12/23 11:00:36 INFO HttpFileServer: HTTP File server directory is
> /tmp/spark-e71e0a2b-a247-4bb8-b06d-19c12467b65a
> 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 0
> 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 1
> 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 2
> 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 3
> 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator:
> geotrellis.spark.KryoRegistrator
> 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator:
> geotrellis.spark.KryoRegistrator
> 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator:
> geotrellis.spark.KryoRegistrator
> 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator:
> geotrellis.spark.KryoRegistrator
> registered TileIdWritable
> registered TileIdWritable
> registered TileIdWritable
> registered TileIdWritable
> registered ArgWritable
> registered ArgWritable
> registered ArgWritable
> registered ArgWritable
> 13/12/23 11:00:37 INFO Executor: Running task ID 2
> 13/12/23 11:00:37 INFO Executor: Running task ID 1
> 13/12/23 11:00:37 INFO Executor: Running task ID 3
> 13/12/23 11:00:37 INFO Executor: Running task ID 0
> 13/12/23 11:00:37 

Re: debugging NotSerializableException while using Kryo

2013-12-23 Thread Michael (Bach) Bui
What spark version are you using? By looking at the code Executor.scala 
line195, you will at least know what cause the NPE.
We can start from there.



On Dec 23, 2013, at 10:21 AM, Ameet Kini  wrote:

> Thanks Imran. 
> 
> I tried setting "spark.closure.serializer" to 
> "org.apache.spark.serializer.KryoSerializer" and now end up seeing 
> NullPointerException when the executor starts up. This is a snippet of the 
> executor's log. Notice how "registered TileIdWritable" and "registered 
> ArgWritable" is called, so I see that my KryoRegistrator is being called. 
> However, it's not clear why there's a follow-on NPE. My spark log level is 
> set to DEBUG in log4j.properties (log4j.rootCategory=DEBUG) so not sure if 
> there s 
> some other way to get the executor to be more verbose as to the cause of the 
> NPE. 
> 
> When I take out the spark.closure.serializer setting (i.e., go back to the 
> default Java serialization), the executors start up fine, and executes other 
> RDD actions, but of course not the lookup action (my original problem). With 
> the spark.closure.serializer setting to kryo, it dies with an NPE during 
> executor startup. 
> 
> 
> 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Connecting to driver: 
> akka.tcp://[redacted]:48147/user/StandaloneScheduler
> 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Successfully registered 
> with driver
> 13/12/23 11:00:36 INFO Slf4jLogger: Slf4jLogger started
> 13/12/23 11:00:36 INFO Remoting: Starting remoting
> 13/12/23 11:00:36 INFO Remoting: Remoting started; listening on addresses 
> :[akka.tcp:/[redacted]:56483]
> 13/12/23 11:00:36 INFO Remoting: Remoting now listens on addresses: 
> [akka.tcp://[redacted]:56483]
> 13/12/23 11:00:36 INFO SparkEnv: Connecting to BlockManagerMaster: 
> akka.tcp://[redacted]:48147/user/BlockManagerMaster
> 13/12/23 11:00:36 INFO MemoryStore: MemoryStore started with capacity 323.9 
> MB.
> 13/12/23 11:00:36 DEBUG DiskStore: Creating local directories at root dirs 
> '/tmp'
> 13/12/23 11:00:36 INFO DiskStore: Created local directory at 
> /tmp/spark-local-20131223110036-4335
> 13/12/23 11:00:36 INFO ConnectionManager: Bound socket to port 41617 with id 
> = ConnectionManagerId([redacted],41617)
> 13/12/23 11:00:36 INFO BlockManagerMaster: Trying to register BlockManager
> 13/12/23 11:00:36 INFO BlockManagerMaster: Registered BlockManager
> 13/12/23 11:00:36 INFO SparkEnv: Connecting to MapOutputTracker: 
> akka.tcp:/[redacted]:48147/user/MapOutputTracker
> 13/12/23 11:00:36 INFO HttpFileServer: HTTP File server directory is 
> /tmp/spark-e71e0a2b-a247-4bb8-b06d-19c12467b65a
> 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 0
> 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 1
> 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 2
> 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 3
> 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator: 
> geotrellis.spark.KryoRegistrator
> 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator: 
> geotrellis.spark.KryoRegistrator
> 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator: 
> geotrellis.spark.KryoRegistrator
> 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator: 
> geotrellis.spark.KryoRegistrator
> registered TileIdWritable
> registered TileIdWritable
> registered TileIdWritable
> registered TileIdWritable
> registered ArgWritable
> registered ArgWritable
> registered ArgWritable
> registered ArgWritable
> 13/12/23 11:00:37 INFO Executor: Running task ID 2
> 13/12/23 11:00:37 INFO Executor: Running task ID 1
> 13/12/23 11:00:37 INFO Executor: Running task ID 3
> 13/12/23 11:00:37 INFO Executor: Running task ID 0
> 13/12/23 11:00:37 INFO Executor: Fetching 
> http://10.194.70.21:51166/jars/geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar with 
> timestamp 1387814434436
> 13/12/23 11:00:37 INFO Utils: Fetching 
> http://10.194.70.21:51166/jars/geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar to 
> /tmp/fetchFileTemp2456419097284083628.tmp
> 13/12/23 11:00:37 INFO Executor: Adding 
> file[redacted]/spark/work/app-20131223110034-/0/./geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar
>  to class loader
> 13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread 
> Thread[pool-7-thread-4,5,main]
> java.lang.NullPointerException
> at 
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
> at 
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
> at scala.Option.flatMap(Option.scala:170)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:724)
> 13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread

Re: debugging NotSerializableException while using Kryo

2013-12-23 Thread Ameet Kini
Using Java serialization would make the NPE go away, but it would be a less
preferable solution. My application is network-intensive, and serialization
cost is significant. In other words, these objects are ideal candidates for
Kryo.





On Mon, Dec 23, 2013 at 3:41 PM, Jie Deng  wrote:

> maybe try to implement your class with serializable...
>
>
> 2013/12/23 Ameet Kini 
>
>> Thanks Imran.
>>
>> I tried setting "spark.closure.serializer" to
>> "org.apache.spark.serializer.KryoSerializer" and now end up seeing
>> NullPointerException when the executor starts up. This is a snippet of the
>> executor's log. Notice how "registered TileIdWritable" and "registered
>> ArgWritable" is called, so I see that my KryoRegistrator is being called.
>> However, it's not clear why there's a follow-on NPE. My spark log level is
>> set to DEBUG in log4j.properties (log4j.rootCategory=DEBUG) so not sure if
>> there s
>> some other way to get the executor to be more verbose as to the cause of
>> the NPE.
>>
>> When I take out the spark.closure.serializer setting (i.e., go back to
>> the default Java serialization), the executors start up fine, and executes
>> other RDD actions, but of course not the lookup action (my original
>> problem). With the spark.closure.serializer setting to kryo, it dies with
>> an NPE during executor startup.
>>
>>
>> 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Connecting to driver:
>> akka.tcp://[redacted]:48147/user/StandaloneScheduler
>> 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Successfully registered
>> with driver
>> 13/12/23 11:00:36 INFO Slf4jLogger: Slf4jLogger started
>> 13/12/23 11:00:36 INFO Remoting: Starting remoting
>> 13/12/23 11:00:36 INFO Remoting: Remoting started; listening on addresses
>> :[akka.tcp:/[redacted]:56483 ]
>> 13/12/23 11:00:36 INFO Remoting: Remoting now listens on addresses:
>> [akka.tcp://[redacted]:56483 ]
>> 13/12/23 11:00:36 INFO SparkEnv: Connecting to BlockManagerMaster:
>> akka.tcp://[redacted]:48147/user/BlockManagerMaster
>> 13/12/23 11:00:36 INFO MemoryStore: MemoryStore started with capacity
>> 323.9 MB.
>> 13/12/23 11:00:36 DEBUG DiskStore: Creating local directories at root
>> dirs '/tmp'
>> 13/12/23 11:00:36 INFO DiskStore: Created local directory at
>> /tmp/spark-local-20131223110036-4335
>> 13/12/23 11:00:36 INFO ConnectionManager: Bound socket to port 41617 with
>> id = ConnectionManagerId([redacted],41617)
>> 13/12/23 11:00:36 INFO BlockManagerMaster: Trying to register BlockManager
>> 13/12/23 11:00:36 INFO BlockManagerMaster: Registered BlockManager
>> 13/12/23 11:00:36 INFO SparkEnv: Connecting to MapOutputTracker:
>> akka.tcp:/[redacted]:48147/user/MapOutputTracker
>> 13/12/23 11:00:36 INFO HttpFileServer: HTTP File server directory is
>> /tmp/spark-e71e0a2b-a247-4bb8-b06d-19c12467b65a
>> 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 0
>> 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 1
>> 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 2
>> 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 3
>> 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator:
>> geotrellis.spark.KryoRegistrator
>> 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator:
>> geotrellis.spark.KryoRegistrator
>> 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator:
>> geotrellis.spark.KryoRegistrator
>> 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator:
>> geotrellis.spark.KryoRegistrator
>> registered TileIdWritable
>> registered TileIdWritable
>> registered TileIdWritable
>> registered TileIdWritable
>> registered ArgWritable
>> registered ArgWritable
>> registered ArgWritable
>> registered ArgWritable
>> 13/12/23 11:00:37 INFO Executor: Running task ID 2
>> 13/12/23 11:00:37 INFO Executor: Running task ID 1
>> 13/12/23 11:00:37 INFO Executor: Running task ID 3
>> 13/12/23 11:00:37 INFO Executor: Running task ID 0
>> 13/12/23 11:00:37 INFO Executor: Fetching
>> http://10.194.70.21:51166/jars/geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar with
>> timestamp 1387814434436
>> 13/12/23 11:00:37 INFO Utils: Fetching
>> http://10.194.70.21:51166/jars/geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar to
>> /tmp/fetchFileTemp2456419097284083628.tmp
>> 13/12/23 11:00:37 INFO Executor: Adding
>> file[redacted]/spark/work/app-20131223110034-/0/./geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar
>> to class loader
>> 13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread
>> Thread[pool-7-thread-4,5,main]
>> java.lang.NullPointerException
>> at
>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
>> at
>> org.apache.spark.executor.Executor$TaskRunner$

Re: debugging NotSerializableException while using Kryo

2013-12-23 Thread Jie Deng
maybe try to implement your class with serializable...


2013/12/23 Ameet Kini 

> Thanks Imran.
>
> I tried setting "spark.closure.serializer" to
> "org.apache.spark.serializer.KryoSerializer" and now end up seeing
> NullPointerException when the executor starts up. This is a snippet of the
> executor's log. Notice how "registered TileIdWritable" and "registered
> ArgWritable" is called, so I see that my KryoRegistrator is being called.
> However, it's not clear why there's a follow-on NPE. My spark log level is
> set to DEBUG in log4j.properties (log4j.rootCategory=DEBUG) so not sure if
> there s
> some other way to get the executor to be more verbose as to the cause of
> the NPE.
>
> When I take out the spark.closure.serializer setting (i.e., go back to the
> default Java serialization), the executors start up fine, and executes
> other RDD actions, but of course not the lookup action (my original
> problem). With the spark.closure.serializer setting to kryo, it dies with
> an NPE during executor startup.
>
>
> 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Connecting to driver:
> akka.tcp://[redacted]:48147/user/StandaloneScheduler
> 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Successfully registered
> with driver
> 13/12/23 11:00:36 INFO Slf4jLogger: Slf4jLogger started
> 13/12/23 11:00:36 INFO Remoting: Starting remoting
> 13/12/23 11:00:36 INFO Remoting: Remoting started; listening on addresses
> :[akka.tcp:/[redacted]:56483 ]
> 13/12/23 11:00:36 INFO Remoting: Remoting now listens on addresses:
> [akka.tcp://[redacted]:56483 ]
> 13/12/23 11:00:36 INFO SparkEnv: Connecting to BlockManagerMaster:
> akka.tcp://[redacted]:48147/user/BlockManagerMaster
> 13/12/23 11:00:36 INFO MemoryStore: MemoryStore started with capacity
> 323.9 MB.
> 13/12/23 11:00:36 DEBUG DiskStore: Creating local directories at root dirs
> '/tmp'
> 13/12/23 11:00:36 INFO DiskStore: Created local directory at
> /tmp/spark-local-20131223110036-4335
> 13/12/23 11:00:36 INFO ConnectionManager: Bound socket to port 41617 with
> id = ConnectionManagerId([redacted],41617)
> 13/12/23 11:00:36 INFO BlockManagerMaster: Trying to register BlockManager
> 13/12/23 11:00:36 INFO BlockManagerMaster: Registered BlockManager
> 13/12/23 11:00:36 INFO SparkEnv: Connecting to MapOutputTracker:
> akka.tcp:/[redacted]:48147/user/MapOutputTracker
> 13/12/23 11:00:36 INFO HttpFileServer: HTTP File server directory is
> /tmp/spark-e71e0a2b-a247-4bb8-b06d-19c12467b65a
> 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 0
> 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 1
> 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 2
> 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 3
> 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator:
> geotrellis.spark.KryoRegistrator
> 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator:
> geotrellis.spark.KryoRegistrator
> 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator:
> geotrellis.spark.KryoRegistrator
> 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator:
> geotrellis.spark.KryoRegistrator
> registered TileIdWritable
> registered TileIdWritable
> registered TileIdWritable
> registered TileIdWritable
> registered ArgWritable
> registered ArgWritable
> registered ArgWritable
> registered ArgWritable
> 13/12/23 11:00:37 INFO Executor: Running task ID 2
> 13/12/23 11:00:37 INFO Executor: Running task ID 1
> 13/12/23 11:00:37 INFO Executor: Running task ID 3
> 13/12/23 11:00:37 INFO Executor: Running task ID 0
> 13/12/23 11:00:37 INFO Executor: Fetching
> http://10.194.70.21:51166/jars/geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar with
> timestamp 1387814434436
> 13/12/23 11:00:37 INFO Utils: Fetching
> http://10.194.70.21:51166/jars/geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar to
> /tmp/fetchFileTemp2456419097284083628.tmp
> 13/12/23 11:00:37 INFO Executor: Adding
> file[redacted]/spark/work/app-20131223110034-/0/./geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar
> to class loader
> 13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread
> Thread[pool-7-thread-4,5,main]
> java.lang.NullPointerException
> at
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
> at
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
> at scala.Option.flatMap(Option.scala:170)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.l

Re: debugging NotSerializableException while using Kryo

2013-12-23 Thread Ameet Kini
Thanks Imran.

I tried setting "spark.closure.serializer" to
"org.apache.spark.serializer.KryoSerializer" and now end up seeing
NullPointerException when the executor starts up. This is a snippet of the
executor's log. Notice how "registered TileIdWritable" and "registered
ArgWritable" is called, so I see that my KryoRegistrator is being called.
However, it's not clear why there's a follow-on NPE. My spark log level is
set to DEBUG in log4j.properties (log4j.rootCategory=DEBUG) so not sure if
there s
some other way to get the executor to be more verbose as to the cause of
the NPE.

When I take out the spark.closure.serializer setting (i.e., go back to the
default Java serialization), the executors start up fine, and executes
other RDD actions, but of course not the lookup action (my original
problem). With the spark.closure.serializer setting to kryo, it dies with
an NPE during executor startup.


13/12/23 11:00:36 INFO StandaloneExecutorBackend: Connecting to driver:
akka.tcp://[redacted]:48147/user/StandaloneScheduler
13/12/23 11:00:36 INFO StandaloneExecutorBackend: Successfully registered
with driver
13/12/23 11:00:36 INFO Slf4jLogger: Slf4jLogger started
13/12/23 11:00:36 INFO Remoting: Starting remoting
13/12/23 11:00:36 INFO Remoting: Remoting started; listening on addresses
:[akka.tcp:/[redacted]:56483 ]
13/12/23 11:00:36 INFO Remoting: Remoting now listens on addresses:
[akka.tcp://[redacted]:56483 ]
13/12/23 11:00:36 INFO SparkEnv: Connecting to BlockManagerMaster:
akka.tcp://[redacted]:48147/user/BlockManagerMaster
13/12/23 11:00:36 INFO MemoryStore: MemoryStore started with capacity 323.9
MB.
13/12/23 11:00:36 DEBUG DiskStore: Creating local directories at root dirs
'/tmp'
13/12/23 11:00:36 INFO DiskStore: Created local directory at
/tmp/spark-local-20131223110036-4335
13/12/23 11:00:36 INFO ConnectionManager: Bound socket to port 41617 with
id = ConnectionManagerId([redacted],41617)
13/12/23 11:00:36 INFO BlockManagerMaster: Trying to register BlockManager
13/12/23 11:00:36 INFO BlockManagerMaster: Registered BlockManager
13/12/23 11:00:36 INFO SparkEnv: Connecting to MapOutputTracker:
akka.tcp:/[redacted]:48147/user/MapOutputTracker
13/12/23 11:00:36 INFO HttpFileServer: HTTP File server directory is
/tmp/spark-e71e0a2b-a247-4bb8-b06d-19c12467b65a
13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 0
13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 1
13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 2
13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 3
13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator:
geotrellis.spark.KryoRegistrator
13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator:
geotrellis.spark.KryoRegistrator
13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator:
geotrellis.spark.KryoRegistrator
13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator:
geotrellis.spark.KryoRegistrator
registered TileIdWritable
registered TileIdWritable
registered TileIdWritable
registered TileIdWritable
registered ArgWritable
registered ArgWritable
registered ArgWritable
registered ArgWritable
13/12/23 11:00:37 INFO Executor: Running task ID 2
13/12/23 11:00:37 INFO Executor: Running task ID 1
13/12/23 11:00:37 INFO Executor: Running task ID 3
13/12/23 11:00:37 INFO Executor: Running task ID 0
13/12/23 11:00:37 INFO Executor: Fetching
http://10.194.70.21:51166/jars/geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar with
timestamp 1387814434436
13/12/23 11:00:37 INFO Utils: Fetching
http://10.194.70.21:51166/jars/geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar to
/tmp/fetchFileTemp2456419097284083628.tmp
13/12/23 11:00:37 INFO Executor: Adding
file[redacted]/spark/work/app-20131223110034-/0/./geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar
to class loader
13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread
Thread[pool-7-thread-4,5,main]
java.lang.NullPointerException
at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
at scala.Option.flatMap(Option.scala:170)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:724)
13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread
Thread[pool-7-thread-2,5,main]
java.lang.NullPointerException
at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
at
org

Re: debugging NotSerializableException while using Kryo

2013-12-20 Thread Imran Rashid
there is a separate setting for serializing closures
"spark.closure.serializer" (listed here
http://spark.incubator.apache.org/docs/latest/configuration.html)

that is used to serialize whatever is used by all the fucntions on an RDD,
eg., map, filter, and lookup.  Those closures include referenced variables,
like your
TileIdWritable.

So you need to either change that to use kryo, or make your object
serializable to java.



On Fri, Dec 20, 2013 at 2:18 PM, Ameet Kini  wrote:

>
> I'm getting the below NotSerializableException despite using Kryo to
> serialize that class (TileIdWritable).
>
> The offending line: awtestRdd.lookup(TileIdWritable(200))
>
> Initially I thought Kryo is not being registered properly, so I tried
> running operations over awtestRDD which force a shuffle (e.g., groupByKey),
> and that seemed to work fine. So it seems to be specific to the
> "TileIdWritable(200)" argument to lookup().  Is there anything unique about
> companion objects and Kryo serialization? I even replaced
> "TileIdWritable(200)" by "new TileIdWritable" but still see that exception
>
>
> class TileIdWritable {
>  //
> }
>
> object TileIdWritable {
>  def apply(value: Long) = new TileIdWritable
> }
>
>
> My Kryo registrator:
> class KryoRegistrator extends SparkKryoRegistrator {
> override def registerClasses(kryo: Kryo) {
>   println("Called KryoRegistrator")  // I see this printed during
> shuffle operations
>   val r = kryo.register(classOf[TileIdWritable])
>   val s = kryo.register(classOf[ArgWritable])
> }
> }
>
> Then just before creating a Spark Context, I have these two lines:
> System.setProperty("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer")
> System.setProperty("spark.kryo.registrator",
> "geotrellis.spark.KryoRegistrator")
>
>
>
>
> The exception itself:
> Exception in thread "main" org.apache.spark.SparkException: Job failed:
> Task not serializable: java.io.NotSerializableException:
> geotrellis.spark.formats.TileIdWritable
> - field (class "org.apache.spark.rdd.PairRDDFunctions$$anonfun$4",
> name: "key$1", type: "class java.lang.Object")
> - object (class "org.apache.spark.rdd.PairRDDFunctions$$anonfun$4",
> )
> - field (class "org.apache.spark.SparkContext$$anonfun$runJob$4",
> name: "func$1", type: "interface scala.Function1")
> - root object (class
> "org.apache.spark.SparkContext$$anonfun$runJob$4", )
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:763)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:761)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:761)
> at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556)
> at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503)
> at
> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361)
> at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
> at
> org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)
>
> Regards,
> Ameet
>
>
>
>
>


debugging NotSerializableException while using Kryo

2013-12-20 Thread Ameet Kini
I'm getting the below NotSerializableException despite using Kryo to
serialize that class (TileIdWritable).

The offending line: awtestRdd.lookup(TileIdWritable(200))

Initially I thought Kryo is not being registered properly, so I tried
running operations over awtestRDD which force a shuffle (e.g., groupByKey),
and that seemed to work fine. So it seems to be specific to the
"TileIdWritable(200)" argument to lookup().  Is there anything unique about
companion objects and Kryo serialization? I even replaced
"TileIdWritable(200)" by "new TileIdWritable" but still see that exception


class TileIdWritable {
 //
}

object TileIdWritable {
 def apply(value: Long) = new TileIdWritable
}


My Kryo registrator:
class KryoRegistrator extends SparkKryoRegistrator {
override def registerClasses(kryo: Kryo) {
  println("Called KryoRegistrator")  // I see this printed during
shuffle operations
  val r = kryo.register(classOf[TileIdWritable])
  val s = kryo.register(classOf[ArgWritable])
}
}

Then just before creating a Spark Context, I have these two lines:
System.setProperty("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
System.setProperty("spark.kryo.registrator",
"geotrellis.spark.KryoRegistrator")




The exception itself:
Exception in thread "main" org.apache.spark.SparkException: Job failed:
Task not serializable: java.io.NotSerializableException:
geotrellis.spark.formats.TileIdWritable
- field (class "org.apache.spark.rdd.PairRDDFunctions$$anonfun$4",
name: "key$1", type: "class java.lang.Object")
- object (class "org.apache.spark.rdd.PairRDDFunctions$$anonfun$4",
)
- field (class "org.apache.spark.SparkContext$$anonfun$runJob$4", name:
"func$1", type: "interface scala.Function1")
- root object (class "org.apache.spark.SparkContext$$anonfun$runJob$4",
)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:763)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:761)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:761)
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556)
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503)
at
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361)
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
at
org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)

Regards,
Ameet