Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/11105#discussion_r56425894 --- Diff: core/src/main/scala/org/apache/spark/Accumulable.scala --- @@ -114,23 +163,40 @@ class Accumulable[R, T] private ( * same mutable instance around. */ private[spark] def copy(): Accumulable[R, T] = { - new Accumulable[R, T](id, initialValue, param, name, internal, countFailedValues) + new Accumulable[R, T](id, initialValue, param, name, internal, countFailedValues, consistent) } /** * Add more data to this accumulator / accumulable * @param term the data to add */ - def += (term: T) { value_ = param.addAccumulator(value_, term) } + def += (term: T) { add(term) } /** * Add more data to this accumulator / accumulable * @param term the data to add */ - def add(term: T) { value_ = param.addAccumulator(value_, term) } + def add(term: T) { + value_ = param.addAccumulator(value_, term) + if (consistent) { + val updateInfo = TaskContext.get().getRDDPartitionInfo() + val base = pending.getOrElse(updateInfo, zero) + pending(updateInfo) = param.addAccumulator(base, term) + } + } /** - * Merge two accumulable objects together + * Mark a specific rdd/shuffle/partition as completely processed. This is a noop for + * non-consistent accumuables. + */ + private[spark] def markFullyProcessed(rddId: Int, shuffleId: Int, partitionId: Int): Unit = { + if (consistent) { + completed += ((rddId, shuffleId, partitionId)) --- End diff -- So the reason for also tracking the id of the shuffle is that if we have two combineByKey operations we can have three different lambdas (the original code, shuffle 1, shuffle 2) operating on what is the same RDD id so we need a way to distinguish the three blocks of execution so they can all be counted. `shuffleWriteId` sounds like a much better name.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org