On Thu, Sep 25, 2014 at 11:25 AM, Brad Miller
<bmill...@eecs.berkeley.edu> wrote:
> Hi Davies,
>
> Thanks for your help.
>
> I ultimately re-wrote the code to use broadcast variables, and then received
> an error when trying to broadcast self.all_models that the size did not fit
> in an int (recall that broadcasts use 32 bit ints to store size),

What is the error? Could you file a JIRA for it?

> that it was in fact over 2G.  I don't know why the previous tests (described
> above) where duplicated portions of self.all_models worked (it could have
> been an error in either my debugging or notes), but splitting the
> self.all_models into a separate broadcast variable for each element worked.
> I avoided broadcast variables for a while since there was no way to
> unpersist them in pyspark, but now that there is you're completely right
> that using broadcast is the correct way to code this.

In 1.1, you could use broadcast.unpersist() to release it, also the performance
of Python Broadcast was much improved in 1.1.


> best,
> -Brad
>
> On Tue, Sep 23, 2014 at 12:16 PM, Davies Liu <dav...@databricks.com> wrote:
>>
>> Or maybe there is a bug related to the base64 in py4j, could you
>> dumps the serialized bytes of closure to verify this?
>>
>> You could add a line in spark/python/pyspark/rdd.py:
>>
>>         ser = CloudPickleSerializer()
>>         pickled_command = ser.dumps(command)
>> +      print len(pickled_command), repr(pickled_command)
>>
>>
>> On Tue, Sep 23, 2014 at 11:02 AM, Brad Miller
>> <bmill...@eecs.berkeley.edu> wrote:
>> > Hi Davies,
>> >
>> > That's interesting to know.  Here's more details about my code.  The
>> > object
>> > ("self") contains pointers to the spark_context (which seems to generate
>> > errors during serialization) so I strip off the extra state using the
>> > outer
>> > lambda function and just pass the value self.all_models into the map.
>> > all_models is a list of length 9 where each element contains 3 numbers
>> > (ints
>> > or floats, can't remember) and then one LinearSVC object.  The
>> > classifier
>> > was trained over ~2.5M features, so the object isn't small, but probably
>> > shouldn't be 150M either.  Additionally, the call ran OK when I use
>> > either
>> > 2x the first 5 objects or 2x the last 5 objects (another reason why it
>> > seems
>> > unlikely the bug was size related).
>> >
>> > def _predict_all_models(all_models, sample):
>> >     scores = []
>> >     for _, (_, _, classifier) in all_models:
>> >         score = classifier.decision_function(sample[VALUE][RECORD])
>> >         scores.append(float(score))
>> >     return (sample[VALUE][LABEL], scores)
>> >
>> >     # fails
>> > #    return (lambda am: testing_feature_rdd.map(lambda x:
>> > _predict_all_models(am, x))) (self.all_models)
>> >     # works
>> > #    return (lambda am: testing_feature_rdd.map(lambda x:
>> > _predict_all_models(am, x))) (self.all_models[:5] + self.all_models[:5])
>> > #    return (lambda am: testing_feature_rdd.map(lambda x:
>> > _predict_all_models(am, x))) (self.all_models[4:] + self.all_models[4:])
>> >
>> > I've since written a work-around into my code, but if I get a chance
>> > I'll
>> > switch to broadcast variables and see whether that works.
>> >
>> > later,
>> > -brad
>> >
>> > On Mon, Sep 22, 2014 at 11:12 AM, Davies Liu <dav...@databricks.com>
>> > wrote:
>> >>
>> >> The traceback said that the serialized closure cannot be parsed
>> >> (base64)
>> >> correctly by py4j.
>> >>
>> >> The string in Java cannot be longer than 2G, so the serialized closure
>> >> cannot longer than 1.5G (there are overhead in base64), is it possible
>> >> that your data used in the map function is so big? If it's, you should
>> >> use broadcast for it.
>> >>
>> >> In master of Spark, we will use broadcast automatically if the closure
>> >> is too big. (but use broadcast explicitly is always better).
>> >>
>> >> On Sat, Sep 20, 2014 at 12:42 PM, Brad Miller
>> >> <bmill...@eecs.berkeley.edu> wrote:
>> >> > Hi All,
>> >> >
>> >> > I'm experiencing a java.lang.NegativeArraySizeException in a pyspark
>> >> > script
>> >> > I have.  I've pasted the full traceback at the end of this email.
>> >> >
>> >> > I have isolated the line of code in my script which "causes" the
>> >> > exception
>> >> > to occur. Although the exception seems to occur deterministically, it
>> >> > is
>> >> > very unclear why the different variants of the line would cause the
>> >> > exception to occur. Unfortunately, I am only able to reproduce the
>> >> > bug
>> >> > in
>> >> > the context of a large data processing job, and the line of code
>> >> > which
>> >> > must
>> >> > change to reproduce the bug has little meaning out of context.  The
>> >> > bug
>> >> > occurs when I call "map" on an RDD with a function that references
>> >> > some
>> >> > state outside of the RDD (which is presumably bundled up and
>> >> > distributed
>> >> > with the function).  The output of the function is a tuple where the
>> >> > first
>> >> > element is an int and the second element is a list of floats (same
>> >> > positive
>> >> > length every time, as verified by an 'assert' statement).
>> >> >
>> >> > Given that:
>> >> > -It's unclear why changes in the line would cause an exception
>> >> > -The exception comes from within pyspark code
>> >> > -The exception has to do with negative array sizes (and I couldn't
>> >> > have
>> >> > created a negative sized array anywhere in my python code)
>> >> > I suspect this is a bug in pyspark.
>> >> >
>> >> > Has anybody else observed or reported this bug?
>> >> >
>> >> > best,
>> >> > -Brad
>> >> >
>> >> > Traceback (most recent call last):
>> >> >   File "/home/bmiller1/pipeline/driver.py", line 214, in <module>
>> >> >     main()
>> >> >   File "/home/bmiller1/pipeline/driver.py", line 203, in main
>> >> >     bl.write_results(iteration_out_dir)
>> >> >   File "/home/bmiller1/pipeline/layer/svm_layer.py", line 137, in
>> >> > write_results
>> >> >     fig, accuracy = _get_results(self.prediction_rdd)
>> >> >   File "/home/bmiller1/pipeline/layer/svm_layer.py", line 56, in
>> >> > _get_results
>> >> >     predictions = np.array(prediction_rdd.collect())
>> >> >   File "/home/spark/spark-1.1.0-bin-hadoop1/python/pyspark/rdd.py",
>> >> > line
>> >> > 723, in collect
>> >> >     bytesInJava = self._jrdd.collect().iterator()
>> >> >   File "/home/spark/spark-1.1.0-bin-hadoop1/python/pyspark/rdd.py",
>> >> > line
>> >> > 2026, in _jrdd
>> >> >     broadcast_vars, self.ctx._javaAccumulator)
>> >> >   File
>> >> >
>> >> >
>> >> > "/home/spark/spark-1.1.0-bin-hadoop1/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
>> >> > line 701, in __call__
>> >> >   File
>> >> >
>> >> >
>> >> > "/home/spark/spark-1.1.0-bin-hadoop1/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py",
>> >> > line 304, in get_return_value
>> >> > py4j.protocol.Py4JError: An error occurred while calling
>> >> > None.org.apache.spark.api.python.PythonRDD. Trace:
>> >> > java.lang.NegativeArraySizeException
>> >> > at py4j.Base64.decode(Base64.java:292)
>> >> > at py4j.Protocol.getBytes(Protocol.java:167)
>> >> > at py4j.Protocol.getObject(Protocol.java:276)
>> >> > at
>> >> > py4j.commands.AbstractCommand.getArguments(AbstractCommand.java:81)
>> >> > at
>> >> > py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:66)
>> >> > at py4j.GatewayConnection.run(GatewayConnection.java:207)
>> >> > at java.lang.Thread.run(Thread.java:701)
>> >> >
>> >
>> >
>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to