Github user holdenk commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11105#discussion_r56425894
  
    --- Diff: core/src/main/scala/org/apache/spark/Accumulable.scala ---
    @@ -114,23 +163,40 @@ class Accumulable[R, T] private (
        * same mutable instance around.
        */
       private[spark] def copy(): Accumulable[R, T] = {
    -    new Accumulable[R, T](id, initialValue, param, name, internal, 
countFailedValues)
    +    new Accumulable[R, T](id, initialValue, param, name, internal, 
countFailedValues, consistent)
       }
     
       /**
        * Add more data to this accumulator / accumulable
        * @param term the data to add
        */
    -  def += (term: T) { value_ = param.addAccumulator(value_, term) }
    +  def += (term: T) { add(term) }
     
       /**
        * Add more data to this accumulator / accumulable
        * @param term the data to add
        */
    -  def add(term: T) { value_ = param.addAccumulator(value_, term) }
    +  def add(term: T) {
    +    value_ = param.addAccumulator(value_, term)
    +    if (consistent) {
    +      val updateInfo = TaskContext.get().getRDDPartitionInfo()
    +      val base = pending.getOrElse(updateInfo, zero)
    +      pending(updateInfo) = param.addAccumulator(base, term)
    +    }
    +  }
     
       /**
    -   * Merge two accumulable objects together
    +   * Mark a specific rdd/shuffle/partition as completely processed. This 
is a noop for
    +   * non-consistent accumuables.
    +   */
    +  private[spark] def markFullyProcessed(rddId: Int, shuffleId: Int, 
partitionId: Int): Unit = {
    +    if (consistent) {
    +      completed += ((rddId, shuffleId, partitionId))
    --- End diff --
    
    So the reason for also tracking the id of the shuffle is that if we have 
two combineByKey operations we can have three different lambdas (the original 
code, shuffle 1, shuffle 2) operating on what is the same RDD id so we need a 
way to distinguish the three blocks of execution so they can all be counted.
    
    `shuffleWriteId` sounds like a much better name.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to