Hi there:

I'm using LBFGS optimizer to train a logistic regression model. The code I
implemented follows the pattern showed in
https://spark.apache.org/docs/1.2.0/mllib-linear-methods.html but training
data is obtained from a Spark SQL RDD.
The problem I'm having is that LBFGS tries to count the elements in my RDD
and that results in a OOM exception since my dataset is huge.
I'm running on a AWS EMR cluster with 16 c3.2xlarge instances on Hadoop
YARN. My dataset is about 150 GB but I sample (I take only 1% of the data)
it in order to scale logistic regression.
The exception I'm getting is this:

15/03/03 04:21:44 WARN scheduler.TaskSetManager: Lost task 108.0 in stage
2.0 (TID 7600, ip-10-155-20-71.ec2.internal): java.lang.OutOfMemoryError:
Java heap space
        at java.util.Arrays.copyOfRange(Arrays.java:2694)
        at java.lang.String.<init>(String.java:203)
        at com.esotericsoftware.kryo.io.Input.readString(Input.java:448)
        at
com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.read(DefaultSerializers.java:157)
        at
com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.read(DefaultSerializers.java:146)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
        at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
        at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:651)
        at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
        at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
        at
com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:42)
        at
com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
        at
org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:144)
        at
org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
        at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
        at
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
        at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
        at
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
        at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at org.apache.spark.sql.execution.joins.HashOuterJoin.org
$apache$spark$sql$execution$joins$HashOuterJoin$$buildHashTable(HashOuterJoin.scala:179)
        at
org.apache.spark.sql.execution.joins.HashOuterJoin$$anonfun$execute$1.apply(HashOuterJoin.scala:199)
        at
org.apache.spark.sql.execution.joins.HashOuterJoin$$anonfun$execute$1.apply(HashOuterJoin.scala:196)
        at
org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
        at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)

I'm using this parameters at runtime:
--num-executors 128 --executor-memory 1G --driver-memory 4G
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer
--conf spark.storage.memoryFraction=0.2

I also persist my dataset using MEMORY_AND_DISK_SER but get the same error.
I will appreciate any help on this problem. I have been trying to solve it
for days and I'm running out of time and hair.

Thanks
Gustavo

Reply via email to