PS, I will recommend you compress the data when you cache the RDD. There will be some overhead in compression/decompression, and serialization/deserialization, but it will help a lot for iterative algorithms with ability to caching more data.
Sincerely, DB Tsai ------------------------------------------------------- Blog: https://www.dbtsai.com On Tue, Mar 3, 2015 at 2:27 PM, Gustavo Enrique Salazar Torres <gsala...@ime.usp.br> wrote: > Yeah, I can call count before that and it works. Also I was over caching > tables but I removed those. Now there is no caching but it gets really slow > since it calculates my table RDD many times. > Also hacked the LBFGS code to pass the number of examples which I calculated > outside in a Spark SQL query but just moved the location of the problem. > > The query I'm running looks like this: > > s"SELECT $mappedFields, tableB.id as b_id FROM tableA LEFT JOIN tableB ON > tableA.id=tableB.table_a_id WHERE tableA.status='' OR tableB.status='' " > > mappedFields contains a list of fields which I'm interested in. The result > of that query goes through (including sampling) some transformations before > being input to LBFGS. > > My dataset has 180GB just for feature selection, I'm planning to use 450GB > to train the final model and I'm using 16 c3.2xlarge EC2 instances, that > means I have 240GB of RAM available. > > Any suggestion? I'm starting to check the algorithm because I don't > understand why it needs to count the dataset. > > Thanks > > Gustavo > > On Tue, Mar 3, 2015 at 6:08 PM, Joseph Bradley <jos...@databricks.com> > wrote: >> >> Is that error actually occurring in LBFGS? It looks like it might be >> happening before the data even gets to LBFGS. (Perhaps the outer join >> you're trying to do is making the dataset size explode a bit.) Are you able >> to call count() (or any RDD action) on the data before you pass it to LBFGS? >> >> On Tue, Mar 3, 2015 at 8:55 AM, Gustavo Enrique Salazar Torres >> <gsala...@ime.usp.br> wrote: >>> >>> Just did with the same error. >>> I think the problem is the "data.count()" call in LBFGS because for huge >>> datasets that's naive to do. >>> I was thinking to write my version of LBFGS but instead of doing >>> data.count() I will pass that parameter which I will calculate from a Spark >>> SQL query. >>> >>> I will let you know. >>> >>> Thanks >>> >>> >>> On Tue, Mar 3, 2015 at 3:25 AM, Akhil Das <ak...@sigmoidanalytics.com> >>> wrote: >>>> >>>> Can you try increasing your driver memory, reducing the executors and >>>> increasing the executor memory? >>>> >>>> Thanks >>>> Best Regards >>>> >>>> On Tue, Mar 3, 2015 at 10:09 AM, Gustavo Enrique Salazar Torres >>>> <gsala...@ime.usp.br> wrote: >>>>> >>>>> Hi there: >>>>> >>>>> I'm using LBFGS optimizer to train a logistic regression model. The >>>>> code I implemented follows the pattern showed in >>>>> https://spark.apache.org/docs/1.2.0/mllib-linear-methods.html but training >>>>> data is obtained from a Spark SQL RDD. >>>>> The problem I'm having is that LBFGS tries to count the elements in my >>>>> RDD and that results in a OOM exception since my dataset is huge. >>>>> I'm running on a AWS EMR cluster with 16 c3.2xlarge instances on Hadoop >>>>> YARN. My dataset is about 150 GB but I sample (I take only 1% of the data) >>>>> it in order to scale logistic regression. >>>>> The exception I'm getting is this: >>>>> >>>>> 15/03/03 04:21:44 WARN scheduler.TaskSetManager: Lost task 108.0 in >>>>> stage 2.0 (TID 7600, ip-10-155-20-71.ec2.internal): >>>>> java.lang.OutOfMemoryError: Java heap space >>>>> at java.util.Arrays.copyOfRange(Arrays.java:2694) >>>>> at java.lang.String.<init>(String.java:203) >>>>> at >>>>> com.esotericsoftware.kryo.io.Input.readString(Input.java:448) >>>>> at >>>>> com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.read(DefaultSerializers.java:157) >>>>> at >>>>> com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.read(DefaultSerializers.java:146) >>>>> at >>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732) >>>>> at >>>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338) >>>>> at >>>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293) >>>>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:651) >>>>> at >>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) >>>>> at >>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) >>>>> at >>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732) >>>>> at >>>>> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:42) >>>>> at >>>>> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33) >>>>> at >>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732) >>>>> at >>>>> org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:144) >>>>> at >>>>> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133) >>>>> at >>>>> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) >>>>> at >>>>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) >>>>> at >>>>> scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) >>>>> at >>>>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) >>>>> at >>>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) >>>>> at >>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>>>> at >>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>>>> at >>>>> org.apache.spark.sql.execution.joins.HashOuterJoin.org$apache$spark$sql$execution$joins$HashOuterJoin$$buildHashTable(HashOuterJoin.scala:179) >>>>> at >>>>> org.apache.spark.sql.execution.joins.HashOuterJoin$$anonfun$execute$1.apply(HashOuterJoin.scala:199) >>>>> at >>>>> org.apache.spark.sql.execution.joins.HashOuterJoin$$anonfun$execute$1.apply(HashOuterJoin.scala:196) >>>>> at >>>>> org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88) >>>>> at >>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) >>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) >>>>> at >>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) >>>>> at >>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) >>>>> >>>>> I'm using this parameters at runtime: >>>>> --num-executors 128 --executor-memory 1G --driver-memory 4G >>>>> --conf spark.serializer=org.apache.spark.serializer.KryoSerializer >>>>> --conf spark.storage.memoryFraction=0.2 >>>>> >>>>> I also persist my dataset using MEMORY_AND_DISK_SER but get the same >>>>> error. >>>>> I will appreciate any help on this problem. I have been trying to solve >>>>> it for days and I'm running out of time and hair. >>>>> >>>>> Thanks >>>>> Gustavo >>>> >>>> >>> >> > --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org