Hi Davies,

Thanks for your help.

I ultimately re-wrote the code to use broadcast variables, and then
received an error when trying to broadcast self.all_models that the size
did not fit in an int (recall that broadcasts use 32 bit ints to store
size), suggesting that it was in fact over 2G.  I don't know why the
previous tests (described above) where duplicated portions of
self.all_models worked (it could have been an error in either my debugging
or notes), but splitting the self.all_models into a separate broadcast
variable for each element worked.  I avoided broadcast variables for a
while since there was no way to unpersist them in pyspark, but now that
there is you're completely right that using broadcast is the correct way to
code this.

best,
-Brad

On Tue, Sep 23, 2014 at 12:16 PM, Davies Liu <dav...@databricks.com> wrote:

> Or maybe there is a bug related to the base64 in py4j, could you
> dumps the serialized bytes of closure to verify this?
>
> You could add a line in spark/python/pyspark/rdd.py:
>
>         ser = CloudPickleSerializer()
>         pickled_command = ser.dumps(command)
> +      print len(pickled_command), repr(pickled_command)
>
>
> On Tue, Sep 23, 2014 at 11:02 AM, Brad Miller
> <bmill...@eecs.berkeley.edu> wrote:
> > Hi Davies,
> >
> > That's interesting to know.  Here's more details about my code.  The
> object
> > ("self") contains pointers to the spark_context (which seems to generate
> > errors during serialization) so I strip off the extra state using the
> outer
> > lambda function and just pass the value self.all_models into the map.
> > all_models is a list of length 9 where each element contains 3 numbers
> (ints
> > or floats, can't remember) and then one LinearSVC object.  The classifier
> > was trained over ~2.5M features, so the object isn't small, but probably
> > shouldn't be 150M either.  Additionally, the call ran OK when I use
> either
> > 2x the first 5 objects or 2x the last 5 objects (another reason why it
> seems
> > unlikely the bug was size related).
> >
> > def _predict_all_models(all_models, sample):
> >     scores = []
> >     for _, (_, _, classifier) in all_models:
> >         score = classifier.decision_function(sample[VALUE][RECORD])
> >         scores.append(float(score))
> >     return (sample[VALUE][LABEL], scores)
> >
> >     # fails
> > #    return (lambda am: testing_feature_rdd.map(lambda x:
> > _predict_all_models(am, x))) (self.all_models)
> >     # works
> > #    return (lambda am: testing_feature_rdd.map(lambda x:
> > _predict_all_models(am, x))) (self.all_models[:5] + self.all_models[:5])
> > #    return (lambda am: testing_feature_rdd.map(lambda x:
> > _predict_all_models(am, x))) (self.all_models[4:] + self.all_models[4:])
> >
> > I've since written a work-around into my code, but if I get a chance I'll
> > switch to broadcast variables and see whether that works.
> >
> > later,
> > -brad
> >
> > On Mon, Sep 22, 2014 at 11:12 AM, Davies Liu <dav...@databricks.com>
> wrote:
> >>
> >> The traceback said that the serialized closure cannot be parsed (base64)
> >> correctly by py4j.
> >>
> >> The string in Java cannot be longer than 2G, so the serialized closure
> >> cannot longer than 1.5G (there are overhead in base64), is it possible
> >> that your data used in the map function is so big? If it's, you should
> >> use broadcast for it.
> >>
> >> In master of Spark, we will use broadcast automatically if the closure
> >> is too big. (but use broadcast explicitly is always better).
> >>
> >> On Sat, Sep 20, 2014 at 12:42 PM, Brad Miller
> >> <bmill...@eecs.berkeley.edu> wrote:
> >> > Hi All,
> >> >
> >> > I'm experiencing a java.lang.NegativeArraySizeException in a pyspark
> >> > script
> >> > I have.  I've pasted the full traceback at the end of this email.
> >> >
> >> > I have isolated the line of code in my script which "causes" the
> >> > exception
> >> > to occur. Although the exception seems to occur deterministically, it
> is
> >> > very unclear why the different variants of the line would cause the
> >> > exception to occur. Unfortunately, I am only able to reproduce the bug
> >> > in
> >> > the context of a large data processing job, and the line of code which
> >> > must
> >> > change to reproduce the bug has little meaning out of context.  The
> bug
> >> > occurs when I call "map" on an RDD with a function that references
> some
> >> > state outside of the RDD (which is presumably bundled up and
> distributed
> >> > with the function).  The output of the function is a tuple where the
> >> > first
> >> > element is an int and the second element is a list of floats (same
> >> > positive
> >> > length every time, as verified by an 'assert' statement).
> >> >
> >> > Given that:
> >> > -It's unclear why changes in the line would cause an exception
> >> > -The exception comes from within pyspark code
> >> > -The exception has to do with negative array sizes (and I couldn't
> have
> >> > created a negative sized array anywhere in my python code)
> >> > I suspect this is a bug in pyspark.
> >> >
> >> > Has anybody else observed or reported this bug?
> >> >
> >> > best,
> >> > -Brad
> >> >
> >> > Traceback (most recent call last):
> >> >   File "/home/bmiller1/pipeline/driver.py", line 214, in <module>
> >> >     main()
> >> >   File "/home/bmiller1/pipeline/driver.py", line 203, in main
> >> >     bl.write_results(iteration_out_dir)
> >> >   File "/home/bmiller1/pipeline/layer/svm_layer.py", line 137, in
> >> > write_results
> >> >     fig, accuracy = _get_results(self.prediction_rdd)
> >> >   File "/home/bmiller1/pipeline/layer/svm_layer.py", line 56, in
> >> > _get_results
> >> >     predictions = np.array(prediction_rdd.collect())
> >> >   File "/home/spark/spark-1.1.0-bin-hadoop1/python/pyspark/rdd.py",
> line
> >> > 723, in collect
> >> >     bytesInJava = self._jrdd.collect().iterator()
> >> >   File "/home/spark/spark-1.1.0-bin-hadoop1/python/pyspark/rdd.py",
> line
> >> > 2026, in _jrdd
> >> >     broadcast_vars, self.ctx._javaAccumulator)
> >> >   File
> >> >
> >> >
> "/home/spark/spark-1.1.0-bin-hadoop1/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
> >> > line 701, in __call__
> >> >   File
> >> >
> >> >
> "/home/spark/spark-1.1.0-bin-hadoop1/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py",
> >> > line 304, in get_return_value
> >> > py4j.protocol.Py4JError: An error occurred while calling
> >> > None.org.apache.spark.api.python.PythonRDD. Trace:
> >> > java.lang.NegativeArraySizeException
> >> > at py4j.Base64.decode(Base64.java:292)
> >> > at py4j.Protocol.getBytes(Protocol.java:167)
> >> > at py4j.Protocol.getObject(Protocol.java:276)
> >> > at py4j.commands.AbstractCommand.getArguments(AbstractCommand.java:81)
> >> > at
> py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:66)
> >> > at py4j.GatewayConnection.run(GatewayConnection.java:207)
> >> > at java.lang.Thread.run(Thread.java:701)
> >> >
> >
> >
>

Reply via email to