GitHub user holdenk opened a pull request: https://github.com/apache/spark/pull/10841
[SPARK-12469][CORE][RFC/WIP] Add Consistent Accumulators for Spark This is an initial PR illustrating one of the possible approaches for providing consistent accumulators in Spark. Tasks executed on Spark workers are unable to modify values from the driver, and accumulators are the one exception for this. Accumulators in Spark are implemented in such a way that when a stage is recomputed (say for cache eviction) the accumulator will be updated a second time. This makes accumulators inside of transformations more difficult to use for things like counting invalid records (one of the primary potential use cases of collecting side information during a transformation). However in some cases this counting during re-evaluation is exactly the behaviour we want (say in tracking total execution time for a particular function). Spark would benefit from a version of accumulators which did not double count even if stages were re-executed. Motivating example: ``` val parseTime = sc.accumulator(0L) val parseFailures = sc.accumulator(0L) val parsedData = sc.textFile(...).flatMap { line => val start = System.currentTimeMillis() val parsed = Try(parse(line)) if (parsed.isFailure) parseFailures += 1 parseTime += System.currentTimeMillis() - start parsed.toOption } parsedData.cache() val resultA = parsedData.map(...).filter(...).count() // some intervening code. Almost anything could happen here -- some of parsedData may // get kicked out of the cache, or an executor where data was cached might get lost val resultB = parsedData.filter(...).map(...).flatMap(...).count() // now we look at the accumulators ``` Here we would want parseFailures to only have been added to once for every line which failed to parse. Unfortunately, the current Spark accumulator API doesnât support the current parseFailures use case since if some data had been evicted its possible that it will be double counted. See the full design document at https://docs.google.com/document/d/1lR_l1g3zMVctZXrcVjFusq2iQVpr4XvRK_UUDsDr6nk/edit?usp=sharing cc @squito You can merge this pull request into a Git repository by running: $ git pull https://github.com/holdenk/spark SPARK-12469-consistent-accumulators-for-spark Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/10841.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #10841 ---- commit 7c9f874d94085f60089cda8ab3b6c451bf793a1a Author: Holden Karau <hol...@us.ibm.com> Date: 2015-12-28T19:59:30Z Start adding the proposed external API for consistent accumulators commit fce92981371ddd7af8ad28f30537fb473bbee378 Author: Holden Karau <hol...@us.ibm.com> Date: 2015-12-28T19:59:47Z Start keeping track of the required information in the taskcontext commit c0e2bfd9d3d5f22e9ea217a500b67e84679f519d Author: Holden Karau <hol...@us.ibm.com> Date: 2015-12-30T00:29:53Z A bit more progress towards consistent accumulators, add an add function which checks the bitset, throw an exception on merges (perhaps this isn't reasonable and then we need to keep all of the previous values - but a quick skim looks like its intended to be a user accesiable function so just don't support). commit e9c287f682cbefc531a8fc84b00a472f860c9d06 Author: Holden Karau <hol...@us.ibm.com> Date: 2015-12-30T01:51:53Z Waaait that was not thinking clearly. As we go with a consistent accumulator adding values add them to a hashmap of pending values to be merged based on the rdd id & partition id - then driver side when merging the accumulators check each rdd & partition id individually against the rdd & bitset of processed partitions for that rdd adding to a single accumulator. (This way we don't need to keep all of the values around forever but if we say have first() and then foreach() we can add the value for partition one then when the second task happens and we can skip just the adding partion one. Up next: start adding some simple tests for this to see if it works commit cf32dc5d543d4c0bb883e00692d21f22a7470737 Author: Holden Karau <hol...@us.ibm.com> Date: 2016-01-06T07:45:18Z Merge branch 'master' into SPARK-12469-consistent-accumulators-for-spark commit d9354b12977bdaa2bcd82730fb34bdae490ab02d Author: Holden Karau <hol...@us.ibm.com> Date: 2016-01-06T20:26:13Z add consistentValue (temporary) to get the consistent value and fix the accumulator to work even when no merge happens commit 28a5754b654441ad8904f6a8f6e4a06db3908541 Author: Holden Karau <hol...@us.ibm.com> Date: 2016-01-06T20:58:07Z Introduce a GenericAccumulable to allow for Accumulables which don't return the same value as the value accumulate (e.g. consistent accumulators accumulate a lot of book keeping information which the user shouldn't know about so hide that). Accumulable now extends GenericAccumulable to keep the API the same commit 21387e1c5ebf10d6a2b26996ba0a54640925a903 Author: Holden Karau <hol...@us.ibm.com> Date: 2016-01-07T07:37:31Z Temporary debugging commit 11ac8364c4077ee14c2284f6d3a17341efb79b61 Author: Holden Karau <hol...@us.ibm.com> Date: 2016-01-07T07:38:36Z Add a some tests for consistent accumulators commit b76218d1645b58be587e9db1337b2867b20dcaf1 Author: Holden Karau <hol...@us.ibm.com> Date: 2016-01-07T09:11:38Z Switch to doing explicit passin in the API for now (can figure out the magic later if we want magic). Next up: handle partially consumed partitions. commit 7fb981c9f1503747b687f31d37a11b22c87fcf66 Author: Holden Karau <hol...@us.ibm.com> Date: 2016-01-07T19:22:39Z Get rid of some old junk added when trying the old approach commit c75b037cdffd05c415e8e3dbe247f921009e31d6 Author: Holden Karau <hol...@us.ibm.com> Date: 2016-01-07T21:39:01Z Have tasks report if they've processed the entire partition and skip incrementing the consistent accumulator if this is not the case commit df89265241891380810860ad0f26e653efed298d Author: Holden Karau <hol...@us.ibm.com> Date: 2016-01-07T22:54:56Z Regardless of how much of the iterator we read in the task, if we are using persistance than the entire partition gets computed commit 33b45411c4f7ac45c439e020d82db95f8333e0bd Author: Holden Karau <hol...@us.ibm.com> Date: 2016-01-07T23:20:34Z Some improvements commit b2d0926eac596b0f075db7809df1130bd7025d00 Author: Holden Karau <hol...@us.ibm.com> Date: 2016-01-11T23:07:13Z Merge in master commit 4fd97a0a85f4e2c3a0e9b7edf2deb0a957443c4d Author: Holden Karau <hol...@us.ibm.com> Date: 2016-01-12T18:53:01Z Style fixes - todo check some indentation and such commit f12608776d5359636065b2e9dd7c3c3d1eed930a Author: Holden Karau <hol...@us.ibm.com> Date: 2016-01-12T19:30:59Z Fix some indentation commit 11da3d30d148e474595e95465c1dd2d81ca1d43f Author: Holden Karau <hol...@us.ibm.com> Date: 2016-01-14T00:07:56Z Fix whitespace, add an explicit test for a single partiton, remove setting the when adding an accumulator value since we always have a merge with the zero val commit b7637c94f0fd694c61ccba2e8876352e472c3795 Author: Holden Karau <hol...@us.ibm.com> Date: 2016-01-14T00:11:14Z Remove unecessary printlns commit 82e8cb33ca1aef621d11083a30c16baf881ec90e Author: Holden Karau <hol...@us.ibm.com> Date: 2016-01-17T20:27:55Z Merge branch 'master' into SPARK-12469-consistent-accumulators-for-spark commit 6304270f42bc9ad5e1aea6021b122d4d85d47c38 Author: Holden Karau <hol...@us.ibm.com> Date: 2016-01-17T20:38:03Z Fix indentation commit 6557e5db63a59af527d0f90447d3a40309ef9d87 Author: Holden Karau <hol...@us.ibm.com> Date: 2016-01-17T20:38:40Z Fix documentation for consistent accumulators example, and be consistent about using splitID commit 922d12c17cb213db596b60736f975f3a52d71196 Author: Holden Karau <hol...@us.ibm.com> Date: 2016-01-17T20:39:07Z Clarify when includeConsistent should be true/false in the javadoc of TaskContext commit 2c7cba021cec3ea0f1f708e4bb0f4291648d12ac Author: Holden Karau <hol...@us.ibm.com> Date: 2016-01-17T20:45:55Z Remove some unused imports commit a842ed23b8994aa1a109f32a742e5f98c0661599 Author: Holden Karau <hol...@us.ibm.com> Date: 2016-01-17T20:46:29Z rename consistentaccumulators.scala to consistentaccumulatorssuite.scala commit 1a7cce9d339bb0debf456e012f07f8b6a2dc5b69 Author: Holden Karau <hol...@us.ibm.com> Date: 2016-01-17T20:52:41Z Fix indentation on multiline params commit aaa9b89fabed40fec33c33243166eeb8a4b3feed Author: Holden Karau <hol...@us.ibm.com> Date: 2016-01-19T23:09:37Z Merge in master - first pass commit 211b3e7a7c9d1e08df919c12fb8e7e6af76eb640 Author: Holden Karau <hol...@us.ibm.com> Date: 2016-01-19T23:27:18Z Fix merge and add flatMapWithAccumulator commit 3f96429b5c6a6c5596d70a9d15de74ae6203c970 Author: Holden Karau <hol...@us.ibm.com> Date: 2016-01-19T23:33:36Z Remove extra line ---- --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org