Or maybe there is a bug related to the base64 in py4j, could you
dumps the serialized bytes of closure to verify this?

You could add a line in spark/python/pyspark/rdd.py:

        ser = CloudPickleSerializer()
        pickled_command = ser.dumps(command)
+      print len(pickled_command), repr(pickled_command)


On Tue, Sep 23, 2014 at 11:02 AM, Brad Miller
<bmill...@eecs.berkeley.edu> wrote:
> Hi Davies,
>
> That's interesting to know.  Here's more details about my code.  The object
> ("self") contains pointers to the spark_context (which seems to generate
> errors during serialization) so I strip off the extra state using the outer
> lambda function and just pass the value self.all_models into the map.
> all_models is a list of length 9 where each element contains 3 numbers (ints
> or floats, can't remember) and then one LinearSVC object.  The classifier
> was trained over ~2.5M features, so the object isn't small, but probably
> shouldn't be 150M either.  Additionally, the call ran OK when I use either
> 2x the first 5 objects or 2x the last 5 objects (another reason why it seems
> unlikely the bug was size related).
>
> def _predict_all_models(all_models, sample):
>     scores = []
>     for _, (_, _, classifier) in all_models:
>         score = classifier.decision_function(sample[VALUE][RECORD])
>         scores.append(float(score))
>     return (sample[VALUE][LABEL], scores)
>
>     # fails
> #    return (lambda am: testing_feature_rdd.map(lambda x:
> _predict_all_models(am, x))) (self.all_models)
>     # works
> #    return (lambda am: testing_feature_rdd.map(lambda x:
> _predict_all_models(am, x))) (self.all_models[:5] + self.all_models[:5])
> #    return (lambda am: testing_feature_rdd.map(lambda x:
> _predict_all_models(am, x))) (self.all_models[4:] + self.all_models[4:])
>
> I've since written a work-around into my code, but if I get a chance I'll
> switch to broadcast variables and see whether that works.
>
> later,
> -brad
>
> On Mon, Sep 22, 2014 at 11:12 AM, Davies Liu <dav...@databricks.com> wrote:
>>
>> The traceback said that the serialized closure cannot be parsed (base64)
>> correctly by py4j.
>>
>> The string in Java cannot be longer than 2G, so the serialized closure
>> cannot longer than 1.5G (there are overhead in base64), is it possible
>> that your data used in the map function is so big? If it's, you should
>> use broadcast for it.
>>
>> In master of Spark, we will use broadcast automatically if the closure
>> is too big. (but use broadcast explicitly is always better).
>>
>> On Sat, Sep 20, 2014 at 12:42 PM, Brad Miller
>> <bmill...@eecs.berkeley.edu> wrote:
>> > Hi All,
>> >
>> > I'm experiencing a java.lang.NegativeArraySizeException in a pyspark
>> > script
>> > I have.  I've pasted the full traceback at the end of this email.
>> >
>> > I have isolated the line of code in my script which "causes" the
>> > exception
>> > to occur. Although the exception seems to occur deterministically, it is
>> > very unclear why the different variants of the line would cause the
>> > exception to occur. Unfortunately, I am only able to reproduce the bug
>> > in
>> > the context of a large data processing job, and the line of code which
>> > must
>> > change to reproduce the bug has little meaning out of context.  The bug
>> > occurs when I call "map" on an RDD with a function that references some
>> > state outside of the RDD (which is presumably bundled up and distributed
>> > with the function).  The output of the function is a tuple where the
>> > first
>> > element is an int and the second element is a list of floats (same
>> > positive
>> > length every time, as verified by an 'assert' statement).
>> >
>> > Given that:
>> > -It's unclear why changes in the line would cause an exception
>> > -The exception comes from within pyspark code
>> > -The exception has to do with negative array sizes (and I couldn't have
>> > created a negative sized array anywhere in my python code)
>> > I suspect this is a bug in pyspark.
>> >
>> > Has anybody else observed or reported this bug?
>> >
>> > best,
>> > -Brad
>> >
>> > Traceback (most recent call last):
>> >   File "/home/bmiller1/pipeline/driver.py", line 214, in <module>
>> >     main()
>> >   File "/home/bmiller1/pipeline/driver.py", line 203, in main
>> >     bl.write_results(iteration_out_dir)
>> >   File "/home/bmiller1/pipeline/layer/svm_layer.py", line 137, in
>> > write_results
>> >     fig, accuracy = _get_results(self.prediction_rdd)
>> >   File "/home/bmiller1/pipeline/layer/svm_layer.py", line 56, in
>> > _get_results
>> >     predictions = np.array(prediction_rdd.collect())
>> >   File "/home/spark/spark-1.1.0-bin-hadoop1/python/pyspark/rdd.py", line
>> > 723, in collect
>> >     bytesInJava = self._jrdd.collect().iterator()
>> >   File "/home/spark/spark-1.1.0-bin-hadoop1/python/pyspark/rdd.py", line
>> > 2026, in _jrdd
>> >     broadcast_vars, self.ctx._javaAccumulator)
>> >   File
>> >
>> > "/home/spark/spark-1.1.0-bin-hadoop1/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
>> > line 701, in __call__
>> >   File
>> >
>> > "/home/spark/spark-1.1.0-bin-hadoop1/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py",
>> > line 304, in get_return_value
>> > py4j.protocol.Py4JError: An error occurred while calling
>> > None.org.apache.spark.api.python.PythonRDD. Trace:
>> > java.lang.NegativeArraySizeException
>> > at py4j.Base64.decode(Base64.java:292)
>> > at py4j.Protocol.getBytes(Protocol.java:167)
>> > at py4j.Protocol.getObject(Protocol.java:276)
>> > at py4j.commands.AbstractCommand.getArguments(AbstractCommand.java:81)
>> > at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:66)
>> > at py4j.GatewayConnection.run(GatewayConnection.java:207)
>> > at java.lang.Thread.run(Thread.java:701)
>> >
>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to