Here are answers to a subset of your questions:

> 1. Memory management
> The general direction of these questions is whether it's possible to take
RDD caching related memory management more into our own hands as LRU
eviction is nice most of the time but can be very suboptimal in some of our
use cases.
> A. Somehow prioritize cached RDDs, E.g. mark some "essential" that one
really wants to keep. I'm fine with going down in flames if I mark too much
data

As far as I am aware, there is currently no other eviction policies for RDD
blocks other than LRU. Your suggestion of prioritizing RDDs is an
interesting one and I'm sure other users would like that as well.

> B. Memory "reflection": can you pragmatically get the memory size of a
cached rdd and memory sizes available in total/per executor? If we could do
this we could indirectly avoid automatic evictions of things we might
really want to keep in memory.

All this information should be displayed on the UI under the Storage tab.

> C. Evictions caused by RDD partitions on the driver. I had a setup with
huge worker memory and smallish memory on the driver JVM. To my surprise,
the system started to cache RDD partitions on the driver as well. As the
driver ran out of memory I started to see evictions while there were still
> plenty of space on workers. This resulted in lengthy recomputations. Can
this be avoided somehow?

The amount of space used for RDD storage is only a fraction of the total
amount of memory available to the JVM. More specifically, it is governed by
`spark.storage.memoryFraction`, which is by default 60%. This may explain
why evictions seem to occur pre-maturely sometimes. In the future, we
should probably add a table that contains information about evicted RDDs on
the UI, so it's easier to track them. Right now evicted RDD's disappear
from the face of the planet completely, sometimes leaving the user somewhat
confounded. Though with off-heap storage (Tachyon) this may become less
relevant.

> D. Broadcasts. Is it possible to get rid of a broadcast manually, without
waiting for the LRU eviction taking care of it? Can you tell the size of a
broadcast programmatically?

In Spark 1.0, the mechanism to unpersist blocks used by a broadcast is
explicitly added! Under the storage tab of the UI, we could probably also
have a Broadcast table in the future, seeing that there are users
interested in this feature.

> 3. Recalculation of cached rdds
> I see the following scenario happening. I load two RDDs A,B from disk,
cache them and then do some jobs on them, at the very least a count on
each. After these jobs are done I see on the storage panel that 100% of
these RDDs are cached in memory.
> Then I create a third RDD C which is created by multiple joins and maps
from A and B, also cache it and start a job on C. When I do this I still
see A and B completely cached and also see C slowly getting more and more
cached. This is all fine and good, but in the meanwhile I see stages
running on the UI that point to code which is used to load A and B. How is
this possible? Am I misunderstanding how cached RDDs should behave?
> And again the general question - how can one debug such issues?

>From the fractions of RDDs cached in memory, it seems to me that your
application is running as expected. If you also cache C, then it will
slowly add more blocks to storage, possibly evicting A and B if there is
memory pressure. It's entirely possible that there is a bug on finding the
call site on the stages page (there were a few PRs that made changes to
this recently).

4. Shuffle on disk
Is it true - I couldn't find it in official docs, but did see this
mentioned in various threads - that shuffle _always_ hits disk?
(Disregarding OS caches.) Why is this the case? Are you planning to add a
function to do shuffle in memory or are there some intrinsic reasons for
this to be impossible?

I don't think it's true... as far as I'm concerned Spark doesn't peek into
the OS and force it to disregard buffer caches. In general, for large
shuffles, all shuffle files do not fit into memory, so we kind of have to
write them out to disk. There is an undocumented option to sync writing
shuffle files to disk every time we write a block, but that is by default
false and not many people use it (for obvious reasons).



On Thu, Apr 10, 2014 at 12:05 PM, Roger Hoover <roger.hoo...@gmail.com>wrote:

