Re: NegativeArraySizeException when doing joins on skewed data

2015-03-12 Thread Soila Pertet Kavulya
Hi Tristan,

Did upgrading to Kryo3 help?

Thanks,

Soila

On Sun, Mar 1, 2015 at 2:48 PM, Tristan Blakers tris...@blackfrog.org wrote:
 Yeah I implemented the same solution. It seems to kick in around the 4B
 mark, but looking at the log I suspect it’s probably a function of the
 number of unique objects more than anything. I definitely don’t have more
 than 2B unique objects.


 Will try the same test on Kryo3 and see if it goes away.

 T


 On 27 February 2015 at 06:21, Soila Pertet Kavulya skavu...@gmail.com
 wrote:

 Thanks Tristan,

 I ran into a similar issue with broadcast variables. I worked around
 it by estimating the size of the object I want to broadcast, splitting
 it up into chunks that were less than 2G, then doing multiple
 broadcasts. This approach worked pretty well for broadcast variables
 less than 10GB on our system. However, for larger variables the spills
 to disk made progress painfully slow so we need to do regular joins.

 Do you know if there are any efforts to get Kryo to support objects
 larger than a couple of GBs.

 Soila

 On Wed, Feb 25, 2015 at 11:06 PM, Tristan Blakers tris...@blackfrog.org
 wrote:
  I get the same exception simply by doing a large broadcast of about 6GB.
  Note that I’m broadcasting a small number (~3m) of fat objects. There’s
  plenty of free RAM. This and related kryo exceptions seem to crop-up
  whenever an object graph of more than a couple of GB gets passed around.
 
  at
 
  com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)
 
  at
 
  com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
 
  at
 
  com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
 
  at
 
  com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
 
  at
 
  com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
 
  at
 
  com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 
  at
  com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
 
  at
 
  com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:86)
 
  at
 
  com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:17)
 
  at
  com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
 
  at
 
  org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:128)
 
  at
 
  org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:202)
 
  at
 
  org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:101)
 
  at
 
  org.apache.spark.broadcast.TorrentBroadcast.init(TorrentBroadcast.scala:84)
 
  at
 
  org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
 
  at
 
  org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29)
 
  at
 
  org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
 
  at
  org.apache.spark.SparkContext.broadcast(SparkContext.scala:945)
 
  at
 
  org.apache.spark.api.java.JavaSparkContext.broadcast(JavaSparkContext.scala:623)
 
 
  Caused by: java.lang.NegativeArraySizeException
 
  at
 
  com.esotericsoftware.kryo.util.IdentityObjectIntMap.resize(IdentityObjectIntMap.java:409)
 
  at
 
  com.esotericsoftware.kryo.util.IdentityObjectIntMap.putStash(IdentityObjectIntMap.java:227)
 
  at
 
  com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntMap.java:221)
 
  at
 
  com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:117)
 
  at
 
  com.esotericsoftware.kryo.util.IdentityObjectIntMap.putStash(IdentityObjectIntMap.java:228)
 
  at
 
  com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntMap.java:221)
 
  at
 
  com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:117)
 
  at
 
  com.esotericsoftware.kryo.util.MapReferenceResolver.addWrittenObject(MapReferenceResolver.java:23)
 
  at
  com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:598)
 
  at
  com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:539)
 
  at
 
  com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570)
 
  ... 23 more
 
 
 
 
  On 26 February 2015 at 03:49, soila skavu...@gmail.com wrote:
 
  I have been running into NegativeArraySizeException's when doing joins
  on
  data with very skewed key distributions in Spark 

Re: NegativeArraySizeException when doing joins on skewed data

2015-02-26 Thread Imran Rashid
Hi Tristan,

at first I thought you were just hitting another instance of
https://issues.apache.org/jira/browse/SPARK-1391, but I actually think its
entirely related to kryo.  Would it be possible for you to try serializing
your object using kryo, without involving spark at all?  If you are
unfamiliar w/ kryo, you could just try something like this, it would also
be OK to try out the utils in spark to do it, something like:

