It’s not a new API, it just happens underneath the current one if you have 
spark.shuffle.spill set to true (which it is by default). Take a look at the 
config settings that mention “spill” in 
http://spark.incubator.apache.org/docs/latest/configuration.html.

Matei

On Apr 11, 2014, at 7:02 AM, Surendranauth Hiraman <suren.hira...@velos.io> 
wrote:

> Matei,
> 
> Where is the functionality in 0.9 to spill data within a task (separately 
> from persist)? My apologies if this is something obvious but I don't see it 
> in the api docs.
> 
> -Suren
> 
> 
> 
> On Thu, Apr 10, 2014 at 3:59 PM, Matei Zaharia <matei.zaha...@gmail.com> 
> wrote:
> To add onto the discussion about memory working space, 0.9 introduced the 
> ability to spill data within a task to disk, and in 1.0 we’re also changing 
> the interface to allow spilling data within the same *group* to disk (e.g. 
> when you do groupBy and get a key with lots of values). The main reason these 
> weren’t there was that for a lot of workloads (everything except the same key 
> having lots of values), simply launching more reduce tasks was also a good 
> solution, because it results in an external sort across the cluster similar 
> to what would happen within a task.
> 
> Overall, expect to see more work to both explain how things execute 
> (http://spark.incubator.apache.org/docs/latest/tuning.html is one example, 
> the monitoring UI is another) and try to make things require no configuration 
> out of the box. We’re doing a lot of this based on user feedback, so that’s 
> definitely appreciated.
> 
> Matei
> 
> On Apr 10, 2014, at 10:33 AM, Dmitriy Lyubimov <dlie...@gmail.com> wrote:
> 
>> On Thu, Apr 10, 2014 at 9:24 AM, Andrew Ash <and...@andrewash.com> wrote:
>> The biggest issue I've come across is that the cluster is somewhat unstable 
>> when under memory pressure.  Meaning that if you attempt to persist an RDD 
>> that's too big for memory, even with MEMORY_AND_DISK, you'll often still get 
>> OOMs.  I had to carefully modify some of the space tuning parameters and GC 
>> settings to get some jobs to even finish.
>> 
>> The other issue I've observed is if you group on a key that is highly 
>> skewed, with a few massively-common keys and a long tail of rare keys, the 
>> one massive key can be too big for a single machine and again cause OOMs.
>> 
>> My take on it -- Spark doesn't believe in sort-and-spill things to enable 
>> super long groups, and IMO for a good reason. Here are my thoughts:
>> 
>> (1) in my work i don't need "sort" in 99% of the cases, i only need "group" 
>> which absolutely doesn't need the spill which makes things slow down to a 
>> crawl. 
>> (2) if that's an aggregate (such as group count), use combine(), not 
>> groupByKey -- this will do tons of good on memory use.
>> (3) if you really need groups that don't fit into memory, that is always 
>> because you want to do something that is other than aggregation, with them. 
>> E,g build an index of that grouped data. we actually had a case just like 
>> that. In this case your friend is really not groupBy, but rather 
>> PartitionBy. I.e. what happens there you build a quick count sketch, perhaps 
>> on downsampled data, to figure which keys have sufficiently "big" count -- 
>> and then you build a partitioner that redirects large groups to a dedicated 
>> map(). assuming this map doesn't try to load things in memory but rather do 
>> something like streaming BTree build, that should be fine. In certain 
>> cituations such processing may require splitting super large group even into 
>> smaller sub groups (e.g. partitioned BTree structure), at which point you 
>> should be fine even from uniform load point of view. It takes a little of 
>> jiu-jitsu to do it all, but it is not Spark's fault here, it did not promise 
>> do this all for you in the groupBy contract.
>> 
>>  
>> 
>> I'm hopeful that off-heap caching (Tachyon) could fix some of these issues.
>> 
>> Just my personal experience, but I've observed significant improvements in 
>> stability since even the 0.7.x days, so I'm confident that things will 
>> continue to get better as long as people report what they're seeing so it 
>> can get fixed.
>> 
>> Andrew
>> 
>> 
>> On Thu, Apr 10, 2014 at 4:08 PM, Alex Boisvert <alex.boisv...@gmail.com> 
>> wrote:
>> I'll provide answers from our own experience at Bizo.  We've been using 
>> Spark for 1+ year now and have found it generally better than previous 
>> approaches (Hadoop + Hive mostly).
>> 
>> 
>> 
>> On Thu, Apr 10, 2014 at 7:11 AM, Andras Nemeth 
>> <andras.nem...@lynxanalytics.com> wrote:
>> I. Is it too much magic? Lots of things "just work right" in Spark and it's 
>> extremely convenient and efficient when it indeed works. But should we be 
>> worried that customization is hard if the built in behavior is not quite 
>> right for us? Are we to expect hard to track down issues originating from 
>> the black box behind the magic?
>> 
>> I think is goes back to understanding Spark's architecture, its design 
>> constraints and the problems it explicitly set out to address.   If the 
>> solution to your problems can be easily formulated in terms of the 
>> map/reduce model, then it's a good choice.  You'll want your 
>> "customizations" to go with (not against) the grain of the architecture.
>>  
>> II. Is it mature enough? E.g. we've created a pull request which fixes a 
>> problem that we were very surprised no one ever stumbled upon before. So 
>> that's why I'm asking: is Spark being already used in professional settings? 
>> Can one already trust it being reasonably bug free and reliable?
>> 
>> There are lots of ways to use Spark; and not all of the features are 
>> necessarily at the same level of maturity.   For instance, we put all the 
>> jars on the main classpath so we've never run into the issue your pull 
>> request addresses.
>> 
>> We definitely use and rely on Spark on a professional basis.  We have 5+ 
>> spark jobs running nightly on Amazon's EMR, slicing through GBs of data.   
>> Once we got them working with the proper configuration settings, they have 
>> been running reliability since.
>> 
>> I would characterize our use of Spark as a "better Hadoop", in the sense 
>> that we use it for batch processing only, no streaming yet.   We're happy it 
>> performs better than Hadoop but we don't require/rely on its memory caching 
>> features.  In fact, for most of our jobs it would simplify our lives if 
>> Spark wouldn't cache so many things in memory since it would make 
>> configuration/tuning a lot simpler and jobs would run successfully on the 
>> first try instead of having to tweak things (# of partitions and such).
>> 
>> So, to the concrete issues. Sorry for the long mail, and let me know if I 
>> should break this out into more threads or if there is some other way to 
>> have this discussion...
>> 
>> 1. Memory management
>> The general direction of these questions is whether it's possible to take 
>> RDD caching related memory management more into our own hands as LRU 
>> eviction is nice most of the time but can be very suboptimal in some of our 
>> use cases.
>> A. Somehow prioritize cached RDDs, E.g. mark some "essential" that one 
>> really wants to keep. I'm fine with going down in flames if I mark too much 
>> data essential.
>> B. Memory "reflection": can you pragmatically get the memory size of a 
>> cached rdd and memory sizes available in total/per executor? If we could do 
>> this we could indirectly avoid automatic evictions of things we might really 
>> want to keep in memory.
>> C. Evictions caused by RDD partitions on the driver. I had a setup with huge 
>> worker memory and smallish memory on the driver JVM. To my surprise, the 
>> system started to cache RDD partitions on the driver as well. As the driver 
>> ran out of memory I started to see evictions while there were still plenty 
>> of space on workers. This resulted in lengthy recomputations. Can this be 
>> avoided somehow?
>> D. Broadcasts. Is it possible to get rid of a broadcast manually, without 
>> waiting for the LRU eviction taking care of it? Can you tell the size of a 
>> broadcast programmatically?
>> 
>> 
>> 2. Akka lost connections
>> We have quite often experienced lost executors due to akka exceptions - 
>> mostly connection lost or similar. It seems to happen when an executor gets 
>> extremely busy with some CPU intensive work. Our hypothesis is that akka 
>> network threads get starved and the executor fails to respond within timeout 
>> limits. Is this plausible? If yes, what can we do with it?
>> 
>> We've seen these as well.  In our case, increasing the akka timeouts and 
>> framesize helped a lot.
>> 
>> e.g. spark.akka.{timeout, askTimeout, lookupTimeout, frameSize}
>>  
>> 
>> In general, these are scary errors in the sense that they come from the very 
>> core of the framework and it's hard to link it to something we do in our own 
>> code, and thus hard to find a fix. So a question more for the community: how 
>> often do you end up scratching your head about cases where spark 
>> magic doesn't work perfectly?
>> 
>> For us, this happens most often for jobs processing TBs of data (instead of 
>> GBs)... which is frustrating of course because these jobs cost a lot more in 
>> $$$ + time to run/debug/diagnose than smaller jobs.
>> 
>> It means we have to comb the logs to understand what happened, interpret 
>> stack traces, dump memory / object allocations, read Spark source to 
>> formulate hypothesis about what went wrong and then trial + error to get to 
>> a configuration that works.   Again, if Spark had better defaults and more 
>> conservative execution model (rely less on in-memory caching of RDDs and 
>> associated metadata, keepings large communication buffers on the heap, 
>> etc.), it would definitely simplify our lives.  
>> 
>> (Though I recognize that others might use Spark very differently and that 
>> these defaults and conservative behavior might not please everybody.)
>> 
>> Hopefully this is the kind of feedback you were looking for...
>> 
>> 
>> 3. Recalculation of cached rdds
>> I see the following scenario happening. I load two RDDs A,B from disk, cache 
>> them and then do some jobs on them, at the very least a count on each. After 
>> these jobs are done I see on the storage panel that 100% of these RDDs are 
>> cached in memory.
>> 
>> Then I create a third RDD C which is created by multiple joins and maps from 
>> A and B, also cache it and start a job on C. When I do this I still see A 
>> and B completely cached and also see C slowly getting more and more cached. 
>> This is all fine and good, but in the meanwhile I see stages running on the 
>> UI that point to code which is used to load A and B. How is this possible? 
>> Am I misunderstanding how cached RDDs should behave?
>> 
>> And again the general question - how can one debug such issues?
>> 
>> 4. Shuffle on disk
>> Is it true - I couldn't find it in official docs, but did see this mentioned 
>> in various threads - that shuffle _always_ hits disk? (Disregarding OS 
>> caches.) Why is this the case? Are you planning to add a function to do 
>> shuffle in memory or are there some intrinsic reasons for this to be 
>> impossible?
>> 
>> 
>> Sorry again for the giant mail, and thanks for any insights!
>> 
>> Andras
>> 
>> 
>> 
>> 
>> 
> 
> 
> 
> 
> -- 
>                                                             
> SUREN HIRAMAN, VP TECHNOLOGY
> Velos
> Accelerating Machine Learning
> 
> 440 NINTH AVENUE, 11TH FLOOR
> NEW YORK, NY 10001
> O: (917) 525-2466 ext. 105
> F: 646.349.4063
> E: suren.hira...@velos.io
> W: www.velos.io
> 

Reply via email to