Re: debugging NotSerializableException while using Kryo

2013-12-24 Thread Ameet Kini
Hi Michael,

I re-ran this on another machine which is on spark's master branch
0.9.0-SNAPSHOT from Dec 14 (right after the scala 2.10 branch was merged
back into master) and recreated the NPE towards the end of this message. I
can't tell looking at the relevant code what may have caused the exception
because line 262 is part of the catch block of a pretty big try/catch
block.


Executor.scala Line 262:  val metrics = attemptedTask.flatMap(t =
t.metrics)

The surrounding lines are:
   case t: Throwable = {
  val serviceTime = (System.currentTimeMillis() - taskStart).toInt
Line 262 --  val metrics = attemptedTask.flatMap(t = t.metrics)
  for (m - metrics) {
m.executorRunTime = serviceTime
m.jvmGCTime = gcTime - startGCTime
  }
  val reason = ExceptionFailure(t.getClass.getName, t.toString,
t.getStackTrace, metrics)
  execBackend.statusUpdate(taskId, TaskState.FAILED,
ser.serialize(reason))

  // TODO: Should we exit the whole executor here? On the one hand,
the failed task may
  // have left some weird state around depending on when the
exception was thrown, but on
  // the other hand, maybe we could detect that when future tasks
fail and exit then.
  logError(Exception in task ID  + taskId, t)
  //System.exit(1)
}




13/12/24 10:02:33 ERROR Executor: Uncaught exception in thread
Thread[Executor task launch worker-4,5,main]
java.lang.NullPointerException
at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1$$anonfun$8.apply(Executor.scala:262)
at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1$$anonfun$8.apply(Executor.scala:262)
at scala.Option.flatMap(Option.scala:170)
at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:262)
at
org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:50)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:182)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:724)

Thanks,
Ameet



On Mon, Dec 23, 2013 at 4:58 PM, Michael (Bach) Bui free...@adatao.comwrote:

 What spark version are you using? By looking at the code Executor.scala
 line195, you will at least know what cause the NPE.
 We can start from there.




 On Dec 23, 2013, at 10:21 AM, Ameet Kini ameetk...@gmail.com wrote:

 Thanks Imran.

 I tried setting spark.closure.serializer to
 org.apache.spark.serializer.KryoSerializer and now end up seeing
 NullPointerException when the executor starts up. This is a snippet of the
 executor's log. Notice how registered TileIdWritable and registered
 ArgWritable is called, so I see that my KryoRegistrator is being called.
 However, it's not clear why there's a follow-on NPE. My spark log level is
 set to DEBUG in log4j.properties (log4j.rootCategory=DEBUG) so not sure if
 there s
 some other way to get the executor to be more verbose as to the cause of
 the NPE.

 When I take out the spark.closure.serializer setting (i.e., go back to the
 default Java serialization), the executors start up fine, and executes
 other RDD actions, but of course not the lookup action (my original
 problem). With the spark.closure.serializer setting to kryo, it dies with
 an NPE during executor startup.


 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Connecting to driver:
 akka.tcp://[redacted]:48147/user/StandaloneSchedulerhttp://sp...@karadi.spadac.com:48147/user/StandaloneScheduler
 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Successfully registered
 with driver
 13/12/23 11:00:36 INFO Slf4jLogger: Slf4jLogger started
 13/12/23 11:00:36 INFO Remoting: Starting remoting
 13/12/23 11:00:36 INFO Remoting: Remoting started; listening on addresses
 :[akka.tcp:/[redacted]:56483 http://sp...@karadi.spadac.com:56483/]
 13/12/23 11:00:36 INFO Remoting: Remoting now listens on addresses: [
 akka.tcp://[redacted]:56483 http://sp...@karadi.spadac.com:56483/]
 13/12/23 11:00:36 INFO SparkEnv: Connecting to BlockManagerMaster:
 akka.tcp://[redacted]:48147/user/BlockManagerMasterhttp://sp...@karadi.spadac.com:48147/user/BlockManagerMaster
 13/12/23 11:00:36 INFO MemoryStore: MemoryStore started with capacity
 323.9 MB.
 13/12/23 11:00:36 DEBUG DiskStore: Creating local directories at root dirs
 '/tmp'
 13/12/23 11:00:36 INFO DiskStore: Created local directory at
 /tmp/spark-local-20131223110036-4335
 13/12/23 11:00:36 INFO ConnectionManager: Bound socket to port 41617 with
 id = ConnectionManagerId([redacted],41617)
 13/12/23 11:00:36 INFO BlockManagerMaster: Trying to register BlockManager
 13/12/23 11:00:36 INFO BlockManagerMaster: Registered BlockManager
 13/12/23 11:00:36 INFO SparkEnv: Connecting to MapOutputTracker:
 

Re: debugging NotSerializableException while using Kryo

2013-12-24 Thread Ameet Kini
If Java serialization is the only one that properly works for closures,
then I shouldn't be setting spark.closure.serializer to
org.apache.spark.serializer.KryoSerializer, and my only hope for getting
lookup (and other such methods that still use closure serializers) to work
is to either a) use only Java serialization for my objects, or b) have my
objects implement Serializable but enable Kryo as well for object
serialization. For option b, I'd be setting spark.serializer to
org.apache.spark.serializer.KryoSerializer but leave
spark.closure.serializer to its default value (Java).

