Or maybe there is a bug related to the base64 in py4j, could you dumps the serialized bytes of closure to verify this?
You could add a line in spark/python/pyspark/rdd.py: ser = CloudPickleSerializer() pickled_command = ser.dumps(command) + print len(pickled_command), repr(pickled_command) On Tue, Sep 23, 2014 at 11:02 AM, Brad Miller <bmill...@eecs.berkeley.edu> wrote: > Hi Davies, > > That's interesting to know. Here's more details about my code. The object > ("self") contains pointers to the spark_context (which seems to generate > errors during serialization) so I strip off the extra state using the outer > lambda function and just pass the value self.all_models into the map. > all_models is a list of length 9 where each element contains 3 numbers (ints > or floats, can't remember) and then one LinearSVC object. The classifier > was trained over ~2.5M features, so the object isn't small, but probably > shouldn't be 150M either. Additionally, the call ran OK when I use either > 2x the first 5 objects or 2x the last 5 objects (another reason why it seems > unlikely the bug was size related). > > def _predict_all_models(all_models, sample): > scores = [] > for _, (_, _, classifier) in all_models: > score = classifier.decision_function(sample[VALUE][RECORD]) > scores.append(float(score)) > return (sample[VALUE][LABEL], scores) > > # fails > # return (lambda am: testing_feature_rdd.map(lambda x: > _predict_all_models(am, x))) (self.all_models) > # works > # return (lambda am: testing_feature_rdd.map(lambda x: > _predict_all_models(am, x))) (self.all_models[:5] + self.all_models[:5]) > # return (lambda am: testing_feature_rdd.map(lambda x: > _predict_all_models(am, x))) (self.all_models[4:] + self.all_models[4:]) > > I've since written a work-around into my code, but if I get a chance I'll > switch to broadcast variables and see whether that works. > > later, > -brad > > On Mon, Sep 22, 2014 at 11:12 AM, Davies Liu <dav...@databricks.com> wrote: >> >> The traceback said that the serialized closure cannot be parsed (base64) >> correctly by py4j. >> >> The string in Java cannot be longer than 2G, so the serialized closure >> cannot longer than 1.5G (there are overhead in base64), is it possible >> that your data used in the map function is so big? If it's, you should >> use broadcast for it. >> >> In master of Spark, we will use broadcast automatically if the closure >> is too big. (but use broadcast explicitly is always better). >> >> On Sat, Sep 20, 2014 at 12:42 PM, Brad Miller >> <bmill...@eecs.berkeley.edu> wrote: >> > Hi All, >> > >> > I'm experiencing a java.lang.NegativeArraySizeException in a pyspark >> > script >> > I have. I've pasted the full traceback at the end of this email. >> > >> > I have isolated the line of code in my script which "causes" the >> > exception >> > to occur. Although the exception seems to occur deterministically, it is >> > very unclear why the different variants of the line would cause the >> > exception to occur. Unfortunately, I am only able to reproduce the bug >> > in >> > the context of a large data processing job, and the line of code which >> > must >> > change to reproduce the bug has little meaning out of context. The bug >> > occurs when I call "map" on an RDD with a function that references some >> > state outside of the RDD (which is presumably bundled up and distributed >> > with the function). The output of the function is a tuple where the >> > first >> > element is an int and the second element is a list of floats (same >> > positive >> > length every time, as verified by an 'assert' statement). >> > >> > Given that: >> > -It's unclear why changes in the line would cause an exception >> > -The exception comes from within pyspark code >> > -The exception has to do with negative array sizes (and I couldn't have >> > created a negative sized array anywhere in my python code) >> > I suspect this is a bug in pyspark. >> > >> > Has anybody else observed or reported this bug? >> > >> > best, >> > -Brad >> > >> > Traceback (most recent call last): >> > File "/home/bmiller1/pipeline/driver.py", line 214, in <module> >> > main() >> > File "/home/bmiller1/pipeline/driver.py", line 203, in main >> > bl.write_results(iteration_out_dir) >> > File "/home/bmiller1/pipeline/layer/svm_layer.py", line 137, in >> > write_results >> > fig, accuracy = _get_results(self.prediction_rdd) >> > File "/home/bmiller1/pipeline/layer/svm_layer.py", line 56, in >> > _get_results >> > predictions = np.array(prediction_rdd.collect()) >> > File "/home/spark/spark-1.1.0-bin-hadoop1/python/pyspark/rdd.py", line >> > 723, in collect >> > bytesInJava = self._jrdd.collect().iterator() >> > File "/home/spark/spark-1.1.0-bin-hadoop1/python/pyspark/rdd.py", line >> > 2026, in _jrdd >> > broadcast_vars, self.ctx._javaAccumulator) >> > File >> > >> > "/home/spark/spark-1.1.0-bin-hadoop1/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", >> > line 701, in __call__ >> > File >> > >> > "/home/spark/spark-1.1.0-bin-hadoop1/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", >> > line 304, in get_return_value >> > py4j.protocol.Py4JError: An error occurred while calling >> > None.org.apache.spark.api.python.PythonRDD. Trace: >> > java.lang.NegativeArraySizeException >> > at py4j.Base64.decode(Base64.java:292) >> > at py4j.Protocol.getBytes(Protocol.java:167) >> > at py4j.Protocol.getObject(Protocol.java:276) >> > at py4j.commands.AbstractCommand.getArguments(AbstractCommand.java:81) >> > at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:66) >> > at py4j.GatewayConnection.run(GatewayConnection.java:207) >> > at java.lang.Thread.run(Thread.java:701) >> > > > --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org