Hi Aaron, I've noticed that this is a spark configuration. When I ran the
app I use the default settings, which means buffer is 100KB and compression
is LZO.

Theoretically 4.8GB memory will cost for FastBufferedOutputStream, what I
saw from the Jvisualvm's memory sampler is nearly twice than 4.8G memory
allocated on byte array and hardly be gc-ed. Also when data size increases
these byte arrays' total usage will reach to about 19G. So I'm gussesing
some other place will also allocate byte array like compressed output
stream buffer.

I have dumped the heap at that moment and will keep investigating the
memory consumption. I will also try to reduce the buffer size or turn off
compression on  shuffle output.

Thanks for your advice.
Jerry
在 2013年11月23日 上午6:22,"Aaron Davidson" <ilike...@gmail.com>写道:

> Jerry, I need to correct what I said about the 100KB for
> each FastBufferedOutputStream -- this is actually a Spark buffer, not a
> compression buffer. The size can be configured using the
> "spark.shuffle.file.buffer.kb" System property, and it defaults to 100. I
> am still curious if you're using compression or seeing more than 48k
> DiskBlockObjectWriters to account for the remaining memory used.
>
>
> On Fri, Nov 22, 2013 at 9:05 AM, Aaron Davidson <ilike...@gmail.com>wrote:
>
>> Great, thanks for the feedback. It sounds like you're using the LZF
>> compression scheme -- switching to Snappy should see significantly less
>> buffer space used up per DiskBlockObjectWriter, but this doesn't really
>> solve the underlying problem. In general I've been thinking of "Spark
>> nodes" as having high memory and a moderate number of cores, but with 24
>> cores and 40GB of memory, each core really doesn't get that much memory
>> individually, despite every one needing its own set of
>> DiskBlockObjectWriters.
>>
>> One thing that is a little odd is that with your numbers, you should have
>> 2000 (reducers) * 24 (cores) = 48k DiskBlockObjectWriters. These should
>> only require a total of 4.8GB for the entire node, though, rather than 80%
>> of your JVM memory. Were you seeing significantly more than 48k
>> DiskBlockObjectWriters?
>>
>>
>> On Fri, Nov 22, 2013 at 1:38 AM, Shao, Saisai <saisai.s...@intel.com>wrote:
>>
>>>  Hi Aaron,
>>>
>>>
>>>
>>> I’ve also met the same problem that shuffle takes so much overhead for
>>> large number of partitions. I think it is an important issue when
>>> processing large data.
>>>
>>>
>>>
>>> In my case I have 2000 mapper and 2000 reducers,  I dump the memory of
>>> executor and found that byte array takes about 80% of total jvm memory,
>>>  which are referred by FastBufferedOutputStream, and created by
>>> DiskBlockObjectWriter. It seems that there are so many instances of
>>> DiskBlockObjectWriter and each DiskBlockObjectWriter will has 100KB buffer
>>> for FastBufferedOutputStream by default. These buffers are persisted
>>> through task execution period and cannot be garbage collected unless task
>>> is finished.
>>>
>>>
>>>
>>> My cluster has 6 nodes, and 40G memory and 24 core per node, I tried
>>> with 5000 partitions, this will easily got OOM.
>>>
>>>
>>>
>>> What a dilemma is that my application needs groupByKey transformation
>>> which requires small partitions size, but small partition size will lead to
>>> more partition numbers that also consumes lots of memory.
>>>
>>>
>>>
>>> Thanks
>>>
>>> Jerry
>>>
>>>
>>>
>>> *From:* Aaron Davidson [mailto:ilike...@gmail.com]
>>> *Sent:* Friday, November 22, 2013 2:54 PM
>>> *To:* user@spark.incubator.apache.org
>>> *Subject:* Re: oome from blockmanager
>>>
>>>
>>>
>>> Thanks for your feedback; I think this is a very important issue on the
>>> usability front. One thing to consider is that at some data size, one
>>> simply needs larger or more nodes. m1.large is essentially the smallest ec2
>>> instance size that can run a Spark job of any reasonable size. That's not
>>> an excuse for an OOM, really -- one should generally just see (heavily)
>>> degraded performance instead of actually failing the job. Additionally, the
>>> number of open files scales with the number of reducers in Spark, rather
>>> than, say, Map Reduce, where each mapper only writes to one file, at the
>>> cost of later sorting the entire thing. This unfortunately means that
>>> adding nodes isn't really a full solution in your case, since each one
>>> would try to have 36k compressed output streams open.
>>>
>>>
>>>
>>> The short term solutions have already been discussed: decrease the
>>> number of reducers (and mappers, if you need them to be tied) or
>>> potentially turn off compression if Snappy is holding too much buffer
>>> space. A third option would actually be to decrease the number of executors
>>> per node to 1, since that would double the available memory and roughly
>>> halve the usage. Clearly either of the latter two solutions will produce a
>>> significant slowdown, while the first should keep the same or better
>>> performance. While Spark is good at handling a large number of partitions,
>>> there is still some cost to schedule every task, as well as to store and
>>> forward the metadata for every shuffle block (which grows with R * M), so
>>> the ideal partition size is one that fits exactly into memory without
>>> OOMing -- although this is of course an unrealistic situation to aim for.
>>>
>>>
>>>
>>> The longer term solutions include algorithms which degrade gracefully
>>> instead of OOMing (although this would be a solution for too-large
>>> partitions instead of too-little, where the metadata and buffering becomes
>>> the issue) and to potentially adopt a more Map-Reducey style of shuffling
>>> where we would only need to write to 1 file per executor at a time, with
>>> some significant processing and disk bandwidth cost. I am currently
>>> investigating shuffle file performance, and thanks to your feedback here,
>>> I'll additionally investigate the memory overheads inherent in shuffling as
>>> well.
>>>
>>>
>>>
>>>
>>>
>>> On Thu, Nov 21, 2013 at 10:20 PM, Stephen Haberman <
>>> stephen.haber...@gmail.com> wrote:
>>>
>>>
>>> > More significant in shuffling data is the number of reducers
>>>
>>> Makes sense.
>>>
>>>
>>> > so the lower bound on the number of reducers is 1.1TB/8GB = 138
>>>
>>> This seems slightly optimistic. My math would be: m1.large = 7.5gb
>>> total, leave
>>> 2gb to OS/worker/etc., split 5.5gb between 2 executors = 2.75gb, plus
>>> say Spark
>>> will need 20% or so as metadata/overhead, so ~2gb actually available to
>>> each
>>> executor to put our working data in memory.
>>>
>>> But the 1.1tb of data is compressed, say with a 50% reduction. And we
>>> wrap a
>>> case class around each line to abstract away the parsing logic, and, as
>>> you say,
>>> Java instances will be a good deal bigger than the raw data they
>>> encapsulate.
>>> Maybe 3x bigger? So, 2gb / 2 / 3 = ~300mb of raw data that, once
>>> uncompressed
>>> and loaded as Java objects, would likely fit in RAM.
>>>
>>> 1.1tb/.3gb = 3666 reducers.
>>>
>>> Perhaps I'm being pessmistic, but an 8gb partition size seems way high.
>>> Are
>>> other Spark users really using partitions this large?
>>>
>>> I'll admit our current value of 64mb is probably way low. We had seen a
>>> lot of OOMEs when first using Spark, due to having too many partitions
>>> (one per file loaded from S3). When writing our "auto coalesce" logic,
>>> I didn't know a good partition size to shoot for, but had read that
>>> HDFS used 64mb blocks.
>>>
>>> I thought we'd get the most parity with regular Spark/HDFS users by
>>> using the same value, so that's what we went with. Perhaps this was
>>> a bad assumption?
>>>
>>>
>>> > So a key question for you is, how many reducers did you use in this
>>> > task?
>>>
>>> 18,000. Yes, I know that seems naive.
>>>
>>> As an explanation, we prefer for our reports to not have/require any
>>> manual partitioning hints from the programmer. Our theory is that, once
>>> the data is loaded and we make a good guessimiate about partitioning
>>> (which is handled by a utility library that knows our goal partition
>>> size), the report logic itself just shouldn't care.
>>>
>>> So, in this case, the report is just cogrouping the 18k partition RDD
>>> with
>>> another RDD, and since we don't have spark.default.parallelism set, the
>>> resulting RDD is also 18k partitions.
>>>
>>> To us, this seems like the only safe default behavior; if the map-side
>>> RDD was
>>> correctly partitioned into 18k, and any fewer partitions would (in
>>> theory) risk
>>> OOMEs, then the reduce-side RDD should have the same number of
>>> partitions,
>>> because it will have, for a cogroup, data from multiple RDDs, not just
>>> the
>>> biggest upstream RDD.
>>>
>>> We would like to avoid having the report hard-code partition size
>>> overrides into a few/all of it cogroup calls--how would the report know
>>> what value to hard code? What date range is it currently being ran for?
>>> How much data is really there for this run?
>>>
>>> Also, I'm generally cautious about dropping the number of partitions
>>> too low, because my impression is that Spark excels at/prefers lots of
>>> small tasks, since its architecture allows it to schedule/move/recover
>>> them quickly.
>>>
>>>
>>> > I'll also be very interested so see any heap dumps
>>>
>>> Sure! I followed up with Aaron offlist.
>>>
>>> - Stephen
>>>
>>>
>>>
>>
>>
>

Reply via email to