Re: java.lang.NegativeArraySizeException in pyspark

2014-09-26 Thread Brad Miller
 to the spark_context (which seems to
generate
   errors during serialization) so I strip off the extra state using the
   outer
   lambda function and just pass the value self.all_models into the map.
   all_models is a list of length 9 where each element contains 3
numbers
   (ints
   or floats, can't remember) and then one LinearSVC object.  The
   classifier
   was trained over ~2.5M features, so the object isn't small, but
probably
   shouldn't be 150M either.  Additionally, the call ran OK when I use
   either
   2x the first 5 objects or 2x the last 5 objects (another reason why
it
   seems
   unlikely the bug was size related).
  
   def _predict_all_models(all_models, sample):
   scores = []
   for _, (_, _, classifier) in all_models:
   score = classifier.decision_function(sample[VALUE][RECORD])
   scores.append(float(score))
   return (sample[VALUE][LABEL], scores)
  
   # fails
   #return (lambda am: testing_feature_rdd.map(lambda x:
   _predict_all_models(am, x))) (self.all_models)
   # works
   #return (lambda am: testing_feature_rdd.map(lambda x:
   _predict_all_models(am, x))) (self.all_models[:5] +
self.all_models[:5])
   #return (lambda am: testing_feature_rdd.map(lambda x:
   _predict_all_models(am, x))) (self.all_models[4:] +
self.all_models[4:])
  
   I've since written a work-around into my code, but if I get a chance
   I'll
   switch to broadcast variables and see whether that works.
  
   later,
   -brad
  
   On Mon, Sep 22, 2014 at 11:12 AM, Davies Liu dav...@databricks.com
   wrote:
  
   The traceback said that the serialized closure cannot be parsed
   (base64)
   correctly by py4j.
  
   The string in Java cannot be longer than 2G, so the serialized
closure
   cannot longer than 1.5G (there are overhead in base64), is it
possible
   that your data used in the map function is so big? If it's, you
should
   use broadcast for it.
  
   In master of Spark, we will use broadcast automatically if the
closure
   is too big. (but use broadcast explicitly is always better).
  
   On Sat, Sep 20, 2014 at 12:42 PM, Brad Miller
   bmill...@eecs.berkeley.edu wrote:
Hi All,
   
I'm experiencing a java.lang.NegativeArraySizeException in a
pyspark
script
I have.  I've pasted the full traceback at the end of this email.
   
I have isolated the line of code in my script which causes the
exception
to occur. Although the exception seems to occur
deterministically, it
is
very unclear why the different variants of the line would cause
the
exception to occur. Unfortunately, I am only able to reproduce the
bug
in
the context of a large data processing job, and the line of code
which
must
change to reproduce the bug has little meaning out of context.
The
bug
occurs when I call map on an RDD with a function that references
some
state outside of the RDD (which is presumably bundled up and
distributed
with the function).  The output of the function is a tuple where
the
first
element is an int and the second element is a list of floats (same
positive
length every time, as verified by an 'assert' statement).
   
Given that:
-It's unclear why changes in the line would cause an exception
-The exception comes from within pyspark code
-The exception has to do with negative array sizes (and I couldn't
have
created a negative sized array anywhere in my python code)
I suspect this is a bug in pyspark.
   
Has anybody else observed or reported this bug?
   
best,
-Brad
   
Traceback (most recent call last):
  File /home/bmiller1/pipeline/driver.py, line 214, in module
main()
  File /home/bmiller1/pipeline/driver.py, line 203, in main
bl.write_results(iteration_out_dir)
  File /home/bmiller1/pipeline/layer/svm_layer.py, line 137, in
write_results
fig, accuracy = _get_results(self.prediction_rdd)
  File /home/bmiller1/pipeline/layer/svm_layer.py, line 56, in
_get_results
predictions = np.array(prediction_rdd.collect())
  File
/home/spark/spark-1.1.0-bin-hadoop1/python/pyspark/rdd.py,
line
723, in collect
bytesInJava = self._jrdd.collect().iterator()
  File
/home/spark/spark-1.1.0-bin-hadoop1/python/pyspark/rdd.py,
line
2026, in _jrdd
broadcast_vars, self.ctx._javaAccumulator)
  File
   
   
   
/home/spark/spark-1.1.0-bin-hadoop1/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py,
line 701, in __call__
  File
   
   
   
/home/spark/spark-1.1.0-bin-hadoop1/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py,
line 304, in get_return_value
py4j.protocol.Py4JError: An error occurred while calling
None.org.apache.spark.api.python.PythonRDD. Trace:
java.lang.NegativeArraySizeException
at py4j.Base64.decode(Base64.java:292)
at py4j.Protocol.getBytes(Protocol.java:167)
at py4j.Protocol.getObject

Re: java.lang.NegativeArraySizeException in pyspark