If the above is true, then seems like as it stands today, the best practice
is for objects that use Kryo to also either implement Serializable or
Externalizable for closures to work properly.

Thanks,
Ameet




On Mon, Dec 23, 2013 at 5:18 PM, Dmitriy Lyubimov dlie...@gmail.com wrote:

 The problem really is that in certain cases task results -- and
 front-end-passed parameters -- are passed thru closures. For closures, only
 java serializer is properly supported (afaik).

 there has been a limited number of fixes for data parameter communication
 between front end and backend for using other-than-java serialization (e.g.
 for parallelize(), collect()-- these methods do not use closures to pass
 in/grab data objects anymore); however, a certain number of methods is
 still using closures to pass in a data object.

 afaik the methods doing correct front/back end parameter serialization are:

 collect()
 take() (maybe)
 parallelize()
 reduce()

 Everything else (fold(), etc.) that communicates data between front end
 and backend, still wraps data into closures. For a thing like fold() in
 fact you'd have to use type that has both Java and Kryo support at the same
 time, because it will always use both closure and object serializers while
 executing.

 This IMO is inconsistent of course with assumption that same data type
 should be supported uniformly regardless of where it serializes, but that's
 the state of things as it stands.



 On Mon, Dec 23, 2013 at 8:21 AM, Ameet Kini ameetk...@gmail.com wrote:

 Thanks Imran.

 I tried setting spark.closure.serializer to
 org.apache.spark.serializer.KryoSerializer and now end up seeing
 NullPointerException when the executor starts up. This is a snippet of the
 executor's log. Notice how registered TileIdWritable and registered
 ArgWritable is called, so I see that my KryoRegistrator is being called.
 However, it's not clear why there's a follow-on NPE. My spark log level is
 set to DEBUG in log4j.properties (log4j.rootCategory=DEBUG) so not sure if
 there s
 some other way to get the executor to be more verbose as to the cause of
 the NPE.

 When I take out the spark.closure.serializer setting (i.e., go back to
 the default Java serialization), the executors start up fine, and executes
 other RDD actions, but of course not the lookup action (my original
 problem). With the spark.closure.serializer setting to kryo, it dies with
 an NPE during executor startup.


 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Connecting to driver:
 akka.tcp://[redacted]:48147/user/StandaloneSchedulerhttp://sp...@karadi.spadac.com:48147/user/StandaloneScheduler
 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Successfully registered
 with driver
 13/12/23 11:00:36 INFO Slf4jLogger: Slf4jLogger started
 13/12/23 11:00:36 INFO Remoting: Starting remoting
 13/12/23 11:00:36 INFO Remoting: Remoting started; listening on addresses
 :[akka.tcp:/[redacted]:56483 http://sp...@karadi.spadac.com:56483/]
 13/12/23 11:00:36 INFO Remoting: Remoting now listens on addresses:
 [akka.tcp://[redacted]:56483 http://sp...@karadi.spadac.com:56483/]
 13/12/23 11:00:36 INFO SparkEnv: Connecting to BlockManagerMaster:
 akka.tcp://[redacted]:48147/user/BlockManagerMasterhttp://sp...@karadi.spadac.com:48147/user/BlockManagerMaster
 13/12/23 11:00:36 INFO MemoryStore: MemoryStore started with capacity
 323.9 MB.
 13/12/23 11:00:36 DEBUG DiskStore: Creating local directories at root
 dirs '/tmp'
 13/12/23 11:00:36 INFO DiskStore: Created local directory at
 /tmp/spark-local-20131223110036-4335
 13/12/23 11:00:36 INFO ConnectionManager: Bound socket to port 41617 with
 id = ConnectionManagerId([redacted],41617)
 13/12/23 11:00:36 INFO BlockManagerMaster: Trying to register BlockManager
 13/12/23 11:00:36 INFO BlockManagerMaster: Registered BlockManager
 13/12/23 11:00:36 INFO SparkEnv: Connecting to MapOutputTracker:
 akka.tcp:/[redacted]:48147/user/MapOutputTrackerhttp://sp...@karadi.spadac.com:48147/user/MapOutputTracker
 13/12/23 11:00:36 INFO HttpFileServer: HTTP File server directory is
 /tmp/spark-e71e0a2b-a247-4bb8-b06d-19c12467b65a
 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 0
 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 1
 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 2
 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 3
 13/12/23 

Re: debugging NotSerializableException while using Kryo

2013-12-24 Thread Eugen Cepoi
In scala case classes are serializable by default, your TileIdWritable
should be a case class. I usually enable Kryo ser for objects and keep
default ser for closures, this works pretty well.

Eugen


2013/12/24 Ameet Kini ameetk...@gmail.com


 If Java serialization is the only one that properly works for closures,
 then I shouldn't be setting spark.closure.serializer to
 org.apache.spark.serializer.KryoSerializer, and my only hope for getting
 lookup (and other such methods that still use closure serializers) to work
 is to either a) use only Java serialization for my objects, or b) have my
 objects implement Serializable but enable Kryo as well for object
 serialization. For option b, I'd be setting spark.serializer to
 org.apache.spark.serializer.KryoSerializer but leave
 spark.closure.serializer to its default value (Java).

 If the above is true, then seems like as it stands today, the best
 practice is for objects that use Kryo to also either implement Serializable
 or Externalizable for closures to work properly.

 Thanks,
 Ameet




 On Mon, Dec 23, 2013 at 5:18 PM, Dmitriy Lyubimov dlie...@gmail.comwrote:

 The problem really is that in certain cases task results -- and
 front-end-passed parameters -- are passed thru closures. For closures, only
 java serializer is properly supported (afaik).

 there has been a limited number of fixes for data parameter communication
 between front end and backend for using other-than-java serialization (e.g.
 for parallelize(), collect()-- these methods do not use closures to pass
 in/grab data objects anymore); however, a certain number of methods is
 still using closures to pass in a data object.

 afaik the methods doing correct front/back end parameter serialization
 are:

 collect()
 take() (maybe)
 parallelize()
 reduce()

 Everything else (fold(), etc.) that communicates data between front end
 and backend, still wraps data into closures. For a thing like fold() in
 fact you'd have to use type that has both Java and Kryo support at the same
 time, because it will always use both closure and object serializers while
 executing.

 This IMO is inconsistent of course with assumption that same data type
 should be supported uniformly regardless of where it serializes, but that's
 the state of things as it stands.



 On Mon, Dec 23, 2013 at 8:21 AM, Ameet Kini ameetk...@gmail.com wrote:

 Thanks Imran.

 I tried setting spark.closure.serializer to
 org.apache.spark.serializer.KryoSerializer and now end up seeing
 NullPointerException when the executor starts up. This is a snippet of the
 executor's log. Notice how registered TileIdWritable and registered
 ArgWritable is called, so I see that my KryoRegistrator is being called.
 However, it's not clear why there's a follow-on NPE. My spark log level is
 set to DEBUG in log4j.properties (log4j.rootCategory=DEBUG) so not sure if
 there s
 some other way to get the executor to be more verbose as to the cause of
 the NPE.

 When I take out the spark.closure.serializer setting (i.e., go back to
 the default Java serialization), the executors start up fine, and executes
 other RDD actions, but of course not the lookup action (my original
 problem). With the spark.closure.serializer setting to kryo, it dies with
 an NPE during executor startup.


 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Connecting to driver:
 akka.tcp://[redacted]:48147/user/StandaloneSchedulerhttp://sp...@karadi.spadac.com:48147/user/StandaloneScheduler
 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Successfully
 registered with driver
 13/12/23 11:00:36 INFO Slf4jLogger: Slf4jLogger started
 13/12/23 11:00:36 INFO Remoting: Starting remoting
 13/12/23 11:00:36 INFO Remoting: Remoting started; listening on
 addresses 
 :[akka.tcp:/[redacted]:56483http://sp...@karadi.spadac.com:56483/
 ]
 13/12/23 11:00:36 INFO Remoting: Remoting now listens on addresses:
 [akka.tcp://[redacted]:56483 http://sp...@karadi.spadac.com:56483/]
 13/12/23 11:00:36 INFO SparkEnv: Connecting to BlockManagerMaster:
 akka.tcp://[redacted]:48147/user/BlockManagerMasterhttp://sp...@karadi.spadac.com:48147/user/BlockManagerMaster
 13/12/23 11:00:36 INFO MemoryStore: MemoryStore started with capacity
 323.9 MB.
 13/12/23 11:00:36 DEBUG DiskStore: Creating local directories at root
 dirs '/tmp'
 13/12/23 11:00:36 INFO DiskStore: Created local directory at
 /tmp/spark-local-20131223110036-4335
 13/12/23 11:00:36 INFO ConnectionManager: Bound socket to port 41617
 with id = ConnectionManagerId([redacted],41617)
 13/12/23 11:00:36 INFO BlockManagerMaster: Trying to register
 BlockManager
 13/12/23 11:00:36 INFO BlockManagerMaster: Registered BlockManager
 13/12/23 11:00:36 INFO SparkEnv: Connecting to MapOutputTracker:
 akka.tcp:/[redacted]:48147/user/MapOutputTrackerhttp://sp...@karadi.spadac.com:48147/user/MapOutputTracker
 13/12/23 11:00:36 INFO HttpFileServer: HTTP File server directory is
 /tmp/spark-e71e0a2b-a247-4bb8-b06d-19c12467b65a
 13/12/23 11:00:36 INFO 

Re: debugging NotSerializableException while using Kryo

2013-12-24 Thread Dmitriy Lyubimov
On Tue, Dec 24, 2013 at 7:29 AM, Ameet Kini ameetk...@gmail.com wrote:


 If Java serialization is the only one that properly works for closures,
 then I shouldn't be setting spark.closure.serializer to
 org.apache.spark.serializer.KryoSerializer,


My understanding is that it's not that it kryo wouldn't necessarily work
for closures, it's just that closure serialization is used not only for
user defined closures but also for a lot of closures internal to Spark. For
them, of course, there's no way to enable kryo support.

Data objects (and their serializer) on the other hand are not defined
anywhere but the user's code. Therefore overriding object serializer is in
general a safe assumption and overriding closure serializer is not.


 and my only hope for getting lookup (and other such methods that still use
 closure serializers) to work is to either a) use only Java serialization
 for my objects, or b) have my objects implement Serializable but enable
 Kryo as well for object serialization.


