I keep bumping into a problem with persisting RDDs. Consider this (silly) example:
def everySecondFromBehind(input: RDD[Int]): RDD[Int] = { val count = input.count if (count % 2 == 0) { return input.filter(_ % 2 == 1) } else { return input.filter(_ % 2 == 0) } } The situation is that we want to do two things with an RDD (a "count" and a "filter" in the example). The "input" RDD may represent a very expensive calculation. So it would make sense to add an "input.cache()" line at the beginning. But where do we put "input.unpersist()"? input.cache()val count = input.countval result = input.filter(...) input.unpersist()return result "input.filter()" is lazy, so this does not work as expected. We only want to release "input" from the cache once nothing depends on it anymore. Maybe "result" was garbage collected. Maybe "result" itself has been cached. But there is no way to detect such conditions. Our current approach is to just leave the RDD cached, and it will get dumped at some point anyway. Is there a better solution? Thanks for any tips.