I have an RDD of the format (user: String, timestamp: Long, state:
Boolean).  My task invovles converting the states, where on/off is
represented as true/false, into intervals of 'on' of the format (beginTs:
Long, endTs: Long).  So this task requires me, per user, to line up all of
the on/off states so that I can compute when it is on, since the
calculation is neither associative nor commutative.

So there are 2 main operations that I'm trying to accomplish together:
1. group by each user
2. sort by time -- keep all of the states in sorted order by time

The main code inside the method that does grouping by user and sorting by
time looks sort of looks like this:

// RDD starts off in format (user, ts, state) of type RDD[(String, Long,
val grouped = keyedStatesRDD.groupByKey
// after .groupByKey, format for RDD is (user, seq-of-(ts, state)) of type
RDD[(String, Iterable(Long, Boolean))]
// take the sequence of (ts, state) per user, sort, get intervals
val groupedIntervals = grouped.mapValues(
  states => {
    val sortedStates = states.toSeq.sortBy(_._1)
    val intervals = DFUtil.statesToIntervals(sortedStates)
    val intervalsList = bucketDurations.map{case(k,v) =>
// after .mapValues, new format for RDD is (user, seq-of-(startTime,
endTime)) of type RDD[(String, IndexedSeq(Long, Long))]

When I run my Spark job with 1 day's worth of data, the job completes
successfully.  When I run with 1 month's or 1 year's worth of data, that
method is where my Spark job consistently crashes with get
OutOfMemoryErrors.  I need to run on the full year's worth of data.

My suspicion is that the groupByKey is the problem (it's pulling all of the
matching data values into a single executor's heap as a plain Scala
Iterable).  But alternatives of doing sortByKey on the RDD first before
grouping, or sortByKey followed by a fold[ByKey] or aggregate[ByKey] don't
quite apply in my scenario because my operation is not associative (can't
combine per-partition results) and I still need to group by users before
doing a foldLeft.

I've definitely thought about the issue before and come across users with
issues that are similar but not exactly the same:

And this Jira seems relevant too:

The amount of memory that I'm using is 2g per executor, and I can't go
higher than that because each executor gets a YARN container from nodes
with 16 GB of RAM and 5 YARN containers allowed per node.

So I'd like to know if there's an easy solution to executing my logic on my
full dataset in Spark.