2014-09-25 Thread Brad Miller
Hi Davies,

Thanks for your help.

I ultimately re-wrote the code to use broadcast variables, and then
received an error when trying to broadcast self.all_models that the size
did not fit in an int (recall that broadcasts use 32 bit ints to store
size), suggesting that it was in fact over 2G.  I don't know why the
previous tests (described above) where duplicated portions of
self.all_models worked (it could have been an error in either my debugging
or notes), but splitting the self.all_models into a separate broadcast
variable for each element worked.  I avoided broadcast variables for a
while since there was no way to unpersist them in pyspark, but now that
there is you're completely right that using broadcast is the correct way to
code this.

best,
-Brad

On Tue, Sep 23, 2014 at 12:16 PM, Davies Liu dav...@databricks.com wrote:

 Or maybe there is a bug related to the base64 in py4j, could you
 dumps the serialized bytes of closure to verify this?

 You could add a line in spark/python/pyspark/rdd.py:

 ser = CloudPickleSerializer()
 pickled_command = ser.dumps(command)
 +  print len(pickled_command), repr(pickled_command)


 On Tue, Sep 23, 2014 at 11:02 AM, Brad Miller
 bmill...@eecs.berkeley.edu wrote:
  Hi Davies,
 
  That's interesting to know.  Here's more details about my code.  The
 object
  (self) contains pointers to the spark_context (which seems to generate
  errors during serialization) so I strip off the extra state using the
 outer
  lambda function and just pass the value self.all_models into the map.
  all_models is a list of length 9 where each element contains 3 numbers
 (ints
  or floats, can't remember) and then one LinearSVC object.  The classifier
  was trained over ~2.5M features, so the object isn't small, but probably
  shouldn't be 150M either.  Additionally, the call ran OK when I use
 either
  2x the first 5 objects or 2x the last 5 objects (another reason why it
 seems
  unlikely the bug was size related).
 
  def _predict_all_models(all_models, sample):
  scores = []
  for _, (_, _, classifier) in all_models:
  score = classifier.decision_function(sample[VALUE][RECORD])
  scores.append(float(score))
  return (sample[VALUE][LABEL], scores)
 
  # fails
  #return (lambda am: testing_feature_rdd.map(lambda x:
  _predict_all_models(am, x))) (self.all_models)
  # works
  #return (lambda am: testing_feature_rdd.map(lambda x:
  _predict_all_models(am, x))) (self.all_models[:5] + self.all_models[:5])
  #return (lambda am: testing_feature_rdd.map(lambda x:
  _predict_all_models(am, x))) (self.all_models[4:] + self.all_models[4:])
 
  I've since written a work-around into my code, but if I get a chance I'll
  switch to broadcast variables and see whether that works.
 
  later,
  -brad
 
  On Mon, Sep 22, 2014 at 11:12 AM, Davies Liu dav...@databricks.com
 wrote:
 
  The traceback said that the serialized closure cannot be parsed (base64)
  correctly by py4j.
 
  The string in Java cannot be longer than 2G, so the serialized closure
  cannot longer than 1.5G (there are overhead in base64), is it possible
  that your data used in the map function is so big? If it's, you should
  use broadcast for it.
 
  In master of Spark, we will use broadcast automatically if the closure
  is too big. (but use broadcast explicitly is always better).
 
  On Sat, Sep 20, 2014 at 12:42 PM, Brad Miller
  bmill...@eecs.berkeley.edu wrote:
   Hi All,
  
   I'm experiencing a java.lang.NegativeArraySizeException in a pyspark
   script
   I have.  I've pasted the full traceback at the end of this email.
  
   I have isolated the line of code in my script which causes the
   exception
   to occur. Although the exception seems to occur deterministically, it
 is
   very unclear why the different variants of the line would cause the
   exception to occur. Unfortunately, I am only able to reproduce the bug
   in
   the context of a large data processing job, and the line of code which
   must
   change to reproduce the bug has little meaning out of context.  The
 bug
   occurs when I call map on an RDD with a function that references
 some
   state outside of the RDD (which is presumably bundled up and
 distributed
   with the function).  The output of the function is a tuple where the
   first
   element is an int and the second element is a list of floats (same
   positive
   length every time, as verified by an 'assert' statement).
  
   Given that:
   -It's unclear why changes in the line would cause an exception
   -The exception comes from within pyspark code
   -The exception has to do with negative array sizes (and I couldn't
 have
   created a negative sized array anywhere in my python code)
   I suspect this is a bug in pyspark.
  
   Has anybody else observed or reported this bug?
  
   best,
   -Brad
  
   Traceback (most recent call last):
 File /home/bmiller1/pipeline/driver.py, line 214, in module
   main()
 File /home

Re: java.lang.NegativeArraySizeException in pyspark

