[ https://issues.apache.org/jira/browse/SPARK-12469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15071799#comment-15071799 ]
Nan Zhu commented on SPARK-12469: --------------------------------- Just to bring the previous discussions about the topic here, https://github.com/apache/spark/pull/2524 I originally would like to fix exactly the same issue in the patch, but later we have to shrink our range to result task, There, I wanted to use StageId + partitionId to identify the accumulator uniquely, but [~matei] indicated the counter example that " A shuffle stage may be resubmitted once the old one is garbage-collected (if periodic cleanup is on) If you use an accumulator in a pipelined transformation like a map(), and then you make a new RDD built on top of that (e.g. apply another map() to it), it won't count as the same stage so you'll still get the updates twice " I'm not sure if the proposed solution can fully resolve this issue > Consistent Accumulators for Spark > --------------------------------- > > Key: SPARK-12469 > URL: https://issues.apache.org/jira/browse/SPARK-12469 > Project: Spark > Issue Type: Improvement > Components: Spark Core > Reporter: holdenk > > Tasks executed on Spark workers are unable to modify values from the driver, > and accumulators are the one exception for this. Accumulators in Spark are > implemented in such a way that when a stage is recomputed (say for cache > eviction) the accumulator will be updated a second time. This makes > accumulators inside of transformations more difficult to use for things like > counting invalid records (one of the primary potential use cases of > collecting side information during a transformation). However in some cases > this counting during re-evaluation is exactly the behaviour we want (say in > tracking total execution time for a particular function). Spark would benefit > from a version of accumulators which did not double count even if stages were > re-executed. > Motivating example: > {code} > val parseTime = sc.accumulator(0L) > val parseFailures = sc.accumulator(0L) > val parsedData = sc.textFile(...).flatMap { line => > val start = System.currentTimeMillis() > val parsed = Try(parse(line)) > if (parsed.isFailure) parseFailures += 1 > parseTime += System.currentTimeMillis() - start > parsed.toOption > } > parsedData.cache() > val resultA = parsedData.map(...).filter(...).count() > // some intervening code. Almost anything could happen here -- some of > parsedData may > // get kicked out of the cache, or an executor where data was cached might > get lost > val resultB = parsedData.filter(...).map(...).flatMap(...).count() > // now we look at the accumulators > {code} > Here we would want parseFailures to only have been added to once for every > line which failed to parse. Unfortunately, the current Spark accumulator API > doesn’t support the current parseFailures use case since if some data had > been evicted its possible that it will be double counted. > See the full design document at > https://docs.google.com/document/d/1lR_l1g3zMVctZXrcVjFusq2iQVpr4XvRK_UUDsDr6nk/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org