Re: performance improvement on second operation...without caching?
Ethan, you're not the only one, which is why I was asking about this! :-) Matei, thanks for your response. your answer explains the performance jump in my code, but shows I've missed something key in my understanding of Spark! I was not aware until just now that map output was saved to disk (other than if explicitly told to do use using persist.) It raises almost as many questions as it answers. Where are the shuffle files saved? Locally on the mapper nodes? Is it the same location that disk-spilled cache is saved to? Doesn't the necessity of saving to disk result in increased i/o that would slow the job down? I thought part of the goal of Spark was to do everything in memory unless the user specifically chose to persist...thereby making a choice to incur time/disk space expense up front in return for fast failure recovery? Not that I'm complaining, mind you, but I do think people should be made clearthis not only affects performance, but also, for instance, whether the data is fresh/out of date. I had assumed if I did not set caching, that each time I performed an operation on an RDD, it would re-compute based on lineage, including re-reading the files...so I didn't have to worry about the possibility of my file content changing. But if it's auto-caching shuffle files, my base files won't get re-read even if the content has changed. (Or does it check timestamps?) Thanks, Diana On Mon, May 5, 2014 at 11:07 AM, Ethan Jewett wrote: > Thanks Patrick and Matei for the clarification. I actually have to update > some code now, as I was apparently relying on the fact that the output > files are being re-used. Explains some edge-case behavior that I've seen. > > For me, at least, I read the guide, did some tests on fairly extensive RDD > dependency graphs, saw that tasks earlier in the dependency graphs were not > being regenerated and assumed (very much incorrectly I just found out!) > that it was because the RDDs themselves were being cached. I wonder if > there is a way to explain this distinction concisely in the programming > guide. Or maybe I'm the only one that went down this incorrect learning > path :-) > > Ethan > > > On Sun, May 4, 2014 at 12:05 AM, Matei Zaharia wrote: > >> Yes, this happens as long as you use the same RDD. For example say you do >> the following: >> >> data1 = sc.textFile(…).map(…).reduceByKey(…) >> data1.count() >> data1.filter(…).count() >> >> The first count() causes outputs of the map/reduce pair in there to be >> written out to shuffle files. Next time you do a count, on either this RDD >> or a child (e.g. after the filter), we notice that output files were >> already generated for this shuffle so we don’t rerun the map stage. Note >> that the output does get read again over the network, which is kind of >> wasteful (if you really wanted to reuse this as quickly as possible you’d >> use cache()). >> >> Matei >> >> On May 3, 2014, at 8:44 PM, Koert Kuipers wrote: >> >> Hey Matei, >> Not sure i understand that. These are 2 separate jobs. So the second job >> takes advantage of the fact that there is map output left somewhere on disk >> from the first job, and re-uses that? >> >> >> On Sat, May 3, 2014 at 8:29 PM, Matei Zaharia wrote: >> >>> Hi Diana, >>> >>> Apart from these reasons, in a multi-stage job, Spark saves the map >>> output files from map stages to the filesystem, so it only needs to rerun >>> the last reduce stage. This is why you only saw one stage executing. These >>> files are saved for fault recovery but they speed up subsequent runs. >>> >>> Matei >>> >>> On May 3, 2014, at 5:21 PM, Patrick Wendell wrote: >>> >>> Ethan, >>> >>> What you said is actually not true, Spark won't cache RDD's unless you >>> ask it to. >>> >>> The observation here - that running the same job can speed up >>> substantially even without caching - is common. This is because other >>> components in the stack are performing caching and optimizations. Two that >>> can make a huge difference are: >>> >>> 1. The OS buffer cache. Which will keep recently read disk blocks in >>> memory. >>> 2. The Java just-in-time compiler (JIT) which will use runtime profiling >>> to significantly speed up execution speed. >>> >>> These can make a huge difference if you are running the same job >>> over-and-over. And there are other things like the OS network stack >>> increasing TCP windows and so fourth. These will all improve response time >>> as a spark program executes. >>> >>> >>> On Fri, May 2, 2014 at 9:27 AM, Ethan Jewett wrote: >>> I believe Spark caches RDDs it has memory for regardless of whether you actually call the 'cache' method on the RDD. The 'cache' method just tips off Spark that the RDD should have higher priority. At least, that is my experience and it seems to correspond with your experience and with my recollection of other discussions on this topic on the list. However, going back and looking at the programming guide, this is not
Re: performance improvement on second operation...without caching?
Thanks Patrick and Matei for the clarification. I actually have to update some code now, as I was apparently relying on the fact that the output files are being re-used. Explains some edge-case behavior that I've seen. For me, at least, I read the guide, did some tests on fairly extensive RDD dependency graphs, saw that tasks earlier in the dependency graphs were not being regenerated and assumed (very much incorrectly I just found out!) that it was because the RDDs themselves were being cached. I wonder if there is a way to explain this distinction concisely in the programming guide. Or maybe I'm the only one that went down this incorrect learning path :-) Ethan On Sun, May 4, 2014 at 12:05 AM, Matei Zaharia wrote: > Yes, this happens as long as you use the same RDD. For example say you do > the following: > > data1 = sc.textFile(…).map(…).reduceByKey(…) > data1.count() > data1.filter(…).count() > > The first count() causes outputs of the map/reduce pair in there to be > written out to shuffle files. Next time you do a count, on either this RDD > or a child (e.g. after the filter), we notice that output files were > already generated for this shuffle so we don’t rerun the map stage. Note > that the output does get read again over the network, which is kind of > wasteful (if you really wanted to reuse this as quickly as possible you’d > use cache()). > > Matei > > On May 3, 2014, at 8:44 PM, Koert Kuipers wrote: > > Hey Matei, > Not sure i understand that. These are 2 separate jobs. So the second job > takes advantage of the fact that there is map output left somewhere on disk > from the first job, and re-uses that? > > > On Sat, May 3, 2014 at 8:29 PM, Matei Zaharia wrote: > >> Hi Diana, >> >> Apart from these reasons, in a multi-stage job, Spark saves the map >> output files from map stages to the filesystem, so it only needs to rerun >> the last reduce stage. This is why you only saw one stage executing. These >> files are saved for fault recovery but they speed up subsequent runs. >> >> Matei >> >> On May 3, 2014, at 5:21 PM, Patrick Wendell wrote: >> >> Ethan, >> >> What you said is actually not true, Spark won't cache RDD's unless you >> ask it to. >> >> The observation here - that running the same job can speed up >> substantially even without caching - is common. This is because other >> components in the stack are performing caching and optimizations. Two that >> can make a huge difference are: >> >> 1. The OS buffer cache. Which will keep recently read disk blocks in >> memory. >> 2. The Java just-in-time compiler (JIT) which will use runtime profiling >> to significantly speed up execution speed. >> >> These can make a huge difference if you are running the same job >> over-and-over. And there are other things like the OS network stack >> increasing TCP windows and so fourth. These will all improve response time >> as a spark program executes. >> >> >> On Fri, May 2, 2014 at 9:27 AM, Ethan Jewett wrote: >> >>> I believe Spark caches RDDs it has memory for regardless of whether you >>> actually call the 'cache' method on the RDD. The 'cache' method just tips >>> off Spark that the RDD should have higher priority. At least, that is my >>> experience and it seems to correspond with your experience and with my >>> recollection of other discussions on this topic on the list. However, going >>> back and looking at the programming guide, this is not the way the >>> cache/persist behavior is described. Does the guide need to be updated? >>> >>> >>> On Fri, May 2, 2014 at 9:04 AM, Diana Carroll wrote: >>> I'm just Posty McPostalot this week, sorry folks! :-) Anyway, another question today: I have a bit of code that is pretty time consuming (pasted at the end of the message): It reads in a bunch of XML files, parses them, extracts some data in a map, counts (using reduce), and then sorts. All stages are executed when I do a final operation (take). The first stage is the most expensive: on first run it takes 30s to a minute. I'm not caching anything. When I re-execute that take at the end, I expected it to re-execute all the same stages, and take approximately the same amount of time, but it didn't. The second "take" executes only a single stage which collectively run very fast: the whole operation takes less than 1 second (down from 5 minutes!) While this is awesome (!) I don't understand it. If I'm not caching data, why would I see such a marked performance improvement on subsequent execution? (or is this related to the known .9.1 bug about sortByKey executing an action when it shouldn't?) Thanks, Diana # load XML files containing device activation records. # Find the most common device models activated import xml.etree.ElementTree as ElementTree # Given a partition containing multi-line XML, parse the contents. # Retu
Re: performance improvement on second operation...without caching?
Yes, this happens as long as you use the same RDD. For example say you do the following: data1 = sc.textFile(…).map(…).reduceByKey(…) data1.count() data1.filter(…).count() The first count() causes outputs of the map/reduce pair in there to be written out to shuffle files. Next time you do a count, on either this RDD or a child (e.g. after the filter), we notice that output files were already generated for this shuffle so we don’t rerun the map stage. Note that the output does get read again over the network, which is kind of wasteful (if you really wanted to reuse this as quickly as possible you’d use cache()). Matei On May 3, 2014, at 8:44 PM, Koert Kuipers wrote: > Hey Matei, > Not sure i understand that. These are 2 separate jobs. So the second job > takes advantage of the fact that there is map output left somewhere on disk > from the first job, and re-uses that? > > > On Sat, May 3, 2014 at 8:29 PM, Matei Zaharia wrote: > Hi Diana, > > Apart from these reasons, in a multi-stage job, Spark saves the map output > files from map stages to the filesystem, so it only needs to rerun the last > reduce stage. This is why you only saw one stage executing. These files are > saved for fault recovery but they speed up subsequent runs. > > Matei > > On May 3, 2014, at 5:21 PM, Patrick Wendell wrote: > >> Ethan, >> >> What you said is actually not true, Spark won't cache RDD's unless you ask >> it to. >> >> The observation here - that running the same job can speed up substantially >> even without caching - is common. This is because other components in the >> stack are performing caching and optimizations. Two that can make a huge >> difference are: >> >> 1. The OS buffer cache. Which will keep recently read disk blocks in memory. >> 2. The Java just-in-time compiler (JIT) which will use runtime profiling to >> significantly speed up execution speed. >> >> These can make a huge difference if you are running the same job >> over-and-over. And there are other things like the OS network stack >> increasing TCP windows and so fourth. These will all improve response time >> as a spark program executes. >> >> >> On Fri, May 2, 2014 at 9:27 AM, Ethan Jewett wrote: >> I believe Spark caches RDDs it has memory for regardless of whether you >> actually call the 'cache' method on the RDD. The 'cache' method just tips >> off Spark that the RDD should have higher priority. At least, that is my >> experience and it seems to correspond with your experience and with my >> recollection of other discussions on this topic on the list. However, going >> back and looking at the programming guide, this is not the way the >> cache/persist behavior is described. Does the guide need to be updated? >> >> >> On Fri, May 2, 2014 at 9:04 AM, Diana Carroll wrote: >> I'm just Posty McPostalot this week, sorry folks! :-) >> >> Anyway, another question today: >> I have a bit of code that is pretty time consuming (pasted at the end of the >> message): >> It reads in a bunch of XML files, parses them, extracts some data in a map, >> counts (using reduce), and then sorts. All stages are executed when I do a >> final operation (take). The first stage is the most expensive: on first run >> it takes 30s to a minute. >> >> I'm not caching anything. >> >> When I re-execute that take at the end, I expected it to re-execute all the >> same stages, and take approximately the same amount of time, but it didn't. >> The second "take" executes only a single stage which collectively run very >> fast: the whole operation takes less than 1 second (down from 5 minutes!) >> >> While this is awesome (!) I don't understand it. If I'm not caching data, >> why would I see such a marked performance improvement on subsequent >> execution? >> >> (or is this related to the known .9.1 bug about sortByKey executing an >> action when it shouldn't?) >> >> Thanks, >> Diana >> >> >> # load XML files containing device activation records. >> # Find the most common device models activated >> import xml.etree.ElementTree as ElementTree >> >> # Given a partition containing multi-line XML, parse the contents. >> # Return an iterator of activation Elements contained in the partition >> def getactivations(fileiterator): >> s = '' >> for i in fileiterator: s = s + str(i) >> filetree = ElementTree.fromstring(s) >> return filetree.getiterator('activation') >> >> # Get the model name from a device activation record >> def getmodel(activation): >> return activation.find('model').text >> >> filename="hdfs://localhost/user/training/activations/*.xml" >> >> # parse each partition as a file into an activation XML record >> activations = sc.textFile(filename) >> activationTrees = activations.mapPartitions(lambda xml: getactivations(xml)) >> models = activationTrees.map(lambda activation: getmodel(activation)) >> >> # count and sort activations by model >> topmodels = models.map(lambda model: (mod
Re: performance improvement on second operation...without caching?
Hey Matei, Not sure i understand that. These are 2 separate jobs. So the second job takes advantage of the fact that there is map output left somewhere on disk from the first job, and re-uses that? On Sat, May 3, 2014 at 8:29 PM, Matei Zaharia wrote: > Hi Diana, > > Apart from these reasons, in a multi-stage job, Spark saves the map output > files from map stages to the filesystem, so it only needs to rerun the last > reduce stage. This is why you only saw one stage executing. These files are > saved for fault recovery but they speed up subsequent runs. > > Matei > > On May 3, 2014, at 5:21 PM, Patrick Wendell wrote: > > Ethan, > > What you said is actually not true, Spark won't cache RDD's unless you ask > it to. > > The observation here - that running the same job can speed up > substantially even without caching - is common. This is because other > components in the stack are performing caching and optimizations. Two that > can make a huge difference are: > > 1. The OS buffer cache. Which will keep recently read disk blocks in > memory. > 2. The Java just-in-time compiler (JIT) which will use runtime profiling > to significantly speed up execution speed. > > These can make a huge difference if you are running the same job > over-and-over. And there are other things like the OS network stack > increasing TCP windows and so fourth. These will all improve response time > as a spark program executes. > > > On Fri, May 2, 2014 at 9:27 AM, Ethan Jewett wrote: > >> I believe Spark caches RDDs it has memory for regardless of whether you >> actually call the 'cache' method on the RDD. The 'cache' method just tips >> off Spark that the RDD should have higher priority. At least, that is my >> experience and it seems to correspond with your experience and with my >> recollection of other discussions on this topic on the list. However, going >> back and looking at the programming guide, this is not the way the >> cache/persist behavior is described. Does the guide need to be updated? >> >> >> On Fri, May 2, 2014 at 9:04 AM, Diana Carroll wrote: >> >>> I'm just Posty McPostalot this week, sorry folks! :-) >>> >>> Anyway, another question today: >>> I have a bit of code that is pretty time consuming (pasted at the end of >>> the message): >>> It reads in a bunch of XML files, parses them, extracts some data in a >>> map, counts (using reduce), and then sorts. All stages are executed when >>> I do a final operation (take). The first stage is the most expensive: on >>> first run it takes 30s to a minute. >>> >>> I'm not caching anything. >>> >>> When I re-execute that take at the end, I expected it to re-execute all >>> the same stages, and take approximately the same amount of time, but it >>> didn't. The second "take" executes only a single stage which collectively >>> run very fast: the whole operation takes less than 1 second (down from 5 >>> minutes!) >>> >>> While this is awesome (!) I don't understand it. If I'm not caching >>> data, why would I see such a marked performance improvement on subsequent >>> execution? >>> >>> (or is this related to the known .9.1 bug about sortByKey executing an >>> action when it shouldn't?) >>> >>> Thanks, >>> Diana >>> >>> >>> # load XML files containing device activation records. >>> # Find the most common device models activated >>> import xml.etree.ElementTree as ElementTree >>> >>> # Given a partition containing multi-line XML, parse the contents. >>> # Return an iterator of activation Elements contained in the partition >>> def getactivations(fileiterator): >>> s = '' >>> for i in fileiterator: s = s + str(i) >>> filetree = ElementTree.fromstring(s) >>> return filetree.getiterator('activation') >>> >>> # Get the model name from a device activation record >>> def getmodel(activation): >>> return activation.find('model').text >>> >>> filename="hdfs://localhost/user/training/activations/*.xml" >>> >>> # parse each partition as a file into an activation XML record >>> activations = sc.textFile(filename) >>> activationTrees = activations.mapPartitions(lambda xml: >>> getactivations(xml)) >>> models = activationTrees.map(lambda activation: getmodel(activation)) >>> >>> # count and sort activations by model >>> topmodels = models.map(lambda model: (model,1))\ >>> .reduceByKey(lambda v1,v2: v1+v2)\ >>> .map(lambda (model,count): (count,model))\ >>> .sortByKey(ascending=False) >>> >>> # display the top 10 models >>> for (count,model) in topmodels.take(10): >>> print "Model %s (%s)" % (model,count) >>> >>> # repeat! >>> for (count,model) in topmodels.take(10): >>> print "Model %s (%s)" % (model,count) >>> >>> >> > >
Re: performance improvement on second operation...without caching?
Hi Diana, Apart from these reasons, in a multi-stage job, Spark saves the map output files from map stages to the filesystem, so it only needs to rerun the last reduce stage. This is why you only saw one stage executing. These files are saved for fault recovery but they speed up subsequent runs. Matei On May 3, 2014, at 5:21 PM, Patrick Wendell wrote: > Ethan, > > What you said is actually not true, Spark won't cache RDD's unless you ask it > to. > > The observation here - that running the same job can speed up substantially > even without caching - is common. This is because other components in the > stack are performing caching and optimizations. Two that can make a huge > difference are: > > 1. The OS buffer cache. Which will keep recently read disk blocks in memory. > 2. The Java just-in-time compiler (JIT) which will use runtime profiling to > significantly speed up execution speed. > > These can make a huge difference if you are running the same job > over-and-over. And there are other things like the OS network stack > increasing TCP windows and so fourth. These will all improve response time as > a spark program executes. > > > On Fri, May 2, 2014 at 9:27 AM, Ethan Jewett wrote: > I believe Spark caches RDDs it has memory for regardless of whether you > actually call the 'cache' method on the RDD. The 'cache' method just tips off > Spark that the RDD should have higher priority. At least, that is my > experience and it seems to correspond with your experience and with my > recollection of other discussions on this topic on the list. However, going > back and looking at the programming guide, this is not the way the > cache/persist behavior is described. Does the guide need to be updated? > > > On Fri, May 2, 2014 at 9:04 AM, Diana Carroll wrote: > I'm just Posty McPostalot this week, sorry folks! :-) > > Anyway, another question today: > I have a bit of code that is pretty time consuming (pasted at the end of the > message): > It reads in a bunch of XML files, parses them, extracts some data in a map, > counts (using reduce), and then sorts. All stages are executed when I do a > final operation (take). The first stage is the most expensive: on first run > it takes 30s to a minute. > > I'm not caching anything. > > When I re-execute that take at the end, I expected it to re-execute all the > same stages, and take approximately the same amount of time, but it didn't. > The second "take" executes only a single stage which collectively run very > fast: the whole operation takes less than 1 second (down from 5 minutes!) > > While this is awesome (!) I don't understand it. If I'm not caching data, > why would I see such a marked performance improvement on subsequent execution? > > (or is this related to the known .9.1 bug about sortByKey executing an action > when it shouldn't?) > > Thanks, > Diana > > > # load XML files containing device activation records. > # Find the most common device models activated > import xml.etree.ElementTree as ElementTree > > # Given a partition containing multi-line XML, parse the contents. > # Return an iterator of activation Elements contained in the partition > def getactivations(fileiterator): > s = '' > for i in fileiterator: s = s + str(i) > filetree = ElementTree.fromstring(s) > return filetree.getiterator('activation') > > # Get the model name from a device activation record > def getmodel(activation): > return activation.find('model').text > > filename="hdfs://localhost/user/training/activations/*.xml" > > # parse each partition as a file into an activation XML record > activations = sc.textFile(filename) > activationTrees = activations.mapPartitions(lambda xml: getactivations(xml)) > models = activationTrees.map(lambda activation: getmodel(activation)) > > # count and sort activations by model > topmodels = models.map(lambda model: (model,1))\ > .reduceByKey(lambda v1,v2: v1+v2)\ > .map(lambda (model,count): (count,model))\ > .sortByKey(ascending=False) > > # display the top 10 models > for (count,model) in topmodels.take(10): > print "Model %s (%s)" % (model,count) > > # repeat! > for (count,model) in topmodels.take(10): > print "Model %s (%s)" % (model,count) > > >