You can try to reduce the number of containers in order to increase their memory.
2015-09-28 9:35 GMT+02:00 Akhil Das <ak...@sigmoidanalytics.com>: > You can try to increase the number of partitions to get ride of the OOM > errors. Also try to use reduceByKey instead of groupByKey. > > Thanks > Best Regards > > On Sat, Sep 26, 2015 at 1:05 AM, Elango Cheran <elango.che...@gmail.com> > wrote: > >> Hi everyone, >> I have an RDD of the format (user: String, timestamp: Long, state: >> Boolean). My task invovles converting the states, where on/off is >> represented as true/false, into intervals of 'on' of the format (beginTs: >> Long, endTs: Long). So this task requires me, per user, to line up all of >> the on/off states so that I can compute when it is on, since the >> calculation is neither associative nor commutative. >> >> So there are 2 main operations that I'm trying to accomplish together: >> 1. group by each user >> 2. sort by time -- keep all of the states in sorted order by time >> >> The main code inside the method that does grouping by user and sorting by >> time looks sort of looks like this: >> >> >> // RDD starts off in format (user, ts, state) of type RDD[(String, Long, >> Boolean)] >> val grouped = keyedStatesRDD.groupByKey >> // after .groupByKey, format for RDD is (user, seq-of-(ts, state)) of >> type RDD[(String, Iterable(Long, Boolean))] >> // take the sequence of (ts, state) per user, sort, get intervals >> val groupedIntervals = grouped.mapValues( >> states => { >> val sortedStates = states.toSeq.sortBy(_._1) >> val intervals = DFUtil.statesToIntervals(sortedStates) >> val intervalsList = bucketDurations.map{case(k,v) => >> (k,v)}(collection.breakOut).sortBy(_._1) >> intervalsList >> } >> ) >> // after .mapValues, new format for RDD is (user, seq-of-(startTime, >> endTime)) of type RDD[(String, IndexedSeq(Long, Long))] >> >> >> When I run my Spark job with 1 day's worth of data, the job completes >> successfully. When I run with 1 month's or 1 year's worth of data, that >> method is where my Spark job consistently crashes with get >> OutOfMemoryErrors. I need to run on the full year's worth of data. >> >> My suspicion is that the groupByKey is the problem (it's pulling all of >> the matching data values into a single executor's heap as a plain Scala >> Iterable). But alternatives of doing sortByKey on the RDD first before >> grouping, or sortByKey followed by a fold[ByKey] or aggregate[ByKey] don't >> quite apply in my scenario because my operation is not associative (can't >> combine per-partition results) and I still need to group by users before >> doing a foldLeft. >> >> I've definitely thought about the issue before and come across users with >> issues that are similar but not exactly the same: >> >> http://apache-spark-user-list.1001560.n3.nabble.com/Folding-an-RDD-in-order-td16577.html >> >> http://mail-archives.us.apache.org/mod_mbox/spark-user/201501.mbox/%3ccaae1cqr8rd8ypebcmbjwfhm+lxh6nw4+r+uharx00psk_sh...@mail.gmail.com%3E >> >> http://apache-spark-user-list.1001560.n3.nabble.com/Partition-sorting-by-Spark-framework-td18213.html >> >> http://apache-spark-user-list.1001560.n3.nabble.com/Alternatives-to-groupByKey-td20293.html >> >> And this Jira seems relevant too: >> https://issues.apache.org/jira/browse/SPARK-3655 >> >> The amount of memory that I'm using is 2g per executor, and I can't go >> higher than that because each executor gets a YARN container from nodes >> with 16 GB of RAM and 5 YARN containers allowed per node. >> >> So I'd like to know if there's an easy solution to executing my logic on >> my full dataset in Spark. >> >> Thanks! >> >> -- Elango >> > >