Hi everyone, I have an RDD of the format (user: String, timestamp: Long, state: Boolean). My task invovles converting the states, where on/off is represented as true/false, into intervals of 'on' of the format (beginTs: Long, endTs: Long). So this task requires me, per user, to line up all of the on/off states so that I can compute when it is on, since the calculation is neither associative nor commutative.
So there are 2 main operations that I'm trying to accomplish together: 1. group by each user 2. sort by time -- keep all of the states in sorted order by time The main code inside the method that does grouping by user and sorting by time looks sort of looks like this: // RDD starts off in format (user, ts, state) of type RDD[(String, Long, Boolean)] val grouped = keyedStatesRDD.groupByKey // after .groupByKey, format for RDD is (user, seq-of-(ts, state)) of type RDD[(String, Iterable(Long, Boolean))] // take the sequence of (ts, state) per user, sort, get intervals val groupedIntervals = grouped.mapValues( states => { val sortedStates = states.toSeq.sortBy(_._1) val intervals = DFUtil.statesToIntervals(sortedStates) val intervalsList = bucketDurations.map{case(k,v) => (k,v)}(collection.breakOut).sortBy(_._1) intervalsList } ) // after .mapValues, new format for RDD is (user, seq-of-(startTime, endTime)) of type RDD[(String, IndexedSeq(Long, Long))] When I run my Spark job with 1 day's worth of data, the job completes successfully. When I run with 1 month's or 1 year's worth of data, that method is where my Spark job consistently crashes with get OutOfMemoryErrors. I need to run on the full year's worth of data. My suspicion is that the groupByKey is the problem (it's pulling all of the matching data values into a single executor's heap as a plain Scala Iterable). But alternatives of doing sortByKey on the RDD first before grouping, or sortByKey followed by a fold[ByKey] or aggregate[ByKey] don't quite apply in my scenario because my operation is not associative (can't combine per-partition results) and I still need to group by users before doing a foldLeft. I've definitely thought about the issue before and come across users with issues that are similar but not exactly the same: http://apache-spark-user-list.1001560.n3.nabble.com/Folding-an-RDD-in-order-td16577.html http://mail-archives.us.apache.org/mod_mbox/spark-user/201501.mbox/%3ccaae1cqr8rd8ypebcmbjwfhm+lxh6nw4+r+uharx00psk_sh...@mail.gmail.com%3E http://apache-spark-user-list.1001560.n3.nabble.com/Partition-sorting-by-Spark-framework-td18213.html http://apache-spark-user-list.1001560.n3.nabble.com/Alternatives-to-groupByKey-td20293.html And this Jira seems relevant too: https://issues.apache.org/jira/browse/SPARK-3655 The amount of memory that I'm using is 2g per executor, and I can't go higher than that because each executor gets a YARN container from nodes with 16 GB of RAM and 5 YARN containers allowed per node. So I'd like to know if there's an easy solution to executing my logic on my full dataset in Spark. Thanks! -- Elango