On Thu, Apr 10, 2014 at 9:24 AM, Andrew Ash <and...@andrewash.com> wrote:

> The biggest issue I've come across is that the cluster is somewhat
> unstable when under memory pressure.  Meaning that if you attempt to
> persist an RDD that's too big for memory, even with MEMORY_AND_DISK, you'll
> often still get OOMs.  I had to carefully modify some of the space tuning
> parameters and GC settings to get some jobs to even finish.
>
> The other issue I've observed is if you group on a key that is highly
> skewed, with a few massively-common keys and a long tail of rare keys, the
> one massive key can be too big for a single machine and again cause OOMs.
>

My take on it -- Spark doesn't believe in sort-and-spill things to enable
super long groups, and IMO for a good reason. Here are my thoughts:

(1) in my work i don't need "sort" in 99% of the cases, i only need "group"
which absolutely doesn't need the spill which makes things slow down to a
crawl.
(2) if that's an aggregate (such as group count), use combine(), not
groupByKey -- this will do tons of good on memory use.
(3) if you really need groups that don't fit into memory, that is always
because you want to do something that is other than aggregation, with them.
E,g build an index of that grouped data. we actually had a case just like
that. In this case your friend is really not groupBy, but rather
PartitionBy. I.e. what happens there you build a quick count sketch,
perhaps on downsampled data, to figure which keys have sufficiently "big"
count -- and then you build a partitioner that redirects large groups to a
dedicated map(). assuming this map doesn't try to load things in memory but
rather do something like streaming BTree build, that should be fine. In
certain cituations such processing may require splitting super large group
even into smaller sub groups (e.g. partitioned BTree structure), at which
point you should be fine even from uniform load point of view. It takes a
little of jiu-jitsu to do it all, but it is not Spark's fault here, it did
not promise do this all for you in the groupBy contract.



