Yes, this happens as long as you use the same RDD. For example say you do the 
following:

data1 = sc.textFile(…).map(…).reduceByKey(…)
data1.count()
data1.filter(…).count()

The first count() causes outputs of the map/reduce pair in there to be written 
out to shuffle files. Next time you do a count, on either this RDD or a child 
(e.g. after the filter), we notice that output files were already generated for 
this shuffle so we don’t rerun the map stage. Note that the output does get 
read again over the network, which is kind of wasteful (if you really wanted to 
reuse this as quickly as possible you’d use cache()).

Matei

On May 3, 2014, at 8:44 PM, Koert Kuipers <ko...@tresata.com> wrote:

> Hey Matei,
> Not sure i understand that. These are 2 separate jobs. So the second job 
> takes advantage of the fact that there is map output left somewhere on disk 
> from the first job, and re-uses that?
> 
> 
> On Sat, May 3, 2014 at 8:29 PM, Matei Zaharia <matei.zaha...@gmail.com> wrote:
> Hi Diana,
> 
> Apart from these reasons, in a multi-stage job, Spark saves the map output 
> files from map stages to the filesystem, so it only needs to rerun the last 
> reduce stage. This is why you only saw one stage executing. These files are 
> saved for fault recovery but they speed up subsequent runs.
> 
> Matei
> 
> On May 3, 2014, at 5:21 PM, Patrick Wendell <pwend...@gmail.com> wrote:
> 
>> Ethan,
>> 
>> What you said is actually not true, Spark won't cache RDD's unless you ask 
>> it to.
>> 
>> The observation here - that running the same job can speed up substantially 
>> even without caching - is common. This is because other components in the 
>> stack are performing caching and optimizations. Two that can make a huge 
>> difference are:
>> 
>> 1. The OS buffer cache. Which will keep recently read disk blocks in memory.
>> 2. The Java just-in-time compiler (JIT) which will use runtime profiling to 
>> significantly speed up execution speed.
>> 
>> These can make a huge difference if you are running the same job 
>> over-and-over. And there are other things like the OS network stack 
>> increasing TCP windows and so fourth. These will all improve response time 
>> as a spark program executes.
>> 
>> 
>> On Fri, May 2, 2014 at 9:27 AM, Ethan Jewett <esjew...@gmail.com> wrote:
>> I believe Spark caches RDDs it has memory for regardless of whether you 
>> actually call the 'cache' method on the RDD. The 'cache' method just tips 
>> off Spark that the RDD should have higher priority. At least, that is my 
>> experience and it seems to correspond with your experience and with my 
>> recollection of other discussions on this topic on the list. However, going 
>> back and looking at the programming guide, this is not the way the 
>> cache/persist behavior is described. Does the guide need to be updated?
>> 
>> 
>> On Fri, May 2, 2014 at 9:04 AM, Diana Carroll <dcarr...@cloudera.com> wrote:
>> I'm just Posty McPostalot this week, sorry folks! :-)
>> 
>> Anyway, another question today:
>> I have a bit of code that is pretty time consuming (pasted at the end of the 
>> message):
>> It reads in a bunch of XML files, parses them, extracts some data in a map, 
>> counts (using reduce), and then sorts.   All stages are executed when I do a 
>> final operation (take).  The first stage is the most expensive: on first run 
>> it takes 30s to a minute.
>> 
>> I'm not caching anything.
>> 
>> When I re-execute that take at the end, I expected it to re-execute all the 
>> same stages, and take approximately the same amount of time, but it didn't.  
>> The second "take" executes only a single stage which collectively run very 
>> fast: the whole operation takes less than 1 second (down from 5 minutes!)
>> 
>> While this is awesome (!) I don't understand it.  If I'm not caching data, 
>> why would I see such a marked performance improvement on subsequent 
>> execution?
>> 
>> (or is this related to the known .9.1 bug about sortByKey executing an 
>> action when it shouldn't?)
>> 
>> Thanks,
>> Diana
>> <sparkdev_04-23_KEEP_FOR_BUILDS.png>
>> 
>> # load XML files containing device activation records.
>> # Find the most common device models activated
>> import xml.etree.ElementTree as ElementTree
>> 
>> # Given a partition containing multi-line XML, parse the contents. 
>> # Return an iterator of activation Elements contained in the partition
>> def getactivations(fileiterator):
>>     s = ''
>>     for i in fileiterator: s = s + str(i)
>>     filetree = ElementTree.fromstring(s)
>>     return filetree.getiterator('activation')
>> 
>> # Get the model name from a device activation record
>> def getmodel(activation):
>>     return activation.find('model').text 
>> 
>> filename="hdfs://localhost/user/training/activations/*.xml"
>>     
>> # parse each partition as a file into an activation XML record
>> activations = sc.textFile(filename)
>> activationTrees = activations.mapPartitions(lambda xml: getactivations(xml))
>> models = activationTrees.map(lambda activation: getmodel(activation))
>> 
>> # count and sort activations by model
>> topmodels = models.map(lambda model: (model,1))\
>>     .reduceByKey(lambda v1,v2: v1+v2)\
>>     .map(lambda (model,count): (count,model))\
>>     .sortByKey(ascending=False)
>> 
>> # display the top 10 models
>> for (count,model) in topmodels.take(10):
>>     print "Model %s (%s)" % (model,count)
>> 
>> # repeat!
>> for (count,model) in topmodels.take(10):
>>     print "Model %s (%s)" % (model,count)
>> 
>> 
>> 
> 
> 

Reply via email to