I want to use accumulators to keep counts of things like invalid lines found and such, for reporting purposes. Similar to Hadoop counters. This may seem simple, but my case is a bit more complicated. The code which is creating an RDD from a transform is separated from the code which performs the operation on that RDD - or operations (I can't make any assumption as to how many operations will be done on this RDD). There are two issues: (1) I want to retrieve the accumulator value only after it has been computed, and (2) I don't wan to count the same thing twice if the RDD is recomputed.
Here's a simple example, converting strings to integers. Any records which can't be parsed as an integer are dropped, but I want to count how many times that happens: def numbers(val input: RDD[String]) : RDD[Int] = { val invalidRecords = sc.accumulator(0) input.flatMap { record => try { Seq(record.toInt) } catch { case NumberFormatException => invalidRecords += 1; Seq() } } } I need some way to know when the result RDD has been computed so I can get the accumulator value and reset it. Or perhaps it would be better to say I need a way to ensure the accumulator value is computed exactly once for a given RDD. Anyone know a way to do this? Or anything I might look into? Or is this something that just isn't supported in Spark? -- Daniel Siegmann, Software Developer Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001 E: daniel.siegm...@velos.io W: www.velos.io <http://www.velos.io>