or c) avoid using Spark api that currently uses closures to communicate
data objects between front end and backend for kryo-only objects (as i do).
The most annoying of those is fold().

In fact, d) you can always wrap a kryo object into byte array in front end,
pass in java-serializable byte array thru a closure, and unwrap it in
backend. This technique is extremely ugly though with methods like fold()
which force you to use the same object type in front end and backend
operation (and obviously it may be not the type you want for the purposes
of elegant folding). I had this problem extensively with 3rd party types
for which i have no desire to add java serialization (mostly, hadoop
Writables of various kind) so the obvious desirable solution is to add kryo
support for them without having to modify the original class. This mostly
works for me.


 For option b, I'd be setting spark.serializer to
 org.apache.spark.serializer.KryoSerializer but leave
 spark.closure.serializer to its default value (Java).

Yes. like i said changing closure serializer is undesirable unless you can
guarantee proper serialization support for all Spark internal closures as
well your own closures.



 If the above is true, then seems like as it stands today, the best
 practice is for objects that use Kryo to also either implement Serializable
 or Externalizable for closures to work properly.


Again, this is only true for a smaller portion of spark api.  Most of spark
api doesn't have this problem so you may well get away with either not
using them or pre-serialize objects into byte arrays while using
problematic api.


 Thanks,
 Ameet




 On Mon, Dec 23, 2013 at 5:18 PM, Dmitriy Lyubimov dlie...@gmail.comwrote:

 The problem really is that in certain cases task results -- and
 front-end-passed parameters -- are passed thru closures. For closures, only
 java serializer is properly supported (afaik).

 there has been a limited number of fixes for data parameter communication
 between front end and backend for using other-than-java serialization (e.g.
 for parallelize(), collect()-- these methods do not use closures to pass
 in/grab data objects anymore); however, a certain number of methods is
 still using closures to pass in a data object.

 afaik the methods doing correct front/back end parameter serialization
 are:

 collect()
 take() (maybe)
 parallelize()
 reduce()

 Everything else (fold(), etc.) that communicates data between front end
 and backend, still wraps data into closures. For a thing like fold() in
 fact you'd have to use type that has both Java and Kryo support at the same
 time, because it will always use both closure and object serializers while
 executing.

 This IMO is inconsistent of course with assumption that same data type
 should be supported uniformly regardless of where it serializes, but that's
 the state of things as it stands.



 On Mon, Dec 23, 2013 at 8:21 AM, Ameet Kini ameetk...@gmail.com wrote:

 Thanks Imran.

 I tried setting spark.closure.serializer to
 org.apache.spark.serializer.KryoSerializer and now end up seeing
 NullPointerException when the executor starts up. This is a snippet of the
 executor's log. Notice how registered TileIdWritable and registered
 ArgWritable is called, so I see that my KryoRegistrator is being called.
 However, it's not clear why there's a follow-on NPE. My spark log level is
 set to DEBUG in log4j.properties (log4j.rootCategory=DEBUG) so not sure if
 there s
 some other way to get the executor to be more verbose as to the cause of
 the NPE.

 When I take out the spark.closure.serializer setting (i.e., go back to
 the default Java serialization), the executors start up fine, and executes
 other RDD actions, but of course not the lookup action (my original
 problem). With the spark.closure.serializer setting to kryo, it dies with
 an NPE during executor startup.


 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Connecting to driver:
 

