I think there is a shuffle stage involved. And the future count job will 
depends on the first job’s shuffle stages’s output data directly as long as it 
is still available. Thus it will be much faster.
Best Regards,
Raymond Liu

From: tomsheep...@gmail.com [mailto:tomsheep...@gmail.com]
Sent: Friday, June 27, 2014 10:08 AM
To: user
Subject: Re: About StorageLevel

Thank u Andrew, that's very helpful.
I still have some doubts on a simple trial: I opened a spark shell in local 
mode,
and typed in

val r=sc.parallelize(0 to 500000)
val r2=r.keyBy(x=>x).groupByKey(10)

and then I invoked the count action several times on it,

r2.count
(multiple times)

The first job obviously takes more time than the latter ones. Is there some 
magic underneath?

Regards,
Kang Liu

From: Andrew Or<mailto:and...@databricks.com>
Date: 2014-06-27 02:25
To: user<mailto:user@spark.apache.org>
Subject: Re: About StorageLevel
Hi Kang,

You raise a good point. Spark does not automatically cache all your RDDs. Why? 
Simply because the application may create many RDDs, and not all of them are to 
be reused. After all, there is only so much memory available to each executor, 
and caching an RDD adds some overhead especially if we have to kick out old 
blocks with LRU. As an example, say you run the following chain:

sc.textFile(...).map(...).filter(...).flatMap(...).map(...).reduceByKey(...).count()

You might be interested in reusing only the final result, but each step of the 
chain actually creates an RDD. If we automatically cache all RDDs, then we'll 
end up doing extra work for the RDDs we don't care about. The effect can be 
much worse if our RDDs are big and there are many of them, in which case there 
may be a lot of churn in the cache as we constantly evict RDDs we reuse. After 
all, the users know best what RDDs they are most interested in, so it makes 
sense to give them control over caching behavior.

Best,
Andrew


2014-06-26 5:36 GMT-07:00 tomsheep...@gmail.com<mailto:tomsheep...@gmail.com> 
<tomsheep...@gmail.com<mailto:tomsheep...@gmail.com>>:
Hi all,

I have a newbie question about StorageLevel of spark. I came up with these 
sentences in spark documents:


If your RDDs fit comfortably with the default storage level (MEMORY_ONLY), 
leave them that way. This is the most CPU-efficient option, allowing operations 
on the RDDs to run as fast as possible.


And


Spark automatically monitors cache usage on each node and drops out old data 
partitions in a least-recently-used (LRU) fashion. If you would like to 
manually remove an RDD instead of waiting for it to fall out of the cache, use 
the RDD.unpersist() method.
http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence

But I found the default storageLevel is NONE in source code, and if I never 
call 'persist(somelevel)', that value will always be NONE. The 'iterator' 
method goes to

final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
    if (storageLevel != StorageLevel.NONE) {
        SparkEnv.get.cacheManager.getOrCompute(this, split, context, 
storageLevel)
    } else {
        computeOrReadCheckpoint(split, context)
    }
}
Is that to say, the rdds are cached in memory (or somewhere else) if and only 
if the 'persist' or 'cache' method is called explicitly,
otherwise they will be re-computed every time even in an iterative situation?
It made me confused becase I had a first impression that spark is super-fast 
because it prefers to store intermediate results in memory automatically.


Forgive me if I asked a stupid question.


Regards,
Kang Liu

Reply via email to