Is that error actually occurring in LBFGS?  It looks like it might be
happening before the data even gets to LBFGS.  (Perhaps the outer join
you're trying to do is making the dataset size explode a bit.)  Are you
able to call count() (or any RDD action) on the data before you pass it to
LBFGS?

On Tue, Mar 3, 2015 at 8:55 AM, Gustavo Enrique Salazar Torres <
gsala...@ime.usp.br> wrote:

> Just did with the same error.
> I think the problem is the "data.count()" call in LBFGS because for huge
> datasets that's naive to do.
> I was thinking to write my version of LBFGS but instead of doing
> data.count() I will pass that parameter which I will calculate from a Spark
> SQL query.
>
> I will let you know.
>
> Thanks
>
>
> On Tue, Mar 3, 2015 at 3:25 AM, Akhil Das <ak...@sigmoidanalytics.com>
> wrote:
>
>> Can you try increasing your driver memory, reducing the executors and
>> increasing the executor memory?
>>
>> Thanks
>> Best Regards
>>
>> On Tue, Mar 3, 2015 at 10:09 AM, Gustavo Enrique Salazar Torres <
>> gsala...@ime.usp.br> wrote:
>>
>>> Hi there:
>>>
>>> I'm using LBFGS optimizer to train a logistic regression model. The code
>>> I implemented follows the pattern showed in
>>> https://spark.apache.org/docs/1.2.0/mllib-linear-methods.html but
>>> training data is obtained from a Spark SQL RDD.
>>> The problem I'm having is that LBFGS tries to count the elements in my
>>> RDD and that results in a OOM exception since my dataset is huge.
>>> I'm running on a AWS EMR cluster with 16 c3.2xlarge instances on Hadoop
>>> YARN. My dataset is about 150 GB but I sample (I take only 1% of the data)
>>> it in order to scale logistic regression.
>>> The exception I'm getting is this:
>>>
>>> 15/03/03 04:21:44 WARN scheduler.TaskSetManager: Lost task 108.0 in
>>> stage 2.0 (TID 7600, ip-10-155-20-71.ec2.internal):
>>> java.lang.OutOfMemoryError: Java heap space
>>>         at java.util.Arrays.copyOfRange(Arrays.java:2694)
>>>         at java.lang.String.<init>(String.java:203)
>>>         at com.esotericsoftware.kryo.io.Input.readString(Input.java:448)
>>>         at
>>> com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.read(DefaultSerializers.java:157)
>>>         at
>>> com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.read(DefaultSerializers.java:146)
>>>         at
>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
>>>         at
>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
>>>         at
>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
>>>         at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:651)
>>>         at
>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
>>>         at
>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
>>>         at
>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
>>>         at
>>> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:42)
>>>         at
>>> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33)
>>>         at
>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
>>>         at
>>> org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:144)
>>>         at
>>> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
>>>         at
>>> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>>>         at
>>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>>>         at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>>         at
>>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>>>         at
>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>>>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>         at org.apache.spark.sql.execution.joins.HashOuterJoin.org
>>> $apache$spark$sql$execution$joins$HashOuterJoin$$buildHashTable(HashOuterJoin.scala:179)
>>>         at
>>> org.apache.spark.sql.execution.joins.HashOuterJoin$$anonfun$execute$1.apply(HashOuterJoin.scala:199)
>>>         at
>>> org.apache.spark.sql.execution.joins.HashOuterJoin$$anonfun$execute$1.apply(HashOuterJoin.scala:196)
>>>         at
>>> org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
>>>         at
>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
>>>         at
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>         at
>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>>>
>>> I'm using this parameters at runtime:
>>> --num-executors 128 --executor-memory 1G --driver-memory 4G
>>> --conf spark.serializer=org.apache.spark.serializer.KryoSerializer
>>> --conf spark.storage.memoryFraction=0.2
>>>
>>> I also persist my dataset using MEMORY_AND_DISK_SER but get the same
>>> error.
>>> I will appreciate any help on this problem. I have been trying to solve
>>> it for days and I'm running out of time and hair.
>>>
>>> Thanks
>>> Gustavo
>>>
>>
>>
>

Reply via email to