Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10835#discussion_r50807020
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
    @@ -1074,39 +1074,43 @@ class DAGScheduler(
         }
       }
     
    -  /** Merge updates from a task to our local accumulator values */
    +  /**
    +   * Merge local values from a task into the corresponding accumulators 
previously registered
    +   * here on the driver.
    +   *
    +   * Although accumulators themselves are not thread-safe, this method is 
called only from one
    +   * thread, the one that runs the scheduling loop. This means we only 
handle one task
    +   * completion event at a time so we don't need to worry about locking 
the accumulators.
    +   * This still doesn't stop the caller from updating the accumulator 
outside the scheduler,
    +   * but that's not our problem since there's nothing we can do about that.
    +   */
       private def updateAccumulators(event: CompletionEvent): Unit = {
         val task = event.task
         val stage = stageIdToStage(task.stageId)
    -    if (event.accumUpdates != null) {
    -      try {
    -        Accumulators.add(event.accumUpdates)
    -
    -        event.accumUpdates.foreach { case (id, partialValue) =>
    -          // In this instance, although the reference in 
Accumulators.originals is a WeakRef,
    -          // it's guaranteed to exist since the event.accumUpdates Map 
exists
    -
    -          val acc = Accumulators.originals(id).get match {
    -            case Some(accum) => accum.asInstanceOf[Accumulable[Any, Any]]
    -            case None => throw new NullPointerException("Non-existent 
reference to Accumulator")
    -          }
    -
    -          // To avoid UI cruft, ignore cases where value wasn't updated
    -          if (acc.name.isDefined && partialValue != acc.zero) {
    -            val name = acc.name.get
    -            val value = s"${acc.value}"
    -            stage.latestInfo.accumulables(id) =
    -              new AccumulableInfo(id, name, None, value, acc.isInternal)
    -            event.taskInfo.accumulables +=
    -              new AccumulableInfo(id, name, Some(s"$partialValue"), value, 
acc.isInternal)
    -          }
    +    try {
    +      event.accumUpdates.foreach { ainfo =>
    +        assert(ainfo.update.isDefined, "accumulator from task should have 
a partial value")
    +        val id = ainfo.id
    +        val partialValue = ainfo.update.get
    +        // Find the corresponding accumulator on the driver and update it
    +        val acc: Accumulable[Any, Any] = Accumulators.get(id) match {
    --- End diff --
    
    `getOrElse`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to