If Java serialization is the only one that properly works for closures,
then I shouldn't be setting "spark.closure.serializer" to
"org.apache.spark.serializer.KryoSerializer", and my only hope for getting
lookup (and other such methods that still use closure serializers) to work
is to either a) use only Java serialization for my objects, or b) have my
objects implement Serializable but enable Kryo as well for object
serialization. For option b, I'd be setting "spark.serializer" to
"org.apache.spark.serializer.KryoSerializer" but leave
"spark.closure.serializer" to its default value (Java).

If the above is true, then seems like as it stands today, the best practice
is for objects that use Kryo to also either implement Serializable or
Externalizable for closures to work properly.

Thanks,
Ameet




On Mon, Dec 23, 2013 at 5:18 PM, Dmitriy Lyubimov <dlie...@gmail.com> wrote:

> The problem really is that in certain cases task results -- and
> front-end-passed parameters -- are passed thru closures. For closures, only
> java serializer is properly supported (afaik).
>
> there has been a limited number of fixes for data parameter communication
> between front end and backend for using other-than-java serialization (e.g.
> for parallelize(), collect()-- these methods do not use closures to pass
> in/grab data objects anymore); however, a certain number of methods is
> still using closures to pass in a data object.
>
> afaik the methods doing correct front/back end parameter serialization are:
>
> collect()
> take() (maybe)
> parallelize()
> reduce()
>
> Everything else ("fold()", etc.) that communicates data between front end
> and backend, still wraps data into closures. For a thing like fold() in
> fact you'd have to use type that has both Java and Kryo support at the same
> time, because it will always use both closure and object serializers while
> executing.
>
> This IMO is inconsistent of course with assumption that same data type
> should be supported uniformly regardless of where it serializes, but that's
> the state of things as it stands.
>
>
>
> On Mon, Dec 23, 2013 at 8:21 AM, Ameet Kini <ameetk...@gmail.com> wrote:
>
>> Thanks Imran.
>>
>> I tried setting "spark.closure.serializer" to
>> "org.apache.spark.serializer.KryoSerializer" and now end up seeing
>> NullPointerException when the executor starts up. This is a snippet of the
>> executor's log. Notice how "registered TileIdWritable" and "registered
>> ArgWritable" is called, so I see that my KryoRegistrator is being called.
>> However, it's not clear why there's a follow-on NPE. My spark log level is
>> set to DEBUG in log4j.properties (log4j.rootCategory=DEBUG) so not sure if
>> there s
>> some other way to get the executor to be more verbose as to the cause of
>> the NPE.
>>
>> When I take out the spark.closure.serializer setting (i.e., go back to
>> the default Java serialization), the executors start up fine, and executes
>> other RDD actions, but of course not the lookup action (my original
>> problem). With the spark.closure.serializer setting to kryo, it dies with
>> an NPE during executor startup.
>>
>>
>> 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Connecting to driver:
>> akka.tcp://[redacted]:48147/user/StandaloneScheduler<http://sp...@karadi.spadac.com:48147/user/StandaloneScheduler>
>> 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Successfully registered
>> with driver
>> 13/12/23 11:00:36 INFO Slf4jLogger: Slf4jLogger started
>> 13/12/23 11:00:36 INFO Remoting: Starting remoting
>> 13/12/23 11:00:36 INFO Remoting: Remoting started; listening on addresses
>> :[akka.tcp:/[redacted]:56483 <http://sp...@karadi.spadac.com:56483/>]
>> 13/12/23 11:00:36 INFO Remoting: Remoting now listens on addresses:
>> [akka.tcp://[redacted]:56483 <http://sp...@karadi.spadac.com:56483/>]
>> 13/12/23 11:00:36 INFO SparkEnv: Connecting to BlockManagerMaster:
>> akka.tcp://[redacted]:48147/user/BlockManagerMaster<http://sp...@karadi.spadac.com:48147/user/BlockManagerMaster>
>> 13/12/23 11:00:36 INFO MemoryStore: MemoryStore started with capacity
>> 323.9 MB.
>> 13/12/23 11:00:36 DEBUG DiskStore: Creating local directories at root
>> dirs '/tmp'
>> 13/12/23 11:00:36 INFO DiskStore: Created local directory at
>> /tmp/spark-local-20131223110036-4335
>> 13/12/23 11:00:36 INFO ConnectionManager: Bound socket to port 41617 with
>> id = ConnectionManagerId([redacted],41617)
>> 13/12/23 11:00:36 INFO BlockManagerMaster: Trying to register BlockManager
>> 13/12/23 11:00:36 INFO BlockManagerMaster: Registered BlockManager
>> 13/12/23 11:00:36 INFO SparkEnv: Connecting to MapOutputTracker:
>> akka.tcp:/[redacted]:48147/user/MapOutputTracker<http://sp...@karadi.spadac.com:48147/user/MapOutputTracker>
>> 13/12/23 11:00:36 INFO HttpFileServer: HTTP File server directory is
>> /tmp/spark-e71e0a2b-a247-4bb8-b06d-19c12467b65a
>> 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 0
>> 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 1
>> 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 2
>> 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 3
>> 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator:
>> geotrellis.spark.KryoRegistrator
>> 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator:
>> geotrellis.spark.KryoRegistrator
>> 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator:
>> geotrellis.spark.KryoRegistrator
>> 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator:
>> geotrellis.spark.KryoRegistrator
>> registered TileIdWritable
>> registered TileIdWritable
>> registered TileIdWritable
>> registered TileIdWritable
>> registered ArgWritable
>> registered ArgWritable
>> registered ArgWritable
>> registered ArgWritable
>> 13/12/23 11:00:37 INFO Executor: Running task ID 2
>> 13/12/23 11:00:37 INFO Executor: Running task ID 1
>> 13/12/23 11:00:37 INFO Executor: Running task ID 3
>> 13/12/23 11:00:37 INFO Executor: Running task ID 0
>> 13/12/23 11:00:37 INFO Executor: Fetching
>> http://10.194.70.21:51166/jars/geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar with
>> timestamp 1387814434436
>> 13/12/23 11:00:37 INFO Utils: Fetching
>> http://10.194.70.21:51166/jars/geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar to
>> /tmp/fetchFileTemp2456419097284083628.tmp
>> 13/12/23 11:00:37 INFO Executor: Adding
>> file[redacted]/spark/work/app-20131223110034-0000/0/./geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar
>> to class loader
>> 13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread
>> Thread[pool-7-thread-4,5,main]
>> java.lang.NullPointerException
>>         at
>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
>>         at
>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
>>         at scala.Option.flatMap(Option.scala:170)
>>         at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
>>         at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>         at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>         at java.lang.Thread.run(Thread.java:724)
>> 13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread
>> Thread[pool-7-thread-2,5,main]
>> java.lang.NullPointerException
>>         at
>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
>>         at
>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
>>        at scala.Option.flatMap(Option.scala:170)
>>         at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
>>         at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>         at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>         at java.lang.Thread.run(Thread.java:724)
>> 13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread
>> Thread[pool-7-thread-1,5,main]
>> java.lang.NullPointerException
>>         at
>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
>>         at
>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
>>         at scala.Option.flatMap(Option.scala:170)
>>         at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
>>         at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>         at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>         at java.lang.Thread.run(Thread.java:724)
>> 13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread
>> Thread[pool-7-thread-3,5,main]
>> java.lang.NullPointerException
>>         at
>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
>>         at
>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
>>         at scala.Option.flatMap(Option.scala:170)
>>         at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
>>         at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>         at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>         at java.lang.Thread.run(Thread.java:724)
>> 13/12/23 11:00:37 DEBUG DiskStore: Shutdown hook called
>>
>> Thanks,
>> Ameet
>>
>>
>> On Fri, Dec 20, 2013 at 3:24 PM, Imran Rashid <im...@quantifind.com>wrote:
>>
>>> there is a separate setting for serializing closures
>>> "spark.closure.serializer" (listed here
>>> http://spark.incubator.apache.org/docs/latest/configuration.html)
>>>
>>> that is used to serialize whatever is used by all the fucntions on an
>>> RDD, eg., map, filter, and lookup.  Those closures include referenced
>>> variables, like your
>>> TileIdWritable.
>>>
>>> So you need to either change that to use kryo, or make your object
>>> serializable to java.
>>>
>>>
>>>
>>> On Fri, Dec 20, 2013 at 2:18 PM, Ameet Kini <ameetk...@gmail.com> wrote:
>>>
>>>>
>>>> I'm getting the below NotSerializableException despite using Kryo to
>>>> serialize that class (TileIdWritable).
>>>>
>>>> The offending line: awtestRdd.lookup(TileIdWritable(200))
>>>>
>>>> Initially I thought Kryo is not being registered properly, so I tried
>>>> running operations over awtestRDD which force a shuffle (e.g., groupByKey),
>>>> and that seemed to work fine. So it seems to be specific to the
>>>> "TileIdWritable(200)" argument to lookup().  Is there anything unique about
>>>> companion objects and Kryo serialization? I even replaced
>>>> "TileIdWritable(200)" by "new TileIdWritable" but still see that exception
>>>>
>>>>
>>>> class TileIdWritable {
>>>>  //
>>>> }
>>>>
>>>> object TileIdWritable {
>>>>  def apply(value: Long) = new TileIdWritable
>>>> }
>>>>
>>>>
>>>> My Kryo registrator:
>>>> class KryoRegistrator extends SparkKryoRegistrator {
>>>>     override def registerClasses(kryo: Kryo) {
>>>>       println("Called KryoRegistrator")  // I see this printed during
>>>> shuffle operations
>>>>       val r = kryo.register(classOf[TileIdWritable])
>>>>       val s = kryo.register(classOf[ArgWritable])
>>>>     }
>>>> }
>>>>
>>>> Then just before creating a Spark Context, I have these two lines:
>>>>     System.setProperty("spark.serializer",
>>>> "org.apache.spark.serializer.KryoSerializer")
>>>>     System.setProperty("spark.kryo.registrator",
>>>> "geotrellis.spark.KryoRegistrator")
>>>>
>>>>
>>>>
>>>>
>>>> The exception itself:
>>>> Exception in thread "main" org.apache.spark.SparkException: Job failed:
>>>> Task not serializable: java.io.NotSerializableException:
>>>> geotrellis.spark.formats.TileIdWritable
>>>>     - field (class "org.apache.spark.rdd.PairRDDFunctions$$anonfun$4",
>>>> name: "key$1", type: "class java.lang.Object")
>>>>     - object (class "org.apache.spark.rdd.PairRDDFunctions$$anonfun$4",
>>>> <function1>)
>>>>     - field (class "org.apache.spark.SparkContext$$anonfun$runJob$4",
>>>> name: "func$1", type: "interface scala.Function1")
>>>>     - root object (class
>>>> "org.apache.spark.SparkContext$$anonfun$runJob$4", <function2>)
>>>>     at
>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:763)
>>>>     at
>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:761)
>>>>     at
>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>>>     at
>>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>>     at
>>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:761)
>>>>     at org.apache.spark.scheduler.DAGScheduler.org
>>>> $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556)
>>>>     at org.apache.spark.scheduler.DAGScheduler.org
>>>> $apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503)
>>>>     at
>>>> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361)
>>>>     at org.apache.spark.scheduler.DAGScheduler.org
>>>> $apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
>>>>     at
>>>> org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)
>>>>
>>>> Regards,
>>>> Ameet
>>>>
>>>>
>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to