Re: java.lang.NegativeArraySizeException in pyspark
to the spark_context (which seems to generate errors during serialization) so I strip off the extra state using the outer lambda function and just pass the value self.all_models into the map. all_models is a list of length 9 where each element contains 3 numbers (ints or floats, can't remember) and then one LinearSVC object. The classifier was trained over ~2.5M features, so the object isn't small, but probably shouldn't be 150M either. Additionally, the call ran OK when I use either 2x the first 5 objects or 2x the last 5 objects (another reason why it seems unlikely the bug was size related). def _predict_all_models(all_models, sample): scores = [] for _, (_, _, classifier) in all_models: score = classifier.decision_function(sample[VALUE][RECORD]) scores.append(float(score)) return (sample[VALUE][LABEL], scores) # fails #return (lambda am: testing_feature_rdd.map(lambda x: _predict_all_models(am, x))) (self.all_models) # works #return (lambda am: testing_feature_rdd.map(lambda x: _predict_all_models(am, x))) (self.all_models[:5] + self.all_models[:5]) #return (lambda am: testing_feature_rdd.map(lambda x: _predict_all_models(am, x))) (self.all_models[4:] + self.all_models[4:]) I've since written a work-around into my code, but if I get a chance I'll switch to broadcast variables and see whether that works. later, -brad On Mon, Sep 22, 2014 at 11:12 AM, Davies Liu dav...@databricks.com wrote: The traceback said that the serialized closure cannot be parsed (base64) correctly by py4j. The string in Java cannot be longer than 2G, so the serialized closure cannot longer than 1.5G (there are overhead in base64), is it possible that your data used in the map function is so big? If it's, you should use broadcast for it. In master of Spark, we will use broadcast automatically if the closure is too big. (but use broadcast explicitly is always better). On Sat, Sep 20, 2014 at 12:42 PM, Brad Miller bmill...@eecs.berkeley.edu wrote: Hi All, I'm experiencing a java.lang.NegativeArraySizeException in a pyspark script I have. I've pasted the full traceback at the end of this email. I have isolated the line of code in my script which causes the exception to occur. Although the exception seems to occur deterministically, it is very unclear why the different variants of the line would cause the exception to occur. Unfortunately, I am only able to reproduce the bug in the context of a large data processing job, and the line of code which must change to reproduce the bug has little meaning out of context. The bug occurs when I call map on an RDD with a function that references some state outside of the RDD (which is presumably bundled up and distributed with the function). The output of the function is a tuple where the first element is an int and the second element is a list of floats (same positive length every time, as verified by an 'assert' statement). Given that: -It's unclear why changes in the line would cause an exception -The exception comes from within pyspark code -The exception has to do with negative array sizes (and I couldn't have created a negative sized array anywhere in my python code) I suspect this is a bug in pyspark. Has anybody else observed or reported this bug? best, -Brad Traceback (most recent call last): File /home/bmiller1/pipeline/driver.py, line 214, in module main() File /home/bmiller1/pipeline/driver.py, line 203, in main bl.write_results(iteration_out_dir) File /home/bmiller1/pipeline/layer/svm_layer.py, line 137, in write_results fig, accuracy = _get_results(self.prediction_rdd) File /home/bmiller1/pipeline/layer/svm_layer.py, line 56, in _get_results predictions = np.array(prediction_rdd.collect()) File /home/spark/spark-1.1.0-bin-hadoop1/python/pyspark/rdd.py, line 723, in collect bytesInJava = self._jrdd.collect().iterator() File /home/spark/spark-1.1.0-bin-hadoop1/python/pyspark/rdd.py, line 2026, in _jrdd broadcast_vars, self.ctx._javaAccumulator) File /home/spark/spark-1.1.0-bin-hadoop1/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 701, in __call__ File /home/spark/spark-1.1.0-bin-hadoop1/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 304, in get_return_value py4j.protocol.Py4JError: An error occurred while calling None.org.apache.spark.api.python.PythonRDD. Trace: java.lang.NegativeArraySizeException at py4j.Base64.decode(Base64.java:292) at py4j.Protocol.getBytes(Protocol.java:167) at py4j.Protocol.getObject
Re: java.lang.NegativeArraySizeException in pyspark
Hi Davies, Thanks for your help. I ultimately re-wrote the code to use broadcast variables, and then received an error when trying to broadcast self.all_models that the size did not fit in an int (recall that broadcasts use 32 bit ints to store size), suggesting that it was in fact over 2G. I don't know why the previous tests (described above) where duplicated portions of self.all_models worked (it could have been an error in either my debugging or notes), but splitting the self.all_models into a separate broadcast variable for each element worked. I avoided broadcast variables for a while since there was no way to unpersist them in pyspark, but now that there is you're completely right that using broadcast is the correct way to code this. best, -Brad On Tue, Sep 23, 2014 at 12:16 PM, Davies Liu dav...@databricks.com wrote: Or maybe there is a bug related to the base64 in py4j, could you dumps the serialized bytes of closure to verify this? You could add a line in spark/python/pyspark/rdd.py: ser = CloudPickleSerializer() pickled_command = ser.dumps(command) + print len(pickled_command), repr(pickled_command) On Tue, Sep 23, 2014 at 11:02 AM, Brad Miller bmill...@eecs.berkeley.edu wrote: Hi Davies, That's interesting to know. Here's more details about my code. The object (self) contains pointers to the spark_context (which seems to generate errors during serialization) so I strip off the extra state using the outer lambda function and just pass the value self.all_models into the map. all_models is a list of length 9 where each element contains 3 numbers (ints or floats, can't remember) and then one LinearSVC object. The classifier was trained over ~2.5M features, so the object isn't small, but probably shouldn't be 150M either. Additionally, the call ran OK when I use either 2x the first 5 objects or 2x the last 5 objects (another reason why it seems unlikely the bug was size related). def _predict_all_models(all_models, sample): scores = [] for _, (_, _, classifier) in all_models: score = classifier.decision_function(sample[VALUE][RECORD]) scores.append(float(score)) return (sample[VALUE][LABEL], scores) # fails #return (lambda am: testing_feature_rdd.map(lambda x: _predict_all_models(am, x))) (self.all_models) # works #return (lambda am: testing_feature_rdd.map(lambda x: _predict_all_models(am, x))) (self.all_models[:5] + self.all_models[:5]) #return (lambda am: testing_feature_rdd.map(lambda x: _predict_all_models(am, x))) (self.all_models[4:] + self.all_models[4:]) I've since written a work-around into my code, but if I get a chance I'll switch to broadcast variables and see whether that works. later, -brad On Mon, Sep 22, 2014 at 11:12 AM, Davies Liu dav...@databricks.com wrote: The traceback said that the serialized closure cannot be parsed (base64) correctly by py4j. The string in Java cannot be longer than 2G, so the serialized closure cannot longer than 1.5G (there are overhead in base64), is it possible that your data used in the map function is so big? If it's, you should use broadcast for it. In master of Spark, we will use broadcast automatically if the closure is too big. (but use broadcast explicitly is always better). On Sat, Sep 20, 2014 at 12:42 PM, Brad Miller bmill...@eecs.berkeley.edu wrote: Hi All, I'm experiencing a java.lang.NegativeArraySizeException in a pyspark script I have. I've pasted the full traceback at the end of this email. I have isolated the line of code in my script which causes the exception to occur. Although the exception seems to occur deterministically, it is very unclear why the different variants of the line would cause the exception to occur. Unfortunately, I am only able to reproduce the bug in the context of a large data processing job, and the line of code which must change to reproduce the bug has little meaning out of context. The bug occurs when I call map on an RDD with a function that references some state outside of the RDD (which is presumably bundled up and distributed with the function). The output of the function is a tuple where the first element is an int and the second element is a list of floats (same positive length every time, as verified by an 'assert' statement). Given that: -It's unclear why changes in the line would cause an exception -The exception comes from within pyspark code -The exception has to do with negative array sizes (and I couldn't have created a negative sized array anywhere in my python code) I suspect this is a bug in pyspark. Has anybody else observed or reported this bug? best, -Brad Traceback (most recent call last): File /home/bmiller1/pipeline/driver.py, line 214, in module main() File /home
Re: java.lang.NegativeArraySizeException in pyspark
On Thu, Sep 25, 2014 at 11:25 AM, Brad Miller bmill...@eecs.berkeley.edu wrote: Hi Davies, Thanks for your help. I ultimately re-wrote the code to use broadcast variables, and then received an error when trying to broadcast self.all_models that the size did not fit in an int (recall that broadcasts use 32 bit ints to store size), What is the error? Could you file a JIRA for it? that it was in fact over 2G. I don't know why the previous tests (described above) where duplicated portions of self.all_models worked (it could have been an error in either my debugging or notes), but splitting the self.all_models into a separate broadcast variable for each element worked. I avoided broadcast variables for a while since there was no way to unpersist them in pyspark, but now that there is you're completely right that using broadcast is the correct way to code this. In 1.1, you could use broadcast.unpersist() to release it, also the performance of Python Broadcast was much improved in 1.1. best, -Brad On Tue, Sep 23, 2014 at 12:16 PM, Davies Liu dav...@databricks.com wrote: Or maybe there is a bug related to the base64 in py4j, could you dumps the serialized bytes of closure to verify this? You could add a line in spark/python/pyspark/rdd.py: ser = CloudPickleSerializer() pickled_command = ser.dumps(command) + print len(pickled_command), repr(pickled_command) On Tue, Sep 23, 2014 at 11:02 AM, Brad Miller bmill...@eecs.berkeley.edu wrote: Hi Davies, That's interesting to know. Here's more details about my code. The object (self) contains pointers to the spark_context (which seems to generate errors during serialization) so I strip off the extra state using the outer lambda function and just pass the value self.all_models into the map. all_models is a list of length 9 where each element contains 3 numbers (ints or floats, can't remember) and then one LinearSVC object. The classifier was trained over ~2.5M features, so the object isn't small, but probably shouldn't be 150M either. Additionally, the call ran OK when I use either 2x the first 5 objects or 2x the last 5 objects (another reason why it seems unlikely the bug was size related). def _predict_all_models(all_models, sample): scores = [] for _, (_, _, classifier) in all_models: score = classifier.decision_function(sample[VALUE][RECORD]) scores.append(float(score)) return (sample[VALUE][LABEL], scores) # fails #return (lambda am: testing_feature_rdd.map(lambda x: _predict_all_models(am, x))) (self.all_models) # works #return (lambda am: testing_feature_rdd.map(lambda x: _predict_all_models(am, x))) (self.all_models[:5] + self.all_models[:5]) #return (lambda am: testing_feature_rdd.map(lambda x: _predict_all_models(am, x))) (self.all_models[4:] + self.all_models[4:]) I've since written a work-around into my code, but if I get a chance I'll switch to broadcast variables and see whether that works. later, -brad On Mon, Sep 22, 2014 at 11:12 AM, Davies Liu dav...@databricks.com wrote: The traceback said that the serialized closure cannot be parsed (base64) correctly by py4j. The string in Java cannot be longer than 2G, so the serialized closure cannot longer than 1.5G (there are overhead in base64), is it possible that your data used in the map function is so big? If it's, you should use broadcast for it. In master of Spark, we will use broadcast automatically if the closure is too big. (but use broadcast explicitly is always better). On Sat, Sep 20, 2014 at 12:42 PM, Brad Miller bmill...@eecs.berkeley.edu wrote: Hi All, I'm experiencing a java.lang.NegativeArraySizeException in a pyspark script I have. I've pasted the full traceback at the end of this email. I have isolated the line of code in my script which causes the exception to occur. Although the exception seems to occur deterministically, it is very unclear why the different variants of the line would cause the exception to occur. Unfortunately, I am only able to reproduce the bug in the context of a large data processing job, and the line of code which must change to reproduce the bug has little meaning out of context. The bug occurs when I call map on an RDD with a function that references some state outside of the RDD (which is presumably bundled up and distributed with the function). The output of the function is a tuple where the first element is an int and the second element is a list of floats (same positive length every time, as verified by an 'assert' statement). Given that: -It's unclear why changes in the line would cause an exception -The exception comes from within pyspark code -The exception has to do with negative array sizes (and I couldn't have created a negative
Re: java.lang.NegativeArraySizeException in pyspark
Or maybe there is a bug related to the base64 in py4j, could you dumps the serialized bytes of closure to verify this? You could add a line in spark/python/pyspark/rdd.py: ser = CloudPickleSerializer() pickled_command = ser.dumps(command) + print len(pickled_command), repr(pickled_command) On Tue, Sep 23, 2014 at 11:02 AM, Brad Miller bmill...@eecs.berkeley.edu wrote: Hi Davies, That's interesting to know. Here's more details about my code. The object (self) contains pointers to the spark_context (which seems to generate errors during serialization) so I strip off the extra state using the outer lambda function and just pass the value self.all_models into the map. all_models is a list of length 9 where each element contains 3 numbers (ints or floats, can't remember) and then one LinearSVC object. The classifier was trained over ~2.5M features, so the object isn't small, but probably shouldn't be 150M either. Additionally, the call ran OK when I use either 2x the first 5 objects or 2x the last 5 objects (another reason why it seems unlikely the bug was size related). def _predict_all_models(all_models, sample): scores = [] for _, (_, _, classifier) in all_models: score = classifier.decision_function(sample[VALUE][RECORD]) scores.append(float(score)) return (sample[VALUE][LABEL], scores) # fails #return (lambda am: testing_feature_rdd.map(lambda x: _predict_all_models(am, x))) (self.all_models) # works #return (lambda am: testing_feature_rdd.map(lambda x: _predict_all_models(am, x))) (self.all_models[:5] + self.all_models[:5]) #return (lambda am: testing_feature_rdd.map(lambda x: _predict_all_models(am, x))) (self.all_models[4:] + self.all_models[4:]) I've since written a work-around into my code, but if I get a chance I'll switch to broadcast variables and see whether that works. later, -brad On Mon, Sep 22, 2014 at 11:12 AM, Davies Liu dav...@databricks.com wrote: The traceback said that the serialized closure cannot be parsed (base64) correctly by py4j. The string in Java cannot be longer than 2G, so the serialized closure cannot longer than 1.5G (there are overhead in base64), is it possible that your data used in the map function is so big? If it's, you should use broadcast for it. In master of Spark, we will use broadcast automatically if the closure is too big. (but use broadcast explicitly is always better). On Sat, Sep 20, 2014 at 12:42 PM, Brad Miller bmill...@eecs.berkeley.edu wrote: Hi All, I'm experiencing a java.lang.NegativeArraySizeException in a pyspark script I have. I've pasted the full traceback at the end of this email. I have isolated the line of code in my script which causes the exception to occur. Although the exception seems to occur deterministically, it is very unclear why the different variants of the line would cause the exception to occur. Unfortunately, I am only able to reproduce the bug in the context of a large data processing job, and the line of code which must change to reproduce the bug has little meaning out of context. The bug occurs when I call map on an RDD with a function that references some state outside of the RDD (which is presumably bundled up and distributed with the function). The output of the function is a tuple where the first element is an int and the second element is a list of floats (same positive length every time, as verified by an 'assert' statement). Given that: -It's unclear why changes in the line would cause an exception -The exception comes from within pyspark code -The exception has to do with negative array sizes (and I couldn't have created a negative sized array anywhere in my python code) I suspect this is a bug in pyspark. Has anybody else observed or reported this bug? best, -Brad Traceback (most recent call last): File /home/bmiller1/pipeline/driver.py, line 214, in module main() File /home/bmiller1/pipeline/driver.py, line 203, in main bl.write_results(iteration_out_dir) File /home/bmiller1/pipeline/layer/svm_layer.py, line 137, in write_results fig, accuracy = _get_results(self.prediction_rdd) File /home/bmiller1/pipeline/layer/svm_layer.py, line 56, in _get_results predictions = np.array(prediction_rdd.collect()) File /home/spark/spark-1.1.0-bin-hadoop1/python/pyspark/rdd.py, line 723, in collect bytesInJava = self._jrdd.collect().iterator() File /home/spark/spark-1.1.0-bin-hadoop1/python/pyspark/rdd.py, line 2026, in _jrdd broadcast_vars, self.ctx._javaAccumulator) File /home/spark/spark-1.1.0-bin-hadoop1/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 701, in __call__ File /home/spark/spark-1.1.0-bin-hadoop1/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 304, in get_return_value py4j.protocol.Py4JError: An error occurred