Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11105#discussion_r55434378
  
    --- Diff: core/src/main/scala/org/apache/spark/Accumulable.scala ---
    @@ -53,42 +54,79 @@ import org.apache.spark.util.Utils
      *                          for system and time metrics like serialization 
time or bytes spilled,
      *                          and false for things with absolute values like 
number of input rows.
      *                          This should be used for internal metrics only.
    - * @tparam R the full accumulated data (result type)
    + * @param consistent if this [[Accumulable]] is consistent. Consistent 
[[Accumulable]]s will only
    + *                   have values added once for each RDD/Partition 
execution combination. This
    + *                   prevents double counting on reevaluation. Partial 
evaluation of a partition
    + *                   will not increment a consistent [[Accumulable]]. 
Consistent [[Accumulable]]s
    + *                   are currently experimental and the behaviour may 
change in future versions.
    + *                   Consistent [[Accumulable]]s can only be added to 
inside is
    + *                   [[MapPartitionsRDD]]s and are designed for counting 
"data properties".
    + * @tparam R the full accumulated data
      * @tparam T partial data that can be added in
      */
    -class Accumulable[R, T] private (
    +class Accumulable[R, T] private[spark] (
         val id: Long,
         // SI-8813: This must explicitly be a private val, or else scala 2.11 
doesn't compile
         @transient private val initialValue: R,
         param: AccumulableParam[R, T],
         val name: Option[String],
         internal: Boolean,
    -    private[spark] val countFailedValues: Boolean)
    +    private[spark] val countFailedValues: Boolean,
    +    private[spark] val consistent: Boolean)
       extends Serializable {
     
       private[spark] def this(
    -      initialValue: R,
    +      @transient initialValue: R,
    +      param: AccumulableParam[R, T],
    +      internal: Boolean,
    +      countFailedValues: Boolean,
    +      consistent: Boolean) = {
    +    this(Accumulators.newId(), initialValue, param, None, internal, 
countFailedValues, consistent)
    +  }
    +
    +  def this(
    +      @transient initialValue: R,
           param: AccumulableParam[R, T],
           name: Option[String],
           internal: Boolean,
           countFailedValues: Boolean) = {
    -    this(Accumulators.newId(), initialValue, param, name, internal, 
countFailedValues)
    +    this(Accumulators.newId(), initialValue, param, name, internal, 
countFailedValues,
    +      false /* consistent */)
       }
     
    -  private[spark] def this(
    -      initialValue: R,
    +  def this(
    +      @transient initialValue: R,
           param: AccumulableParam[R, T],
           name: Option[String],
    -      internal: Boolean) = {
    -    this(initialValue, param, name, internal, false /* countFailedValues 
*/)
    +      internal: Boolean,
    +      countFailedValues: Boolean,
    +      consistent: Boolean) = {
    +    this(Accumulators.newId(), initialValue, param, name, internal, 
countFailedValues, consistent)
       }
     
    -  def this(initialValue: R, param: AccumulableParam[R, T], name: 
Option[String]) =
    +  def this(
    +      @transient initialValue: R,
    +      param: AccumulableParam[R, T],
    +      name: Option[String],
    +      internal: Boolean) =
    +    this(initialValue, param, name, internal, false /* countFailed */)
    +
    +  def this(
    +      @transient initialValue: R,
    +      param: AccumulableParam[R, T],
    +      name: Option[String]) =
         this(initialValue, param, name, false /* internal */)
     
    -  def this(initialValue: R, param: AccumulableParam[R, T]) = 
this(initialValue, param, None)
    +  def this(
    +      @transient initialValue: R,
    +      param: AccumulableParam[R, T]) =
    +    this(initialValue, param, None)
     
       @volatile @transient private var value_ : R = initialValue // Current 
value on driver
    +  // For consistent accumulators pending and processed updates
    +  @volatile @transient private[spark] var pending = new 
mutable.HashMap[(Int, Int), R]()
    +  @volatile @transient private var processed = new mutable.HashMap[Int, 
mutable.BitSet]()
    --- End diff --
    
    think these can be `@transient lazy val`s.  Also I'd like names or a 
comment which explains the keys and values


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to