val outputStream = new
FileOutputStream(/some/local/path/doesn't/really/matter/just/delete/me/afterwards)

val kryoSer = new org.apache.spark.serializer.KryoSerializer(sparkConf)
val kryoStreamSer = kryoSer.newInstance().serializeStream(outputStream)

kryoStreamSer.writeObject(yourBigObject).close()

My guess is that this will fail.  There is a little of spark's wrapping
code involved here too, but I suspect the error is out of our control.
From the error, it seems like whatever object you are trying to serialize
has more than 2B references:
Caused by: java.lang.NegativeArraySizeException
at
com.esotericsoftware.kryo.util.IdentityObjectIntMap.
resize(IdentityObjectIntMap.java:409)

Though that is rather surprising -- it doesn't even seem possible to me
with an object that is only 6 GB.

There are a handful of other size restrictions and tuning parameters that
come with kryo as well.  It would be good for us to write up some docs on
those limitations, as well as work with the kryo devs to see which ones can
be removed.  (Eg., another one that I just noticed from browsing the code
is that even when writing to a stream, kryo has an internal buffer of
limited size, which is periodically flushes.  Perhaps we can get kryo to
turn off that buffer, or we can at least get it to flush more often.)

thanks,
Imran


On Thu, Feb 26, 2015 at 1:06 AM, Tristan Blakers tris...@blackfrog.org
wrote:

 I get the same exception simply by doing a large broadcast of about 6GB.
 Note that I’m broadcasting a small number (~3m) of fat objects. There’s
 plenty of free RAM. This and related kryo exceptions seem to crop-up
 whenever an object graph of more than a couple of GB gets passed around.

 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)

 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)

 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)

 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)

 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)

 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)

 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)

 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)

 at
 com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)

 at
 com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:86)

 at
 com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:17)

 at
 com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)

 at
 org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:128)

 at
 org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:202)

 at
 org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:101)

 at
 org.apache.spark.broadcast.TorrentBroadcast.init(TorrentBroadcast.scala:84)

 at
 org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)

 at
 org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29)

 at
 org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)

 at org.apache.spark.SparkContext.broadcast(SparkContext.scala:945)

 at
 org.apache.spark.api.java.JavaSparkContext.broadcast(JavaSparkContext.scala:623)


 Caused by: java.lang.NegativeArraySizeException

 at
 com.esotericsoftware.kryo.util.IdentityObjectIntMap.resize(IdentityObjectIntMap.java:409)

 at
 com.esotericsoftware.kryo.util.IdentityObjectIntMap.putStash(IdentityObjectIntMap.java:227)

 at
 com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntMap.java:221)

 at
 com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:117)

 at
 com.esotericsoftware.kryo.util.IdentityObjectIntMap.putStash(IdentityObjectIntMap.java:228)

 at
 com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntMap.java:221)

 at
 com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:117)

 at
 

Re: NegativeArraySizeException when doing joins on skewed data

2015-02-26 Thread Tristan Blakers
Hi Imran,

I can confirm this still happens when calling Kryo serialisation directly,
not I’m using Java. The output file is at about 440mb at the time of the
crash.

Kryo is version 2.21. When I get a chance I’ll see if I can make a
shareable test case and try on Kryo 3.0, I doubt they’d be interested in a
bug report on 2.21?

Cheers
Tristan



