GitHub user holdenk opened a pull request:

    https://github.com/apache/spark/pull/10841

    [SPARK-12469][CORE][RFC/WIP] Add Consistent Accumulators for Spark

    This is an initial PR illustrating one of the possible approaches for 
providing consistent accumulators in Spark.
    
    Tasks executed on Spark workers are unable to modify values from the 
driver, and accumulators are the one exception for this. Accumulators in Spark 
are implemented in such a way that when a stage is recomputed (say for cache 
eviction) the accumulator will be updated a second time. This makes 
accumulators inside of transformations more difficult to use for things like 
counting invalid records (one of the primary potential use cases of collecting 
side information during a transformation). However in some cases this counting 
during re-evaluation is exactly the behaviour we want (say in tracking total 
execution time for a particular function). Spark would benefit from a version 
of accumulators which did not double count even if stages were re-executed.
    
    Motivating example:
    ```
    val parseTime = sc.accumulator(0L)
    val parseFailures = sc.accumulator(0L)
    val parsedData = sc.textFile(...).flatMap { line =>
      val start = System.currentTimeMillis()
      val parsed = Try(parse(line))
      if (parsed.isFailure) parseFailures += 1
      parseTime += System.currentTimeMillis() - start
      parsed.toOption
    }
    parsedData.cache()
    
    val resultA = parsedData.map(...).filter(...).count()
    
    // some intervening code.  Almost anything could happen here -- some of 
parsedData may
    // get kicked out of the cache, or an executor where data was cached might 
get lost
    
    val resultB = parsedData.filter(...).map(...).flatMap(...).count()
    
    // now we look at the accumulators
    ```
    
    Here we would want parseFailures to only have been added to once for every 
line which failed to parse.  Unfortunately, the current Spark accumulator API 
doesn’t support the current parseFailures use case since if some data had 
been evicted its possible that it will be double counted.
    
    
    See the full design document at 
https://docs.google.com/document/d/1lR_l1g3zMVctZXrcVjFusq2iQVpr4XvRK_UUDsDr6nk/edit?usp=sharing
    
    cc @squito

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/holdenk/spark 
SPARK-12469-consistent-accumulators-for-spark

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/10841.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #10841
    
----
commit 7c9f874d94085f60089cda8ab3b6c451bf793a1a
Author: Holden Karau <hol...@us.ibm.com>
Date:   2015-12-28T19:59:30Z

    Start adding the proposed external API for consistent accumulators

commit fce92981371ddd7af8ad28f30537fb473bbee378
Author: Holden Karau <hol...@us.ibm.com>
Date:   2015-12-28T19:59:47Z

    Start keeping track of the required information in the taskcontext

commit c0e2bfd9d3d5f22e9ea217a500b67e84679f519d
Author: Holden Karau <hol...@us.ibm.com>
Date:   2015-12-30T00:29:53Z

    A bit more progress towards consistent accumulators, add an add function 
which checks the bitset, throw an exception on merges (perhaps this isn't 
reasonable and then we need to keep all of the previous values - but a quick 
skim looks like its intended to be a user accesiable function so just don't 
support).

commit e9c287f682cbefc531a8fc84b00a472f860c9d06
Author: Holden Karau <hol...@us.ibm.com>
Date:   2015-12-30T01:51:53Z

    Waaait that was not thinking clearly. As we go with a consistent 
