Thanks, Guilaume, Below is when the exception happens, nothing has spilled to disk yet.
And there isn't a join, but a partitionBy and groupBy action. Actually if numPartitions is small, it succeeds, while if it's large, it fails. Partition was simply done by override def getPartition(key: Any): Int = { (key.toString.hashCode & Integer.MAX_VALUE) % numPartitions } IndexIDAttemptStatus ▾Locality LevelExecutorLaunch TimeDurationGC Time AccumulatorsShuffle ReadShuffle Spill (Memory)Shuffle Spill (Disk)Errors99 1730FAILEDPROCESS_LOCALgs-server-10002014/10/21 17:29:561.6 min30 s43.6 MB0.0 B0.0 B com.esotericsoftware.kryo.KryoException: java.lang.NegativeArraySizeException Serialization trace: values (org.apache.spark.sql.catalyst.expressions.SpecificMutableRow) otherElements (org.apache.spark.util.collection.CompactBuffer) com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585) com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318) com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293) com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:38) com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:34) com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:119) org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:195) org.apache.spark.util.collection.ExternalAppendOnlyMap.spill(ExternalAppendOnlyMap.scala:203) org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:150) org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58) org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$2.apply(PairRDDFunctions.scala:90) org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$2.apply(PairRDDFunctions.scala:89) org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:625) org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:625) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) 991751FAILEDPROCESS_LOCALgs-server-10002014/10/21 17:31:292.6 min39 s42.7 MB0.0 B0.0 B com.esotericsoftware.kryo.KryoException: java.lang.NegativeArraySizeException Serialization trace: values (org.apache.spark.sql.catalyst.expressions.SpecificMutableRow) otherElements (org.apache.spark.util.collection.CompactBuffer) com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585) com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318) com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293) com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:38) com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:34) com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:119) org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:195) org.apache.spark.util.collection.ExternalAppendOnlyMap.spill(ExternalAppendOnlyMap.scala:203) org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:150) org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58) org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$2.apply(PairRDDFunctions.scala:90) org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$2.apply(PairRDDFunctions.scala:89) org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:625) org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:625) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) 2014-10-20 20:58 GMT+08:00 Guillaume Pitel <guillaume.pi...@exensa.com>: > Well, reading your logs, here is what happens : > > You do a combineByKey (so you have a join probably somewhere), which > spills on disk because it's too big. To spill on disk it serializes, and > the blocks are > 2GB. > > From a 2GB dataset, it's easy to exand to several TB > > Increase parallelism, make sure that your combineByKey has enough > different keys, and see what happens. > > Guillaume > > Thank you, Guillaume, my dataset is not that large, it's totally ~2GB > > 2014-10-20 16:58 GMT+08:00 Guillaume Pitel <guillaume.pi...@exensa.com>: > >> Hi, >> >> It happened to me with blocks which take more than 1 or 2 GB once >> serialized >> >> I think the problem was that during serialization, a Byte Array is >> created, and arrays in java are indexed by ints. When the serializer needs >> to increase the buffer size, it does so somehow, but then writing in the >> array leads to an error. >> >> Don't know if your problem is the same, but maybe. >> >> In general Java or Java libraries do not check for oversized arrays, >> which is really bad when you play with big data. >> >> Guillaume >> >> The exception drives me crazy, because it occurs randomly. >> I didn't know which line of my code causes this exception. >> I didn't even understand what "KryoException: >> java.lang.NegativeArraySizeException" means, or even implies? >> >> >> 14/10/20 15:59:01 WARN scheduler.TaskSetManager: Lost task 32.2 in >> stage 0.0 (TID 181, gs-server-1000): >> com.esotericsoftware.kryo.KryoException: >> java.lang.NegativeArraySizeException >> Serialization trace: >> value (org.apache.spark.sql.catalyst.expressions.MutableAny) >> values (org.apache.spark.sql.catalyst.expressions.SpecificMutableRow) >> otherElements (org.apache.spark.util.collection.CompactBuffer) >> >> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585) >> >> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) >> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) >> >> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318) >> >> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293) >> com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) >> >> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) >> >> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) >> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) >> >> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318) >> >> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293) >> com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) >> >> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) >> >> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) >> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) >> >> com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:38) >> >> com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:34) >> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) >> >> org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:119) >> >> org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:195) >> >> org.apache.spark.util.collection.ExternalAppendOnlyMap.spill(ExternalAppendOnlyMap.scala:203) >> >> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:150) >> >> org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58) >> >> org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$2.apply(PairRDDFunctions.scala:90) >> >> org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$2.apply(PairRDDFunctions.scala:89) >> org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:625) >> org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:625) >> >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) >> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) >> org.apache.spark.rdd.RDD.iterator(RDD.scala:229) >> org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33) >> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) >> org.apache.spark.rdd.RDD.iterator(RDD.scala:229) >> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) >> org.apache.spark.scheduler.Task.run(Task.scala:54) >> >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) >> >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >> >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >> java.lang.Thread.run(Thread.java:745) >> >> >> >> -- >> [image: eXenSa] >> *Guillaume PITEL, Président* >> +33(0)626 222 431 >> >> eXenSa S.A.S. <http://www.exensa.com/> >> 41, rue Périer - 92120 Montrouge - FRANCE >> Tel +33(0)184 163 677 / Fax +33(0)972 283 705 >> > > > > -- > [image: eXenSa] > *Guillaume PITEL, Président* > +33(0)626 222 431 > > eXenSa S.A.S. <http://www.exensa.com/> > 41, rue Périer - 92120 Montrouge - FRANCE > Tel +33(0)184 163 677 / Fax +33(0)972 283 705 >