Matei,

Where is the functionality in 0.9 to spill data within a task (separately
from persist)? My apologies if this is something obvious but I don't see it
in the api docs.

-Suren



On Thu, Apr 10, 2014 at 3:59 PM, Matei Zaharia <matei.zaha...@gmail.com>wrote:

> To add onto the discussion about memory working space, 0.9 introduced the
> ability to spill data within a task to disk, and in 1.0 we're also changing
> the interface to allow spilling data within the same *group* to disk (e.g.
> when you do groupBy and get a key with lots of values). The main reason
> these weren't there was that for a lot of workloads (everything except the
> same key having lots of values), simply launching more reduce tasks was
> also a good solution, because it results in an external sort across the
> cluster similar to what would happen within a task.
>
> Overall, expect to see more work to both explain how things execute (
> http://spark.incubator.apache.org/docs/latest/tuning.html is one example,
> the monitoring UI is another) and try to make things require no
> configuration out of the box. We're doing a lot of this based on user
> feedback, so that's definitely appreciated.
>
> Matei
>
> On Apr 10, 2014, at 10:33 AM, Dmitriy Lyubimov <dlie...@gmail.com> wrote:
>
> On Thu, Apr 10, 2014 at 9:24 AM, Andrew Ash <and...@andrewash.com> wrote:
>
>> The biggest issue I've come across is that the cluster is somewhat
>> unstable when under memory pressure.  Meaning that if you attempt to
>> persist an RDD that's too big for memory, even with MEMORY_AND_DISK, you'll
>> often still get OOMs.  I had to carefully modify some of the space tuning
>> parameters and GC settings to get some jobs to even finish.
>>
>> The other issue I've observed is if you group on a key that is highly
>> skewed, with a few massively-common keys and a long tail of rare keys, the
>> one massive key can be too big for a single machine and again cause OOMs.
>>
>
> My take on it -- Spark doesn't believe in sort-and-spill things to enable
> super long groups, and IMO for a good reason. Here are my thoughts:
>
> (1) in my work i don't need "sort" in 99% of the cases, i only need
> "group" which absolutely doesn't need the spill which makes things slow
> down to a crawl.
> (2) if that's an aggregate (such as group count), use combine(), not
> groupByKey -- this will do tons of good on memory use.
> (3) if you really need groups that don't fit into memory, that is always
> because you want to do something that is other than aggregation, with them.
> E,g build an index of that grouped data. we actually had a case just like
> that. In this case your friend is really not groupBy, but rather
> PartitionBy. I.e. what happens there you build a quick count sketch,
> perhaps on downsampled data, to figure which keys have sufficiently "big"
> count -- and then you build a partitioner that redirects large groups to a
> dedicated map(). assuming this map doesn't try to load things in memory but
> rather do something like streaming BTree build, that should be fine. In
> certain cituations such processing may require splitting super large group
> even into smaller sub groups (e.g. partitioned BTree structure), at which
> point you should be fine even from uniform load point of view. It takes a
> little of jiu-jitsu to do it all, but it is not Spark's fault here, it did
> not promise do this all for you in the groupBy contract.
>
>
>
>>
>> I'm hopeful that off-heap caching (Tachyon) could fix some of these
>> issues.
>>
>> Just my personal experience, but I've observed significant improvements
>> in stability since even the 0.7.x days, so I'm confident that things will
>> continue to get better as long as people report what they're seeing so it
>> can get fixed.
>>
>> Andrew
>>
>>
>> On Thu, Apr 10, 2014 at 4:08 PM, Alex Boisvert 
>> <alex.boisv...@gmail.com>wrote:
>>
>>> I'll provide answers from our own experience at Bizo.  We've been using
>>> Spark for 1+ year now and have found it generally better than previous
>>> approaches (Hadoop + Hive mostly).
>>>
>>>
>>>
>>> On Thu, Apr 10, 2014 at 7:11 AM, Andras Nemeth <
>>> andras.nem...@lynxanalytics.com> wrote:
>>>
>>>> I. Is it too much magic? Lots of things "just work right" in Spark and
>>>> it's extremely convenient and efficient when it indeed works. But should we
>>>> be worried that customization is hard if the built in behavior is not quite
>>>> right for us? Are we to expect hard to track down issues originating from
>>>> the black box behind the magic?
>>>>
>>>
>>> I think is goes back to understanding Spark's architecture, its design
>>> constraints and the problems it explicitly set out to address.   If the
>>> solution to your problems can be easily formulated in terms of the
>>> map/reduce model, then it's a good choice.  You'll want your
>>> "customizations" to go with (not against) the grain of the architecture.
>>>
>>>
>>>> II. Is it mature enough? E.g. we've created a pull 
>>>> request<https://github.com/apache/spark/pull/181>which fixes a problem 
>>>> that we were very surprised no one ever stumbled upon
>>>> before. So that's why I'm asking: is Spark being already used in
>>>> professional settings? Can one already trust it being reasonably bug free
>>>> and reliable?
>>>>
>>>
>>> There are lots of ways to use Spark; and not all of the features are
>>> necessarily at the same level of maturity.   For instance, we put all the
>>> jars on the main classpath so we've never run into the issue your pull
>>> request addresses.
>>>
>>> We definitely use and rely on Spark on a professional basis.  We have 5+
>>> spark jobs running nightly on Amazon's EMR, slicing through GBs of data.
>>> Once we got them working with the proper configuration settings, they have
>>> been running reliability since.
>>>
>>> I would characterize our use of Spark as a "better Hadoop", in the sense
>>> that we use it for batch processing only, no streaming yet.   We're happy
>>> it performs better than Hadoop but we don't require/rely on its memory
>>> caching features.  In fact, for most of our jobs it would simplify our
>>> lives if Spark wouldn't cache so many things in memory since it would make
>>> configuration/tuning a lot simpler and jobs would run successfully on the
>>> first try instead of having to tweak things (# of partitions and such).
>>>
>>> So, to the concrete issues. Sorry for the long mail, and let me know if
>>>> I should break this out into more threads or if there is some other way to
>>>> have this discussion...
>>>>
>>>> 1. Memory management
>>>> The general direction of these questions is whether it's possible to
>>>> take RDD caching related memory management more into our own hands as LRU
>>>> eviction is nice most of the time but can be very suboptimal in some of our
>>>> use cases.
>>>> A. Somehow prioritize cached RDDs, E.g. mark some "essential" that one
>>>> really wants to keep. I'm fine with going down in flames if I mark too much
>>>> data essential.
>>>> B. Memory "reflection": can you pragmatically get the memory size of a
>>>> cached rdd and memory sizes available in total/per executor? If we could do
>>>> this we could indirectly avoid automatic evictions of things we might
>>>> really want to keep in memory.
>>>> C. Evictions caused by RDD partitions on the driver. I had a setup with
>>>> huge worker memory and smallish memory on the driver JVM. To my surprise,
>>>> the system started to cache RDD partitions on the driver as well. As the
>>>> driver ran out of memory I started to see evictions while there were still
>>>> plenty of space on workers. This resulted in lengthy recomputations. Can
>>>> this be avoided somehow?
>>>> D. Broadcasts. Is it possible to get rid of a broadcast manually,
>>>> without waiting for the LRU eviction taking care of it? Can you tell the
>>>> size of a broadcast programmatically?
>>>>
>>>>
>>>> 2. Akka lost connections
>>>> We have quite often experienced lost executors due to akka exceptions -
>>>> mostly connection lost or similar. It seems to happen when an executor gets
>>>> extremely busy with some CPU intensive work. Our hypothesis is that akka
>>>> network threads get starved and the executor fails to respond within
>>>> timeout limits. Is this plausible? If yes, what can we do with it?
>>>>
>>>
>>> We've seen these as well.  In our case, increasing the akka timeouts and
>>> framesize helped a lot.
>>>
>>> e.g. spark.akka.{timeout, askTimeout, lookupTimeout, frameSize}
>>>
>>>
>>>>
>>>> In general, these are scary errors in the sense that they come from the
>>>> very core of the framework and it's hard to link it to something we do in
>>>> our own code, and thus hard to find a fix. So a question more for the
>>>> community: how often do you end up scratching your head about cases where
>>>> spark
>>>>
>>> magic doesn't work perfectly?
>>>>
>>>
>>> For us, this happens most often for jobs processing TBs of data (instead
>>> of GBs)... which is frustrating of course because these jobs cost a lot
>>> more in $$$ + time to run/debug/diagnose than smaller jobs.
>>>
>>> It means we have to comb the logs to understand what happened, interpret
>>> stack traces, dump memory / object allocations, read Spark source to
>>> formulate hypothesis about what went wrong and then trial + error to get to
>>> a configuration that works.   Again, if Spark had better defaults and more
>>> conservative execution model (rely less on in-memory caching of RDDs and
>>> associated metadata, keepings large communication buffers on the heap,
>>> etc.), it would definitely simplify our lives.
>>>
>>> (Though I recognize that others might use Spark very differently and
>>> that these defaults and conservative behavior might not please everybody.)
>>>
>>> Hopefully this is the kind of feedback you were looking for...
>>>
>>>
>>>> 3. Recalculation of cached rdds
>>>> I see the following scenario happening. I load two RDDs A,B from disk,
>>>> cache them and then do some jobs on them, at the very least a count on
>>>> each. After these jobs are done I see on the storage panel that 100% of
>>>> these RDDs are cached in memory.
>>>>
>>>> Then I create a third RDD C which is created by multiple joins and maps
>>>> from A and B, also cache it and start a job on C. When I do this I still
>>>> see A and B completely cached and also see C slowly getting more and more
>>>> cached. This is all fine and good, but in the meanwhile I see stages
>>>> running on the UI that point to code which is used to load A and B. How is
>>>> this possible? Am I misunderstanding how cached RDDs should behave?
>>>>
>>>> And again the general question - how can one debug such issues?
>>>>
>>>> 4. Shuffle on disk
>>>> Is it true - I couldn't find it in official docs, but did see this
>>>> mentioned in various threads - that shuffle _always_ hits disk?
>>>> (Disregarding OS caches.) Why is this the case? Are you planning to add a
>>>> function to do shuffle in memory or are there some intrinsic reasons for
>>>> this to be impossible?
>>>>
>>>>
>>>> Sorry again for the giant mail, and thanks for any insights!
>>>>
>>>> Andras
>>>>
>>>>
>>>>
>>>
>>
>
>


-- 

SUREN HIRAMAN, VP TECHNOLOGY
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR
NEW YORK, NY 10001
O: (917) 525-2466 ext. 105
F: 646.349.4063
E: suren.hiraman@v <suren.hira...@sociocast.com>elos.io
W: www.velos.io

Reply via email to