2014-09-25 Thread Davies Liu
On Thu, Sep 25, 2014 at 11:25 AM, Brad Miller
bmill...@eecs.berkeley.edu wrote:
 Hi Davies,

 Thanks for your help.

 I ultimately re-wrote the code to use broadcast variables, and then received
 an error when trying to broadcast self.all_models that the size did not fit
 in an int (recall that broadcasts use 32 bit ints to store size),

What is the error? Could you file a JIRA for it?

 that it was in fact over 2G.  I don't know why the previous tests (described
 above) where duplicated portions of self.all_models worked (it could have
 been an error in either my debugging or notes), but splitting the
 self.all_models into a separate broadcast variable for each element worked.
 I avoided broadcast variables for a while since there was no way to
 unpersist them in pyspark, but now that there is you're completely right
 that using broadcast is the correct way to code this.

In 1.1, you could use broadcast.unpersist() to release it, also the performance
of Python Broadcast was much improved in 1.1.


 best,
 -Brad

 On Tue, Sep 23, 2014 at 12:16 PM, Davies Liu dav...@databricks.com wrote:

 Or maybe there is a bug related to the base64 in py4j, could you
 dumps the serialized bytes of closure to verify this?

 You could add a line in spark/python/pyspark/rdd.py:

 ser = CloudPickleSerializer()
 pickled_command = ser.dumps(command)
 +  print len(pickled_command), repr(pickled_command)


 On Tue, Sep 23, 2014 at 11:02 AM, Brad Miller
 bmill...@eecs.berkeley.edu wrote:
  Hi Davies,
 
  That's interesting to know.  Here's more details about my code.  The
  object
  (self) contains pointers to the spark_context (which seems to generate
  errors during serialization) so I strip off the extra state using the
  outer
  lambda function and just pass the value self.all_models into the map.
  all_models is a list of length 9 where each element contains 3 numbers
  (ints
  or floats, can't remember) and then one LinearSVC object.  The
  classifier
  was trained over ~2.5M features, so the object isn't small, but probably
  shouldn't be 150M either.  Additionally, the call ran OK when I use
  either
  2x the first 5 objects or 2x the last 5 objects (another reason why it
  seems
  unlikely the bug was size related).
 
  def _predict_all_models(all_models, sample):
  scores = []
  for _, (_, _, classifier) in all_models:
  score = classifier.decision_function(sample[VALUE][RECORD])
  scores.append(float(score))
  return (sample[VALUE][LABEL], scores)
 
  # fails
  #return (lambda am: testing_feature_rdd.map(lambda x:
  _predict_all_models(am, x))) (self.all_models)
  # works
  #return (lambda am: testing_feature_rdd.map(lambda x:
  _predict_all_models(am, x))) (self.all_models[:5] + self.all_models[:5])
  #return (lambda am: testing_feature_rdd.map(lambda x:
  _predict_all_models(am, x))) (self.all_models[4:] + self.all_models[4:])
 
  I've since written a work-around into my code, but if I get a chance
  I'll
  switch to broadcast variables and see whether that works.
 
  later,
  -brad
 
  On Mon, Sep 22, 2014 at 11:12 AM, Davies Liu dav...@databricks.com
  wrote:
 
  The traceback said that the serialized closure cannot be parsed
  (base64)
  correctly by py4j.
 
  The string in Java cannot be longer than 2G, so the serialized closure
  cannot longer than 1.5G (there are overhead in base64), is it possible
  that your data used in the map function is so big? If it's, you should
  use broadcast for it.
 
  In master of Spark, we will use broadcast automatically if the closure
  is too big. (but use broadcast explicitly is always better).
 
  On Sat, Sep 20, 2014 at 12:42 PM, Brad Miller
  bmill...@eecs.berkeley.edu wrote:
   Hi All,
  
   I'm experiencing a java.lang.NegativeArraySizeException in a pyspark
   script
   I have.  I've pasted the full traceback at the end of this email.
  
   I have isolated the line of code in my script which causes the
   exception
   to occur. Although the exception seems to occur deterministically, it
   is
   very unclear why the different variants of the line would cause the
   exception to occur. Unfortunately, I am only able to reproduce the
   bug
   in
   the context of a large data processing job, and the line of code
   which
   must
   change to reproduce the bug has little meaning out of context.  The
   bug
   occurs when I call map on an RDD with a function that references
   some
   state outside of the RDD (which is presumably bundled up and
   distributed
   with the function).  The output of the function is a tuple where the
   first
   element is an int and the second element is a list of floats (same
   positive
   length every time, as verified by an 'assert' statement).
  
   Given that:
   -It's unclear why changes in the line would cause an exception
   -The exception comes from within pyspark code
   -The exception has to do with negative array sizes (and I couldn't
   have
   created a negative

Re: java.lang.NegativeArraySizeException in pyspark

2014-09-23 Thread Davies Liu
Or maybe there is a bug related to the base64 in py4j, could you
dumps the serialized bytes of closure to verify this?

You could add a line in spark/python/pyspark/rdd.py:

ser = CloudPickleSerializer()
pickled_command = ser.dumps(command)
+  print len(pickled_command), repr(pickled_command)


On Tue, Sep 23, 2014 at 11:02 AM, Brad Miller
bmill...@eecs.berkeley.edu wrote:
 Hi Davies,

 That's interesting to know.  Here's more details about my code.  The object
 (self) contains pointers to the spark_context (which seems to generate
 errors during serialization) so I strip off the extra state using the outer
 lambda function and just pass the value self.all_models into the map.
 all_models is a list of length 9 where each element contains 3 numbers (ints
 or floats, can't remember) and then one LinearSVC object.  The classifier
 was trained over ~2.5M features, so the object isn't small, but probably
 shouldn't be 150M either.  Additionally, the call ran OK when I use either
 2x the first 5 objects or 2x the last 5 objects (another reason why it seems
 unlikely the bug was size related).

 def _predict_all_models(all_models, sample):
 scores = []
 for _, (_, _, classifier) in all_models:
 score = classifier.decision_function(sample[VALUE][RECORD])
 scores.append(float(score))
 return (sample[VALUE][LABEL], scores)

 # fails
 #return (lambda am: testing_feature_rdd.map(lambda x:
 _predict_all_models(am, x))) (self.all_models)
 # works
 #return (lambda am: testing_feature_rdd.map(lambda x:
 _predict_all_models(am, x))) (self.all_models[:5] + self.all_models[:5])
 #return (lambda am: testing_feature_rdd.map(lambda x:
 _predict_all_models(am, x))) (self.all_models[4:] + self.all_models[4:])

 I've since written a work-around into my code, but if I get a chance I'll
 switch to broadcast variables and see whether that works.

 later,
 -brad

 On Mon, Sep 22, 2014 at 11:12 AM, Davies Liu dav...@databricks.com wrote:

 The traceback said that the serialized closure cannot be parsed (base64)
 correctly by py4j.

 The string in Java cannot be longer than 2G, so the serialized closure
 cannot longer than 1.5G (there are overhead in base64), is it possible
 that your data used in the map function is so big? If it's, you should
 use broadcast for it.

 In master of Spark, we will use broadcast automatically if the closure
 is too big. (but use broadcast explicitly is always better).

 On Sat, Sep 20, 2014 at 12:42 PM, Brad Miller
 bmill...@eecs.berkeley.edu wrote:
  Hi All,
 
  I'm experiencing a java.lang.NegativeArraySizeException in a pyspark
  script
  I have.  I've pasted the full traceback at the end of this email.
 
  I have isolated the line of code in my script which causes the
  exception
  to occur. Although the exception seems to occur deterministically, it is
  very unclear why the different variants of the line would cause the
  exception to occur. Unfortunately, I am only able to reproduce the bug
  in
  the context of a large data processing job, and the line of code which
  must
  change to reproduce the bug has little meaning out of context.  The bug
  occurs when I call map on an RDD with a function that references some
  state outside of the RDD (which is presumably bundled up and distributed
  with the function).  The output of the function is a tuple where the
  first
  element is an int and the second element is a list of floats (same
  positive
  length every time, as verified by an 'assert' statement).
 
  Given that:
  -It's unclear why changes in the line would cause an exception
  -The exception comes from within pyspark code
  -The exception has to do with negative array sizes (and I couldn't have
  created a negative sized array anywhere in my python code)
  I suspect this is a bug in pyspark.
 
  Has anybody else observed or reported this bug?
 
  best,
  -Brad
 
  Traceback (most recent call last):
File /home/bmiller1/pipeline/driver.py, line 214, in module
  main()
File /home/bmiller1/pipeline/driver.py, line 203, in main
  bl.write_results(iteration_out_dir)
File /home/bmiller1/pipeline/layer/svm_layer.py, line 137, in
  write_results
  fig, accuracy = _get_results(self.prediction_rdd)
File /home/bmiller1/pipeline/layer/svm_layer.py, line 56, in
  _get_results
  predictions = np.array(prediction_rdd.collect())
File /home/spark/spark-1.1.0-bin-hadoop1/python/pyspark/rdd.py, line
  723, in collect
  bytesInJava = self._jrdd.collect().iterator()
File /home/spark/spark-1.1.0-bin-hadoop1/python/pyspark/rdd.py, line
  2026, in _jrdd
  broadcast_vars, self.ctx._javaAccumulator)
File
 
  /home/spark/spark-1.1.0-bin-hadoop1/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py,
  line 701, in __call__
File
 
  /home/spark/spark-1.1.0-bin-hadoop1/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py,
  line 304, in get_return_value
  py4j.protocol.Py4JError: An error occurred