accumulator adding values add them to a hashmap of pending values to be merged 
based on the rdd id & partition id - then driver side when merging the 
accumulators check each rdd & partition id individually against the rdd & 
bitset of processed partitions for that rdd adding to a single accumulator. 
(This way we don't need to keep all of the values around forever but if we say 
have first() and then foreach() we can add the value for partition one then 
when the second task happens and we can skip just the adding partion one. Up 
next: start adding some simple tests for this to see if it works

commit cf32dc5d543d4c0bb883e00692d21f22a7470737
Author: Holden Karau <hol...@us.ibm.com>
Date:   2016-01-06T07:45:18Z

    Merge branch 'master' into SPARK-12469-consistent-accumulators-for-spark

commit d9354b12977bdaa2bcd82730fb34bdae490ab02d
Author: Holden Karau <hol...@us.ibm.com>
Date:   2016-01-06T20:26:13Z

    add consistentValue (temporary) to get the consistent value and fix the 
accumulator to work even when no merge happens

commit 28a5754b654441ad8904f6a8f6e4a06db3908541
Author: Holden Karau <hol...@us.ibm.com>
Date:   2016-01-06T20:58:07Z

    Introduce a GenericAccumulable to allow for Accumulables which don't return 
the same value as the value accumulate (e.g. consistent accumulators accumulate 
a lot of book keeping information which the user shouldn't know about so hide 
that). Accumulable now extends GenericAccumulable to keep the API the same

commit 21387e1c5ebf10d6a2b26996ba0a54640925a903
Author: Holden Karau <hol...@us.ibm.com>
Date:   2016-01-07T07:37:31Z

    Temporary debugging

commit 11ac8364c4077ee14c2284f6d3a17341efb79b61
Author: Holden Karau <hol...@us.ibm.com>
Date:   2016-01-07T07:38:36Z

    Add a some tests for consistent accumulators

commit b76218d1645b58be587e9db1337b2867b20dcaf1
Author: Holden Karau <hol...@us.ibm.com>
Date:   2016-01-07T09:11:38Z

    Switch to doing explicit passin in the API for now (can figure out the 
magic later if we want magic). Next up: handle partially consumed partitions.

commit 7fb981c9f1503747b687f31d37a11b22c87fcf66
Author: Holden Karau <hol...@us.ibm.com>
Date:   2016-01-07T19:22:39Z

    Get rid of some old junk added when trying the old approach

commit c75b037cdffd05c415e8e3dbe247f921009e31d6
Author: Holden Karau <hol...@us.ibm.com>
Date:   2016-01-07T21:39:01Z

    Have tasks report if they've processed the entire partition and skip 
incrementing the consistent accumulator if this is not the case

commit df89265241891380810860ad0f26e653efed298d
Author: Holden Karau <hol...@us.ibm.com>
Date:   2016-01-07T22:54:56Z

    Regardless of how much of the iterator we read in the task, if we are using 
persistance than the entire partition gets computed

commit 33b45411c4f7ac45c439e020d82db95f8333e0bd
Author: Holden Karau <hol...@us.ibm.com>
Date:   2016-01-07T23:20:34Z

    Some improvements

commit b2d0926eac596b0f075db7809df1130bd7025d00
Author: Holden Karau <hol...@us.ibm.com>
Date:   2016-01-11T23:07:13Z

    Merge in master

commit 4fd97a0a85f4e2c3a0e9b7edf2deb0a957443c4d
Author: Holden Karau <hol...@us.ibm.com>
Date:   2016-01-12T18:53:01Z

    Style fixes - todo check some indentation and such

commit f12608776d5359636065b2e9dd7c3c3d1eed930a
Author: Holden Karau <hol...@us.ibm.com>
Date:   2016-01-12T19:30:59Z

    Fix some indentation

commit 11da3d30d148e474595e95465c1dd2d81ca1d43f
Author: Holden Karau <hol...@us.ibm.com>
Date:   2016-01-14T00:07:56Z

    Fix whitespace, add an explicit test for a single partiton, remove setting 
the when adding an accumulator value since we always have a merge with the zero 
val

commit b7637c94f0fd694c61ccba2e8876352e472c3795
Author: Holden Karau <hol...@us.ibm.com>
Date:   2016-01-14T00:11:14Z

    Remove unecessary printlns

commit 82e8cb33ca1aef621d11083a30c16baf881ec90e
Author: Holden Karau <hol...@us.ibm.com>
Date:   2016-01-17T20:27:55Z

    Merge branch 'master' into SPARK-12469-consistent-accumulators-for-spark

commit 6304270f42bc9ad5e1aea6021b122d4d85d47c38
Author: Holden Karau <hol...@us.ibm.com>
Date:   2016-01-17T20:38:03Z

    Fix indentation

commit 6557e5db63a59af527d0f90447d3a40309ef9d87
Author: Holden Karau <hol...@us.ibm.com>
Date:   2016-01-17T20:38:40Z

    Fix documentation for consistent accumulators example, and be consistent 
about using splitID

commit 922d12c17cb213db596b60736f975f3a52d71196
Author: Holden Karau <hol...@us.ibm.com>
Date:   2016-01-17T20:39:07Z

    Clarify when includeConsistent should be true/false in the javadoc of 
TaskContext

commit 2c7cba021cec3ea0f1f708e4bb0f4291648d12ac
Author: Holden Karau <hol...@us.ibm.com>
Date:   2016-01-17T20:45:55Z

    Remove some unused imports

commit a842ed23b8994aa1a109f32a742e5f98c0661599
Author: Holden Karau <hol...@us.ibm.com>
Date:   2016-01-17T20:46:29Z

    rename consistentaccumulators.scala to consistentaccumulatorssuite.scala

commit 1a7cce9d339bb0debf456e012f07f8b6a2dc5b69
Author: Holden Karau <hol...@us.ibm.com>
Date:   2016-01-17T20:52:41Z

    Fix indentation on multiline params

commit aaa9b89fabed40fec33c33243166eeb8a4b3feed
Author: Holden Karau <hol...@us.ibm.com>
Date:   2016-01-19T23:09:37Z

    Merge in master - first pass

commit 211b3e7a7c9d1e08df919c12fb8e7e6af76eb640
Author: Holden Karau <hol...@us.ibm.com>
Date:   2016-01-19T23:27:18Z

    Fix merge and add flatMapWithAccumulator

commit 3f96429b5c6a6c5596d70a9d15de74ae6203c970
Author: Holden Karau <hol...@us.ibm.com>
Date:   2016-01-19T23:33:36Z

    Remove extra line

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to