I see. Thanks for clearing things up. Looking forward to the 0.8 release. Grega
On Wed, Aug 28, 2013 at 12:58 AM, Matei Zaharia <[email protected]>wrote: > Hi Grega, > > thanks for the response, I haven't noticed it until now. > 1. Is there some performance penalty if we union many small RDDs? > > I will need to union 7 RDDs continuously in a loop, where 6 RDDs will be > common from 1 iteration to the next. Something along the lines: > > val rdds = ... // IndexedSeq of RDDs > > for ( i <- 0 to n ) { > val rddCachedSet1 = rdds.slice(i, i + 7).map(_.cache).reduceLeft { > _.union(_) } > // do some computations on rddCachedSet1 > } > > > Unioning 7 of them shouldn't be a problem, though hundreds might be. > > 2. Will calling cache() on an rdd multiple times cause some undesired > effects? If so, I can probably check whether the RDD is already cached with > rdd.getStorageLevel() ? > > > Calling cache() multiple times should work as long as you set the same > storage level. > > 3. Is rdd.unpersist() planned for the next Spark release (0.8)? > > > Yes, it's already in the code. > > 4. Regarding LRU cache that you mentioned. Were you referring to garbage > collector cleaning up unused objects? If so, does that mean that even > cached rdds will at some point get garbage-collected? > > > I was mentioning that we remove objects from the cache (i.e. unreference > them) in an LRU manner, when the memory for it becomes full. The GC will > then collect them some time afterward. So even if you cache arbitrarily > many RDDs, only the least recently used ones will be in memory, and you > won't get OutOfMemoryErrors. But we are the ones controlling when we > unreference them, and the GC just picks up from there when it decides to > clean stuff up. > > Matei > > > > Thanks, > Grega > > > On Wed, Aug 14, 2013 at 12:40 AM, Matei Zaharia > <[email protected]>wrote: > >> Hi Grega, >> >> You'll need to create a new cached RDD for each batch, and then create >> the union of those on every window. So for example, if you have rdd0, rdd1, >> and rdd2, you might first take the union of 0 and 1, then of 1 and 2. This >> will let you use just the subset of RDDs you care about instead of trying >> to subtract stuff. >> >> In terms of uncaching, in the master branch, you can use RDD.unpersist() >> to uncache an old RDD. In Spark 0.7, there's no user control over uncaching >> so you just have to wait for it to fall out of the LRU cache. >> >> Matei >> >> On Aug 13, 2013, at 4:07 AM, Grega Kešpret <[email protected]> wrote: >> >> Hi all, >> I would need some tips regarding how to go about doing a "rolling window" >> with Spark. We would like to make something like this: >> >> [----------- rolling window data -----------][ new ] >> [ old ][----------- rolling window data -----------][ new ] >> >> Effectively, the size of the data in rolling window will be much larger >> than additional data in each iteration, so we want to cache it in memory. >> >> val rolling = sc.textFile(...).cache() >> rolling.count() >> // rolling window should be in cache >> >> val new = sc.textFile(...) >> val data = rolling.union(new).cache() >> data.count() >> // rolling window + new should be in cache >> >> We can add new data (and cache it) with unioning rolling window RDD with >> new RDD. But how can we forget old data / remove it from cache? >> If it's of any help, we have the data segmented by small intervals so we >> know the file names beforehand. >> >> Thanks, >> Grega >> >> >> > >
