On Thu, Sep 25, 2014 at 11:25 AM, Brad Miller <bmill...@eecs.berkeley.edu> wrote: > Hi Davies, > > Thanks for your help. > > I ultimately re-wrote the code to use broadcast variables, and then received > an error when trying to broadcast self.all_models that the size did not fit > in an int (recall that broadcasts use 32 bit ints to store size),
What is the error? Could you file a JIRA for it? > that it was in fact over 2G. I don't know why the previous tests (described > above) where duplicated portions of self.all_models worked (it could have > been an error in either my debugging or notes), but splitting the > self.all_models into a separate broadcast variable for each element worked. > I avoided broadcast variables for a while since there was no way to > unpersist them in pyspark, but now that there is you're completely right > that using broadcast is the correct way to code this. In 1.1, you could use broadcast.unpersist() to release it, also the performance of Python Broadcast was much improved in 1.1. > best, > -Brad > > On Tue, Sep 23, 2014 at 12:16 PM, Davies Liu <dav...@databricks.com> wrote: >> >> Or maybe there is a bug related to the base64 in py4j, could you >> dumps the serialized bytes of closure to verify this? >> >> You could add a line in spark/python/pyspark/rdd.py: >> >> ser = CloudPickleSerializer() >> pickled_command = ser.dumps(command) >> + print len(pickled_command), repr(pickled_command) >> >> >> On Tue, Sep 23, 2014 at 11:02 AM, Brad Miller >> <bmill...@eecs.berkeley.edu> wrote: >> > Hi Davies, >> > >> > That's interesting to know. Here's more details about my code. The >> > object >> > ("self") contains pointers to the spark_context (which seems to generate >> > errors during serialization) so I strip off the extra state using the >> > outer >> > lambda function and just pass the value self.all_models into the map. >> > all_models is a list of length 9 where each element contains 3 numbers >> > (ints >> > or floats, can't remember) and then one LinearSVC object. The >> > classifier >> > was trained over ~2.5M features, so the object isn't small, but probably >> > shouldn't be 150M either. Additionally, the call ran OK when I use >> > either >> > 2x the first 5 objects or 2x the last 5 objects (another reason why it >> > seems >> > unlikely the bug was size related). >> > >> > def _predict_all_models(all_models, sample): >> > scores = [] >> > for _, (_, _, classifier) in all_models: >> > score = classifier.decision_function(sample[VALUE][RECORD]) >> > scores.append(float(score)) >> > return (sample[VALUE][LABEL], scores) >> > >> > # fails >> > # return (lambda am: testing_feature_rdd.map(lambda x: >> > _predict_all_models(am, x))) (self.all_models) >> > # works >> > # return (lambda am: testing_feature_rdd.map(lambda x: >> > _predict_all_models(am, x))) (self.all_models[:5] + self.all_models[:5]) >> > # return (lambda am: testing_feature_rdd.map(lambda x: >> > _predict_all_models(am, x))) (self.all_models[4:] + self.all_models[4:]) >> > >> > I've since written a work-around into my code, but if I get a chance >> > I'll >> > switch to broadcast variables and see whether that works. >> > >> > later, >> > -brad >> > >> > On Mon, Sep 22, 2014 at 11:12 AM, Davies Liu <dav...@databricks.com> >> > wrote: >> >> >> >> The traceback said that the serialized closure cannot be parsed >> >> (base64) >> >> correctly by py4j. >> >> >> >> The string in Java cannot be longer than 2G, so the serialized closure >> >> cannot longer than 1.5G (there are overhead in base64), is it possible >> >> that your data used in the map function is so big? If it's, you should >> >> use broadcast for it. >> >> >> >> In master of Spark, we will use broadcast automatically if the closure >> >> is too big. (but use broadcast explicitly is always better). >> >> >> >> On Sat, Sep 20, 2014 at 12:42 PM, Brad Miller >> >> <bmill...@eecs.berkeley.edu> wrote: >> >> > Hi All, >> >> > >> >> > I'm experiencing a java.lang.NegativeArraySizeException in a pyspark >> >> > script >> >> > I have. I've pasted the full traceback at the end of this email. >> >> > >> >> > I have isolated the line of code in my script which "causes" the >> >> > exception >> >> > to occur. Although the exception seems to occur deterministically, it >> >> > is >> >> > very unclear why the different variants of the line would cause the >> >> > exception to occur. Unfortunately, I am only able to reproduce the >> >> > bug >> >> > in >> >> > the context of a large data processing job, and the line of code >> >> > which >> >> > must >> >> > change to reproduce the bug has little meaning out of context. The >> >> > bug >> >> > occurs when I call "map" on an RDD with a function that references >> >> > some >> >> > state outside of the RDD (which is presumably bundled up and >> >> > distributed >> >> > with the function). The output of the function is a tuple where the >> >> > first >> >> > element is an int and the second element is a list of floats (same >> >> > positive >> >> > length every time, as verified by an 'assert' statement). >> >> > >> >> > Given that: >> >> > -It's unclear why changes in the line would cause an exception >> >> > -The exception comes from within pyspark code >> >> > -The exception has to do with negative array sizes (and I couldn't >> >> > have >> >> > created a negative sized array anywhere in my python code) >> >> > I suspect this is a bug in pyspark. >> >> > >> >> > Has anybody else observed or reported this bug? >> >> > >> >> > best, >> >> > -Brad >> >> > >> >> > Traceback (most recent call last): >> >> > File "/home/bmiller1/pipeline/driver.py", line 214, in <module> >> >> > main() >> >> > File "/home/bmiller1/pipeline/driver.py", line 203, in main >> >> > bl.write_results(iteration_out_dir) >> >> > File "/home/bmiller1/pipeline/layer/svm_layer.py", line 137, in >> >> > write_results >> >> > fig, accuracy = _get_results(self.prediction_rdd) >> >> > File "/home/bmiller1/pipeline/layer/svm_layer.py", line 56, in >> >> > _get_results >> >> > predictions = np.array(prediction_rdd.collect()) >> >> > File "/home/spark/spark-1.1.0-bin-hadoop1/python/pyspark/rdd.py", >> >> > line >> >> > 723, in collect >> >> > bytesInJava = self._jrdd.collect().iterator() >> >> > File "/home/spark/spark-1.1.0-bin-hadoop1/python/pyspark/rdd.py", >> >> > line >> >> > 2026, in _jrdd >> >> > broadcast_vars, self.ctx._javaAccumulator) >> >> > File >> >> > >> >> > >> >> > "/home/spark/spark-1.1.0-bin-hadoop1/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", >> >> > line 701, in __call__ >> >> > File >> >> > >> >> > >> >> > "/home/spark/spark-1.1.0-bin-hadoop1/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", >> >> > line 304, in get_return_value >> >> > py4j.protocol.Py4JError: An error occurred while calling >> >> > None.org.apache.spark.api.python.PythonRDD. Trace: >> >> > java.lang.NegativeArraySizeException >> >> > at py4j.Base64.decode(Base64.java:292) >> >> > at py4j.Protocol.getBytes(Protocol.java:167) >> >> > at py4j.Protocol.getObject(Protocol.java:276) >> >> > at >> >> > py4j.commands.AbstractCommand.getArguments(AbstractCommand.java:81) >> >> > at >> >> > py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:66) >> >> > at py4j.GatewayConnection.run(GatewayConnection.java:207) >> >> > at java.lang.Thread.run(Thread.java:701) >> >> > >> > >> > > > --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org