Hi guys, I have a question about how the basics of D-Streams, accumulators, failure and speculative execution interact.
Let's say I have a streaming app that takes a stream of strings, formats them (let's say it converts each to Unicode), and prints them (e.g. on a news ticker). I know print() by default only prints 10 elements, but the configuration of print() is irrelevant, let's assume we have set the max number of printed elements to be > n, the size of the streams we're dealing with. I use an accumulator (A1) in the formatting operation to count the number of elements, another (A2) to count every element that goes through the printing operation. Let's say I run this through a stream of n elements. What's the value of A1, A2 ? Now I lose a node during the processing. How do A1 and A2 compare to n ? Now I notice that one batch of the stream is lagging behind. I turn speculative execution on. How do the values of A1 and A2 compare to n ? Here's how my reasoning goes: If nobody crashes or there is no speculative execution, n = A1 = A2 (trivial counting). Format is a transformation (it produces a new RDD), print is an action (it doesn't - in fact it produces a side-effect). Transformations may be run multiple times (to re-build the pre-shuffle elements if a worker dies along the way), actions are run at least once (if you crash in the middle of printing, you restart from the persisted "post-shuffle" collection - the shuffle being here trivial - i.e. from the beginning, and may see multiple prints for the same element). See https://spark.apache.org/docs/1.1.0/streaming-programming-guide.html#fault-tolerance-properties But I see this in Learning Spark: Accumulators and Fault Tolerance Spark automatically deals with failed or slow machines by re-executing failed or slow tasks. For example, if the node running a partition of a map operation crashes, Spark will rerun it on another node; and even if the node does not crash, but is simply much slower than other nodes, Spark can preemptivley launch a “speculative” copy of the task on another node, and take its result if that finishes. Even if no nodes fail, Spark may have to rerun a task to rebuild a cached value that falls out of memory. The net result is therefore that the same function may run multiple times on the same data depending on what happens on the cluster. How does this interact with accumulators? The end result is that for accumulators used in actions, Spark only applies each task’s update to each accumulator once. Thus if we want a reliable absolute value counter, regardless of failures or multiple evaluations, we must put it inside an action like foreach. For accumulators used in RDD transformations instead of actions, this guarantee does not exist. An accumulator update within a transformation can occur more than once. One such case of a probably unintended multiple update occurs when a cached but infrequently used RDD is first evicted from the LRU cache and is then subsequently needed. This forces the RDD to be recalculated from its lineage, with the unintended side-effect that calls to update an accumulator within the transformations in that lineage are sent again to the driver. Within transformations, accumulators should, consequently, only be used for debugging purposes. While future versions of Spark may change this behavior to only count the update once, the current, 1.0.0, version does have the multiple update behavior, so accumulators in transformations are recommended only for debugging purposes. So, in the case of a failure of a node, I may have A1 > n, and the same for speculative execution. However, the A2 accumulator update is somehow constrained to be a 'protected' side-effect, so despite the fact we may see stream elements printed several times if the printing node dies, we always have A2 = n, both in failure and speculative exec cases. Am i right with this ? -- François Garillot