Thanks, Guilaume,

Below is when the exception happens, nothing has spilled to disk yet.

And there isn't a join, but a partitionBy and groupBy action.

Actually if numPartitions is small, it succeeds, while if it's large, it
fails.

Partition was simply done by
    override def getPartition(key: Any): Int = {
        (key.toString.hashCode & Integer.MAX_VALUE) % numPartitions
    }

IndexIDAttemptStatus ▾Locality LevelExecutorLaunch TimeDurationGC Time
AccumulatorsShuffle ReadShuffle Spill (Memory)Shuffle Spill (Disk)Errors99
1730FAILEDPROCESS_LOCALgs-server-10002014/10/21 17:29:561.6 min30 s43.6 MB0.0
B0.0 B

com.esotericsoftware.kryo.KryoException: java.lang.NegativeArraySizeException
Serialization trace:
values (org.apache.spark.sql.catalyst.expressions.SpecificMutableRow)
otherElements (org.apache.spark.util.collection.CompactBuffer)
        
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)
        
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
        com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
        
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318)
        
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)
        com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
        
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
        
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
        com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
        com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:38)
        com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:34)
        com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
        
org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:119)
        
org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:195)
        
org.apache.spark.util.collection.ExternalAppendOnlyMap.spill(ExternalAppendOnlyMap.scala:203)
        
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:150)
        org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58)
        
org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$2.apply(PairRDDFunctions.scala:90)
        
org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$2.apply(PairRDDFunctions.scala:89)
        org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:625)
        org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:625)
        org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
        org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
        org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
        org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33)
        org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
        org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
        org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
        org.apache.spark.scheduler.Task.run(Task.scala:54)
        org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
        
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        java.lang.Thread.run(Thread.java:745)

991751FAILEDPROCESS_LOCALgs-server-10002014/10/21 17:31:292.6 min39 s42.7 MB0.0
B0.0 B

com.esotericsoftware.kryo.KryoException: java.lang.NegativeArraySizeException
Serialization trace:
values (org.apache.spark.sql.catalyst.expressions.SpecificMutableRow)
otherElements (org.apache.spark.util.collection.CompactBuffer)
        
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)
        
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
        com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
        
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318)
        
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)
        com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
        
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
        
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
        com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
        com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:38)
        com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:34)
        com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
        
org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:119)
        
org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:195)
        
org.apache.spark.util.collection.ExternalAppendOnlyMap.spill(ExternalAppendOnlyMap.scala:203)
        
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:150)
        org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58)
        
org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$2.apply(PairRDDFunctions.scala:90)
        
org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$2.apply(PairRDDFunctions.scala:89)
        org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:625)
        org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:625)
        org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
        org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
        org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
        org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33)
        org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
        org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
        org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
        org.apache.spark.scheduler.Task.run(Task.scala:54)
        org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
        
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        java.lang.Thread.run(Thread.java:745)


2014-10-20 20:58 GMT+08:00 Guillaume Pitel <guillaume.pi...@exensa.com>:

>  Well, reading your logs, here is what happens :
>
> You do a combineByKey (so you have a join probably somewhere), which
> spills on disk because it's too big. To spill on disk it serializes, and
> the blocks are > 2GB.
>
> From a 2GB dataset, it's easy to exand to several TB
>
> Increase parallelism, make sure that your combineByKey has enough
> different keys, and see what happens.
>
> Guillaume
>
> Thank you, Guillaume, my dataset is not that large, it's totally ~2GB
>
> 2014-10-20 16:58 GMT+08:00 Guillaume Pitel <guillaume.pi...@exensa.com>:
>
>>  Hi,
>>
>> It happened to me with blocks which take more than 1 or 2 GB once
>> serialized
>>
>> I think the problem was that during serialization, a Byte Array is
>> created, and arrays in java are indexed by ints. When the serializer needs
>> to increase the buffer size, it does so somehow, but then writing in the
>> array leads to an error.
>>
>> Don't know if your problem is the same, but maybe.
>>
>> In general Java or Java libraries do not check for oversized arrays,
>> which is really bad when you play with big data.
>>
>> Guillaume
>>
>>  The exception drives me crazy, because it occurs randomly.
>> I didn't know which line of my code causes this exception.
>> I didn't even understand what "KryoException:
>> java.lang.NegativeArraySizeException" means, or even implies?
>>
>>
>>  14/10/20 15:59:01 WARN scheduler.TaskSetManager: Lost task 32.2 in
>> stage 0.0 (TID 181, gs-server-1000):
>> com.esotericsoftware.kryo.KryoException:
>> java.lang.NegativeArraySizeException
>> Serialization trace:
>> value (org.apache.spark.sql.catalyst.expressions.MutableAny)
>> values (org.apache.spark.sql.catalyst.expressions.SpecificMutableRow)
>> otherElements (org.apache.spark.util.collection.CompactBuffer)
>>
>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)
>>
>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>>         com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
>>
>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318)
>>
>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)
>>         com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>
>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>>
>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>>         com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
>>
>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318)
>>
>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)
>>         com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>
>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>>
>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>>         com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
>>
>> com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:38)
>>
>> com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:34)
>>         com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
>>
>> org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:119)
>>
>> org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:195)
>>
>> org.apache.spark.util.collection.ExternalAppendOnlyMap.spill(ExternalAppendOnlyMap.scala:203)
>>
>> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:150)
>>
>> org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58)
>>
>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$2.apply(PairRDDFunctions.scala:90)
>>
>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$2.apply(PairRDDFunctions.scala:89)
>>         org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:625)
>>         org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:625)
>>
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>         org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>>         org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>>         org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33)
>>         org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>>         org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>>         org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
>>         org.apache.spark.scheduler.Task.run(Task.scala:54)
>>
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
>>
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>         java.lang.Thread.run(Thread.java:745)
>>
>>
>>
>>   --
>>    [image: eXenSa]
>>  *Guillaume PITEL, Président*
>> +33(0)626 222 431
>>
>> eXenSa S.A.S. <http://www.exensa.com/>
>>  41, rue Périer - 92120 Montrouge - FRANCE
>> Tel +33(0)184 163 677 / Fax +33(0)972 283 705
>>
>
>
>
> --
>    [image: eXenSa]
>  *Guillaume PITEL, Président*
> +33(0)626 222 431
>
> eXenSa S.A.S. <http://www.exensa.com/>
>  41, rue Périer - 92120 Montrouge - FRANCE
> Tel +33(0)184 163 677 / Fax +33(0)972 283 705
>

Reply via email to