holdenk created SPARK-12469:
-------------------------------

             Summary: Consistent Accumulators for Spark
                 Key: SPARK-12469
                 URL: https://issues.apache.org/jira/browse/SPARK-12469
             Project: Spark
          Issue Type: Improvement
          Components: Spark Core
            Reporter: holdenk


Tasks executed on Spark workers are unable to modify values from the driver, 
and accumulators are the one exception for this. Accumulators in Spark are 
implemented in such a way that when a stage is recomputed (say for cache 
eviction) the accumulator will be updated a second time. This makes 
accumulators inside of transformations more difficult to use for things like 
counting invalid records (one of the primary potential use cases of collecting 
side information during a transformation). However in some cases this counting 
during re-evaluation is exactly the behaviour we want (say in tracking total 
execution time for a particular function). Spark would benefit from a version 
of accumulators which did not double count even if stages were re-executed.

Motivating example:
{code}
val parseTime = sc.accumulator(0L)
val parseFailures = sc.accumulator(0L)
val parsedData = sc.textFile(...).flatMap { line =>
  val start = System.currentTimeMillis()
  val parsed = Try(parse(line))
  if (parsed.isFailure) parseFailures += 1
  parseTime += System.currentTimeMillis() - start
  parsed.toOption
}
parsedData.cache()

val resultA = parsedData.map(...).filter(...).count()

// some intervening code.  Almost anything could happen here -- some of 
parsedData may
// get kicked out of the cache, or an executor where data was cached might get 
lost

val resultB = parsedData.filter(...).map(...).flatMap(...).count()

// now we look at the accumulators
{code}

Here we would want parseFailures to only have been added to once for every line 
which failed to parse.  Unfortunately, the current Spark accumulator API 
doesn’t support the current parseFailures use case since if some data had been 
evicted its possible that it will be double counted.


See the full design document at 
https://docs.google.com/document/d/1lR_l1g3zMVctZXrcVjFusq2iQVpr4XvRK_UUDsDr6nk/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to