Github user holdenk commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11105#discussion_r86490436
  
    --- Diff: core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala ---
    @@ -42,18 +60,45 @@ private[spark] case class AccumulatorMetadata(
      * `OUT` should be a type that can be read atomically (e.g., Int, Long), 
or thread-safely
      * (e.g., synchronized collections) because it will be read from other 
threads.
      */
    -abstract class AccumulatorV2[IN, OUT] extends Serializable {
    +abstract class AccumulatorV2[@specialized(Int, Long, Double) IN, OUT] 
extends Serializable {
       private[spark] var metadata: AccumulatorMetadata = _
    -  private[this] var atDriverSide = true
    +  private[spark] var atDriverSide = true
    +
    +  /**
    +   * The following values are used for data property [[AccumulatorV2]]s.
    +   * Data property [[AccumulatorV2]]s have only-once semantics. These 
semantics are implemented
    +   * by keeping track of which RDD id, shuffle id, and partition id the 
current function is
    +   * processing in. If a partition is fully processed the results for that 
partition/shuffle/rdd
    +   * combination are sent back to the driver. The driver keeps track of 
which rdd/shuffle/partitions
    +   * already have been applied, and only combines values into value_ if 
the rdd/shuffle/partition
    +   * has not already been aggregated on the driver program
    +   */
    +  // For data property accumulators pending and processed updates.
    +  // Pending and processed are keyed by (rdd id, shuffle id, partition id)
    +  private[spark] lazy val pending =
    +    new mutable.HashMap[TaskOutputId, AccumulatorV2[IN, OUT]]()
    +  // Completed contains the set of (rdd id, shuffle id, partition id) that 
have been
    +  // fully processed on the worker side. This is used to determine if the 
updates should
    +  // be merged on the driver for a particular rdd/shuffle/partition 
combination.
    +  private[spark] lazy val completed = new mutable.HashSet[TaskOutputId]()
    +  // rddProcessed is keyed by rdd id and the value is a bitset containing 
all partitions
    +  // for the given key which have been merged into the value. This is used 
on the driver.
    +  @transient private[spark] lazy val rddProcessed = new 
mutable.HashMap[Int, mutable.BitSet]()
    +  // shuffleProcessed is the same as rddProcessed except keyed by shuffle 
id.
    +  @transient private[spark] lazy val shuffleProcessed = new 
mutable.HashMap[Int, mutable.BitSet]()
    --- End diff --
    
    That understanding seems correct, indeed the merge function now asserts 
that it is at driver side (from the last time this comment was posted).
    
    As an aside I don't know what GitHub is doing/changing with the review 
features but it seems to be really weird on this PR for some reason.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to