> Can anyone comment on their experience running Spark Streaming in
> production?
>
>
> On Thu, Apr 10, 2014 at 10:33 AM, Dmitriy Lyubimov <dlie...@gmail.com>wrote:
>
>>
>>
>>
>> On Thu, Apr 10, 2014 at 9:24 AM, Andrew Ash <and...@andrewash.com> wrote:
>>
>>> The biggest issue I've come across is that the cluster is somewhat
>>> unstable when under memory pressure.  Meaning that if you attempt to
>>> persist an RDD that's too big for memory, even with MEMORY_AND_DISK, you'll
>>> often still get OOMs.  I had to carefully modify some of the space tuning
>>> parameters and GC settings to get some jobs to even finish.
>>>
>>> The other issue I've observed is if you group on a key that is highly
>>> skewed, with a few massively-common keys and a long tail of rare keys, the
>>> one massive key can be too big for a single machine and again cause OOMs.
>>>
>>
>> My take on it -- Spark doesn't believe in sort-and-spill things to enable
>> super long groups, and IMO for a good reason. Here are my thoughts:
>>
>> (1) in my work i don't need "sort" in 99% of the cases, i only need
>> "group" which absolutely doesn't need the spill which makes things slow
>> down to a crawl.
>> (2) if that's an aggregate (such as group count), use combine(), not
>> groupByKey -- this will do tons of good on memory use.
>> (3) if you really need groups that don't fit into memory, that is always
>> because you want to do something that is other than aggregation, with them.
>> E,g build an index of that grouped data. we actually had a case just like
>> that. In this case your friend is really not groupBy, but rather
>> PartitionBy. I.e. what happens there you build a quick count sketch,
>> perhaps on downsampled data, to figure which keys have sufficiently "big"
>> count -- and then you build a partitioner that redirects large groups to a
>> dedicated map(). assuming this map doesn't try to load things in memory but
>> rather do something like streaming BTree build, that should be fine. In
>> certain cituations such processing may require splitting super large group
>> even into smaller sub groups (e.g. partitioned BTree structure), at which
>> point you should be fine even from uniform load point of view. It takes a
>> little of jiu-jitsu to do it all, but it is not Spark's fault here, it did
>> not promise do this all for you in the groupBy contract.
>>
>>
>>
>>>
>>> I'm hopeful that off-heap caching (Tachyon) could fix some of these
>>> issues.
>>>
>>> Just my personal experience, but I've observed significant improvements
>>> in stability since even the 0.7.x days, so I'm confident that things will
>>> continue to get better as long as people report what they're seeing so it
>>> can get fixed.
>>>
>>> Andrew
>>>
>>>
>>> On Thu, Apr 10, 2014 at 4:08 PM, Alex Boisvert 
>>> <alex.boisv...@gmail.com>wrote:
>>>
>>>> I'll provide answers from our own experience at Bizo.  We've been using
>>>> Spark for 1+ year now and have found it generally better than previous
>>>> approaches (Hadoop + Hive mostly).
>>>>
>>>>
>>>>
>>>> On Thu, Apr 10, 2014 at 7:11 AM, Andras Nemeth <
>>>> andras.nem...@lynxanalytics.com> wrote:
>>>>
>>>>> I. Is it too much magic? Lots of things "just work right" in Spark and
>>>>> it's extremely convenient and efficient when it indeed works. But should 
>>>>> we
>>>>> be worried that customization is hard if the built in behavior is not 
>>>>> quite
>>>>> right for us? Are we to expect hard to track down issues originating from
>>>>> the black box behind the magic?
>>>>>
>>>>
>>>> I think is goes back to understanding Spark's architecture, its design
>>>> constraints and the problems it explicitly set out to address.   If the
>>>> solution to your problems can be easily formulated in terms of the
>>>> map/reduce model, then it's a good choice.  You'll want your
>>>> "customizations" to go with (not against) the grain of the architecture.
>>>>
>>>>
>>>>> II. Is it mature enough? E.g. we've created a pull 
>>>>> request<https://github.com/apache/spark/pull/181>which fixes a problem 
>>>>> that we were very surprised no one ever stumbled upon
>>>>> before. So that's why I'm asking: is Spark being already used in
>>>>> professional settings? Can one already trust it being reasonably bug free
>>>>> and reliable?
>>>>>
>>>>
>>>> There are lots of ways to use Spark; and not all of the features are
>>>> necessarily at the same level of maturity.   For instance, we put all the
>>>> jars on the main classpath so we've never run into the issue your pull
>>>> request addresses.
>>>>
>>>> We definitely use and rely on Spark on a professional basis.  We have
>>>> 5+ spark jobs running nightly on Amazon's EMR, slicing through GBs of
>>>> data.   Once we got them working with the proper configuration settings,
>>>> they have been running reliability since.
>>>>
>>>> I would characterize our use of Spark as a "better Hadoop", in the
>>>> sense that we use it for batch processing only, no streaming yet.   We're
>>>> happy it performs better than Hadoop but we don't require/rely on its
>>>> memory caching features.  In fact, for most of our jobs it would simplify
>>>> our lives if Spark wouldn't cache so many things in memory since it would
>>>> make configuration/tuning a lot simpler and jobs would run successfully on
>>>> the first try instead of having to tweak things (# of partitions and such).
>>>>
>>>> So, to the concrete issues. Sorry for the long mail, and let me know if
>>>>> I should break this out into more threads or if there is some other way to
>>>>> have this discussion...
>>>>>
>>>>> 1. Memory management
>>>>> The general direction of these questions is whether it's possible to
>>>>> take RDD caching related memory management more into our own hands as LRU
>>>>> eviction is nice most of the time but can be very suboptimal in some of 
>>>>> our
>>>>> use cases.
>>>>> A. Somehow prioritize cached RDDs, E.g. mark some "essential" that one
>>>>> really wants to keep. I'm fine with going down in flames if I mark too 
>>>>> much
>>>>> data essential.
>>>>> B. Memory "reflection": can you pragmatically get the memory size of a
>>>>> cached rdd and memory sizes available in total/per executor? If we could 
>>>>> do
>>>>> this we could indirectly avoid automatic evictions of things we might
>>>>> really want to keep in memory.
>>>>> C. Evictions caused by RDD partitions on the driver. I had a setup
>>>>> with huge worker memory and smallish memory on the driver JVM. To my
>>>>> surprise, the system started to cache RDD partitions on the driver as 
>>>>> well.
>>>>> As the driver ran out of memory I started to see evictions while there 
>>>>> were
>>>>> still plenty of space on workers. This resulted in lengthy recomputations.
>>>>> Can this be avoided somehow?
>>>>> D. Broadcasts. Is it possible to get rid of a broadcast manually,
>>>>> without waiting for the LRU eviction taking care of it? Can you tell the
>>>>> size of a broadcast programmatically?
>>>>>
>>>>>
>>>>> 2. Akka lost connections
>>>>> We have quite often experienced lost executors due to akka exceptions
>>>>> - mostly connection lost or similar. It seems to happen when an executor
>>>>> gets extremely busy with some CPU intensive work. Our hypothesis is that
>>>>> akka network threads get starved and the executor fails to respond within
>>>>> timeout limits. Is this plausible? If yes, what can we do with it?
>>>>>
>>>>
>>>> We've seen these as well.  In our case, increasing the akka timeouts
>>>> and framesize helped a lot.
>>>>
>>>> e.g. spark.akka.{timeout, askTimeout, lookupTimeout, frameSize}
>>>>
>>>>
>>>>>
>>>>> In general, these are scary errors in the sense that they come from
>>>>> the very core of the framework and it's hard to link it to something we do
>>>>> in our own code, and thus hard to find a fix. So a question more for the
>>>>> community: how often do you end up scratching your head about cases where
>>>>> spark
>>>>>
>>>> magic doesn't work perfectly?
>>>>>
>>>>
>>>> For us, this happens most often for jobs processing TBs of data
>>>> (instead of GBs)... which is frustrating of course because these jobs cost
>>>> a lot more in $$$ + time to run/debug/diagnose than smaller jobs.
>>>>
>>>> It means we have to comb the logs to understand what happened,
>>>> interpret stack traces, dump memory / object allocations, read Spark source
>>>> to formulate hypothesis about what went wrong and then trial + error to get
>>>> to a configuration that works.   Again, if Spark had better defaults and
>>>> more conservative execution model (rely less on in-memory caching of RDDs
>>>> and associated metadata, keepings large communication buffers on the heap,
>>>> etc.), it would definitely simplify our lives.
>>>>
>>>> (Though I recognize that others might use Spark very differently and
>>>> that these defaults and conservative behavior might not please everybody.)
>>>>
>>>> Hopefully this is the kind of feedback you were looking for...
>>>>
>>>>
>>>>> 3. Recalculation of cached rdds
>>>>> I see the following scenario happening. I load two RDDs A,B from disk,
>>>>> cache them and then do some jobs on them, at the very least a count on
>>>>> each. After these jobs are done I see on the storage panel that 100% of
>>>>> these RDDs are cached in memory.
>>>>>
>>>>> Then I create a third RDD C which is created by multiple joins and
>>>>> maps from A and B, also cache it and start a job on C. When I do this I
>>>>> still see A and B completely cached and also see C slowly getting more and
>>>>> more cached. This is all fine and good, but in the meanwhile I see stages
>>>>> running on the UI that point to code which is used to load A and B. How is
>>>>> this possible? Am I misunderstanding how cached RDDs should behave?
>>>>>
>>>>> And again the general question - how can one debug such issues?
>>>>>
>>>>> 4. Shuffle on disk
>>>>> Is it true - I couldn't find it in official docs, but did see this
>>>>> mentioned in various threads - that shuffle _always_ hits disk?
>>>>> (Disregarding OS caches.) Why is this the case? Are you planning to add a
>>>>> function to do shuffle in memory or are there some intrinsic reasons for
>>>>> this to be impossible?
>>>>>
>>>>>
>>>>> Sorry again for the giant mail, and thanks for any insights!
>>>>>
>>>>> Andras
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to