It’s not a new API, it just happens underneath the current one if you have spark.shuffle.spill set to true (which it is by default). Take a look at the config settings that mention “spill” in http://spark.incubator.apache.org/docs/latest/configuration.html.
Matei On Apr 11, 2014, at 7:02 AM, Surendranauth Hiraman <suren.hira...@velos.io> wrote: > Matei, > > Where is the functionality in 0.9 to spill data within a task (separately > from persist)? My apologies if this is something obvious but I don't see it > in the api docs. > > -Suren > > > > On Thu, Apr 10, 2014 at 3:59 PM, Matei Zaharia <matei.zaha...@gmail.com> > wrote: > To add onto the discussion about memory working space, 0.9 introduced the > ability to spill data within a task to disk, and in 1.0 we’re also changing > the interface to allow spilling data within the same *group* to disk (e.g. > when you do groupBy and get a key with lots of values). The main reason these > weren’t there was that for a lot of workloads (everything except the same key > having lots of values), simply launching more reduce tasks was also a good > solution, because it results in an external sort across the cluster similar > to what would happen within a task. > > Overall, expect to see more work to both explain how things execute > (http://spark.incubator.apache.org/docs/latest/tuning.html is one example, > the monitoring UI is another) and try to make things require no configuration > out of the box. We’re doing a lot of this based on user feedback, so that’s > definitely appreciated. > > Matei > > On Apr 10, 2014, at 10:33 AM, Dmitriy Lyubimov <dlie...@gmail.com> wrote: > >> On Thu, Apr 10, 2014 at 9:24 AM, Andrew Ash <and...@andrewash.com> wrote: >> The biggest issue I've come across is that the cluster is somewhat unstable >> when under memory pressure. Meaning that if you attempt to persist an RDD >> that's too big for memory, even with MEMORY_AND_DISK, you'll often still get >> OOMs. I had to carefully modify some of the space tuning parameters and GC >> settings to get some jobs to even finish. >> >> The other issue I've observed is if you group on a key that is highly >> skewed, with a few massively-common keys and a long tail of rare keys, the >> one massive key can be too big for a single machine and again cause OOMs. >> >> My take on it -- Spark doesn't believe in sort-and-spill things to enable >> super long groups, and IMO for a good reason. Here are my thoughts: >> >> (1) in my work i don't need "sort" in 99% of the cases, i only need "group" >> which absolutely doesn't need the spill which makes things slow down to a >> crawl. >> (2) if that's an aggregate (such as group count), use combine(), not >> groupByKey -- this will do tons of good on memory use. >> (3) if you really need groups that don't fit into memory, that is always >> because you want to do something that is other than aggregation, with them. >> E,g build an index of that grouped data. we actually had a case just like >> that. In this case your friend is really not groupBy, but rather >> PartitionBy. I.e. what happens there you build a quick count sketch, perhaps >> on downsampled data, to figure which keys have sufficiently "big" count -- >> and then you build a partitioner that redirects large groups to a dedicated >> map(). assuming this map doesn't try to load things in memory but rather do >> something like streaming BTree build, that should be fine. In certain >> cituations such processing may require splitting super large group even into >> smaller sub groups (e.g. partitioned BTree structure), at which point you >> should be fine even from uniform load point of view. It takes a little of >> jiu-jitsu to do it all, but it is not Spark's fault here, it did not promise >> do this all for you in the groupBy contract. >> >> >> >> I'm hopeful that off-heap caching (Tachyon) could fix some of these issues. >> >> Just my personal experience, but I've observed significant improvements in >> stability since even the 0.7.x days, so I'm confident that things will >> continue to get better as long as people report what they're seeing so it >> can get fixed. >> >> Andrew >> >> >> On Thu, Apr 10, 2014 at 4:08 PM, Alex Boisvert <alex.boisv...@gmail.com> >> wrote: >> I'll provide answers from our own experience at Bizo. We've been using >> Spark for 1+ year now and have found it generally better than previous >> approaches (Hadoop + Hive mostly). >> >> >> >> On Thu, Apr 10, 2014 at 7:11 AM, Andras Nemeth >> <andras.nem...@lynxanalytics.com> wrote: >> I. Is it too much magic? Lots of things "just work right" in Spark and it's >> extremely convenient and efficient when it indeed works. But should we be >> worried that customization is hard if the built in behavior is not quite >> right for us? Are we to expect hard to track down issues originating from >> the black box behind the magic? >> >> I think is goes back to understanding Spark's architecture, its design >> constraints and the problems it explicitly set out to address. If the >> solution to your problems can be easily formulated in terms of the >> map/reduce model, then it's a good choice. You'll want your >> "customizations" to go with (not against) the grain of the architecture. >> >> II. Is it mature enough? E.g. we've created a pull request which fixes a >> problem that we were very surprised no one ever stumbled upon before. So >> that's why I'm asking: is Spark being already used in professional settings? >> Can one already trust it being reasonably bug free and reliable? >> >> There are lots of ways to use Spark; and not all of the features are >> necessarily at the same level of maturity. For instance, we put all the >> jars on the main classpath so we've never run into the issue your pull >> request addresses. >> >> We definitely use and rely on Spark on a professional basis. We have 5+ >> spark jobs running nightly on Amazon's EMR, slicing through GBs of data. >> Once we got them working with the proper configuration settings, they have >> been running reliability since. >> >> I would characterize our use of Spark as a "better Hadoop", in the sense >> that we use it for batch processing only, no streaming yet. We're happy it >> performs better than Hadoop but we don't require/rely on its memory caching >> features. In fact, for most of our jobs it would simplify our lives if >> Spark wouldn't cache so many things in memory since it would make >> configuration/tuning a lot simpler and jobs would run successfully on the >> first try instead of having to tweak things (# of partitions and such). >> >> So, to the concrete issues. Sorry for the long mail, and let me know if I >> should break this out into more threads or if there is some other way to >> have this discussion... >> >> 1. Memory management >> The general direction of these questions is whether it's possible to take >> RDD caching related memory management more into our own hands as LRU >> eviction is nice most of the time but can be very suboptimal in some of our >> use cases. >> A. Somehow prioritize cached RDDs, E.g. mark some "essential" that one >> really wants to keep. I'm fine with going down in flames if I mark too much >> data essential. >> B. Memory "reflection": can you pragmatically get the memory size of a >> cached rdd and memory sizes available in total/per executor? If we could do >> this we could indirectly avoid automatic evictions of things we might really >> want to keep in memory. >> C. Evictions caused by RDD partitions on the driver. I had a setup with huge >> worker memory and smallish memory on the driver JVM. To my surprise, the >> system started to cache RDD partitions on the driver as well. As the driver >> ran out of memory I started to see evictions while there were still plenty >> of space on workers. This resulted in lengthy recomputations. Can this be >> avoided somehow? >> D. Broadcasts. Is it possible to get rid of a broadcast manually, without >> waiting for the LRU eviction taking care of it? Can you tell the size of a >> broadcast programmatically? >> >> >> 2. Akka lost connections >> We have quite often experienced lost executors due to akka exceptions - >> mostly connection lost or similar. It seems to happen when an executor gets >> extremely busy with some CPU intensive work. Our hypothesis is that akka >> network threads get starved and the executor fails to respond within timeout >> limits. Is this plausible? If yes, what can we do with it? >> >> We've seen these as well. In our case, increasing the akka timeouts and >> framesize helped a lot. >> >> e.g. spark.akka.{timeout, askTimeout, lookupTimeout, frameSize} >> >> >> In general, these are scary errors in the sense that they come from the very >> core of the framework and it's hard to link it to something we do in our own >> code, and thus hard to find a fix. So a question more for the community: how >> often do you end up scratching your head about cases where spark >> magic doesn't work perfectly? >> >> For us, this happens most often for jobs processing TBs of data (instead of >> GBs)... which is frustrating of course because these jobs cost a lot more in >> $$$ + time to run/debug/diagnose than smaller jobs. >> >> It means we have to comb the logs to understand what happened, interpret >> stack traces, dump memory / object allocations, read Spark source to >> formulate hypothesis about what went wrong and then trial + error to get to >> a configuration that works. Again, if Spark had better defaults and more >> conservative execution model (rely less on in-memory caching of RDDs and >> associated metadata, keepings large communication buffers on the heap, >> etc.), it would definitely simplify our lives. >> >> (Though I recognize that others might use Spark very differently and that >> these defaults and conservative behavior might not please everybody.) >> >> Hopefully this is the kind of feedback you were looking for... >> >> >> 3. Recalculation of cached rdds >> I see the following scenario happening. I load two RDDs A,B from disk, cache >> them and then do some jobs on them, at the very least a count on each. After >> these jobs are done I see on the storage panel that 100% of these RDDs are >> cached in memory. >> >> Then I create a third RDD C which is created by multiple joins and maps from >> A and B, also cache it and start a job on C. When I do this I still see A >> and B completely cached and also see C slowly getting more and more cached. >> This is all fine and good, but in the meanwhile I see stages running on the >> UI that point to code which is used to load A and B. How is this possible? >> Am I misunderstanding how cached RDDs should behave? >> >> And again the general question - how can one debug such issues? >> >> 4. Shuffle on disk >> Is it true - I couldn't find it in official docs, but did see this mentioned >> in various threads - that shuffle _always_ hits disk? (Disregarding OS >> caches.) Why is this the case? Are you planning to add a function to do >> shuffle in memory or are there some intrinsic reasons for this to be >> impossible? >> >> >> Sorry again for the giant mail, and thanks for any insights! >> >> Andras >> >> >> >> >> > > > > > -- > > SUREN HIRAMAN, VP TECHNOLOGY > Velos > Accelerating Machine Learning > > 440 NINTH AVENUE, 11TH FLOOR > NEW YORK, NY 10001 > O: (917) 525-2466 ext. 105 > F: 646.349.4063 > E: suren.hira...@velos.io > W: www.velos.io >