GitHub user kellrott opened a pull request:

    https://github.com/apache/spark/pull/50

    Patch for SPARK-942

    This is a port of a pull request original targeted at incubator-spark: 
https://github.com/apache/incubator-spark/pull/180
    
    Essentially if a user returns a generative iterator (from a flatMap 
operation), when trying to persist the data, Spark would first unroll the 
iterator into an ArrayBuffer, and then try to figure out if it could store the 
data. In cases where the user provided an iterator that generated more data 
then available memory, this would case a crash. With this patch, if the user 
requests a persist with a 'StorageLevel.DISK_ONLY', the iterator will be 
unrolled as it is inputed into the serializer.
    
    To do this, two changes where made:
    1) The type of the 'values' argument in the putValues method of the 
BlockStore interface was changed from ArrayBuffer to Iterator (and all code 
interfacing with this method was modified to connect correctly.
    2) The JavaSerializer now calls the ObjectOutputStream 'reset' method every 
1000 objects. This was done because the ObjectOutputStream caches objects (thus 
preventing them from being GC'd) to write more compact serialization. If reset 
is never called, eventually the memory fills up, if it is called too often then 
the serialization streams become much larger because of redundant class 
descriptions.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/kellrott/spark iterator-to-disk

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/50.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #50
    
----
commit efe1102c8a7436b2fe112d3bece9f35fedea0dc8
Author: Kyle Ellrott <kellr...@gmail.com>
Date:   2013-11-13T00:32:54Z

    Changing CacheManager and BlockManager to pass iterators directly to the 
serializer when a 'DISK_ONLY' persist is called.
    This is in response to SPARK-942.

commit cac1fadeec964cfc254ee1f02b82665aac9a5690
Author: Kyle Ellrott <kellr...@gmail.com>
Date:   2013-11-13T21:49:50Z

    Fixing MemoryStore, so that it converts incoming iterators to ArrayBuffer 
objects. This was previously done higher up the stack.

commit d32992fd55726d3aa26530136b9a711856e42bd5
Author: Kyle Ellrott <kellr...@gmail.com>
Date:   2013-11-13T22:10:28Z

    Merge remote-tracking branch 'origin/master' into iterator-to-disk

commit 81d670cb9ad9d2e2635a0eb6ecc74f117554a708
Author: Kyle Ellrott <kellr...@gmail.com>
Date:   2013-11-14T01:20:17Z

    Adding unit test for straight to disk iterator methods.

commit f40382630bceed95b2e56e3f76fbc924fdb9f2c8
Author: Kyle Ellrott <kellr...@gmail.com>
Date:   2013-11-15T06:36:59Z

    Merge branch 'master' into iterator-to-disk

commit 5eb2b7e53d5290fdf71a7addd672c7f4ffbf6ec7
Author: Kyle Ellrott <kellr...@gmail.com>
Date:   2013-11-17T06:19:19Z

    Changing the JavaSerializer reset to occur every 1000 objects.

commit 44ec35a3733a25df6038827f480e8cf6991f9344
Author: Kyle Ellrott <kellr...@gmail.com>
Date:   2013-11-17T06:35:51Z

    Adding some comments.

commit 56f71cd10782b3c65df04ff9b083d9fc4f5b2503
Author: Kyle Ellrott <kellr...@gmail.com>
Date:   2014-02-04T15:49:27Z

    Merge branch 'master' into iterator-to-disk
    
    Conflicts:
        core/src/main/scala/org/apache/spark/CacheManager.scala

commit 95c7f67b131496de51587afa373eee9da1a5d46b
Author: Kyle Ellrott <kellr...@gmail.com>
Date:   2014-02-24T21:35:05Z

    Simplifying StorageLevel checks

commit 0e6f8084fe2e7cfb5129a016fcd65d62e4005031
Author: Kyle Ellrott <kellr...@gmail.com>
Date:   2014-02-24T23:56:09Z

    Deleting temp output directory when done

commit 2eeda75621eb1d60f10d1f4ab805acae75edd7c5
Author: Kyle Ellrott <kellr...@gmail.com>
Date:   2014-02-25T00:20:20Z

    Fixing dumb mistake ("||" instead of "&&")

commit a6424ba6b2551d4366a57cd1d5d32ffe5a4a3fd0
Author: Kyle Ellrott <kellr...@gmail.com>
Date:   2014-02-25T00:24:03Z

    Wrapping long line

commit 9df02765528d57935a9aed8daf754a065f5d0ef5
Author: Kyle Ellrott <kellr...@gmail.com>
Date:   2014-02-25T01:21:48Z

    Added check to make sure that streamed-to-dist RDD actually returns good 
data in the LargeIteratorSuite

commit 31fe08ed356c5fb37a985ea72a10d6e3e165c80b
Author: Kyle Ellrott <kellr...@gmail.com>
Date:   2014-02-25T01:34:27Z

    Removing un-needed semi-colons

commit 40fe1d7cf83ce2fe29a061ec2d6e6e54bd18a6ff
Author: Kyle Ellrott <kellr...@gmail.com>
Date:   2014-02-25T03:34:43Z

    Removing rouge space

commit 00c98e07334dac20085f51977d015cab6e2242bb
Author: Kyle Ellrott <kellr...@gmail.com>
Date:   2014-02-25T06:22:12Z

    Making the Java ObjectStreamSerializer reset rate configurable by the 
system variable 'spark.serializer.objectStreamReset', default is not 10000.

commit 8644ee83ffbc6f02f93abaef8c56906c4683e8db
Author: Kyle Ellrott <kellr...@gmail.com>
Date:   2014-02-25T07:30:42Z

    Merge branch 'master' into iterator-to-disk

commit 656c33e800a0f3c7926dd0857105e12e0cf5fb25
Author: Kyle Ellrott <kellr...@gmail.com>
Date:   2014-02-25T18:58:20Z

    Fixing the JavaSerializer to read from the SparkConf rather then the System 
property.

commit 0f28ec70853a6a5ab198bc113f6af77b78d34d51
Author: Kyle Ellrott <kellr...@gmail.com>
Date:   2014-02-25T19:03:23Z

    Adding second putValues to BlockStore interface that accepts an ArrayBuffer 
(rather then an Iterator).
    This will allow BlockStores to have slightly different behaviors dependent 
on whether they get an
    Iterator or ArrayBuffer. In the case of the MemoryStore, it needs to 
duplicate and cache an Iterator
    into an ArrayBuffer, but if handed a ArrayBuffer, it can skip the 
duplication.

commit 627a8b79d760103674f3c5b108900e911a6a7eeb
Author: Kyle Ellrott <kellr...@gmail.com>
Date:   2014-02-25T21:20:30Z

    Wrapping a few long lines

commit c2fb43056c836ebb520bd076da2b576c32e794cf
Author: Kyle Ellrott <kellr...@gmail.com>
Date:   2014-02-26T01:09:29Z

    Removing more un-needed array-buffer to iterator conversions

commit 16a4ceae706c3458e5a2721f8c27eebbf2cf4c89
Author: Kyle Ellrott <kellr...@gmail.com>
Date:   2014-02-26T19:05:27Z

    Streamlined the LargeIteratorSuite unit test. It should now run in ~25 
seconds. Confirmed that it still crashes an unpatched copy of Spark.

commit 7ccc74b7f7a2c58739cde2e4e83950e07e7fd3eb
Author: Kyle Ellrott <kellr...@gmail.com>
Date:   2014-02-27T21:34:56Z

    Moving the 'LargeIteratorSuite' to simply test persistance of iterators. It 
doesn't try to invoke a OOM error any more

commit f70d06939bb9c164a0a6c9af42f663bc882c3211
Author: Kyle Ellrott <kellr...@gmail.com>
Date:   2014-02-27T21:36:06Z

    Adding docs for spark.serializer.objectStreamReset configuration

commit 2f684ea15053d1ad934d60c58e082c1edf57b3a0
Author: Kyle Ellrott <kellr...@gmail.com>
Date:   2014-02-27T21:36:42Z

    Refactoring the BlockManager to replace the Either[Either[A,B]] usage. Now 
using trait 'Values'. Also modified BlockStore.putBytes call to return 
PutResult, so that it behaves like putValues.

commit 33ac3900b0b11c0646b85e41e1129734adf6ce5c
Author: Kyle Ellrott <kellr...@gmail.com>
Date:   2014-02-27T21:39:50Z

    Merge branch 'iterator-to-disk' of github.com:kellrott/incubator-spark into 
iterator-to-disk
    
    Conflicts:
        core/src/test/scala/org/apache/spark/storage/LargeIteratorSuite.scala

commit 8aa31cdf94981887fcbc5c7db79a4f2d310dcb59
Author: Kyle Ellrott <kellr...@gmail.com>
Date:   2014-02-28T23:25:02Z

    Merge ../incubator-spark into iterator-to-disk

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to