GitHub user kellrott opened a pull request: https://github.com/apache/spark/pull/50
Patch for SPARK-942 This is a port of a pull request original targeted at incubator-spark: https://github.com/apache/incubator-spark/pull/180 Essentially if a user returns a generative iterator (from a flatMap operation), when trying to persist the data, Spark would first unroll the iterator into an ArrayBuffer, and then try to figure out if it could store the data. In cases where the user provided an iterator that generated more data then available memory, this would case a crash. With this patch, if the user requests a persist with a 'StorageLevel.DISK_ONLY', the iterator will be unrolled as it is inputed into the serializer. To do this, two changes where made: 1) The type of the 'values' argument in the putValues method of the BlockStore interface was changed from ArrayBuffer to Iterator (and all code interfacing with this method was modified to connect correctly. 2) The JavaSerializer now calls the ObjectOutputStream 'reset' method every 1000 objects. This was done because the ObjectOutputStream caches objects (thus preventing them from being GC'd) to write more compact serialization. If reset is never called, eventually the memory fills up, if it is called too often then the serialization streams become much larger because of redundant class descriptions. You can merge this pull request into a Git repository by running: $ git pull https://github.com/kellrott/spark iterator-to-disk Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/50.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #50 ---- commit efe1102c8a7436b2fe112d3bece9f35fedea0dc8 Author: Kyle Ellrott <kellr...@gmail.com> Date: 2013-11-13T00:32:54Z Changing CacheManager and BlockManager to pass iterators directly to the serializer when a 'DISK_ONLY' persist is called. This is in response to SPARK-942. commit cac1fadeec964cfc254ee1f02b82665aac9a5690 Author: Kyle Ellrott <kellr...@gmail.com> Date: 2013-11-13T21:49:50Z Fixing MemoryStore, so that it converts incoming iterators to ArrayBuffer objects. This was previously done higher up the stack. commit d32992fd55726d3aa26530136b9a711856e42bd5 Author: Kyle Ellrott <kellr...@gmail.com> Date: 2013-11-13T22:10:28Z Merge remote-tracking branch 'origin/master' into iterator-to-disk commit 81d670cb9ad9d2e2635a0eb6ecc74f117554a708 Author: Kyle Ellrott <kellr...@gmail.com> Date: 2013-11-14T01:20:17Z Adding unit test for straight to disk iterator methods. commit f40382630bceed95b2e56e3f76fbc924fdb9f2c8 Author: Kyle Ellrott <kellr...@gmail.com> Date: 2013-11-15T06:36:59Z Merge branch 'master' into iterator-to-disk commit 5eb2b7e53d5290fdf71a7addd672c7f4ffbf6ec7 Author: Kyle Ellrott <kellr...@gmail.com> Date: 2013-11-17T06:19:19Z Changing the JavaSerializer reset to occur every 1000 objects. commit 44ec35a3733a25df6038827f480e8cf6991f9344 Author: Kyle Ellrott <kellr...@gmail.com> Date: 2013-11-17T06:35:51Z Adding some comments. commit 56f71cd10782b3c65df04ff9b083d9fc4f5b2503 Author: Kyle Ellrott <kellr...@gmail.com> Date: 2014-02-04T15:49:27Z Merge branch 'master' into iterator-to-disk Conflicts: core/src/main/scala/org/apache/spark/CacheManager.scala commit 95c7f67b131496de51587afa373eee9da1a5d46b Author: Kyle Ellrott <kellr...@gmail.com> Date: 2014-02-24T21:35:05Z Simplifying StorageLevel checks commit 0e6f8084fe2e7cfb5129a016fcd65d62e4005031 Author: Kyle Ellrott <kellr...@gmail.com> Date: 2014-02-24T23:56:09Z Deleting temp output directory when done commit 2eeda75621eb1d60f10d1f4ab805acae75edd7c5 Author: Kyle Ellrott <kellr...@gmail.com> Date: 2014-02-25T00:20:20Z Fixing dumb mistake ("||" instead of "&&") commit a6424ba6b2551d4366a57cd1d5d32ffe5a4a3fd0 Author: Kyle Ellrott <kellr...@gmail.com> Date: 2014-02-25T00:24:03Z Wrapping long line commit 9df02765528d57935a9aed8daf754a065f5d0ef5 Author: Kyle Ellrott <kellr...@gmail.com> Date: 2014-02-25T01:21:48Z Added check to make sure that streamed-to-dist RDD actually returns good data in the LargeIteratorSuite commit 31fe08ed356c5fb37a985ea72a10d6e3e165c80b Author: Kyle Ellrott <kellr...@gmail.com> Date: 2014-02-25T01:34:27Z Removing un-needed semi-colons commit 40fe1d7cf83ce2fe29a061ec2d6e6e54bd18a6ff Author: Kyle Ellrott <kellr...@gmail.com> Date: 2014-02-25T03:34:43Z Removing rouge space commit 00c98e07334dac20085f51977d015cab6e2242bb Author: Kyle Ellrott <kellr...@gmail.com> Date: 2014-02-25T06:22:12Z Making the Java ObjectStreamSerializer reset rate configurable by the system variable 'spark.serializer.objectStreamReset', default is not 10000. commit 8644ee83ffbc6f02f93abaef8c56906c4683e8db Author: Kyle Ellrott <kellr...@gmail.com> Date: 2014-02-25T07:30:42Z Merge branch 'master' into iterator-to-disk commit 656c33e800a0f3c7926dd0857105e12e0cf5fb25 Author: Kyle Ellrott <kellr...@gmail.com> Date: 2014-02-25T18:58:20Z Fixing the JavaSerializer to read from the SparkConf rather then the System property. commit 0f28ec70853a6a5ab198bc113f6af77b78d34d51 Author: Kyle Ellrott <kellr...@gmail.com> Date: 2014-02-25T19:03:23Z Adding second putValues to BlockStore interface that accepts an ArrayBuffer (rather then an Iterator). This will allow BlockStores to have slightly different behaviors dependent on whether they get an Iterator or ArrayBuffer. In the case of the MemoryStore, it needs to duplicate and cache an Iterator into an ArrayBuffer, but if handed a ArrayBuffer, it can skip the duplication. commit 627a8b79d760103674f3c5b108900e911a6a7eeb Author: Kyle Ellrott <kellr...@gmail.com> Date: 2014-02-25T21:20:30Z Wrapping a few long lines commit c2fb43056c836ebb520bd076da2b576c32e794cf Author: Kyle Ellrott <kellr...@gmail.com> Date: 2014-02-26T01:09:29Z Removing more un-needed array-buffer to iterator conversions commit 16a4ceae706c3458e5a2721f8c27eebbf2cf4c89 Author: Kyle Ellrott <kellr...@gmail.com> Date: 2014-02-26T19:05:27Z Streamlined the LargeIteratorSuite unit test. It should now run in ~25 seconds. Confirmed that it still crashes an unpatched copy of Spark. commit 7ccc74b7f7a2c58739cde2e4e83950e07e7fd3eb Author: Kyle Ellrott <kellr...@gmail.com> Date: 2014-02-27T21:34:56Z Moving the 'LargeIteratorSuite' to simply test persistance of iterators. It doesn't try to invoke a OOM error any more commit f70d06939bb9c164a0a6c9af42f663bc882c3211 Author: Kyle Ellrott <kellr...@gmail.com> Date: 2014-02-27T21:36:06Z Adding docs for spark.serializer.objectStreamReset configuration commit 2f684ea15053d1ad934d60c58e082c1edf57b3a0 Author: Kyle Ellrott <kellr...@gmail.com> Date: 2014-02-27T21:36:42Z Refactoring the BlockManager to replace the Either[Either[A,B]] usage. Now using trait 'Values'. Also modified BlockStore.putBytes call to return PutResult, so that it behaves like putValues. commit 33ac3900b0b11c0646b85e41e1129734adf6ce5c Author: Kyle Ellrott <kellr...@gmail.com> Date: 2014-02-27T21:39:50Z Merge branch 'iterator-to-disk' of github.com:kellrott/incubator-spark into iterator-to-disk Conflicts: core/src/test/scala/org/apache/spark/storage/LargeIteratorSuite.scala commit 8aa31cdf94981887fcbc5c7db79a4f2d310dcb59 Author: Kyle Ellrott <kellr...@gmail.com> Date: 2014-02-28T23:25:02Z Merge ../incubator-spark into iterator-to-disk ---- --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---