Hi I think it is bad user experience to throw OOM exception when user only persist the RDD with DISK_ONLY or MEMORY_ADN_DISK.
As Kyle mentioned below, Key point is that CacheManager has unrolled the total Iterator into ArrayBuffer without free memory check, we should estimate size of unrolled iterator object and check if it is beyond current free memory size. We could separate into three scenarios 1. For MEMORY_ONLY, I think it is normal case to throw OOM exception and need user to adjust its application 2. For MEMORY_AND_DISK, we should check if free memory could hold unrolled Arraybuffer, if yes, then it will go with usual path, if no, we will degrade it to DISK_ONLY 3. For DIS_ONLY, I think that we need not to unroll total iterator into ArrayBuffer, because we could write this iterator one by one to disk. So this issue is how to judge if free memory size could hold size of unrolled iterator before it become Arraybuffer. Is there any solution for this case? Could we just unroll first 10% of total iterator into ArrayBuffer, and estimate this size, and total size is equal to 10* size of 10%? apparently it is not perfect. -----Original Message----- From: Kyle Ellrott [mailto:[email protected]] Sent: Thursday, November 07, 2013 2:59 AM To: [email protected] Subject: Re: SPARK-942 I think the usage has to be calculated as the iterator is being put into the arraybuffer. Right now, the BlockManager, in it's put method when it gets an iterator named 'values' uses the simple stanza of: def put(blockId: BlockId, values: Iterator[Any], level: StorageLevel, tellMaster: Boolean) : Long = { val elements = new ArrayBuffer[Any] elements ++= values put(blockId, elements, level, tellMaster) } Completely unrolling the iterator in a single line. Above it, the CacheManager does the exact same thing with: val elements = new ArrayBuffer[Any] elements ++= computedValues blockManager.put(key, elements, storageLevel, tellMaster = true) We would probably have to implement some sort of 'IteratorBuffer' class, which would wrap an iterator. It would include a method to unroll an iterator into a buffer up to a point, something like def unroll(maxMem:Long) : Boolean ={ ...} And it would return True if the maxMem was hit. At which point BlockManager could read through the already cached values, then continue on through the rest of the iterators dumping all the values to file. If it unrolled without hitting maxMem (which would probably be most of the time), the class would simply wrap the ArrayBuffer of cached values. Kyle On Sun, Nov 3, 2013 at 12:50 AM, Reynold Xin <[email protected]> wrote: > It's not a very elegant solution, but one possibility is for the > CacheManager to check whether it will have enough space. If it is > running out of space, skips buffering the output of the iterator & > directly write the output of the iterator to disk (if storage level allows > that). > > But it is still tricky to know whether we will run out of space before > we even start running the iterator. One possibility is to use sizing > data from previous partitions to estimate the size of the current partition > (i.e. > estimated in memory size = avg of current in-memory size / current > input size). > > Do you have any ideas on this one, Kyle? > > > On Sat, Oct 26, 2013 at 10:53 AM, Kyle Ellrott <[email protected] > >wrote: > > > I was wondering if anybody had any thoughts on the best way to > > tackle > > SPARK-942 ( https://spark-project.atlassian.net/browse/SPARK-942 ). > > Basically, Spark takes an iterator from a flatmap call and because I > > tell it that it needs to persist Spark proceeds to push it all into > > an array before deciding that it doesn't have enough memory and > > trying to > serialize > > it to disk, and somewhere along the line it runs out of memory. For > > my particular operation, the function return an iterator that reads > > data out of a file, and the size of the files passed to that > > function can vary greatly (from a few kilobytes to a few gigabytes). > > The funny thing is > that > > if I do a strait 'map' operation after the flat map, everything > > works, because Spark just passes the iterator forward and never > > tries to expand the whole thing into memory. But I need do a > > reduceByKey across all the records, so I'd like to persist to disk > > first, and that is where I hit > this > > snag. > > I've already setup a unit test to replicate the problem, and I know > > the area of the code that would need to be fixed. > > I'm just hoping for some tips on the best way to fix the problem. > > > > Kyle > > >