Re: debugging NotSerializableException while using Kryo

2013-12-23 Thread Ameet Kini
Thanks Imran.

I tried setting spark.closure.serializer to
org.apache.spark.serializer.KryoSerializer and now end up seeing
NullPointerException when the executor starts up. This is a snippet of the
executor's log. Notice how registered TileIdWritable and registered
ArgWritable is called, so I see that my KryoRegistrator is being called.
However, it's not clear why there's a follow-on NPE. My spark log level is
set to DEBUG in log4j.properties (log4j.rootCategory=DEBUG) so not sure if
there s
some other way to get the executor to be more verbose as to the cause of
the NPE.

When I take out the spark.closure.serializer setting (i.e., go back to the
default Java serialization), the executors start up fine, and executes
other RDD actions, but of course not the lookup action (my original
problem). With the spark.closure.serializer setting to kryo, it dies with
an NPE during executor startup.


13/12/23 11:00:36 INFO StandaloneExecutorBackend: Connecting to driver:
akka.tcp://[redacted]:48147/user/StandaloneSchedulerhttp://sp...@karadi.spadac.com:48147/user/StandaloneScheduler
13/12/23 11:00:36 INFO StandaloneExecutorBackend: Successfully registered
with driver
13/12/23 11:00:36 INFO Slf4jLogger: Slf4jLogger started
13/12/23 11:00:36 INFO Remoting: Starting remoting
13/12/23 11:00:36 INFO Remoting: Remoting started; listening on addresses
:[akka.tcp:/[redacted]:56483 http://sp...@karadi.spadac.com:56483/]
13/12/23 11:00:36 INFO Remoting: Remoting now listens on addresses:
[akka.tcp://[redacted]:56483 http://sp...@karadi.spadac.com:56483/]
13/12/23 11:00:36 INFO SparkEnv: Connecting to BlockManagerMaster:
akka.tcp://[redacted]:48147/user/BlockManagerMasterhttp://sp...@karadi.spadac.com:48147/user/BlockManagerMaster
13/12/23 11:00:36 INFO MemoryStore: MemoryStore started with capacity 323.9
MB.
13/12/23 11:00:36 DEBUG DiskStore: Creating local directories at root dirs
'/tmp'
13/12/23 11:00:36 INFO DiskStore: Created local directory at
/tmp/spark-local-20131223110036-4335
13/12/23 11:00:36 INFO ConnectionManager: Bound socket to port 41617 with
id = ConnectionManagerId([redacted],41617)
13/12/23 11:00:36 INFO BlockManagerMaster: Trying to register BlockManager
13/12/23 11:00:36 INFO BlockManagerMaster: Registered BlockManager
13/12/23 11:00:36 INFO SparkEnv: Connecting to MapOutputTracker:
akka.tcp:/[redacted]:48147/user/MapOutputTrackerhttp://sp...@karadi.spadac.com:48147/user/MapOutputTracker
13/12/23 11:00:36 INFO HttpFileServer: HTTP File server directory is
/tmp/spark-e71e0a2b-a247-4bb8-b06d-19c12467b65a
13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 0
13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 1
13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 2
13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 3
13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator:
geotrellis.spark.KryoRegistrator
13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator:
geotrellis.spark.KryoRegistrator
13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator:
geotrellis.spark.KryoRegistrator
13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator:
geotrellis.spark.KryoRegistrator
registered TileIdWritable
registered TileIdWritable
registered TileIdWritable
registered TileIdWritable
registered ArgWritable
registered ArgWritable
registered ArgWritable
registered ArgWritable
13/12/23 11:00:37 INFO Executor: Running task ID 2
13/12/23 11:00:37 INFO Executor: Running task ID 1
13/12/23 11:00:37 INFO Executor: Running task ID 3
13/12/23 11:00:37 INFO Executor: Running task ID 0
13/12/23 11:00:37 INFO Executor: Fetching
http://10.194.70.21:51166/jars/geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar with
timestamp 1387814434436
13/12/23 11:00:37 INFO Utils: Fetching
http://10.194.70.21:51166/jars/geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar to
/tmp/fetchFileTemp2456419097284083628.tmp
13/12/23 11:00:37 INFO Executor: Adding
file[redacted]/spark/work/app-20131223110034-/0/./geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar
to class loader
13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread
Thread[pool-7-thread-4,5,main]
java.lang.NullPointerException
at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
at scala.Option.flatMap(Option.scala:170)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:724)
13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread
Thread[pool-7-thread-2,5,main]
java.lang.NullPointerException
at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
at

Re: debugging NotSerializableException while using Kryo

2013-12-23 Thread Jie Deng
maybe try to implement your class with serializable...


2013/12/23 Ameet Kini ameetk...@gmail.com

 Thanks Imran.

 I tried setting spark.closure.serializer to
 org.apache.spark.serializer.KryoSerializer and now end up seeing
 NullPointerException when the executor starts up. This is a snippet of the
 executor's log. Notice how registered TileIdWritable and registered
 ArgWritable is called, so I see that my KryoRegistrator is being called.
 However, it's not clear why there's a follow-on NPE. My spark log level is
 set to DEBUG in log4j.properties (log4j.rootCategory=DEBUG) so not sure if
 there s
 some other way to get the executor to be more verbose as to the cause of
 the NPE.

 When I take out the spark.closure.serializer setting (i.e., go back to the
 default Java serialization), the executors start up fine, and executes
 other RDD actions, but of course not the lookup action (my original
 problem). With the spark.closure.serializer setting to kryo, it dies with
 an NPE during executor startup.


 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Connecting to driver:
 akka.tcp://[redacted]:48147/user/StandaloneSchedulerhttp://sp...@karadi.spadac.com:48147/user/StandaloneScheduler
 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Successfully registered
 with driver
 13/12/23 11:00:36 INFO Slf4jLogger: Slf4jLogger started
 13/12/23 11:00:36 INFO Remoting: Starting remoting
 13/12/23 11:00:36 INFO Remoting: Remoting started; listening on addresses
 :[akka.tcp:/[redacted]:56483 http://sp...@karadi.spadac.com:56483/]
 13/12/23 11:00:36 INFO Remoting: Remoting now listens on addresses:
 [akka.tcp://[redacted]:56483 http://sp...@karadi.spadac.com:56483/]
 13/12/23 11:00:36 INFO SparkEnv: Connecting to BlockManagerMaster:
 akka.tcp://[redacted]:48147/user/BlockManagerMasterhttp://sp...@karadi.spadac.com:48147/user/BlockManagerMaster
 13/12/23 11:00:36 INFO MemoryStore: MemoryStore started with capacity
 323.9 MB.
 13/12/23 11:00:36 DEBUG DiskStore: Creating local directories at root dirs
 '/tmp'
 13/12/23 11:00:36 INFO DiskStore: Created local directory at
 /tmp/spark-local-20131223110036-4335
 13/12/23 11:00:36 INFO ConnectionManager: Bound socket to port 41617 with
 id = ConnectionManagerId([redacted],41617)
 13/12/23 11:00:36 INFO BlockManagerMaster: Trying to register BlockManager
 13/12/23 11:00:36 INFO BlockManagerMaster: Registered BlockManager
 13/12/23 11:00:36 INFO SparkEnv: Connecting to MapOutputTracker:
 akka.tcp:/[redacted]:48147/user/MapOutputTrackerhttp://sp...@karadi.spadac.com:48147/user/MapOutputTracker
 13/12/23 11:00:36 INFO HttpFileServer: HTTP File server directory is
 /tmp/spark-e71e0a2b-a247-4bb8-b06d-19c12467b65a
 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 0
 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 1
 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 2
 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 3
 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator:
 geotrellis.spark.KryoRegistrator
 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator:
 geotrellis.spark.KryoRegistrator
 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator:
 geotrellis.spark.KryoRegistrator
 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator:
 geotrellis.spark.KryoRegistrator
 registered TileIdWritable
 registered TileIdWritable
 registered TileIdWritable
 registered TileIdWritable
 registered ArgWritable
 registered ArgWritable
 registered ArgWritable
 registered ArgWritable
 13/12/23 11:00:37 INFO Executor: Running task ID 2
 13/12/23 11:00:37 INFO Executor: Running task ID 1
 13/12/23 11:00:37 INFO Executor: Running task ID 3
 13/12/23 11:00:37 INFO Executor: Running task ID 0
 13/12/23 11:00:37 INFO Executor: Fetching
 http://10.194.70.21:51166/jars/geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar with
 timestamp 1387814434436
 13/12/23 11:00:37 INFO Utils: Fetching
 http://10.194.70.21:51166/jars/geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar to
 /tmp/fetchFileTemp2456419097284083628.tmp
 13/12/23 11:00:37 INFO Executor: Adding
 file[redacted]/spark/work/app-20131223110034-/0/./geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar
 to class loader
 13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread
 Thread[pool-7-thread-4,5,main]
 java.lang.NullPointerException
 at
 org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
 at
 org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
 at scala.Option.flatMap(Option.scala:170)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:724)
 13/12/23 11:00:37 ERROR Executor: Uncaught exception in 

Re: debugging NotSerializableException while using Kryo

2013-12-23 Thread Ameet Kini
Using Java serialization would make the NPE go away, but it would be a less
preferable solution. My application is network-intensive, and serialization
cost is significant. In other words, these objects are ideal candidates for
Kryo.





On Mon, Dec 23, 2013 at 3:41 PM, Jie Deng deng113...@gmail.com wrote:

 maybe try to implement your class with serializable...


 2013/12/23 Ameet Kini ameetk...@gmail.com

 Thanks Imran.

 I tried setting spark.closure.serializer to
 org.apache.spark.serializer.KryoSerializer and now end up seeing
 NullPointerException when the executor starts up. This is a snippet of the
 executor's log. Notice how registered TileIdWritable and registered
 ArgWritable is called, so I see that my KryoRegistrator is being called.
 However, it's not clear why there's a follow-on NPE. My spark log level is
 set to DEBUG in log4j.properties (log4j.rootCategory=DEBUG) so not sure if
 there s
 some other way to get the executor to be more verbose as to the cause of
 the NPE.

 When I take out the spark.closure.serializer setting (i.e., go back to
 the default Java serialization), the executors start up fine, and executes
 other RDD actions, but of course not the lookup action (my original
 problem). With the spark.closure.serializer setting to kryo, it dies with
 an NPE during executor startup.


 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Connecting to driver:
 akka.tcp://[redacted]:48147/user/StandaloneSchedulerhttp://sp...@karadi.spadac.com:48147/user/StandaloneScheduler
 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Successfully registered
 with driver
 13/12/23 11:00:36 INFO Slf4jLogger: Slf4jLogger started
 13/12/23 11:00:36 INFO Remoting: Starting remoting
 13/12/23 11:00:36 INFO Remoting: Remoting started; listening on addresses
 :[akka.tcp:/[redacted]:56483 http://sp...@karadi.spadac.com:56483/]
 13/12/23 11:00:36 INFO Remoting: Remoting now listens on addresses:
 [akka.tcp://[redacted]:56483 http://sp...@karadi.spadac.com:56483/]
 13/12/23 11:00:36 INFO SparkEnv: Connecting to BlockManagerMaster:
 akka.tcp://[redacted]:48147/user/BlockManagerMasterhttp://sp...@karadi.spadac.com:48147/user/BlockManagerMaster
 13/12/23 11:00:36 INFO MemoryStore: MemoryStore started with capacity
 323.9 MB.
 13/12/23 11:00:36 DEBUG DiskStore: Creating local directories at root
 dirs '/tmp'
 13/12/23 11:00:36 INFO DiskStore: Created local directory at
 /tmp/spark-local-20131223110036-4335
 13/12/23 11:00:36 INFO ConnectionManager: Bound socket to port 41617 with
 id = ConnectionManagerId([redacted],41617)
 13/12/23 11:00:36 INFO BlockManagerMaster: Trying to register BlockManager
 13/12/23 11:00:36 INFO BlockManagerMaster: Registered BlockManager
 13/12/23 11:00:36 INFO SparkEnv: Connecting to MapOutputTracker:
 akka.tcp:/[redacted]:48147/user/MapOutputTrackerhttp://sp...@karadi.spadac.com:48147/user/MapOutputTracker
 13/12/23 11:00:36 INFO HttpFileServer: HTTP File server directory is
 /tmp/spark-e71e0a2b-a247-4bb8-b06d-19c12467b65a
 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 0
 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 1
 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 2
 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 3
 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator:
 geotrellis.spark.KryoRegistrator
 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator:
 geotrellis.spark.KryoRegistrator
 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator:
 geotrellis.spark.KryoRegistrator
 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator:
 geotrellis.spark.KryoRegistrator
 registered TileIdWritable
 registered TileIdWritable
 registered TileIdWritable
 registered TileIdWritable
 registered ArgWritable
 registered ArgWritable
 registered ArgWritable
 registered ArgWritable
 13/12/23 11:00:37 INFO Executor: Running task ID 2
 13/12/23 11:00:37 INFO Executor: Running task ID 1
 13/12/23 11:00:37 INFO Executor: Running task ID 3
 13/12/23 11:00:37 INFO Executor: Running task ID 0
 13/12/23 11:00:37 INFO Executor: Fetching
 http://10.194.70.21:51166/jars/geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar with
 timestamp 1387814434436
 13/12/23 11:00:37 INFO Utils: Fetching
 http://10.194.70.21:51166/jars/geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar to
 /tmp/fetchFileTemp2456419097284083628.tmp
 13/12/23 11:00:37 INFO Executor: Adding
 file[redacted]/spark/work/app-20131223110034-/0/./geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar
 to class loader
 13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread
 Thread[pool-7-thread-4,5,main]
 java.lang.NullPointerException
 at
 org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
 at
 org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
 at scala.Option.flatMap(Option.scala:170)
 at
 

Re: debugging NotSerializableException while using Kryo

2013-12-23 Thread Michael (Bach) Bui
What spark version are you using? By looking at the code Executor.scala 
line195, you will at least know what cause the NPE.
We can start from there.



On Dec 23, 2013, at 10:21 AM, Ameet Kini ameetk...@gmail.com wrote:

 Thanks Imran. 
 
 I tried setting spark.closure.serializer to 
 org.apache.spark.serializer.KryoSerializer and now end up seeing 
 NullPointerException when the executor starts up. This is a snippet of the 
 executor's log. Notice how registered TileIdWritable and registered 
 ArgWritable is called, so I see that my KryoRegistrator is being called. 
 However, it's not clear why there's a follow-on NPE. My spark log level is 
 set to DEBUG in log4j.properties (log4j.rootCategory=DEBUG) so not sure if 
 there s 
 some other way to get the executor to be more verbose as to the cause of the 
 NPE. 
 
 When I take out the spark.closure.serializer setting (i.e., go back to the 
 default Java serialization), the executors start up fine, and executes other 
 RDD actions, but of course not the lookup action (my original problem). With 
 the spark.closure.serializer setting to kryo, it dies with an NPE during 
 executor startup. 
 
 
 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Connecting to driver: 
 akka.tcp://[redacted]:48147/user/StandaloneScheduler
 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Successfully registered 
 with driver
 13/12/23 11:00:36 INFO Slf4jLogger: Slf4jLogger started
 13/12/23 11:00:36 INFO Remoting: Starting remoting
 13/12/23 11:00:36 INFO Remoting: Remoting started; listening on addresses 
 :[akka.tcp:/[redacted]:56483]
 13/12/23 11:00:36 INFO Remoting: Remoting now listens on addresses: 
 [akka.tcp://[redacted]:56483]
 13/12/23 11:00:36 INFO SparkEnv: Connecting to BlockManagerMaster: 
 akka.tcp://[redacted]:48147/user/BlockManagerMaster
 13/12/23 11:00:36 INFO MemoryStore: MemoryStore started with capacity 323.9 
 MB.
 13/12/23 11:00:36 DEBUG DiskStore: Creating local directories at root dirs 
 '/tmp'
 13/12/23 11:00:36 INFO DiskStore: Created local directory at 
 /tmp/spark-local-20131223110036-4335
 13/12/23 11:00:36 INFO ConnectionManager: Bound socket to port 41617 with id 
 = ConnectionManagerId([redacted],41617)
 13/12/23 11:00:36 INFO BlockManagerMaster: Trying to register BlockManager
 13/12/23 11:00:36 INFO BlockManagerMaster: Registered BlockManager
 13/12/23 11:00:36 INFO SparkEnv: Connecting to MapOutputTracker: 
 akka.tcp:/[redacted]:48147/user/MapOutputTracker
 13/12/23 11:00:36 INFO HttpFileServer: HTTP File server directory is 
 /tmp/spark-e71e0a2b-a247-4bb8-b06d-19c12467b65a
 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 0
 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 1
 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 2
 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 3
 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator: 
 geotrellis.spark.KryoRegistrator
 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator: 
 geotrellis.spark.KryoRegistrator
 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator: 
 geotrellis.spark.KryoRegistrator
 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator: 
 geotrellis.spark.KryoRegistrator
 registered TileIdWritable
 registered TileIdWritable
 registered TileIdWritable
 registered TileIdWritable
 registered ArgWritable
 registered ArgWritable
 registered ArgWritable
 registered ArgWritable
 13/12/23 11:00:37 INFO Executor: Running task ID 2
 13/12/23 11:00:37 INFO Executor: Running task ID 1
 13/12/23 11:00:37 INFO Executor: Running task ID 3
 13/12/23 11:00:37 INFO Executor: Running task ID 0
 13/12/23 11:00:37 INFO Executor: Fetching 
 http://10.194.70.21:51166/jars/geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar with 
 timestamp 1387814434436
 13/12/23 11:00:37 INFO Utils: Fetching 
 http://10.194.70.21:51166/jars/geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar to 
 /tmp/fetchFileTemp2456419097284083628.tmp
 13/12/23 11:00:37 INFO Executor: Adding 
 file[redacted]/spark/work/app-20131223110034-/0/./geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar
  to class loader
 13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread 
 Thread[pool-7-thread-4,5,main]
 java.lang.NullPointerException
 at 
 org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
 at 
 org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
 at scala.Option.flatMap(Option.scala:170)
 at 
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
 at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:724)
 13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread 
 Thread[pool-7-thread-2,5,main]
 java.lang.NullPointerException
 at 
 

Re: debugging NotSerializableException while using Kryo

2013-12-20 Thread Imran Rashid
there is a separate setting for serializing closures
spark.closure.serializer (listed here
http://spark.incubator.apache.org/docs/latest/configuration.html)

that is used to serialize whatever is used by all the fucntions on an RDD,
eg., map, filter, and lookup.  Those closures include referenced variables,
like your
TileIdWritable.

So you need to either change that to use kryo, or make your object
serializable to java.



On Fri, Dec 20, 2013 at 2:18 PM, Ameet Kini ameetk...@gmail.com wrote:


 I'm getting the below NotSerializableException despite using Kryo to
 serialize that class (TileIdWritable).

 The offending line: awtestRdd.lookup(TileIdWritable(200))

 Initially I thought Kryo is not being registered properly, so I tried
 running operations over awtestRDD which force a shuffle (e.g., groupByKey),
 and that seemed to work fine. So it seems to be specific to the
 TileIdWritable(200) argument to lookup().  Is there anything unique about
 companion objects and Kryo serialization? I even replaced
 TileIdWritable(200) by new TileIdWritable but still see that exception


 class TileIdWritable {
  //
 }

 object TileIdWritable {
  def apply(value: Long) = new TileIdWritable
 }


 My Kryo registrator:
 class KryoRegistrator extends SparkKryoRegistrator {
 override def registerClasses(kryo: Kryo) {
   println(Called KryoRegistrator)  // I see this printed during
 shuffle operations
   val r = kryo.register(classOf[TileIdWritable])
   val s = kryo.register(classOf[ArgWritable])
 }
 }

 Then just before creating a Spark Context, I have these two lines:
 System.setProperty(spark.serializer,
 org.apache.spark.serializer.KryoSerializer)
 System.setProperty(spark.kryo.registrator,
 geotrellis.spark.KryoRegistrator)




 The exception itself:
 Exception in thread main org.apache.spark.SparkException: Job failed:
 Task not serializable: java.io.NotSerializableException:
 geotrellis.spark.formats.TileIdWritable
 - field (class org.apache.spark.rdd.PairRDDFunctions$$anonfun$4,
 name: key$1, type: class java.lang.Object)
 - object (class org.apache.spark.rdd.PairRDDFunctions$$anonfun$4,
 function1)
 - field (class org.apache.spark.SparkContext$$anonfun$runJob$4,
 name: func$1, type: interface scala.Function1)
 - root object (class
 org.apache.spark.SparkContext$$anonfun$runJob$4, function2)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:763)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:761)
 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at
 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:761)
 at org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556)
 at org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503)
 at
 org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361)
 at org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
 at
 org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)

 Regards,
 Ameet