Hi again,

I narrowed down the issue a bit more -- it seems to have to do with the
Kryo serializer. When I use it, then this results in a Null Pointer:

rdd = sc.parallelize(range(10))
d = {}
from random import random
for i in range(100000) :
    d[i] = random()

rdd.map(lambda x: d[x]).collect()

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-97-7cd5df24206c> in <module>()
----> 1 rdd.map(lambda x: d[x]).collect()

/cluster/home/roskarr/spark-1.2.0-bin-hadoop2.4/python/pyspark/rdd.pyc in
collect(self)
    674         """
    675         with SCCallSiteSync(self.context) as css:
--> 676             bytesInJava = self._jrdd.collect().iterator()
    677         return
list(self._collect_iterator_through_file(bytesInJava))
    678

/cluster/home/roskarr/spark-1.2.0-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py
in __call__(self, *args)
    536         answer = self.gateway_client.send_command(command)
    537         return_value = get_return_value(answer, self.gateway_client,
--> 538                 self.target_id, self.name)
    539
    540         for temp_arg in temp_args:

/cluster/home/roskarr/spark-1.2.0-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py
in get_return_value(answer, gateway_client, target_id, name)
    298                 raise Py4JJavaError(
    299                     'An error occurred while calling {0}{1}{2}.\n'.
--> 300                     format(target_id, '.', name), value)
    301             else:
    302                 raise Py4JError(

Py4JJavaError: An error occurred while calling o768.collect.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1
in stage 0.0 failed 4 times, most recent failure: Lost task 1.3 in stage
0.0 (TID 87, e1305.hpc-lca.ethz.ch): java.lang.NullPointerException
at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:590)
at
org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(PythonRDD.scala:233)
at
org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(PythonRDD.scala:229)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at
org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:229)
at
org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:204)
at
org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:204)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1460)
at
org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:203)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

If I use a dictionary with fewer items, then it works fine:

In [98]:
rdd = sc.parallelize(range(10))
d = {}

from random import random
for i in range(10000) :
    d[i] = random()

In [99]:
rdd.map(lambda x: d[x]).collect()

Out[99]:
[0.39210713836346933,
 0.8636333432012482,
 0.28744831569153617,
 0.663815926356163,
 0.38274814840717364,
 0.6606453820150496,
 0.8610156719813942,
 0.6971353266345091,
 0.9896836700210551,
 0.05789392881996358]

Is there a size limit for objects serialized with Kryo? Or an option that
controls it? The Java serializer works fine.

On Wed, Feb 11, 2015 at 8:04 PM, Rok Roskar <rokros...@gmail.com> wrote:

> I think the problem was related to the broadcasts being too large -- I've
> now split it up into many smaller operations but it's still not quite there
> -- see
> http://apache-spark-user-list.1001560.n3.nabble.com/iteratively-modifying-an-RDD-td21606.html
>
> Thanks,
>
> Rok
>
> On Wed, Feb 11, 2015, 19:59 Davies Liu <dav...@databricks.com> wrote:
>
>> Could you share a short script to reproduce this problem?
>>
>> On Tue, Feb 10, 2015 at 8:55 PM, Rok Roskar <rokros...@gmail.com> wrote:
>> > I didn't notice other errors -- I also thought such a large broadcast
>> is a
>> > bad idea but I tried something similar with a much smaller dictionary
>> and
>> > encountered the same problem. I'm not familiar enough with spark
>> internals
>> > to know whether the trace indicates an issue with the broadcast
>> variables or
>> > perhaps something different?
>> >
>> > The driver and executors have 50gb of ram so memory should be fine.
>> >
>> > Thanks,
>> >
>> > Rok
>> >
>> > On Feb 11, 2015 12:19 AM, "Davies Liu" <dav...@databricks.com> wrote:
>> >>
>> >> It's brave to broadcast 8G pickled data, it will take more than 15G in
>> >> memory for each Python worker,
>> >> how much memory do you have in executor and driver?
>> >> Do you see any other exceptions in driver and executors? Something
>> >> related to serialization in JVM.
>> >>
>> >> On Tue, Feb 10, 2015 at 2:16 PM, Rok Roskar <rokros...@gmail.com>
>> wrote:
>> >> > I get this in the driver log:
>> >>
>> >> I think this should happen on executor, or you called first() or
>> >> take() on the RDD?
>> >>
>> >> > java.lang.NullPointerException
>> >> >         at
>> >> > org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:590)
>> >> >         at
>> >> > org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$
>> run$1$$anonfun$apply$mcV$sp$3.apply(PythonRDD.scala:233)
>> >> >         at
>> >> > org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$
>> run$1$$anonfun$apply$mcV$sp$3.apply(PythonRDD.scala:229)
>> >> >         at scala.collection.Iterator$class.foreach(Iterator.scala:
>> 727)
>> >> >         at
>> >> > scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>> >> >         at
>> >> > scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>> >> >         at scala.collection.AbstractIterable.foreach(
>> Iterable.scala:54)
>> >> >         at
>> >> > org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$
>> run$1.apply$mcV$sp(PythonRDD.scala:229)
>> >> >         at
>> >> > org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$
>> run$1.apply(PythonRDD.scala:204)
>> >> >         at
>> >> > org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$
>> run$1.apply(PythonRDD.scala:204)
>> >> >         at
>> >> > org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1460)
>> >> >         at
>> >> > org.apache.spark.api.python.PythonRDD$WriterThread.run(Pytho
>> nRDD.scala:203)
>> >> >
>> >> > and on one of the executor's stderr:
>> >> >
>> >> > 15/02/10 23:10:35 ERROR PythonRDD: Python worker exited unexpectedly
>> >> > (crashed)
>> >> > org.apache.spark.api.python.PythonException: Traceback (most recent
>> call
>> >> > last):
>> >> >   File
>> >> > "/cluster/home/roskarr/spark-1.2.0-bin-hadoop2.4/python/pysp
>> ark/worker.py",
>> >> > line 57, in main
>> >> >     split_index = read_int(infile)
>> >> >   File
>> >> > "/cluster/home/roskarr/spark-1.2.0-bin-hadoop2.4/python/pysp
>> ark/serializers.py",
>> >> > line 511, in read_int
>> >> >     raise EOFError
>> >> > EOFError
>> >> >
>> >> >         at
>> >> > org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD
>> .scala:137)
>> >> >         at
>> >> > org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonR
>> DD.scala:174)
>> >> >         at
>> >> > org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96)
>> >> >         at
>> >> > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>> >> >         at
>> >> > org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61)
>> >> >         at org.apache.spark.rdd.RDD.iterator(RDD.scala:228)
>> >> >         at
>> >> > org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$
>> run$1.apply$mcV$sp(PythonRDD.scala:242)
>> >> >         at
>> >> > org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$
>> run$1.apply(PythonRDD.scala:204)
>> >> >         at
>> >> > org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$
>> run$1.apply(PythonRDD.scala:204)
>> >> >         at
>> >> > org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1460)
>> >> >         at
>> >> > org.apache.spark.api.python.PythonRDD$WriterThread.run(Pytho
>> nRDD.scala:203)
>> >> > Caused by: java.lang.NullPointerException
>> >> >         at
>> >> > org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:590)
>> >> >         at
>> >> > org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$
>> run$1$$anonfun$apply$mcV$sp$3.apply(PythonRDD.scala:233)
>> >> >         at
>> >> > org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$
>> run$1$$anonfun$apply$mcV$sp$3.apply(PythonRDD.scala:229)
>> >> >         at scala.collection.Iterator$class.foreach(Iterator.scala:
>> 727)
>> >> >         at
>> >> > scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>> >> >         at
>> >> > scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>> >> >         at scala.collection.AbstractIterable.foreach(
>> Iterable.scala:54)
>> >> >         at
>> >> > org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$
>> run$1.apply$mcV$sp(PythonRDD.scala:229)
>> >> >         ... 4 more
>> >> > 15/02/10 23:10:35 ERROR PythonRDD: Python worker exited unexpectedly
>> >> > (crashed)
>> >> > org.apache.spark.api.python.PythonException: Traceback (most recent
>> call
>> >> > last):
>> >> >   File
>> >> > "/cluster/home/roskarr/spark-1.2.0-bin-hadoop2.4/python/pysp
>> ark/worker.py",
>> >> > line 57, in main
>> >> >     split_index = read_int(infile)
>> >> >   File
>> >> > "/cluster/home/roskarr/spark-1.2.0-bin-hadoop2.4/python/pysp
>> ark/serializers.py",
>> >> > line 511, in read_int
>> >> >     raise EOFError
>> >> > EOFError
>> >> >
>> >> >         at
>> >> > org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD
>> .scala:137)
>> >> >         at
>> >> > org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonR
>> DD.scala:174)
>> >> >         at
>> >> > org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96)
>> >> >         at
>> >> > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>> >> >         at
>> >> > org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61)
>> >> >         at org.apache.spark.rdd.RDD.iterator(RDD.scala:228)
>> >> >         at
>> >> > org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$
>> run$1.apply$mcV$sp(PythonRDD.scala:242)
>> >> >         at
>> >> > org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$
>> run$1.apply(PythonRDD.scala:204)
>> >> >         at
>> >> > org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$
>> run$1.apply(PythonRDD.scala:204)
>> >> >         at
>> >> > org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1460)
>> >> >         at
>> >> > org.apache.spark.api.python.PythonRDD$WriterThread.run(Pytho
>> nRDD.scala:203)
>> >> > Caused by: java.lang.NullPointerException
>> >> >         at
>> >> > org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:590)
>> >> >         at
>> >> > org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$
>> run$1$$anonfun$apply$mcV$sp$3.apply(PythonRDD.scala:233)
>> >> >         at
>> >> > org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$
>> run$1$$anonfun$apply$mcV$sp$3.apply(PythonRDD.scala:229)
>> >> >         at scala.collection.Iterator$class.foreach(Iterator.scala:
>> 727)
>> >> >         at
>> >> > scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>> >> >         at
>> >> > scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>> >> >         at scala.collection.AbstractIterable.foreach(
>> Iterable.scala:54)
>> >> >         at
>> >> > org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$
>> run$1.apply$mcV$sp(PythonRDD.scala:229)
>> >> >         ... 4 more
>> >> >
>> >> >
>> >> > What I find odd is that when I make the broadcast object, the logs
>> don't
>> >> > show any significant amount of memory being allocated in any of the
>> block
>> >> > managers -- but the dictionary is large, it's 8 Gb pickled on disk.
>> >> >
>> >> >
>> >> > On Feb 10, 2015, at 10:01 PM, Davies Liu <dav...@databricks.com>
>> wrote:
>> >> >
>> >> >> Could you paste the NPE stack trace here? It will better to create a
>> >> >> JIRA for it, thanks!
>> >> >>
>> >> >> On Tue, Feb 10, 2015 at 10:42 AM, rok <rokros...@gmail.com> wrote:
>> >> >>> I'm trying to use a broadcasted dictionary inside a map function
>> and
>> >> >>> am
>> >> >>> consistently getting Java null pointer exceptions. This is inside
>> an
>> >> >>> IPython
>> >> >>> session connected to a standalone spark cluster. I seem to recall
>> >> >>> being able
>> >> >>> to do this before but at the moment I am at a loss as to what to
>> try
>> >> >>> next.
>> >> >>> Is there a limit to the size of broadcast variables? This one is
>> >> >>> rather
>> >> >>> large (a few Gb dict). Thanks!
>> >> >>>
>> >> >>> Rok
>> >> >>>
>> >> >>>
>> >> >>>
>> >> >>> --
>> >> >>> View this message in context:
>> >> >>> http://apache-spark-user-list.1001560.n3.nabble.com/pyspark-
>> Java-null-pointer-exception-when-accessing-broadcast-variabl
>> es-tp21580.html
>> >> >>> Sent from the Apache Spark User List mailing list archive at
>> >> >>> Nabble.com.
>> >> >>>
>> >> >>> ------------------------------------------------------------
>> ---------
>> >> >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> >> >>> For additional commands, e-mail: user-h...@spark.apache.org
>> >> >>>
>> >> >
>>
>

Reply via email to