I can reply from an user’s perspective – I defer to semantic guarantees to 
someone with more experience.

I’ve successfully implemented the following using a custom Accumulable class:

  *   Created a MapAccumulator with dynamic keys (they are driven by the data 
coming in), as opposed to creating many discrete accumulators
     *   The merge operation is add the values on key conflict
  *   I’m adding K->Vs to this accumulator in a variety of places (maps, 
flatmaps, transforms and updateStateBy key)
  *   In a foreachRdd at the end of the transformations I’m reading the 
accumulator and writing the counters to OpenTSDB
     *   after this I’m resetting it to the “zero” value (e.g. Empty map)

Everything works as expected in terms of functionality - with 2 caveats:

  *   On task/job failure you might get duplicate values for the tasks that are 
retried in the active job since adding to an Accumulator in a transformation is 
a side effect
     *   I’m partially working around this by also referring to the RDD time 
and overwriting the values in OpenTSDB (idempotent operation)
  *   If you have stateful transformations and you use checkpointing, the 
accumulator code becomes really intrusive in your codebase
     *   You will need to have a global singleton in your driver and 
“getInstance” in a foreachRdd or transform, to force code execution on the 
driver
     *   This is because on restoring from checkpoint your accumulators will be 
NULL as the checkpoint recovery makes no attempt to initialize them (See 
SPARK-5206<https://issues.apache.org/jira/browse/SPARK-5206?jql=project%20%3D%20SPARK%20AND%20text%20~%20%22accumulator%20null%22>)

Hope this helps,
-adrian

From: "Sela, Amit"
Date: Monday, October 26, 2015 at 11:13 AM
To: "user@spark.apache.org<mailto:user@spark.apache.org>"
Subject: Accumulators internals and reliability

It seems like there is not much literature about Spark's Accumulators so I 
thought I'd ask here:

Do Accumulators reside in a Task ? Are they being serialized with the task ? 
Sent back on task completion as part of the ResultTask ?

Are they reliable ? If so, when ? Can I relay on accumulators value only after 
the task was successfully complete (meaning in the driver) ? Or also during the 
task execution as well (what about speculative execution) ?

What are the limitations on the number (or size) of Accumulators ?

Thanks,
Amit

Reply via email to