Hi Tristan,

at first I thought you were just hitting another instance of
https://issues.apache.org/jira/browse/SPARK-1391, but I actually think its
entirely related to kryo.  Would it be possible for you to try serializing
your object using kryo, without involving spark at all?  If you are
unfamiliar w/ kryo, you could just try something like this, it would also
be OK to try out the utils in spark to do it, something like:

val outputStream = new
FileOutputStream("/some/local/path/doesn't/really/matter/just/delete/me/afterwards")

val kryoSer = new org.apache.spark.serializer.KryoSerializer(sparkConf)
val kryoStreamSer = kryoSer.newInstance().serializeStream(outputStream)

kryoStreamSer.writeObject(yourBigObject).close()

My guess is that this will fail.  There is a little of spark's wrapping
code involved here too, but I suspect the error is out of our control.
>From the error, it seems like whatever object you are trying to serialize
has more than 2B references:
Caused by: java.lang.NegativeArraySizeException
        at
com.esotericsoftware.kryo.util.IdentityObjectIntMap.
resize(IdentityObjectIntMap.java:409)

Though that is rather surprising -- it doesn't even seem possible to me
with an object that is only 6 GB.

There are a handful of other size restrictions and tuning parameters that
come with kryo as well.  It would be good for us to write up some docs on
those limitations, as well as work with the kryo devs to see which ones can
be removed.  (Eg., another one that I just noticed from browsing the code
is that even when writing to a stream, kryo has an internal buffer of
limited size, which is periodically flushes.  Perhaps we can get kryo to
turn off that buffer, or we can at least get it to flush more often.)

thanks,
Imran


On Thu, Feb 26, 2015 at 1:06 AM, Tristan Blakers <tris...@blackfrog.org>
wrote:

> I get the same exception simply by doing a large broadcast of about 6GB.
> Note that I’m broadcasting a small number (~3m) of fat objects. There’s
> plenty of free RAM. This and related kryo exceptions seem to crop-up
> whenever an object graph of more than a couple of GB gets passed around.
>
>         at
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)
>
>         at
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>
>         at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>
>         at
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>
>         at
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>
>         at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>
>         at
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>
>         at
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>
>         at
> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
>
>         at
> com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:86)
>
>         at
> com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:17)
>
>         at
> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
>
>         at
> org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:128)
>
>         at
> org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:202)
>
>         at
> org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:101)
>
>         at
> org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:84)
>
>         at
> org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
>
>         at
> org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29)
>
>         at
> org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
>
>         at org.apache.spark.SparkContext.broadcast(SparkContext.scala:945)
>
>         at
> org.apache.spark.api.java.JavaSparkContext.broadcast(JavaSparkContext.scala:623)
>
>
> Caused by: java.lang.NegativeArraySizeException
>
>         at
> com.esotericsoftware.kryo.util.IdentityObjectIntMap.resize(IdentityObjectIntMap.java:409)
>
>         at
> com.esotericsoftware.kryo.util.IdentityObjectIntMap.putStash(IdentityObjectIntMap.java:227)
>
>         at
> com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntMap.java:221)
>
>         at
> com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:117)
>
>         at
> com.esotericsoftware.kryo.util.IdentityObjectIntMap.putStash(IdentityObjectIntMap.java:228)
>
>         at
> com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntMap.java:221)
>
>         at
> com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:117)
>
>         at
> com.esotericsoftware.kryo.util.MapReferenceResolver.addWrittenObject(MapReferenceResolver.java:23)
>
>         at
> com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:598)
>
>         at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:539)
>
>         at
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570)
>
>         ... 23 more
>
>
>
> On 26 February 2015 at 03:49, soila <skavu...@gmail.com> wrote:
>
>> I have been running into NegativeArraySizeException's when doing joins on
>> data with very skewed key distributions in Spark 1.2.0. I found a previous
>> post that mentioned that this exception arises when the size of the blocks
>> spilled during the shuffle exceeds 2GB. The post recommended increasing
>> the
>> number of partitions. I tried increasing the number of partitions, and
>> using
>> the RangePartitioner instead of the HashPartitioner but still encountered
>> the problem.
>>
>> Does Spark support skewed joins similar to Pig?
>>
>>
>> com.esotericsoftware.kryo.KryoException:
>> java.lang.NegativeArraySizeException
>> Serialization trace:
>> otherElements (org.apache.spark.util.collection.CompactBuffer)
>>         at
>>
>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)
>>         at
>>
>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>>         at
>> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
>>         at
>>
>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318)
>>         at
>>
>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)
>>         at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>         at
>>
>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>>         at
>>
>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>>         at
>> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
>>         at
>>
>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318)
>>         at
>>
>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)
>>         at
>> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
>>         at
>> com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:37)
>>         at
>> com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33)
>>         at
>> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
>>         at
>>
>> org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:128)
>>         at
>>
>> org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:195)
>>         at
>>
>> org.apache.spark.util.collection.ExternalAppendOnlyMap.spill(ExternalAppendOnlyMap.scala:176)
>>         at
>>
>> org.apache.spark.util.collection.ExternalAppendOnlyMap.spill(ExternalAppendOnlyMap.scala:63)
>>         at
>>
>> org.apache.spark.util.collection.Spillable$class.maybeSpill(Spillable.scala:87)
>>         at
>>
>> org.apache.spark.util.collection.ExternalAppendOnlyMap.maybeSpill(ExternalAppendOnlyMap.scala:63)
>>         at
>>
>> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:127)
>>         at
>>
>> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160)
>>         at
>>
>> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)
>>         at
>>
>> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>>         at
>>
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>         at
>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>         at
>>
>> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>>         at
>> org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159)
>>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>>         at
>> org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31)
>>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>>         at
>>
>> org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31)
>>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>>         at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>>         at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>>         at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>         at org.apache.spark.scheduler.Task.run(Task.scala:56)
>>         at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
>>         at
>>
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>         at
>>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>         at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.lang.NegativeArraySizeException
>>         at
>>
>> com.esotericsoftware.kryo.util.IdentityObjectIntMap.resize(IdentityObjectIntMap.java:409)
>>         at
>>
>> com.esotericsoftware.kryo.util.IdentityObjectIntMap.putStash(IdentityObjectIntMap.java:227)
>>         at
>>
>> com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntMap.java:221)
>>         at
>>
>> com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:117)
>>         at
>>
>> com.esotericsoftware.kryo.util.IdentityObjectIntMap.putStash(IdentityObjectIntMap.java:228)
>>         at
>>
>> com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntMap.java:221)
>>         at
>>
>> com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:117)
>>         at
>>
>> com.esotericsoftware.kryo.util.MapReferenceResolver.addWrittenObject(MapReferenceResolver.java:23)
>>         at
>> com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:598)
>>         at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:499)
>>         at
>>
>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>>         ... 46 more
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/NegativeArraySizeException-when-doing-joins-on-skewed-data-tp21802.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>

Reply via email to