I can reply from an user’s perspective – I defer to semantic guarantees to someone with more experience.
I’ve successfully implemented the following using a custom Accumulable class: * Created a MapAccumulator with dynamic keys (they are driven by the data coming in), as opposed to creating many discrete accumulators * The merge operation is add the values on key conflict * I’m adding K->Vs to this accumulator in a variety of places (maps, flatmaps, transforms and updateStateBy key) * In a foreachRdd at the end of the transformations I’m reading the accumulator and writing the counters to OpenTSDB * after this I’m resetting it to the “zero” value (e.g. Empty map) Everything works as expected in terms of functionality - with 2 caveats: * On task/job failure you might get duplicate values for the tasks that are retried in the active job since adding to an Accumulator in a transformation is a side effect * I’m partially working around this by also referring to the RDD time and overwriting the values in OpenTSDB (idempotent operation) * If you have stateful transformations and you use checkpointing, the accumulator code becomes really intrusive in your codebase * You will need to have a global singleton in your driver and “getInstance” in a foreachRdd or transform, to force code execution on the driver * This is because on restoring from checkpoint your accumulators will be NULL as the checkpoint recovery makes no attempt to initialize them (See SPARK-5206<https://issues.apache.org/jira/browse/SPARK-5206?jql=project%20%3D%20SPARK%20AND%20text%20~%20%22accumulator%20null%22>) Hope this helps, -adrian From: "Sela, Amit" Date: Monday, October 26, 2015 at 11:13 AM To: "user@spark.apache.org<mailto:user@spark.apache.org>" Subject: Accumulators internals and reliability It seems like there is not much literature about Spark's Accumulators so I thought I'd ask here: Do Accumulators reside in a Task ? Are they being serialized with the task ? Sent back on task completion as part of the ResultTask ? Are they reliable ? If so, when ? Can I relay on accumulators value only after the task was successfully complete (meaning in the driver) ? Or also during the task execution as well (what about speculative execution) ? What are the limitations on the number (or size) of Accumulators ? Thanks, Amit