On 27 February 2015 at 07:20, Imran Rashid iras...@cloudera.com wrote:

 Hi Tristan,

 at first I thought you were just hitting another instance of
 https://issues.apache.org/jira/browse/SPARK-1391, but I actually think
 its entirely related to kryo.  Would it be possible for you to try
 serializing your object using kryo, without involving spark at all?  If you
 are unfamiliar w/ kryo, you could just try something like this, it would
 also be OK to try out the utils in spark to do it, something like:

 val outputStream = new
 FileOutputStream(/some/local/path/doesn't/really/matter/just/delete/me/afterwards)

 val kryoSer = new org.apache.spark.serializer.KryoSerializer(sparkConf)
 val kryoStreamSer = kryoSer.newInstance().serializeStream(outputStream)

 kryoStreamSer.writeObject(yourBigObject).close()

 My guess is that this will fail.  There is a little of spark's wrapping
 code involved here too, but I suspect the error is out of our control.
 From the error, it seems like whatever object you are trying to serialize
 has more than 2B references:
 Caused by: java.lang.NegativeArraySizeException
 at
 com.esotericsoftware.kryo.util.IdentityObjectIntMap.
 resize(IdentityObjectIntMap.java:409)

 Though that is rather surprising -- it doesn't even seem possible to me
 with an object that is only 6 GB.

 There are a handful of other size restrictions and tuning parameters that
 come with kryo as well.  It would be good for us to write up some docs on
 those limitations, as well as work with the kryo devs to see which ones can
 be removed.  (Eg., another one that I just noticed from browsing the code
 is that even when writing to a stream, kryo has an internal buffer of
 limited size, which is periodically flushes.  Perhaps we can get kryo to
 turn off that buffer, or we can at least get it to flush more often.)

 thanks,
 Imran


 On Thu, Feb 26, 2015 at 1:06 AM, Tristan Blakers tris...@blackfrog.org
 wrote:

 I get the same exception simply by doing a large broadcast of about 6GB.
 Note that I’m broadcasting a small number (~3m) of fat objects. There’s
 plenty of free RAM. This and related kryo exceptions seem to crop-up
 whenever an object graph of more than a couple of GB gets passed around.

 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)

 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)

 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)

 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)

 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)

 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)

 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)

 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)

 at
 com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)

 at
 com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:86)

 at
 com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:17)

 at
 com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)

 at
 org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:128)

 at
 org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:202)

 at
 org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:101)

 at
 org.apache.spark.broadcast.TorrentBroadcast.init(TorrentBroadcast.scala:84)

 at
 org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)

 at
 org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29)

 at
 org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)

 at org.apache.spark.SparkContext.broadcast(SparkContext.scala:945)

 at
 org.apache.spark.api.java.JavaSparkContext.broadcast(JavaSparkContext.scala:623)


 Caused by: java.lang.NegativeArraySizeException

 at
 com.esotericsoftware.kryo.util.IdentityObjectIntMap.resize(IdentityObjectIntMap.java:409)

 at
 com.esotericsoftware.kryo.util.IdentityObjectIntMap.putStash(IdentityObjectIntMap.java:227)

 at
 com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntMap.java:221)

 at
 

Re: NegativeArraySizeException when doing joins on skewed data

2015-02-25 Thread Tristan Blakers
I get the same exception simply by doing a large broadcast of about 6GB.
Note that I’m broadcasting a small number (~3m) of fat objects. There’s
plenty of free RAM. This and related kryo exceptions seem to crop-up
whenever an object graph of more than a couple of GB gets passed around.

at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)

at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)

at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)

at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)

at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)

at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)

at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)

at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)

at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)

at
com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:86)

at
com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:17)

at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)

at
org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:128)

at
org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:202)

at
org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:101)

at
org.apache.spark.broadcast.TorrentBroadcast.init(TorrentBroadcast.scala:84)

at
org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)

at
org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29)

at
org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)

at org.apache.spark.SparkContext.broadcast(SparkContext.scala:945)

at
org.apache.spark.api.java.JavaSparkContext.broadcast(JavaSparkContext.scala:623)


Caused by: java.lang.NegativeArraySizeException

at
com.esotericsoftware.kryo.util.IdentityObjectIntMap.resize(IdentityObjectIntMap.java:409)

at
com.esotericsoftware.kryo.util.IdentityObjectIntMap.putStash(IdentityObjectIntMap.java:227)

at
com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntMap.java:221)

at
com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:117)

at
com.esotericsoftware.kryo.util.IdentityObjectIntMap.putStash(IdentityObjectIntMap.java:228)

at
com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntMap.java:221)

at
com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:117)

at
com.esotericsoftware.kryo.util.MapReferenceResolver.addWrittenObject(MapReferenceResolver.java:23)

at
com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:598)

at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:539)

at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570)

... 23 more



On 26 February 2015 at 03:49, soila skavu...@gmail.com wrote:

 I have been running into NegativeArraySizeException's when doing joins on
 data with very skewed key distributions in Spark 1.2.0. I found a previous
 post that mentioned that this exception arises when the size of the blocks
 spilled during the shuffle exceeds 2GB. The post recommended increasing the
 number of partitions. I tried increasing the number of partitions, and
 using
 the RangePartitioner instead of the HashPartitioner but still encountered
 the problem.

 Does Spark support skewed joins similar to Pig?


 com.esotericsoftware.kryo.KryoException:
 java.lang.NegativeArraySizeException
 Serialization trace:
 otherElements (org.apache.spark.util.collection.CompactBuffer)
 at

 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)
 at

 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at
 com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
 at

 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318)
 at

 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)
 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
 at

 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
 at