PS, I will recommend you compress the data when you cache the RDD.
There will be some overhead in compression/decompression, and
serialization/deserialization, but it will help a lot for iterative
algorithms with ability to caching more data.

Sincerely,

DB Tsai
-------------------------------------------------------
Blog: https://www.dbtsai.com


On Tue, Mar 3, 2015 at 2:27 PM, Gustavo Enrique Salazar Torres
<gsala...@ime.usp.br> wrote:
> Yeah, I can call count before that and it works. Also I was over caching
> tables but I removed those. Now there is no caching but it gets really slow
> since it calculates my table RDD many times.
> Also hacked the LBFGS code to pass the number of examples which I calculated
> outside in a Spark SQL query but just moved the location of the problem.
>
> The query I'm running looks like this:
>
> s"SELECT $mappedFields, tableB.id as b_id FROM tableA LEFT JOIN tableB  ON
> tableA.id=tableB.table_a_id WHERE tableA.status='' OR tableB.status='' "
>
> mappedFields contains a list of fields which I'm interested in. The result
> of that query goes through (including sampling) some transformations before
> being input to LBFGS.
>
> My dataset has 180GB just for feature selection, I'm planning to use 450GB
> to train the final model and I'm using 16 c3.2xlarge EC2 instances, that
> means I have 240GB of RAM available.
>
> Any suggestion? I'm starting to check the algorithm because I don't
> understand why it needs to count the dataset.
>
> Thanks
>
> Gustavo
>
> On Tue, Mar 3, 2015 at 6:08 PM, Joseph Bradley <jos...@databricks.com>
> wrote:
>>
>> Is that error actually occurring in LBFGS?  It looks like it might be
>> happening before the data even gets to LBFGS.  (Perhaps the outer join
>> you're trying to do is making the dataset size explode a bit.)  Are you able
>> to call count() (or any RDD action) on the data before you pass it to LBFGS?
>>
>> On Tue, Mar 3, 2015 at 8:55 AM, Gustavo Enrique Salazar Torres
>> <gsala...@ime.usp.br> wrote:
>>>
>>> Just did with the same error.
>>> I think the problem is the "data.count()" call in LBFGS because for huge
>>> datasets that's naive to do.
>>> I was thinking to write my version of LBFGS but instead of doing
>>> data.count() I will pass that parameter which I will calculate from a Spark
>>> SQL query.
>>>
>>> I will let you know.
>>>
>>> Thanks
>>>
>>>
>>> On Tue, Mar 3, 2015 at 3:25 AM, Akhil Das <ak...@sigmoidanalytics.com>
>>> wrote:
>>>>
>>>> Can you try increasing your driver memory, reducing the executors and
>>>> increasing the executor memory?
>>>>
>>>> Thanks
>>>> Best Regards
>>>>
>>>> On Tue, Mar 3, 2015 at 10:09 AM, Gustavo Enrique Salazar Torres
>>>> <gsala...@ime.usp.br> wrote:
>>>>>
>>>>> Hi there:
>>>>>
>>>>> I'm using LBFGS optimizer to train a logistic regression model. The
>>>>> code I implemented follows the pattern showed in
>>>>> https://spark.apache.org/docs/1.2.0/mllib-linear-methods.html but training
>>>>> data is obtained from a Spark SQL RDD.
>>>>> The problem I'm having is that LBFGS tries to count the elements in my
>>>>> RDD and that results in a OOM exception since my dataset is huge.
>>>>> I'm running on a AWS EMR cluster with 16 c3.2xlarge instances on Hadoop
>>>>> YARN. My dataset is about 150 GB but I sample (I take only 1% of the data)
>>>>> it in order to scale logistic regression.
>>>>> The exception I'm getting is this:
>>>>>
>>>>> 15/03/03 04:21:44 WARN scheduler.TaskSetManager: Lost task 108.0 in
>>>>> stage 2.0 (TID 7600, ip-10-155-20-71.ec2.internal):
>>>>> java.lang.OutOfMemoryError: Java heap space
>>>>>         at java.util.Arrays.copyOfRange(Arrays.java:2694)
>>>>>         at java.lang.String.<init>(String.java:203)
>>>>>         at
>>>>> com.esotericsoftware.kryo.io.Input.readString(Input.java:448)
>>>>>         at
>>>>> com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.read(DefaultSerializers.java:157)
>>>>>         at
>>>>> com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.read(DefaultSerializers.java:146)
>>>>>         at
>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
>>>>>         at
>>>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
>>>>>         at
>>>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
>>>>>         at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:651)
>>>>>         at
>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
>>>>>         at
>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
>>>>>         at
>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
>>>>>         at
>>>>> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:42)
>>>>>         at
>>>>> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33)
>>>>>         at
>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
>>>>>         at
>>>>> org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:144)
>>>>>         at
>>>>> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
>>>>>         at
>>>>> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>>>>>         at
>>>>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>>>>>         at
>>>>> scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>>>>         at
>>>>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>>>>>         at
>>>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>>>>>         at
>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>         at
>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>         at
>>>>> org.apache.spark.sql.execution.joins.HashOuterJoin.org$apache$spark$sql$execution$joins$HashOuterJoin$$buildHashTable(HashOuterJoin.scala:179)
>>>>>         at
>>>>> org.apache.spark.sql.execution.joins.HashOuterJoin$$anonfun$execute$1.apply(HashOuterJoin.scala:199)
>>>>>         at
>>>>> org.apache.spark.sql.execution.joins.HashOuterJoin$$anonfun$execute$1.apply(HashOuterJoin.scala:196)
>>>>>         at
>>>>> org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
>>>>>         at
>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
>>>>>         at
>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>>>         at
>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>>>>>
>>>>> I'm using this parameters at runtime:
>>>>> --num-executors 128 --executor-memory 1G --driver-memory 4G
>>>>> --conf spark.serializer=org.apache.spark.serializer.KryoSerializer
>>>>> --conf spark.storage.memoryFraction=0.2
>>>>>
>>>>> I also persist my dataset using MEMORY_AND_DISK_SER but get the same
>>>>> error.
>>>>> I will appreciate any help on this problem. I have been trying to solve
>>>>> it for days and I'm running out of time and hair.
>>>>>
>>>>> Thanks
>>>>> Gustavo
>>>>
>>>>
>>>
>>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to