[ https://issues.apache.org/jira/browse/SPARK-30666?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Enrico Minack updated SPARK-30666: ---------------------------------- Description: This proposes a pragmatic improvement to allow for reliable single-stage accumulators. Under the assumption that a given stage / partition / rdd produces identical results, non-deterministic code produces identical accumulator increments on success. Rerunning partitions for any reason should always produce the same increments on success. With this pragmatic approach, increments from individual partitions / tasks are compared to earlier increments. Depending on the strategy of how a new increment updates over an earlier increment from the same partition, different semantics of accumulators (here called accumulator modes) can be implemented: - {{ALL}} sums over all increments of each partition: this represents the current implementation of accumulators - {{FIRST}} increment: allows to retrieve the first accumulator value for each partition only. This is only useful for accumulators registered with {{countFailedValues == false}}. - {{LARGEST}} over all increments of each partition: accumulators aggregate multiple increments while a partition is processed, a successful task provides the most accumulated values that has always the largest cardinality than any accumulated value of failed tasks, hence it paramounts any failed task's value. This produces reliable accumulator values. This does not require {{countFailedValues == false}}. This should only be used in a single stage. The naming may be confused with {{MAX}}. The implementations for {{LARGEST}} and {{FIRST}} require extra memory that scales with the number of partitions. The current {{ALL}} implementation does not require extra memory. was: This proposes a pragmatic improvement to allow for reliable single-stage accumulators. Under the assumption that a given stage / partition / rdd produces identical results, non-deterministic code produces identical accumulator increments on success. Rerunning partitions for any reason should always produce the same increments on success. With this pragmatic approach, increments from individual partitions / tasks are compared to earlier increments. Depending on the strategy of how a new increment updates over an earlier increment from the same partition, different semantics of accumulators (here called accumulator modes) can be implemented: - {{ALL}} sums over all increments of each partition: this represents the current implementation of accumulators - {{LARGEST}} over all increments of each partition: accumulators aggregate multiple increments while a partition is processed, a successful task provides the most accumulated values that has always the largest cardinality than any accumulated value of failed tasks, hence it paramounts any failed task's value. This produces reliable accumulator values. This does not require {{countFailedValues == false}}. This should only be used in a single stage. The naming may be confused with {{MAX}}. - {{FIRST}} increment: allows to retrieve the first accumulator value for each partition only. This is only useful for accumulators registered with {{countFailedValues == false}}. The implementations for {{LARGEST}} and {{FIRST}} require extra memory that scales with the number of partitions. The current {{ALL}} implementation does not require extra memory. > Reliable single-stage accumulators > ---------------------------------- > > Key: SPARK-30666 > URL: https://issues.apache.org/jira/browse/SPARK-30666 > Project: Spark > Issue Type: Improvement > Components: Spark Core > Affects Versions: 3.1.0 > Reporter: Enrico Minack > Priority: Major > > This proposes a pragmatic improvement to allow for reliable single-stage > accumulators. Under the assumption that a given stage / partition / rdd > produces identical results, non-deterministic code produces identical > accumulator increments on success. Rerunning partitions for any reason should > always produce the same increments on success. > With this pragmatic approach, increments from individual partitions / tasks > are compared to earlier increments. Depending on the strategy of how a new > increment updates over an earlier increment from the same partition, > different semantics of accumulators (here called accumulator modes) can be > implemented: > - {{ALL}} sums over all increments of each partition: this represents the > current implementation of accumulators > - {{FIRST}} increment: allows to retrieve the first accumulator value for > each partition only. This is only useful for accumulators registered with > {{countFailedValues == false}}. > - {{LARGEST}} over all increments of each partition: accumulators aggregate > multiple increments while a partition is processed, a successful task > provides the most accumulated values that has always the largest cardinality > than any accumulated value of failed tasks, hence it paramounts any failed > task's value. This produces reliable accumulator values. This does not > require {{countFailedValues == false}}. This should only be used in a single > stage. The naming may be confused with {{MAX}}. > The implementations for {{LARGEST}} and {{FIRST}} require extra memory that > scales with the number of partitions. The current {{ALL}} implementation does > not require extra memory. -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org