I've run into this issue. The goal of caching / persist seems to be to
avoid recomputing an RDD when its data will be needed multiple times.
However, once the following RDDs are computed the cache is no longer
needed. The currently design provides no obvious way to detect when the
cache is no longer needed so it can be discarded.

In the case of cache in memory, it may be handled by partitions being
dropped (in LRU order) when memory fills up. I need to do some more
experimentation to see if this really works well, or if allowing memory to
fill up causes performance issues or possibly OOM errors if data isn't
correctly freed.

In the case of persisting to disk, I'm not sure if there's a way to limit
the disk space used for caching. Does anyone know if there is such a
configuration option? This is a pressing issue for me - I have had jobs
fail because nodes ran out of disk space.


On Wed, Jun 11, 2014 at 2:26 AM, Nick Pentreath <nick.pentre...@gmail.com>
wrote:

> If you want to force materialization use .count()
>
> Also if you can simply don't unpersist anything, unless you really need to
> free the memory
> —
> Sent from Mailbox <https://www.dropbox.com/mailbox>
>
>
> On Wed, Jun 11, 2014 at 5:13 AM, innowireless TaeYun Kim <
> taeyun....@innowireless.co.kr> wrote:
>
>> BTW, it is possible that rdd.first() does not compute the whole
>> partitions.
>> So, first() cannot be uses for the situation below.
>>
>> -----Original Message-----
>> From: innowireless TaeYun Kim [mailto:taeyun....@innowireless.co.kr]
>> Sent: Wednesday, June 11, 2014 11:40 AM
>> To: user@spark.apache.org
>> Subject: Question about RDD cache, unpersist, materialization
>>
>> Hi,
>>
>> What I (seems to) know about RDD persisting API is as follows:
>> - cache() and persist() is not an action. It only does a marking.
>> - unpersist() is also not an action. It only removes a marking. But if
>> the
>> rdd is already in memory, it is unloaded.
>>
>> And there seems no API to forcefully materialize the RDD without
>> requiring a
>> data by an action method, for example first().
>>
>> So, I am faced with the following scenario.
>>
>> {
>> JavaRDD<T> rddUnion = sc.parallelize(new ArrayList<T>()); // create
>> empty for merging
>> for (int i = 0; i < 10; i++)
>> {
>> JavaRDD<T2> rdd = sc.textFile(inputFileNames[i]);
>> rdd.cache(); // Since it will be used twice, cache.
>> rdd.map(...).filter(...).saveAsTextFile(outputFileNames[i]); //
>> Transform and save, rdd materializes
>> rddUnion = rddUnion.union(rdd.map(...).filter(...)); // Do another
>> transform to T and merge by union
>> rdd.unpersist(); // Now it seems not needed. (But needed actually)
>> }
>> // Here, rddUnion actually materializes, and needs all 10 rdds that
>> already unpersisted.
>> // So, rebuilding all 10 rdds will occur.
>> rddUnion.saveAsTextFile(mergedFileName);
>> }
>>
>> If rddUnion can be materialized before the rdd.unpersist() line and
>> cache()d, the rdds in the loop will not be needed on
>> rddUnion.saveAsTextFile().
>>
>> Now what is the best strategy?
>> - Do not unpersist all 10 rdds in the loop.
>> - Materialize rddUnion in the loop by calling 'light' action API, like
>> first().
>> - Give up and just rebuild/reload all 10 rdds when saving rddUnion.
>>
>> Is there some misunderstanding?
>>
>> Thanks.
>>
>>
>>
>


-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegm...@velos.io W: www.velos.io

Reply via email to