Hey Matei, Not sure i understand that. These are 2 separate jobs. So the second job takes advantage of the fact that there is map output left somewhere on disk from the first job, and re-uses that?
On Sat, May 3, 2014 at 8:29 PM, Matei Zaharia <matei.zaha...@gmail.com>wrote: > Hi Diana, > > Apart from these reasons, in a multi-stage job, Spark saves the map output > files from map stages to the filesystem, so it only needs to rerun the last > reduce stage. This is why you only saw one stage executing. These files are > saved for fault recovery but they speed up subsequent runs. > > Matei > > On May 3, 2014, at 5:21 PM, Patrick Wendell <pwend...@gmail.com> wrote: > > Ethan, > > What you said is actually not true, Spark won't cache RDD's unless you ask > it to. > > The observation here - that running the same job can speed up > substantially even without caching - is common. This is because other > components in the stack are performing caching and optimizations. Two that > can make a huge difference are: > > 1. The OS buffer cache. Which will keep recently read disk blocks in > memory. > 2. The Java just-in-time compiler (JIT) which will use runtime profiling > to significantly speed up execution speed. > > These can make a huge difference if you are running the same job > over-and-over. And there are other things like the OS network stack > increasing TCP windows and so fourth. These will all improve response time > as a spark program executes. > > > On Fri, May 2, 2014 at 9:27 AM, Ethan Jewett <esjew...@gmail.com> wrote: > >> I believe Spark caches RDDs it has memory for regardless of whether you >> actually call the 'cache' method on the RDD. The 'cache' method just tips >> off Spark that the RDD should have higher priority. At least, that is my >> experience and it seems to correspond with your experience and with my >> recollection of other discussions on this topic on the list. However, going >> back and looking at the programming guide, this is not the way the >> cache/persist behavior is described. Does the guide need to be updated? >> >> >> On Fri, May 2, 2014 at 9:04 AM, Diana Carroll <dcarr...@cloudera.com>wrote: >> >>> I'm just Posty McPostalot this week, sorry folks! :-) >>> >>> Anyway, another question today: >>> I have a bit of code that is pretty time consuming (pasted at the end of >>> the message): >>> It reads in a bunch of XML files, parses them, extracts some data in a >>> map, counts (using reduce), and then sorts. All stages are executed when >>> I do a final operation (take). The first stage is the most expensive: on >>> first run it takes 30s to a minute. >>> >>> I'm not caching anything. >>> >>> When I re-execute that take at the end, I expected it to re-execute all >>> the same stages, and take approximately the same amount of time, but it >>> didn't. The second "take" executes only a single stage which collectively >>> run very fast: the whole operation takes less than 1 second (down from 5 >>> minutes!) >>> >>> While this is awesome (!) I don't understand it. If I'm not caching >>> data, why would I see such a marked performance improvement on subsequent >>> execution? >>> >>> (or is this related to the known .9.1 bug about sortByKey executing an >>> action when it shouldn't?) >>> >>> Thanks, >>> Diana >>> <sparkdev_04-23_KEEP_FOR_BUILDS.png> >>> >>> # load XML files containing device activation records. >>> # Find the most common device models activated >>> import xml.etree.ElementTree as ElementTree >>> >>> # Given a partition containing multi-line XML, parse the contents. >>> # Return an iterator of activation Elements contained in the partition >>> def getactivations(fileiterator): >>> s = '' >>> for i in fileiterator: s = s + str(i) >>> filetree = ElementTree.fromstring(s) >>> return filetree.getiterator('activation') >>> >>> # Get the model name from a device activation record >>> def getmodel(activation): >>> return activation.find('model').text >>> >>> filename="hdfs://localhost/user/training/activations/*.xml" >>> >>> # parse each partition as a file into an activation XML record >>> activations = sc.textFile(filename) >>> activationTrees = activations.mapPartitions(lambda xml: >>> getactivations(xml)) >>> models = activationTrees.map(lambda activation: getmodel(activation)) >>> >>> # count and sort activations by model >>> topmodels = models.map(lambda model: (model,1))\ >>> .reduceByKey(lambda v1,v2: v1+v2)\ >>> .map(lambda (model,count): (count,model))\ >>> .sortByKey(ascending=False) >>> >>> # display the top 10 models >>> for (count,model) in topmodels.take(10): >>> print "Model %s (%s)" % (model,count) >>> >>> # repeat! >>> for (count,model) in topmodels.take(10): >>> print "Model %s (%s)" % (model,count) >>> >>> >> > >