>
> I'm hopeful that off-heap caching (Tachyon) could fix some of these issues.
>
> Just my personal experience, but I've observed significant improvements in
> stability since even the 0.7.x days, so I'm confident that things will
> continue to get better as long as people report what they're seeing so it
> can get fixed.
>
> Andrew
>
>
> On Thu, Apr 10, 2014 at 4:08 PM, Alex Boisvert <alex.boisv...@gmail.com>wrote:
>
>> I'll provide answers from our own experience at Bizo.  We've been using
>> Spark for 1+ year now and have found it generally better than previous
>> approaches (Hadoop + Hive mostly).
>>
>>
>>
>> On Thu, Apr 10, 2014 at 7:11 AM, Andras Nemeth <
>> andras.nem...@lynxanalytics.com> wrote:
>>
>>> I. Is it too much magic? Lots of things "just work right" in Spark and
>>> it's extremely convenient and efficient when it indeed works. But should we
>>> be worried that customization is hard if the built in behavior is not quite
>>> right for us? Are we to expect hard to track down issues originating from
>>> the black box behind the magic?
>>>
>>
>> I think is goes back to understanding Spark's architecture, its design
>> constraints and the problems it explicitly set out to address.   If the
>> solution to your problems can be easily formulated in terms of the
>> map/reduce model, then it's a good choice.  You'll want your
>> "customizations" to go with (not against) the grain of the architecture.
>>
>>
>>> II. Is it mature enough? E.g. we've created a pull 
>>> request<https://github.com/apache/spark/pull/181>which fixes a problem that 
>>> we were very surprised no one ever stumbled upon
>>> before. So that's why I'm asking: is Spark being already used in
>>> professional settings? Can one already trust it being reasonably bug free
>>> and reliable?
>>>
>>
>> There are lots of ways to use Spark; and not all of the features are
>> necessarily at the same level of maturity.   For instance, we put all the
>> jars on the main classpath so we've never run into the issue your pull
>> request addresses.
>>
>> We definitely use and rely on Spark on a professional basis.  We have 5+
>> spark jobs running nightly on Amazon's EMR, slicing through GBs of data.
>> Once we got them working with the proper configuration settings, they have
>> been running reliability since.
>>
>> I would characterize our use of Spark as a "better Hadoop", in the sense
>> that we use it for batch processing only, no streaming yet.   We're happy
>> it performs better than Hadoop but we don't require/rely on its memory
>> caching features.  In fact, for most of our jobs it would simplify our
>> lives if Spark wouldn't cache so many things in memory since it would make
>> configuration/tuning a lot simpler and jobs would run successfully on the
>> first try instead of having to tweak things (# of partitions and such).
>>
>> So, to the concrete issues. Sorry for the long mail, and let me know if I
>>> should break this out into more threads or if there is some other way to
>>> have this discussion...
>>>
>>> 1. Memory management
>>> The general direction of these questions is whether it's possible to
>>> take RDD caching related memory management more into our own hands as LRU
>>> eviction is nice most of the time but can be very suboptimal in some of our
>>> use cases.
>>> A. Somehow prioritize cached RDDs, E.g. mark some "essential" that one
>>> really wants to keep. I'm fine with going down in flames if I mark too much
>>> data essential.
>>> B. Memory "reflection": can you pragmatically get the memory size of a
>>> cached rdd and memory sizes available in total/per executor? If we could do
>>> this we could indirectly avoid automatic evictions of things we might
>>> really want to keep in memory.
>>> C. Evictions caused by RDD partitions on the driver. I had a setup with
>>> huge worker memory and smallish memory on the driver JVM. To my surprise,
>>> the system started to cache RDD partitions on the driver as well. As the
>>> driver ran out of memory I started to see evictions while there were still
>>> plenty of space on workers. This resulted in lengthy recomputations. Can
>>> this be avoided somehow?
>>> D. Broadcasts. Is it possible to get rid of a broadcast manually,
>>> without waiting for the LRU eviction taking care of it? Can you tell the
>>> size of a broadcast programmatically?
>>>
>>>
>>> 2. Akka lost connections
>>> We have quite often experienced lost executors due to akka exceptions -
>>> mostly connection lost or similar. It seems to happen when an executor gets
>>> extremely busy with some CPU intensive work. Our hypothesis is that akka
>>> network threads get starved and the executor fails to respond within
>>> timeout limits. Is this plausible? If yes, what can we do with it?
>>>
>>
>> We've seen these as well.  In our case, increasing the akka timeouts and
>> framesize helped a lot.
>>
>> e.g. spark.akka.{timeout, askTimeout, lookupTimeout, frameSize}
>>
>>
>>>
>>> In general, these are scary errors in the sense that they come from the
>>> very core of the framework and it's hard to link it to something we do in
>>> our own code, and thus hard to find a fix. So a question more for the
>>> community: how often do you end up scratching your head about cases where
>>> spark
>>>
>> magic doesn't work perfectly?
>>>
>>
>> For us, this happens most often for jobs processing TBs of data (instead
>> of GBs)... which is frustrating of course because these jobs cost a lot
>> more in $$$ + time to run/debug/diagnose than smaller jobs.
>>
>> It means we have to comb the logs to understand what happened, interpret
>> stack traces, dump memory / object allocations, read Spark source to
>> formulate hypothesis about what went wrong and then trial + error to get to
>> a configuration that works.   Again, if Spark had better defaults and more
>> conservative execution model (rely less on in-memory caching of RDDs and
>> associated metadata, keepings large communication buffers on the heap,
>> etc.), it would definitely simplify our lives.
>>
>> (Though I recognize that others might use Spark very differently and that
>> these defaults and conservative behavior might not please everybody.)
>>
>> Hopefully this is the kind of feedback you were looking for...
>>
>>
>>> 3. Recalculation of cached rdds
>>> I see the following scenario happening. I load two RDDs A,B from disk,
>>> cache them and then do some jobs on them, at the very least a count on
>>> each. After these jobs are done I see on the storage panel that 100% of
>>> these RDDs are cached in memory.
>>>
>>> Then I create a third RDD C which is created by multiple joins and maps
>>> from A and B, also cache it and start a job on C. When I do this I still
>>> see A and B completely cached and also see C slowly getting more and more
>>> cached. This is all fine and good, but in the meanwhile I see stages
>>> running on the UI that point to code which is used to load A and B. How is
>>> this possible? Am I misunderstanding how cached RDDs should behave?
>>>
>>> And again the general question - how can one debug such issues?
>>>
>>> 4. Shuffle on disk
>>> Is it true - I couldn't find it in official docs, but did see this
>>> mentioned in various threads - that shuffle _always_ hits disk?
>>> (Disregarding OS caches.) Why is this the case? Are you planning to add a
>>> function to do shuffle in memory or are there some intrinsic reasons for
>>> this to be impossible?
>>>
>>>
>>> Sorry again for the giant mail, and thanks for any insights!
>>>
>>> Andras
>>>
>>>
>>>
>>
>

Reply via email to