Hi Tristan, Did upgrading to Kryo3 help?
Thanks, Soila On Sun, Mar 1, 2015 at 2:48 PM, Tristan Blakers <tris...@blackfrog.org> wrote: > Yeah I implemented the same solution. It seems to kick in around the 4B > mark, but looking at the log I suspect it’s probably a function of the > number of unique objects more than anything. I definitely don’t have more > than 2B unique objects. > > > Will try the same test on Kryo3 and see if it goes away. > > T > > > On 27 February 2015 at 06:21, Soila Pertet Kavulya <skavu...@gmail.com> > wrote: >> >> Thanks Tristan, >> >> I ran into a similar issue with broadcast variables. I worked around >> it by estimating the size of the object I want to broadcast, splitting >> it up into chunks that were less than 2G, then doing multiple >> broadcasts. This approach worked pretty well for broadcast variables >> less than 10GB on our system. However, for larger variables the spills >> to disk made progress painfully slow so we need to do regular joins. >> >> Do you know if there are any efforts to get Kryo to support objects >> larger than a couple of GBs. >> >> Soila >> >> On Wed, Feb 25, 2015 at 11:06 PM, Tristan Blakers <tris...@blackfrog.org> >> wrote: >> > I get the same exception simply by doing a large broadcast of about 6GB. >> > Note that I’m broadcasting a small number (~3m) of fat objects. There’s >> > plenty of free RAM. This and related kryo exceptions seem to crop-up >> > whenever an object graph of more than a couple of GB gets passed around. >> > >> > at >> > >> > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585) >> > >> > at >> > >> > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) >> > >> > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) >> > >> > at >> > >> > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) >> > >> > at >> > >> > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) >> > >> > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) >> > >> > at >> > >> > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) >> > >> > at >> > >> > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) >> > >> > at >> > com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) >> > >> > at >> > >> > com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:86) >> > >> > at >> > >> > com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:17) >> > >> > at >> > com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) >> > >> > at >> > >> > org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:128) >> > >> > at >> > >> > org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:202) >> > >> > at >> > >> > org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:101) >> > >> > at >> > >> > org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:84) >> > >> > at >> > >> > org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34) >> > >> > at >> > >> > org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29) >> > >> > at >> > >> > org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62) >> > >> > at >> > org.apache.spark.SparkContext.broadcast(SparkContext.scala:945) >> > >> > at >> > >> > org.apache.spark.api.java.JavaSparkContext.broadcast(JavaSparkContext.scala:623) >> > >> > >> > Caused by: java.lang.NegativeArraySizeException >> > >> > at >> > >> > com.esotericsoftware.kryo.util.IdentityObjectIntMap.resize(IdentityObjectIntMap.java:409) >> > >> > at >> > >> > com.esotericsoftware.kryo.util.IdentityObjectIntMap.putStash(IdentityObjectIntMap.java:227) >> > >> > at >> > >> > com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntMap.java:221) >> > >> > at >> > >> > com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:117) >> > >> > at >> > >> > com.esotericsoftware.kryo.util.IdentityObjectIntMap.putStash(IdentityObjectIntMap.java:228) >> > >> > at >> > >> > com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntMap.java:221) >> > >> > at >> > >> > com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:117) >> > >> > at >> > >> > com.esotericsoftware.kryo.util.MapReferenceResolver.addWrittenObject(MapReferenceResolver.java:23) >> > >> > at >> > com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:598) >> > >> > at >> > com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:539) >> > >> > at >> > >> > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570) >> > >> > ... 23 more >> > >> > >> > >> > >> > On 26 February 2015 at 03:49, soila <skavu...@gmail.com> wrote: >> >> >> >> I have been running into NegativeArraySizeException's when doing joins >> >> on >> >> data with very skewed key distributions in Spark 1.2.0. I found a >> >> previous >> >> post that mentioned that this exception arises when the size of the >> >> blocks >> >> spilled during the shuffle exceeds 2GB. The post recommended increasing >> >> the >> >> number of partitions. I tried increasing the number of partitions, and >> >> using >> >> the RangePartitioner instead of the HashPartitioner but still >> >> encountered >> >> the problem. >> >> >> >> Does Spark support skewed joins similar to Pig? >> >> >> >> >> >> com.esotericsoftware.kryo.KryoException: >> >> java.lang.NegativeArraySizeException >> >> Serialization trace: >> >> otherElements (org.apache.spark.util.collection.CompactBuffer) >> >> at >> >> >> >> >> >> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585) >> >> at >> >> >> >> >> >> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) >> >> at >> >> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) >> >> at >> >> >> >> >> >> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318) >> >> at >> >> >> >> >> >> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293) >> >> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) >> >> at >> >> >> >> >> >> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) >> >> at >> >> >> >> >> >> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) >> >> at >> >> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) >> >> at >> >> >> >> >> >> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318) >> >> at >> >> >> >> >> >> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293) >> >> at >> >> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) >> >> at >> >> com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:37) >> >> at >> >> com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33) >> >> at >> >> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) >> >> at >> >> >> >> >> >> org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:128) >> >> at >> >> >> >> >> >> org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:195) >> >> at >> >> >> >> >> >> org.apache.spark.util.collection.ExternalAppendOnlyMap.spill(ExternalAppendOnlyMap.scala:176) >> >> at >> >> >> >> >> >> org.apache.spark.util.collection.ExternalAppendOnlyMap.spill(ExternalAppendOnlyMap.scala:63) >> >> at >> >> >> >> >> >> org.apache.spark.util.collection.Spillable$class.maybeSpill(Spillable.scala:87) >> >> at >> >> >> >> >> >> org.apache.spark.util.collection.ExternalAppendOnlyMap.maybeSpill(ExternalAppendOnlyMap.scala:63) >> >> at >> >> >> >> >> >> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:127) >> >> at >> >> >> >> >> >> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160) >> >> at >> >> >> >> >> >> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159) >> >> at >> >> >> >> >> >> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) >> >> at >> >> >> >> >> >> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >> >> at >> >> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >> >> at >> >> >> >> >> >> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) >> >> at >> >> org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159) >> >> at >> >> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) >> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) >> >> at >> >> org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31) >> >> at >> >> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) >> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) >> >> at >> >> >> >> >> >> org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31) >> >> at >> >> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) >> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) >> >> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) >> >> at >> >> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) >> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) >> >> at >> >> >> >> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) >> >> at >> >> >> >> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) >> >> at org.apache.spark.scheduler.Task.run(Task.scala:56) >> >> at >> >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) >> >> at >> >> >> >> >> >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >> >> at >> >> >> >> >> >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >> >> at java.lang.Thread.run(Thread.java:745) >> >> Caused by: java.lang.NegativeArraySizeException >> >> at >> >> >> >> >> >> com.esotericsoftware.kryo.util.IdentityObjectIntMap.resize(IdentityObjectIntMap.java:409) >> >> at >> >> >> >> >> >> com.esotericsoftware.kryo.util.IdentityObjectIntMap.putStash(IdentityObjectIntMap.java:227) >> >> at >> >> >> >> >> >> com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntMap.java:221) >> >> at >> >> >> >> >> >> com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:117) >> >> at >> >> >> >> >> >> com.esotericsoftware.kryo.util.IdentityObjectIntMap.putStash(IdentityObjectIntMap.java:228) >> >> at >> >> >> >> >> >> com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntMap.java:221) >> >> at >> >> >> >> >> >> com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:117) >> >> at >> >> >> >> >> >> com.esotericsoftware.kryo.util.MapReferenceResolver.addWrittenObject(MapReferenceResolver.java:23) >> >> at >> >> com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:598) >> >> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:499) >> >> at >> >> >> >> >> >> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) >> >> ... 46 more >> >> >> >> >> >> >> >> >> >> -- >> >> View this message in context: >> >> >> >> http://apache-spark-user-list.1001560.n3.nabble.com/NegativeArraySizeException-when-doing-joins-on-skewed-data-tp21802.html >> >> Sent from the Apache Spark User List mailing list archive at >> >> Nabble.com. >> >> >> >> --------------------------------------------------------------------- >> >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> >> For additional commands, e-mail: user-h...@spark.apache.org >> >> >> > > > --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org