I get the same exception simply by doing a large broadcast of about 6GB. Note that I’m broadcasting a small number (~3m) of fat objects. There’s plenty of free RAM. This and related kryo exceptions seem to crop-up whenever an object graph of more than a couple of GB gets passed around.
at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:86) at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:17) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:128) at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:202) at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:101) at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:84) at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34) at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29) at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62) at org.apache.spark.SparkContext.broadcast(SparkContext.scala:945) at org.apache.spark.api.java.JavaSparkContext.broadcast(JavaSparkContext.scala:623) Caused by: java.lang.NegativeArraySizeException at com.esotericsoftware.kryo.util.IdentityObjectIntMap.resize(IdentityObjectIntMap.java:409) at com.esotericsoftware.kryo.util.IdentityObjectIntMap.putStash(IdentityObjectIntMap.java:227) at com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntMap.java:221) at com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:117) at com.esotericsoftware.kryo.util.IdentityObjectIntMap.putStash(IdentityObjectIntMap.java:228) at com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntMap.java:221) at com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:117) at com.esotericsoftware.kryo.util.MapReferenceResolver.addWrittenObject(MapReferenceResolver.java:23) at com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:598) at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:539) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570) ... 23 more On 26 February 2015 at 03:49, soila <skavu...@gmail.com> wrote: > I have been running into NegativeArraySizeException's when doing joins on > data with very skewed key distributions in Spark 1.2.0. I found a previous > post that mentioned that this exception arises when the size of the blocks > spilled during the shuffle exceeds 2GB. The post recommended increasing the > number of partitions. I tried increasing the number of partitions, and > using > the RangePartitioner instead of the HashPartitioner but still encountered > the problem. > > Does Spark support skewed joins similar to Pig? > > > com.esotericsoftware.kryo.KryoException: > java.lang.NegativeArraySizeException > Serialization trace: > otherElements (org.apache.spark.util.collection.CompactBuffer) > at > > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585) > at > > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) > at > com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) > at > > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318) > at > > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293) > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) > at > > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) > at > > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) > at > com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) > at > > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318) > at > > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293) > at > com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) > at > com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:37) > at > com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33) > at > com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) > at > > org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:128) > at > > org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:195) > at > > org.apache.spark.util.collection.ExternalAppendOnlyMap.spill(ExternalAppendOnlyMap.scala:176) > at > > org.apache.spark.util.collection.ExternalAppendOnlyMap.spill(ExternalAppendOnlyMap.scala:63) > at > > org.apache.spark.util.collection.Spillable$class.maybeSpill(Spillable.scala:87) > at > > org.apache.spark.util.collection.ExternalAppendOnlyMap.maybeSpill(ExternalAppendOnlyMap.scala:63) > at > > org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:127) > at > > org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160) > at > > org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159) > at > > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) > at > > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at > scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) > at > org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) > at > org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) > at > > org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) > at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) > at org.apache.spark.scheduler.Task.run(Task.scala:56) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) > at > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.NegativeArraySizeException > at > > com.esotericsoftware.kryo.util.IdentityObjectIntMap.resize(IdentityObjectIntMap.java:409) > at > > com.esotericsoftware.kryo.util.IdentityObjectIntMap.putStash(IdentityObjectIntMap.java:227) > at > > com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntMap.java:221) > at > > com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:117) > at > > com.esotericsoftware.kryo.util.IdentityObjectIntMap.putStash(IdentityObjectIntMap.java:228) > at > > com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntMap.java:221) > at > > com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:117) > at > > com.esotericsoftware.kryo.util.MapReferenceResolver.addWrittenObject(MapReferenceResolver.java:23) > at > com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:598) > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:499) > at > > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) > ... 46 more > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/NegativeArraySizeException-when-doing-joins-on-skewed-data-tp21802.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >