> 4. Shuffle on disk
> Is it true - I couldn't find it in official docs, but did see this mentioned
> in various threads - that shuffle _always_ hits disk? (Disregarding OS
> caches.) Why is this the case? Are you planning to add a function to do
> shuffle in memory or are there some intrinsic reasons for this to be
> impossible?
>
> I don't think it's true... as far as I'm concerned Spark doesn't peek into
> the OS and force it to disregard buffer caches. In general, for large
> shuffles, all shuffle files do not fit into memory, so we kind of have to
> write them out to disk. There is an undocumented option to sync writing
> shuffle files to disk every time we write a block, but that is by default
> false and not many people use it (for obvious reasons).

I believe I recently had the experience that for the map portion of
the shuffle all shuffle files seemed to be written into the file
system (albeit potentially on buffer caches).  The size of the shuffle
files on hosts matched the size of the "shuffle write" metric shown in
the UI (pyspark branch-0.9 as of Monday), so there didn't seem to be
any effort to keep the shuffle files in memory.

On Thu, Apr 10, 2014 at 12:43 PM, Andrew Or <and...@databricks.com> wrote:
> Here are answers to a subset of your questions:
>
>> 1. Memory management
>> The general direction of these questions is whether it's possible to take
>> RDD caching related memory management more into our own hands as LRU
>> eviction is nice most of the time but can be very suboptimal in some of our
>> use cases.
>> A. Somehow prioritize cached RDDs, E.g. mark some "essential" that one
>> really wants to keep. I'm fine with going down in flames if I mark too much
>> data
>
> As far as I am aware, there is currently no other eviction policies for RDD
> blocks other than LRU. Your suggestion of prioritizing RDDs is an
> interesting one and I'm sure other users would like that as well.
>
>> B. Memory "reflection": can you pragmatically get the memory size of a
>> cached rdd and memory sizes available in total/per executor? If we could do
>> this we could indirectly avoid automatic evictions of things we might really
>> want to keep in memory.
>
> All this information should be displayed on the UI under the Storage tab.
>
>> C. Evictions caused by RDD partitions on the driver. I had a setup with
>> huge worker memory and smallish memory on the driver JVM. To my surprise,
>> the system started to cache RDD partitions on the driver as well. As the
>> driver ran out of memory I started to see evictions while there were still >
>> plenty of space on workers. This resulted in lengthy recomputations. Can
>> this be avoided somehow?
>
> The amount of space used for RDD storage is only a fraction of the total
> amount of memory available to the JVM. More specifically, it is governed by
> `spark.storage.memoryFraction`, which is by default 60%. This may explain
> why evictions seem to occur pre-maturely sometimes. In the future, we should
> probably add a table that contains information about evicted RDDs on the UI,
> so it's easier to track them. Right now evicted RDD's disappear from the
> face of the planet completely, sometimes leaving the user somewhat
> confounded. Though with off-heap storage (Tachyon) this may become less
> relevant.
>
>> D. Broadcasts. Is it possible to get rid of a broadcast manually, without
>> waiting for the LRU eviction taking care of it? Can you tell the size of a
>> broadcast programmatically?
>
> In Spark 1.0, the mechanism to unpersist blocks used by a broadcast is
> explicitly added! Under the storage tab of the UI, we could probably also
> have a Broadcast table in the future, seeing that there are users interested
> in this feature.
>
>
>> 3. Recalculation of cached rdds
>> I see the following scenario happening. I load two RDDs A,B from disk,
>> cache them and then do some jobs on them, at the very least a count on each.
>> After these jobs are done I see on the storage panel that 100% of these RDDs
>> are cached in memory.
>> Then I create a third RDD C which is created by multiple joins and maps
>> from A and B, also cache it and start a job on C. When I do this I still see
>> A and B completely cached and also see C slowly getting more and more
>> cached. This is all fine and good, but in the meanwhile I see stages running
>> on the UI that point to code which is used to load A and B. How is this
>> possible? Am I misunderstanding how cached RDDs should behave?
>> And again the general question - how can one debug such issues?
>
> From the fractions of RDDs cached in memory, it seems to me that your
> application is running as expected. If you also cache C, then it will slowly
> add more blocks to storage, possibly evicting A and B if there is memory
> pressure. It's entirely possible that there is a bug on finding the call
> site on the stages page (there were a few PRs that made changes to this
> recently).
>
> 4. Shuffle on disk
> Is it true - I couldn't find it in official docs, but did see this mentioned
> in various threads - that shuffle _always_ hits disk? (Disregarding OS
> caches.) Why is this the case? Are you planning to add a function to do
> shuffle in memory or are there some intrinsic reasons for this to be
> impossible?
>
> I don't think it's true... as far as I'm concerned Spark doesn't peek into
> the OS and force it to disregard buffer caches. In general, for large
> shuffles, all shuffle files do not fit into memory, so we kind of have to
> write them out to disk. There is an undocumented option to sync writing
> shuffle files to disk every time we write a block, but that is by default
> false and not many people use it (for obvious reasons).
>
>
>
> On Thu, Apr 10, 2014 at 12:05 PM, Roger Hoover <roger.hoo...@gmail.com>
> wrote:
>>
>> Can anyone comment on their experience running Spark Streaming in
>> production?
>>
>>
>> On Thu, Apr 10, 2014 at 10:33 AM, Dmitriy Lyubimov <dlie...@gmail.com>
>> wrote:
>>>
>>>
>>>
>>>
>>> On Thu, Apr 10, 2014 at 9:24 AM, Andrew Ash <and...@andrewash.com> wrote:
>>>>
>>>> The biggest issue I've come across is that the cluster is somewhat
>>>> unstable when under memory pressure.  Meaning that if you attempt to 
>>>> persist
>>>> an RDD that's too big for memory, even with MEMORY_AND_DISK, you'll often
>>>> still get OOMs.  I had to carefully modify some of the space tuning
>>>> parameters and GC settings to get some jobs to even finish.
>>>>
>>>> The other issue I've observed is if you group on a key that is highly
>>>> skewed, with a few massively-common keys and a long tail of rare keys, the
>>>> one massive key can be too big for a single machine and again cause OOMs.
>>>
>>>
>>> My take on it -- Spark doesn't believe in sort-and-spill things to enable
>>> super long groups, and IMO for a good reason. Here are my thoughts:
>>>
>>> (1) in my work i don't need "sort" in 99% of the cases, i only need
>>> "group" which absolutely doesn't need the spill which makes things slow down
>>> to a crawl.
>>> (2) if that's an aggregate (such as group count), use combine(), not
>>> groupByKey -- this will do tons of good on memory use.
>>> (3) if you really need groups that don't fit into memory, that is always
>>> because you want to do something that is other than aggregation, with them.
>>> E,g build an index of that grouped data. we actually had a case just like
>>> that. In this case your friend is really not groupBy, but rather
>>> PartitionBy. I.e. what happens there you build a quick count sketch, perhaps
>>> on downsampled data, to figure which keys have sufficiently "big" count --
>>> and then you build a partitioner that redirects large groups to a dedicated
>>> map(). assuming this map doesn't try to load things in memory but rather do
>>> something like streaming BTree build, that should be fine. In certain
>>> cituations such processing may require splitting super large group even into
>>> smaller sub groups (e.g. partitioned BTree structure), at which point you
>>> should be fine even from uniform load point of view. It takes a little of
>>> jiu-jitsu to do it all, but it is not Spark's fault here, it did not promise
>>> do this all for you in the groupBy contract.
>>>
>>>
>>>>
>>>>
>>>> I'm hopeful that off-heap caching (Tachyon) could fix some of these
>>>> issues.
>>>>
>>>> Just my personal experience, but I've observed significant improvements
>>>> in stability since even the 0.7.x days, so I'm confident that things will
>>>> continue to get better as long as people report what they're seeing so it
>>>> can get fixed.
>>>>
>>>> Andrew
>>>>
>>>>
>>>> On Thu, Apr 10, 2014 at 4:08 PM, Alex Boisvert <alex.boisv...@gmail.com>
>>>> wrote:
>>>>>
>>>>> I'll provide answers from our own experience at Bizo.  We've been using
>>>>> Spark for 1+ year now and have found it generally better than previous
>>>>> approaches (Hadoop + Hive mostly).
>>>>>
>>>>>
>>>>>
>>>>> On Thu, Apr 10, 2014 at 7:11 AM, Andras Nemeth
>>>>> <andras.nem...@lynxanalytics.com> wrote:
>>>>>>
>>>>>> I. Is it too much magic? Lots of things "just work right" in Spark and
>>>>>> it's extremely convenient and efficient when it indeed works. But should 
>>>>>> we
>>>>>> be worried that customization is hard if the built in behavior is not 
>>>>>> quite
>>>>>> right for us? Are we to expect hard to track down issues originating from
>>>>>> the black box behind the magic?
>>>>>
>>>>>
>>>>> I think is goes back to understanding Spark's architecture, its design
>>>>> constraints and the problems it explicitly set out to address.   If the
>>>>> solution to your problems can be easily formulated in terms of the
>>>>> map/reduce model, then it's a good choice.  You'll want your
>>>>> "customizations" to go with (not against) the grain of the architecture.
>>>>>
>>>>>>
>>>>>> II. Is it mature enough? E.g. we've created a pull request which fixes
>>>>>> a problem that we were very surprised no one ever stumbled upon before. 
>>>>>> So
>>>>>> that's why I'm asking: is Spark being already used in professional 
>>>>>> settings?
>>>>>> Can one already trust it being reasonably bug free and reliable?
>>>>>
>>>>>
>>>>> There are lots of ways to use Spark; and not all of the features are
>>>>> necessarily at the same level of maturity.   For instance, we put all the
>>>>> jars on the main classpath so we've never run into the issue your pull
>>>>> request addresses.
>>>>>
>>>>> We definitely use and rely on Spark on a professional basis.  We have
>>>>> 5+ spark jobs running nightly on Amazon's EMR, slicing through GBs of 
>>>>> data.
>>>>> Once we got them working with the proper configuration settings, they have
>>>>> been running reliability since.
>>>>>
>>>>> I would characterize our use of Spark as a "better Hadoop", in the
>>>>> sense that we use it for batch processing only, no streaming yet.   We're
>>>>> happy it performs better than Hadoop but we don't require/rely on its 
>>>>> memory
>>>>> caching features.  In fact, for most of our jobs it would simplify our 
>>>>> lives
>>>>> if Spark wouldn't cache so many things in memory since it would make
>>>>> configuration/tuning a lot simpler and jobs would run successfully on the
>>>>> first try instead of having to tweak things (# of partitions and such).
>>>>>
>>>>>> So, to the concrete issues. Sorry for the long mail, and let me know
>>>>>> if I should break this out into more threads or if there is some other 
>>>>>> way
>>>>>> to have this discussion...
>>>>>>
>>>>>> 1. Memory management
>>>>>> The general direction of these questions is whether it's possible to
>>>>>> take RDD caching related memory management more into our own hands as LRU
>>>>>> eviction is nice most of the time but can be very suboptimal in some of 
>>>>>> our
>>>>>> use cases.
>>>>>> A. Somehow prioritize cached RDDs, E.g. mark some "essential" that one
>>>>>> really wants to keep. I'm fine with going down in flames if I mark too 
>>>>>> much
>>>>>> data essential.
>>>>>> B. Memory "reflection": can you pragmatically get the memory size of a
>>>>>> cached rdd and memory sizes available in total/per executor? If we could 
>>>>>> do
>>>>>> this we could indirectly avoid automatic evictions of things we might 
>>>>>> really
>>>>>> want to keep in memory.
>>>>>> C. Evictions caused by RDD partitions on the driver. I had a setup
>>>>>> with huge worker memory and smallish memory on the driver JVM. To my
>>>>>> surprise, the system started to cache RDD partitions on the driver as 
>>>>>> well.
>>>>>> As the driver ran out of memory I started to see evictions while there 
>>>>>> were
>>>>>> still plenty of space on workers. This resulted in lengthy 
>>>>>> recomputations.
>>>>>> Can this be avoided somehow?
>>>>>> D. Broadcasts. Is it possible to get rid of a broadcast manually,
>>>>>> without waiting for the LRU eviction taking care of it? Can you tell the
>>>>>> size of a broadcast programmatically?
>>>>>>
>>>>>>
>>>>>> 2. Akka lost connections
>>>>>> We have quite often experienced lost executors due to akka exceptions
>>>>>> - mostly connection lost or similar. It seems to happen when an executor
>>>>>> gets extremely busy with some CPU intensive work. Our hypothesis is that
>>>>>> akka network threads get starved and the executor fails to respond within
>>>>>> timeout limits. Is this plausible? If yes, what can we do with it?
>>>>>
>>>>>
>>>>> We've seen these as well.  In our case, increasing the akka timeouts
>>>>> and framesize helped a lot.
>>>>>
>>>>> e.g. spark.akka.{timeout, askTimeout, lookupTimeout, frameSize}
>>>>>
>>>>>>
>>>>>>
>>>>>> In general, these are scary errors in the sense that they come from
>>>>>> the very core of the framework and it's hard to link it to something we 
>>>>>> do
>>>>>> in our own code, and thus hard to find a fix. So a question more for the
>>>>>> community: how often do you end up scratching your head about cases where
>>>>>> spark
>>>>>>
>>>>>> magic doesn't work perfectly?
>>>>>
>>>>>
>>>>> For us, this happens most often for jobs processing TBs of data
>>>>> (instead of GBs)... which is frustrating of course because these jobs 
>>>>> cost a
>>>>> lot more in $$$ + time to run/debug/diagnose than smaller jobs.
>>>>>
>>>>> It means we have to comb the logs to understand what happened,
>>>>> interpret stack traces, dump memory / object allocations, read Spark 
>>>>> source
>>>>> to formulate hypothesis about what went wrong and then trial + error to 
>>>>> get
>>>>> to a configuration that works.   Again, if Spark had better defaults and
>>>>> more conservative execution model (rely less on in-memory caching of RDDs
>>>>> and associated metadata, keepings large communication buffers on the heap,
>>>>> etc.), it would definitely simplify our lives.
>>>>>
>>>>> (Though I recognize that others might use Spark very differently and
>>>>> that these defaults and conservative behavior might not please everybody.)
>>>>>
>>>>> Hopefully this is the kind of feedback you were looking for...
>>>>>
>>>>>>
>>>>>> 3. Recalculation of cached rdds
>>>>>> I see the following scenario happening. I load two RDDs A,B from disk,
>>>>>> cache them and then do some jobs on them, at the very least a count on 
>>>>>> each.
>>>>>> After these jobs are done I see on the storage panel that 100% of these 
>>>>>> RDDs
>>>>>> are cached in memory.
>>>>>>
>>>>>> Then I create a third RDD C which is created by multiple joins and
>>>>>> maps from A and B, also cache it and start a job on C. When I do this I
>>>>>> still see A and B completely cached and also see C slowly getting more 
>>>>>> and
>>>>>> more cached. This is all fine and good, but in the meanwhile I see stages
>>>>>> running on the UI that point to code which is used to load A and B. How 
>>>>>> is
>>>>>> this possible? Am I misunderstanding how cached RDDs should behave?
>>>>>>
>>>>>> And again the general question - how can one debug such issues?
>>>>>>
>>>>>> 4. Shuffle on disk
>>>>>> Is it true - I couldn't find it in official docs, but did see this
>>>>>> mentioned in various threads - that shuffle _always_ hits disk?
>>>>>> (Disregarding OS caches.) Why is this the case? Are you planning to add a
>>>>>> function to do shuffle in memory or are there some intrinsic reasons for
>>>>>> this to be impossible?
>>>>>>
>>>>>>
>>>>>> Sorry again for the giant mail, and thanks for any insights!
>>>>>>
>>>>>> Andras
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to