Daniel,
Is SPARK-1103 https://issues.apache.org/jira/browse/SPARK-1103 related to
your example? Automatic unpersist()-ing of unreferenced RDDs would be nice.
Nick
On Tue, May 27, 2014 at 12:28 PM, Daniel Darabos
daniel.dara...@lynxanalytics.com wrote:
I keep bumping into a problem with persisting RDDs. Consider this (silly)
example:
def everySecondFromBehind(input: RDD[Int]): RDD[Int] = {
val count = input.count
if (count % 2 == 0) {
return input.filter(_ % 2 == 1)
} else {
return input.filter(_ % 2 == 0)
}
}
The situation is that we want to do two things with an RDD (a count and
a filter in the example). The input RDD may represent a very expensive
calculation. So it would make sense to add an input.cache() line at the
beginning. But where do we put input.unpersist()?
input.cache()val count = input.countval result = input.filter(...)
input.unpersist()return result
input.filter() is lazy, so this does not work as expected. We only want
to release input from the cache once nothing depends on it anymore. Maybe
result was garbage collected. Maybe result itself has been cached. But
there is no way to detect such conditions.
Our current approach is to just leave the RDD cached, and it will get
dumped at some point anyway. Is there a better solution? Thanks for any
tips.