I keep bumping into a problem with persisting RDDs. Consider this (silly)
example:

def everySecondFromBehind(input: RDD[Int]): RDD[Int] = {
  val count = input.count
  if (count % 2 == 0) {
    return input.filter(_ % 2 == 1)
  } else {
    return input.filter(_ % 2 == 0)
  }
}


The situation is that we want to do two things with an RDD (a "count" and a
"filter" in the example). The "input" RDD may represent a very expensive
calculation. So it would make sense to add an "input.cache()" line at the
beginning. But where do we put "input.unpersist()"?

input.cache()val count = input.countval result = input.filter(...)
input.unpersist()return result


"input.filter()" is lazy, so this does not work as expected. We only want
to release "input" from the cache once nothing depends on it anymore. Maybe
"result" was garbage collected. Maybe "result" itself has been cached. But
there is no way to detect such conditions.

Our current approach is to just leave the RDD cached, and it will get
dumped at some point anyway. Is there a better solution? Thanks for any
tips.

Reply via email to