Repository: spark
Updated Branches:
  refs/heads/branch-2.0 705172202 -> 7d63c36e1


[SPARK-15049] Rename NewAccumulator to AccumulatorV2

## What changes were proposed in this pull request?
NewAccumulator isn't the best name if we ever come up with v3 of the API.

## How was this patch tested?
Updated tests to reflect the change.

Author: Reynold Xin <r...@databricks.com>

Closes #12827 from rxin/SPARK-15049.

(cherry picked from commit 44da8d8eabeccc12bfed0d43b37d54e0da845c66)
Signed-off-by: Reynold Xin <r...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7d63c36e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7d63c36e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7d63c36e

Branch: refs/heads/branch-2.0
Commit: 7d63c36e1efe8baec96cdc16a997249728e204fd
Parents: 7051722
Author: Reynold Xin <r...@databricks.com>
Authored: Sun May 1 20:21:02 2016 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Sun May 1 20:21:11 2016 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/AccumulatorV2.scala  | 394 +++++++++++++++++++
 .../scala/org/apache/spark/ContextCleaner.scala |   2 +-
 .../org/apache/spark/HeartbeatReceiver.scala    |   2 +-
 .../scala/org/apache/spark/NewAccumulator.scala | 393 ------------------
 .../scala/org/apache/spark/SparkContext.scala   |   4 +-
 .../scala/org/apache/spark/TaskContext.scala    |   2 +-
 .../org/apache/spark/TaskContextImpl.scala      |   2 +-
 .../scala/org/apache/spark/TaskEndReason.scala  |   4 +-
 .../org/apache/spark/executor/Executor.scala    |   4 +-
 .../org/apache/spark/executor/TaskMetrics.scala |  18 +-
 .../apache/spark/scheduler/DAGScheduler.scala   |  10 +-
 .../spark/scheduler/DAGSchedulerEvent.scala     |   2 +-
 .../scala/org/apache/spark/scheduler/Task.scala |   2 +-
 .../org/apache/spark/scheduler/TaskResult.scala |   8 +-
 .../apache/spark/scheduler/TaskScheduler.scala  |   4 +-
 .../spark/scheduler/TaskSchedulerImpl.scala     |   2 +-
 .../apache/spark/scheduler/TaskSetManager.scala |   2 +-
 .../org/apache/spark/AccumulatorSuite.scala     |   2 +-
 .../apache/spark/InternalAccumulatorSuite.scala |   2 +-
 .../spark/executor/TaskMetricsSuite.scala       |   6 +-
 .../spark/scheduler/DAGSchedulerSuite.scala     |   6 +-
 .../scheduler/ExternalClusterManagerSuite.scala |   4 +-
 .../spark/scheduler/TaskSetManagerSuite.scala   |   6 +-
 .../spark/sql/execution/metric/SQLMetrics.scala |   6 +-
 24 files changed, 444 insertions(+), 443 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7d63c36e/core/src/main/scala/org/apache/spark/AccumulatorV2.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/AccumulatorV2.scala 
b/core/src/main/scala/org/apache/spark/AccumulatorV2.scala
new file mode 100644
index 0000000..c65108a
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/AccumulatorV2.scala
@@ -0,0 +1,394 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.{lang => jl}
+import java.io.ObjectInputStream
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.atomic.AtomicLong
+
+import org.apache.spark.scheduler.AccumulableInfo
+import org.apache.spark.util.Utils
+
+
+private[spark] case class AccumulatorMetadata(
+    id: Long,
+    name: Option[String],
+    countFailedValues: Boolean) extends Serializable
+
+
+/**
+ * The base class for accumulators, that can accumulate inputs of type `IN`, 
and produce output of
+ * type `OUT`.
+ */
+abstract class AccumulatorV2[IN, OUT] extends Serializable {
+  private[spark] var metadata: AccumulatorMetadata = _
+  private[this] var atDriverSide = true
+
+  private[spark] def register(
+      sc: SparkContext,
+      name: Option[String] = None,
+      countFailedValues: Boolean = false): Unit = {
+    if (this.metadata != null) {
+      throw new IllegalStateException("Cannot register an Accumulator twice.")
+    }
+    this.metadata = AccumulatorMetadata(AccumulatorContext.newId(), name, 
countFailedValues)
+    AccumulatorContext.register(this)
+    sc.cleaner.foreach(_.registerAccumulatorForCleanup(this))
+  }
+
+  /**
+   * Returns true if this accumulator has been registered.  Note that all 
accumulators must be
+   * registered before ues, or it will throw exception.
+   */
+  final def isRegistered: Boolean =
+    metadata != null && AccumulatorContext.get(metadata.id).isDefined
+
+  private def assertMetadataNotNull(): Unit = {
+    if (metadata == null) {
+      throw new IllegalAccessError("The metadata of this accumulator has not 
been assigned yet.")
+    }
+  }
+
+  /**
+   * Returns the id of this accumulator, can only be called after registration.
+   */
+  final def id: Long = {
+    assertMetadataNotNull()
+    metadata.id
+  }
+
+  /**
+   * Returns the name of this accumulator, can only be called after 
registration.
+   */
+  final def name: Option[String] = {
+    assertMetadataNotNull()
+    metadata.name
+  }
+
+  /**
+   * Whether to accumulate values from failed tasks. This is set to true for 
system and time
+   * metrics like serialization time or bytes spilled, and false for things 
with absolute values
+   * like number of input rows.  This should be used for internal metrics only.
+   */
+  private[spark] final def countFailedValues: Boolean = {
+    assertMetadataNotNull()
+    metadata.countFailedValues
+  }
+
+  /**
+   * Creates an [[AccumulableInfo]] representation of this [[AccumulatorV2]] 
with the provided
+   * values.
+   */
+  private[spark] def toInfo(update: Option[Any], value: Option[Any]): 
AccumulableInfo = {
+    val isInternal = 
name.exists(_.startsWith(InternalAccumulator.METRICS_PREFIX))
+    new AccumulableInfo(id, name, update, value, isInternal, countFailedValues)
+  }
+
+  final private[spark] def isAtDriverSide: Boolean = atDriverSide
+
+  /**
+   * Returns if this accumulator is zero value or not. e.g. for a counter 
accumulator, 0 is zero
+   * value; for a list accumulator, Nil is zero value.
+   */
+  def isZero: Boolean
+
+  /**
+   * Creates a new copy of this accumulator, which is zero value. i.e. call 
`isZero` on the copy
+   * must return true.
+   */
+  def copyAndReset(): AccumulatorV2[IN, OUT]
+
+  /**
+   * Takes the inputs and accumulates. e.g. it can be a simple `+=` for 
counter accumulator.
+   */
+  def add(v: IN): Unit
+
+  /**
+   * Merges another same-type accumulator into this one and update its state, 
i.e. this should be
+   * merge-in-place.
+   */
+  def merge(other: AccumulatorV2[IN, OUT]): Unit
+
+  /**
+   * Access this accumulator's current value; only allowed on driver.
+   */
+  final def value: OUT = {
+    if (atDriverSide) {
+      localValue
+    } else {
+      throw new UnsupportedOperationException("Can't read accumulator value in 
task")
+    }
+  }
+
+  /**
+   * Defines the current value of this accumulator.
+   *
+   * This is NOT the global value of the accumulator.  To get the global value 
after a
+   * completed operation on the dataset, call `value`.
+   */
+  def localValue: OUT
+
+  // Called by Java when serializing an object
+  final protected def writeReplace(): Any = {
+    if (atDriverSide) {
+      if (!isRegistered) {
+        throw new UnsupportedOperationException(
+          "Accumulator must be registered before send to executor")
+      }
+      val copy = copyAndReset()
+      assert(copy.isZero, "copyAndReset must return a zero value copy")
+      copy.metadata = metadata
+      copy
+    } else {
+      this
+    }
+  }
+
+  // Called by Java when deserializing an object
+  private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException 
{
+    in.defaultReadObject()
+    if (atDriverSide) {
+      atDriverSide = false
+
+      // Automatically register the accumulator when it is deserialized with 
the task closure.
+      // This is for external accumulators and internal ones that do not 
represent task level
+      // metrics, e.g. internal SQL metrics, which are per-operator.
+      val taskContext = TaskContext.get()
+      if (taskContext != null) {
+        taskContext.registerAccumulator(this)
+      }
+    } else {
+      atDriverSide = true
+    }
+  }
+
+  override def toString: String = {
+    if (metadata == null) {
+      "Un-registered Accumulator: " + getClass.getSimpleName
+    } else {
+      getClass.getSimpleName + s"(id: $id, name: $name, value: $localValue)"
+    }
+  }
+}
+
+
+/**
+ * An internal class used to track accumulators by Spark itself.
+ */
+private[spark] object AccumulatorContext {
+
+  /**
+   * This global map holds the original accumulator objects that are created 
on the driver.
+   * It keeps weak references to these objects so that accumulators can be 
garbage-collected
+   * once the RDDs and user-code that reference them are cleaned up.
+   * TODO: Don't use a global map; these should be tied to a SparkContext 
(SPARK-13051).
+   */
+  private val originals = new ConcurrentHashMap[Long, 
jl.ref.WeakReference[AccumulatorV2[_, _]]]
+
+  private[this] val nextId = new AtomicLong(0L)
+
+  /**
+   * Returns a globally unique ID for a new [[Accumulator]].
+   * Note: Once you copy the [[Accumulator]] the ID is no longer unique.
+   */
+  def newId(): Long = nextId.getAndIncrement
+
+  /** Returns the number of accumulators registered. Used in testing. */
+  def numAccums: Int = originals.size
+
+  /**
+   * Registers an [[Accumulator]] created on the driver such that it can be 
used on the executors.
+   *
+   * All accumulators registered here can later be used as a container for 
accumulating partial
+   * values across multiple tasks. This is what 
[[org.apache.spark.scheduler.DAGScheduler]] does.
+   * Note: if an accumulator is registered here, it should also be registered 
with the active
+   * context cleaner for cleanup so as to avoid memory leaks.
+   *
+   * If an [[Accumulator]] with the same ID was already registered, this does 
nothing instead
+   * of overwriting it. We will never register same accumulator twice, this is 
just a sanity check.
+   */
+  def register(a: AccumulatorV2[_, _]): Unit = {
+    originals.putIfAbsent(a.id, new jl.ref.WeakReference[AccumulatorV2[_, 
_]](a))
+  }
+
+  /**
+   * Unregisters the [[Accumulator]] with the given ID, if any.
+   */
+  def remove(id: Long): Unit = {
+    originals.remove(id)
+  }
+
+  /**
+   * Returns the [[Accumulator]] registered with the given ID, if any.
+   */
+  def get(id: Long): Option[AccumulatorV2[_, _]] = {
+    Option(originals.get(id)).map { ref =>
+      // Since we are storing weak references, we must check whether the 
underlying data is valid.
+      val acc = ref.get
+      if (acc eq null) {
+        throw new IllegalAccessError(s"Attempted to access garbage collected 
accumulator $id")
+      }
+      acc
+    }
+  }
+
+  /**
+   * Clears all registered [[Accumulator]]s. For testing only.
+   */
+  def clear(): Unit = {
+    originals.clear()
+  }
+}
+
+
+class LongAccumulator extends AccumulatorV2[jl.Long, jl.Long] {
+  private[this] var _sum = 0L
+
+  override def isZero: Boolean = _sum == 0
+
+  override def copyAndReset(): LongAccumulator = new LongAccumulator
+
+  override def add(v: jl.Long): Unit = _sum += v
+
+  def add(v: Long): Unit = _sum += v
+
+  def sum: Long = _sum
+
+  override def merge(other: AccumulatorV2[jl.Long, jl.Long]): Unit = other 
match {
+    case o: LongAccumulator => _sum += o.sum
+    case _ => throw new UnsupportedOperationException(
+      s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
+  }
+
+  private[spark] def setValue(newValue: Long): Unit = _sum = newValue
+
+  override def localValue: jl.Long = _sum
+}
+
+
+class DoubleAccumulator extends AccumulatorV2[jl.Double, jl.Double] {
+  private[this] var _sum = 0.0
+
+  override def isZero: Boolean = _sum == 0.0
+
+  override def copyAndReset(): DoubleAccumulator = new DoubleAccumulator
+
+  override def add(v: jl.Double): Unit = _sum += v
+
+  def add(v: Double): Unit = _sum += v
+
+  def sum: Double = _sum
+
+  override def merge(other: AccumulatorV2[jl.Double, jl.Double]): Unit = other 
match {
+    case o: DoubleAccumulator => _sum += o.sum
+    case _ => throw new UnsupportedOperationException(
+      s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
+  }
+
+  private[spark] def setValue(newValue: Double): Unit = _sum = newValue
+
+  override def localValue: jl.Double = _sum
+}
+
+
+class AverageAccumulator extends AccumulatorV2[jl.Double, jl.Double] {
+  private[this] var _sum = 0.0
+  private[this] var _count = 0L
+
+  override def isZero: Boolean = _sum == 0.0 && _count == 0
+
+  override def copyAndReset(): AverageAccumulator = new AverageAccumulator
+
+  override def add(v: jl.Double): Unit = {
+    _sum += v
+    _count += 1
+  }
+
+  def add(d: Double): Unit = {
+    _sum += d
+    _count += 1
+  }
+
+  override def merge(other: AccumulatorV2[jl.Double, jl.Double]): Unit = other 
match {
+    case o: AverageAccumulator =>
+      _sum += o.sum
+      _count += o.count
+    case _ => throw new UnsupportedOperationException(
+      s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
+  }
+
+  override def localValue: jl.Double = if (_count == 0) {
+    Double.NaN
+  } else {
+    _sum / _count
+  }
+
+  def sum: Double = _sum
+
+  def count: Long = _count
+}
+
+
+class ListAccumulator[T] extends AccumulatorV2[T, java.util.List[T]] {
+  private[this] val _list: java.util.List[T] = new java.util.ArrayList[T]
+
+  override def isZero: Boolean = _list.isEmpty
+
+  override def copyAndReset(): ListAccumulator[T] = new ListAccumulator
+
+  override def add(v: T): Unit = _list.add(v)
+
+  override def merge(other: AccumulatorV2[T, java.util.List[T]]): Unit = other 
match {
+    case o: ListAccumulator[T] => _list.addAll(o.localValue)
+    case _ => throw new UnsupportedOperationException(
+      s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
+  }
+
+  override def localValue: java.util.List[T] = 
java.util.Collections.unmodifiableList(_list)
+
+  private[spark] def setValue(newValue: java.util.List[T]): Unit = {
+    _list.clear()
+    _list.addAll(newValue)
+  }
+}
+
+
+class LegacyAccumulatorWrapper[R, T](
+    initialValue: R,
+    param: org.apache.spark.AccumulableParam[R, T]) extends AccumulatorV2[T, 
R] {
+  private[spark] var _value = initialValue  // Current value on driver
+
+  override def isZero: Boolean = _value == param.zero(initialValue)
+
+  override def copyAndReset(): LegacyAccumulatorWrapper[R, T] = {
+    val acc = new LegacyAccumulatorWrapper(initialValue, param)
+    acc._value = param.zero(initialValue)
+    acc
+  }
+
+  override def add(v: T): Unit = _value = param.addAccumulator(_value, v)
+
+  override def merge(other: AccumulatorV2[T, R]): Unit = other match {
+    case o: LegacyAccumulatorWrapper[R, T] => _value = 
param.addInPlace(_value, o.localValue)
+    case _ => throw new UnsupportedOperationException(
+      s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
+  }
+
+  override def localValue: R = _value
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/7d63c36e/core/src/main/scala/org/apache/spark/ContextCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala 
b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
index 63a00a8..a51338c 100644
--- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
@@ -144,7 +144,7 @@ private[spark] class ContextCleaner(sc: SparkContext) 
extends Logging {
     registerForCleanup(rdd, CleanRDD(rdd.id))
   }
 
-  def registerAccumulatorForCleanup(a: NewAccumulator[_, _]): Unit = {
+  def registerAccumulatorForCleanup(a: AccumulatorV2[_, _]): Unit = {
     registerForCleanup(a, CleanAccum(a.id))
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7d63c36e/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala 
b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
index 9eac05f..29018c7 100644
--- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
+++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
@@ -35,7 +35,7 @@ import org.apache.spark.util.{Clock, SystemClock, 
ThreadUtils, Utils}
  */
 private[spark] case class Heartbeat(
     executorId: String,
-    accumUpdates: Array[(Long, Seq[NewAccumulator[_, _]])], // taskId -> 
accumulator updates
+    accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])], // taskId -> 
accumulator updates
     blockManagerId: BlockManagerId)
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/7d63c36e/core/src/main/scala/org/apache/spark/NewAccumulator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/NewAccumulator.scala 
b/core/src/main/scala/org/apache/spark/NewAccumulator.scala
deleted file mode 100644
index 1571e15..0000000
--- a/core/src/main/scala/org/apache/spark/NewAccumulator.scala
+++ /dev/null
@@ -1,393 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark
-
-import java.{lang => jl}
-import java.io.ObjectInputStream
-import java.util.concurrent.ConcurrentHashMap
-import java.util.concurrent.atomic.AtomicLong
-import javax.annotation.concurrent.GuardedBy
-
-import scala.collection.JavaConverters._
-
-import org.apache.spark.scheduler.AccumulableInfo
-import org.apache.spark.util.Utils
-
-
-private[spark] case class AccumulatorMetadata(
-    id: Long,
-    name: Option[String],
-    countFailedValues: Boolean) extends Serializable
-
-
-/**
- * The base class for accumulators, that can accumulate inputs of type `IN`, 
and produce output of
- * type `OUT`.
- */
-abstract class NewAccumulator[IN, OUT] extends Serializable {
-  private[spark] var metadata: AccumulatorMetadata = _
-  private[this] var atDriverSide = true
-
-  private[spark] def register(
-      sc: SparkContext,
-      name: Option[String] = None,
-      countFailedValues: Boolean = false): Unit = {
-    if (this.metadata != null) {
-      throw new IllegalStateException("Cannot register an Accumulator twice.")
-    }
-    this.metadata = AccumulatorMetadata(AccumulatorContext.newId(), name, 
countFailedValues)
-    AccumulatorContext.register(this)
-    sc.cleaner.foreach(_.registerAccumulatorForCleanup(this))
-  }
-
-  /**
-   * Returns true if this accumulator has been registered.  Note that all 
accumulators must be
-   * registered before ues, or it will throw exception.
-   */
-  final def isRegistered: Boolean =
-    metadata != null && AccumulatorContext.get(metadata.id).isDefined
-
-  private def assertMetadataNotNull(): Unit = {
-    if (metadata == null) {
-      throw new IllegalAccessError("The metadata of this accumulator has not 
been assigned yet.")
-    }
-  }
-
-  /**
-   * Returns the id of this accumulator, can only be called after registration.
-   */
-  final def id: Long = {
-    assertMetadataNotNull()
-    metadata.id
-  }
-
-  /**
-   * Returns the name of this accumulator, can only be called after 
registration.
-   */
-  final def name: Option[String] = {
-    assertMetadataNotNull()
-    metadata.name
-  }
-
-  /**
-   * Whether to accumulate values from failed tasks. This is set to true for 
system and time
-   * metrics like serialization time or bytes spilled, and false for things 
with absolute values
-   * like number of input rows.  This should be used for internal metrics only.
-   */
-  private[spark] final def countFailedValues: Boolean = {
-    assertMetadataNotNull()
-    metadata.countFailedValues
-  }
-
-  /**
-   * Creates an [[AccumulableInfo]] representation of this [[NewAccumulator]] 
with the provided
-   * values.
-   */
-  private[spark] def toInfo(update: Option[Any], value: Option[Any]): 
AccumulableInfo = {
-    val isInternal = 
name.exists(_.startsWith(InternalAccumulator.METRICS_PREFIX))
-    new AccumulableInfo(id, name, update, value, isInternal, countFailedValues)
-  }
-
-  final private[spark] def isAtDriverSide: Boolean = atDriverSide
-
-  /**
-   * Tells if this accumulator is zero value or not. e.g. for a counter 
accumulator, 0 is zero
-   * value; for a list accumulator, Nil is zero value.
-   */
-  def isZero(): Boolean
-
-  /**
-   * Creates a new copy of this accumulator, which is zero value. i.e. call 
`isZero` on the copy
-   * must return true.
-   */
-  def copyAndReset(): NewAccumulator[IN, OUT]
-
-  /**
-   * Takes the inputs and accumulates. e.g. it can be a simple `+=` for 
counter accumulator.
-   */
-  def add(v: IN): Unit
-
-  /**
-   * Merges another same-type accumulator into this one and update its state, 
i.e. this should be
-   * merge-in-place.
-   */
-  def merge(other: NewAccumulator[IN, OUT]): Unit
-
-  /**
-   * Access this accumulator's current value; only allowed on driver.
-   */
-  final def value: OUT = {
-    if (atDriverSide) {
-      localValue
-    } else {
-      throw new UnsupportedOperationException("Can't read accumulator value in 
task")
-    }
-  }
-
-  /**
-   * Defines the current value of this accumulator.
-   *
-   * This is NOT the global value of the accumulator.  To get the global value 
after a
-   * completed operation on the dataset, call `value`.
-   */
-  def localValue: OUT
-
-  // Called by Java when serializing an object
-  final protected def writeReplace(): Any = {
-    if (atDriverSide) {
-      if (!isRegistered) {
-        throw new UnsupportedOperationException(
-          "Accumulator must be registered before send to executor")
-      }
-      val copy = copyAndReset()
-      assert(copy.isZero(), "copyAndReset must return a zero value copy")
-      copy.metadata = metadata
-      copy
-    } else {
-      this
-    }
-  }
-
-  // Called by Java when deserializing an object
-  private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException 
{
-    in.defaultReadObject()
-    if (atDriverSide) {
-      atDriverSide = false
-
-      // Automatically register the accumulator when it is deserialized with 
the task closure.
-      // This is for external accumulators and internal ones that do not 
represent task level
-      // metrics, e.g. internal SQL metrics, which are per-operator.
-      val taskContext = TaskContext.get()
-      if (taskContext != null) {
-        taskContext.registerAccumulator(this)
-      }
-    } else {
-      atDriverSide = true
-    }
-  }
-
-  override def toString: String = {
-    if (metadata == null) {
-      "Un-registered Accumulator: " + getClass.getSimpleName
-    } else {
-      getClass.getSimpleName + s"(id: $id, name: $name, value: $localValue)"
-    }
-  }
-}
-
-
-private[spark] object AccumulatorContext {
-
-  /**
-   * This global map holds the original accumulator objects that are created 
on the driver.
-   * It keeps weak references to these objects so that accumulators can be 
garbage-collected
-   * once the RDDs and user-code that reference them are cleaned up.
-   * TODO: Don't use a global map; these should be tied to a SparkContext 
(SPARK-13051).
-   */
-  private val originals = new ConcurrentHashMap[Long, 
jl.ref.WeakReference[NewAccumulator[_, _]]]
-
-  private[this] val nextId = new AtomicLong(0L)
-
-  /**
-   * Return a globally unique ID for a new [[Accumulator]].
-   * Note: Once you copy the [[Accumulator]] the ID is no longer unique.
-   */
-  def newId(): Long = nextId.getAndIncrement
-
-  def numAccums: Int = originals.size
-
-  /**
-   * Register an [[Accumulator]] created on the driver such that it can be 
used on the executors.
-   *
-   * All accumulators registered here can later be used as a container for 
accumulating partial
-   * values across multiple tasks. This is what 
[[org.apache.spark.scheduler.DAGScheduler]] does.
-   * Note: if an accumulator is registered here, it should also be registered 
with the active
-   * context cleaner for cleanup so as to avoid memory leaks.
-   *
-   * If an [[Accumulator]] with the same ID was already registered, this does 
nothing instead
-   * of overwriting it. We will never register same accumulator twice, this is 
just a sanity check.
-   */
-  def register(a: NewAccumulator[_, _]): Unit = {
-    originals.putIfAbsent(a.id, new jl.ref.WeakReference[NewAccumulator[_, 
_]](a))
-  }
-
-  /**
-   * Unregister the [[Accumulator]] with the given ID, if any.
-   */
-  def remove(id: Long): Unit = {
-    originals.remove(id)
-  }
-
-  /**
-   * Return the [[Accumulator]] registered with the given ID, if any.
-   */
-  def get(id: Long): Option[NewAccumulator[_, _]] = {
-    Option(originals.get(id)).map { ref =>
-      // Since we are storing weak references, we must check whether the 
underlying data is valid.
-      val acc = ref.get
-      if (acc eq null) {
-        throw new IllegalAccessError(s"Attempted to access garbage collected 
accumulator $id")
-      }
-      acc
-    }
-  }
-
-  /**
-   * Clear all registered [[Accumulator]]s. For testing only.
-   */
-  def clear(): Unit = {
-    originals.clear()
-  }
-}
-
-
-class LongAccumulator extends NewAccumulator[jl.Long, jl.Long] {
-  private[this] var _sum = 0L
-
-  override def isZero(): Boolean = _sum == 0
-
-  override def copyAndReset(): LongAccumulator = new LongAccumulator
-
-  override def add(v: jl.Long): Unit = _sum += v
-
-  def add(v: Long): Unit = _sum += v
-
-  def sum: Long = _sum
-
-  override def merge(other: NewAccumulator[jl.Long, jl.Long]): Unit = other 
match {
-    case o: LongAccumulator => _sum += o.sum
-    case _ => throw new UnsupportedOperationException(
-      s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
-  }
-
-  private[spark] def setValue(newValue: Long): Unit = _sum = newValue
-
-  override def localValue: jl.Long = _sum
-}
-
-
-class DoubleAccumulator extends NewAccumulator[jl.Double, jl.Double] {
-  private[this] var _sum = 0.0
-
-  override def isZero(): Boolean = _sum == 0.0
-
-  override def copyAndReset(): DoubleAccumulator = new DoubleAccumulator
-
-  override def add(v: jl.Double): Unit = _sum += v
-
-  def add(v: Double): Unit = _sum += v
-
-  def sum: Double = _sum
-
-  override def merge(other: NewAccumulator[jl.Double, jl.Double]): Unit = 
other match {
-    case o: DoubleAccumulator => _sum += o.sum
-    case _ => throw new UnsupportedOperationException(
-      s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
-  }
-
-  private[spark] def setValue(newValue: Double): Unit = _sum = newValue
-
-  override def localValue: jl.Double = _sum
-}
-
-
-class AverageAccumulator extends NewAccumulator[jl.Double, jl.Double] {
-  private[this] var _sum = 0.0
-  private[this] var _count = 0L
-
-  override def isZero(): Boolean = _sum == 0.0 && _count == 0
-
-  override def copyAndReset(): AverageAccumulator = new AverageAccumulator
-
-  override def add(v: jl.Double): Unit = {
-    _sum += v
-    _count += 1
-  }
-
-  def add(d: Double): Unit = {
-    _sum += d
-    _count += 1
-  }
-
-  override def merge(other: NewAccumulator[jl.Double, jl.Double]): Unit = 
other match {
-    case o: AverageAccumulator =>
-      _sum += o.sum
-      _count += o.count
-    case _ => throw new UnsupportedOperationException(
-      s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
-  }
-
-  override def localValue: jl.Double = if (_count == 0) {
-    Double.NaN
-  } else {
-    _sum / _count
-  }
-
-  def sum: Double = _sum
-
-  def count: Long = _count
-}
-
-
-class ListAccumulator[T] extends NewAccumulator[T, java.util.List[T]] {
-  private[this] val _list: java.util.List[T] = new java.util.ArrayList[T]
-
-  override def isZero(): Boolean = _list.isEmpty
-
-  override def copyAndReset(): ListAccumulator[T] = new ListAccumulator
-
-  override def add(v: T): Unit = _list.add(v)
-
-  override def merge(other: NewAccumulator[T, java.util.List[T]]): Unit = 
other match {
-    case o: ListAccumulator[T] => _list.addAll(o.localValue)
-    case _ => throw new UnsupportedOperationException(
-      s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
-  }
-
-  override def localValue: java.util.List[T] = 
java.util.Collections.unmodifiableList(_list)
-
-  private[spark] def setValue(newValue: java.util.List[T]): Unit = {
-    _list.clear()
-    _list.addAll(newValue)
-  }
-}
-
-
-class LegacyAccumulatorWrapper[R, T](
-    initialValue: R,
-    param: org.apache.spark.AccumulableParam[R, T]) extends NewAccumulator[T, 
R] {
-  private[spark] var _value = initialValue  // Current value on driver
-
-  override def isZero(): Boolean = _value == param.zero(initialValue)
-
-  override def copyAndReset(): LegacyAccumulatorWrapper[R, T] = {
-    val acc = new LegacyAccumulatorWrapper(initialValue, param)
-    acc._value = param.zero(initialValue)
-    acc
-  }
-
-  override def add(v: T): Unit = _value = param.addAccumulator(_value, v)
-
-  override def merge(other: NewAccumulator[T, R]): Unit = other match {
-    case o: LegacyAccumulatorWrapper[R, T] => _value = 
param.addInPlace(_value, o.localValue)
-    case _ => throw new UnsupportedOperationException(
-      s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
-  }
-
-  override def localValue: R = _value
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/7d63c36e/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 2cb3ed0..d0f88d4 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1282,7 +1282,7 @@ class SparkContext(config: SparkConf) extends Logging 
with ExecutorAllocationCli
    * Register the given accumulator.  Note that accumulators must be 
registered before use, or it
    * will throw exception.
    */
-  def register(acc: NewAccumulator[_, _]): Unit = {
+  def register(acc: AccumulatorV2[_, _]): Unit = {
     acc.register(this)
   }
 
@@ -1290,7 +1290,7 @@ class SparkContext(config: SparkConf) extends Logging 
with ExecutorAllocationCli
    * Register the given accumulator with given name.  Note that accumulators 
must be registered
    * before use, or it will throw exception.
    */
-  def register(acc: NewAccumulator[_, _], name: String): Unit = {
+  def register(acc: AccumulatorV2[_, _], name: String): Unit = {
     acc.register(this, name = Some(name))
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7d63c36e/core/src/main/scala/org/apache/spark/TaskContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala 
b/core/src/main/scala/org/apache/spark/TaskContext.scala
index 9e53257..1a8f8cf 100644
--- a/core/src/main/scala/org/apache/spark/TaskContext.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContext.scala
@@ -188,6 +188,6 @@ abstract class TaskContext extends Serializable {
    * Register an accumulator that belongs to this task. Accumulators must call 
this method when
    * deserializing in executors.
    */
-  private[spark] def registerAccumulator(a: NewAccumulator[_, _]): Unit
+  private[spark] def registerAccumulator(a: AccumulatorV2[_, _]): Unit
 
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/7d63c36e/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala 
b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
index bc3807f..c904e08 100644
--- a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
@@ -122,7 +122,7 @@ private[spark] class TaskContextImpl(
   override def getMetricsSources(sourceName: String): Seq[Source] =
     metricsSystem.getSourcesByName(sourceName)
 
-  private[spark] override def registerAccumulator(a: NewAccumulator[_, _]): 
Unit = {
+  private[spark] override def registerAccumulator(a: AccumulatorV2[_, _]): 
Unit = {
     taskMetrics.registerAccumulator(a)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7d63c36e/core/src/main/scala/org/apache/spark/TaskEndReason.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala 
b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
index 82ba2d0..ef333e3 100644
--- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala
+++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
@@ -118,7 +118,7 @@ case class ExceptionFailure(
     fullStackTrace: String,
     private val exceptionWrapper: Option[ThrowableSerializationWrapper],
     accumUpdates: Seq[AccumulableInfo] = Seq.empty,
-    private[spark] var accums: Seq[NewAccumulator[_, _]] = Nil)
+    private[spark] var accums: Seq[AccumulatorV2[_, _]] = Nil)
   extends TaskFailedReason {
 
   /**
@@ -138,7 +138,7 @@ case class ExceptionFailure(
     this(e, accumUpdates, preserveCause = true)
   }
 
-  private[spark] def withAccums(accums: Seq[NewAccumulator[_, _]]): 
ExceptionFailure = {
+  private[spark] def withAccums(accums: Seq[AccumulatorV2[_, _]]): 
ExceptionFailure = {
     this.accums = accums
     this
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/7d63c36e/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala 
b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 4d61f7e..4f74dc9 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -353,7 +353,7 @@ private[spark] class Executor(
           logError(s"Exception in $taskName (TID $taskId)", t)
 
           // Collect latest accumulator values to report back to the driver
-          val accums: Seq[NewAccumulator[_, _]] =
+          val accums: Seq[AccumulatorV2[_, _]] =
             if (task != null) {
               task.metrics.setExecutorRunTime(System.currentTimeMillis() - 
taskStart)
               task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime)
@@ -478,7 +478,7 @@ private[spark] class Executor(
   /** Reports heartbeat and metrics for active tasks to the driver. */
   private def reportHeartBeat(): Unit = {
     // list of (task id, accumUpdates) to send back to the driver
-    val accumUpdates = new ArrayBuffer[(Long, Seq[NewAccumulator[_, _]])]()
+    val accumUpdates = new ArrayBuffer[(Long, Seq[AccumulatorV2[_, _]])]()
     val curGCTime = computeTotalGcTime()
 
     for (taskRunner <- runningTasks.values().asScala) {

http://git-wip-us.apache.org/repos/asf/spark/blob/7d63c36e/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala 
b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index 0b64917..56d034f 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -201,7 +201,7 @@ class TaskMetrics private[spark] () extends Serializable {
     output.RECORDS_WRITTEN -> outputMetrics._recordsWritten
   ) ++ testAccum.map(TEST_ACCUM -> _)
 
-  @transient private[spark] lazy val internalAccums: Seq[NewAccumulator[_, _]] 
=
+  @transient private[spark] lazy val internalAccums: Seq[AccumulatorV2[_, _]] =
     nameToAccums.values.toIndexedSeq
 
   /* ========================== *
@@ -217,13 +217,13 @@ class TaskMetrics private[spark] () extends Serializable {
   /**
    * External accumulators registered with this task.
    */
-  @transient private lazy val externalAccums = new 
ArrayBuffer[NewAccumulator[_, _]]
+  @transient private lazy val externalAccums = new 
ArrayBuffer[AccumulatorV2[_, _]]
 
-  private[spark] def registerAccumulator(a: NewAccumulator[_, _]): Unit = {
+  private[spark] def registerAccumulator(a: AccumulatorV2[_, _]): Unit = {
     externalAccums += a
   }
 
-  private[spark] def accumulators(): Seq[NewAccumulator[_, _]] = 
internalAccums ++ externalAccums
+  private[spark] def accumulators(): Seq[AccumulatorV2[_, _]] = internalAccums 
++ externalAccums
 }
 
 
@@ -271,15 +271,15 @@ private[spark] object TaskMetrics extends Logging {
   /**
    * Construct a [[TaskMetrics]] object from a list of accumulator updates, 
called on driver only.
    */
-  def fromAccumulators(accums: Seq[NewAccumulator[_, _]]): TaskMetrics = {
+  def fromAccumulators(accums: Seq[AccumulatorV2[_, _]]): TaskMetrics = {
     val tm = new TaskMetrics
     val (internalAccums, externalAccums) =
       accums.partition(a => a.name.isDefined && 
tm.nameToAccums.contains(a.name.get))
 
     internalAccums.foreach { acc =>
-      val tmAcc = 
tm.nameToAccums(acc.name.get).asInstanceOf[NewAccumulator[Any, Any]]
+      val tmAcc = 
tm.nameToAccums(acc.name.get).asInstanceOf[AccumulatorV2[Any, Any]]
       tmAcc.metadata = acc.metadata
-      tmAcc.merge(acc.asInstanceOf[NewAccumulator[Any, Any]])
+      tmAcc.merge(acc.asInstanceOf[AccumulatorV2[Any, Any]])
     }
 
     tm.externalAccums ++= externalAccums
@@ -289,7 +289,7 @@ private[spark] object TaskMetrics extends Logging {
 
 
 private[spark] class BlockStatusesAccumulator
-  extends NewAccumulator[(BlockId, BlockStatus), Seq[(BlockId, BlockStatus)]] {
+  extends AccumulatorV2[(BlockId, BlockStatus), Seq[(BlockId, BlockStatus)]] {
   private[this] var _seq = ArrayBuffer.empty[(BlockId, BlockStatus)]
 
   override def isZero(): Boolean = _seq.isEmpty
@@ -298,7 +298,7 @@ private[spark] class BlockStatusesAccumulator
 
   override def add(v: (BlockId, BlockStatus)): Unit = _seq += v
 
-  override def merge(other: NewAccumulator[(BlockId, BlockStatus), 
Seq[(BlockId, BlockStatus)]])
+  override def merge(other: AccumulatorV2[(BlockId, BlockStatus), 
Seq[(BlockId, BlockStatus)]])
   : Unit = other match {
     case o: BlockStatusesAccumulator => _seq ++= o.localValue
     case _ => throw new UnsupportedOperationException(

http://git-wip-us.apache.org/repos/asf/spark/blob/7d63c36e/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index a96d5f6..4dfd532 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -209,7 +209,7 @@ class DAGScheduler(
       task: Task[_],
       reason: TaskEndReason,
       result: Any,
-      accumUpdates: Seq[NewAccumulator[_, _]],
+      accumUpdates: Seq[AccumulatorV2[_, _]],
       taskInfo: TaskInfo): Unit = {
     eventProcessLoop.post(
       CompletionEvent(task, reason, result, accumUpdates, taskInfo))
@@ -1091,14 +1091,14 @@ class DAGScheduler(
       event.accumUpdates.foreach { updates =>
         val id = updates.id
         // Find the corresponding accumulator on the driver and update it
-        val acc: NewAccumulator[Any, Any] = AccumulatorContext.get(id) match {
-          case Some(accum) => accum.asInstanceOf[NewAccumulator[Any, Any]]
+        val acc: AccumulatorV2[Any, Any] = AccumulatorContext.get(id) match {
+          case Some(accum) => accum.asInstanceOf[AccumulatorV2[Any, Any]]
           case None =>
             throw new SparkException(s"attempted to access non-existent 
accumulator $id")
         }
-        acc.merge(updates.asInstanceOf[NewAccumulator[Any, Any]])
+        acc.merge(updates.asInstanceOf[AccumulatorV2[Any, Any]])
         // To avoid UI cruft, ignore cases where value wasn't updated
-        if (acc.name.isDefined && !updates.isZero()) {
+        if (acc.name.isDefined && !updates.isZero) {
           stage.latestInfo.accumulables(id) = acc.toInfo(None, Some(acc.value))
           event.taskInfo.accumulables += acc.toInfo(Some(updates.value), 
Some(acc.value))
         }

http://git-wip-us.apache.org/repos/asf/spark/blob/7d63c36e/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
index e57a224..0a2c2dc 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
@@ -71,7 +71,7 @@ private[scheduler] case class CompletionEvent(
     task: Task[_],
     reason: TaskEndReason,
     result: Any,
-    accumUpdates: Seq[NewAccumulator[_, _]],
+    accumUpdates: Seq[AccumulatorV2[_, _]],
     taskInfo: TaskInfo)
   extends DAGSchedulerEvent
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7d63c36e/core/src/main/scala/org/apache/spark/scheduler/Task.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala 
b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
index e7ca6ef..362f8e5 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
@@ -153,7 +153,7 @@ private[spark] abstract class Task[T](
    * Collect the latest values of accumulators used in this task. If the task 
failed,
    * filter out the accumulators whose values should not be included on 
failures.
    */
-  def collectAccumulatorUpdates(taskFailed: Boolean = false): 
Seq[NewAccumulator[_, _]] = {
+  def collectAccumulatorUpdates(taskFailed: Boolean = false): 
Seq[AccumulatorV2[_, _]] = {
     if (context != null) {
       context.taskMetrics.accumulators().filter { a => !taskFailed || 
a.countFailedValues }
     } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/7d63c36e/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
index b472c55..69ce00f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
@@ -22,7 +22,7 @@ import java.nio.ByteBuffer
 
 import scala.collection.mutable.ArrayBuffer
 
-import org.apache.spark.{NewAccumulator, SparkEnv}
+import org.apache.spark.{AccumulatorV2, SparkEnv}
 import org.apache.spark.storage.BlockId
 import org.apache.spark.util.Utils
 
@@ -36,7 +36,7 @@ private[spark] case class IndirectTaskResult[T](blockId: 
BlockId, size: Int)
 /** A TaskResult that contains the task's return value and accumulator 
updates. */
 private[spark] class DirectTaskResult[T](
     var valueBytes: ByteBuffer,
-    var accumUpdates: Seq[NewAccumulator[_, _]])
+    var accumUpdates: Seq[AccumulatorV2[_, _]])
   extends TaskResult[T] with Externalizable {
 
   private var valueObjectDeserialized = false
@@ -61,9 +61,9 @@ private[spark] class DirectTaskResult[T](
     if (numUpdates == 0) {
       accumUpdates = null
     } else {
-      val _accumUpdates = new ArrayBuffer[NewAccumulator[_, _]]
+      val _accumUpdates = new ArrayBuffer[AccumulatorV2[_, _]]
       for (i <- 0 until numUpdates) {
-        _accumUpdates += in.readObject.asInstanceOf[NewAccumulator[_, _]]
+        _accumUpdates += in.readObject.asInstanceOf[AccumulatorV2[_, _]]
       }
       accumUpdates = _accumUpdates
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/7d63c36e/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
index 75a0c56..9881a10 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.scheduler
 
-import org.apache.spark.NewAccumulator
+import org.apache.spark.AccumulatorV2
 import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
 import org.apache.spark.storage.BlockManagerId
 
@@ -67,7 +67,7 @@ private[spark] trait TaskScheduler {
    */
   def executorHeartbeatReceived(
       execId: String,
-      accumUpdates: Array[(Long, Seq[NewAccumulator[_, _]])],
+      accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])],
       blockManagerId: BlockManagerId): Boolean
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/7d63c36e/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 8fa4aa1..666b636 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -384,7 +384,7 @@ private[spark] class TaskSchedulerImpl(
    */
   override def executorHeartbeatReceived(
       execId: String,
-      accumUpdates: Array[(Long, Seq[NewAccumulator[_, _]])],
+      accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])],
       blockManagerId: BlockManagerId): Boolean = {
     // (taskId, stageId, stageAttemptId, accumUpdates)
     val accumUpdatesWithTaskIds: Array[(Long, Int, Int, Seq[AccumulableInfo])] 
= synchronized {

http://git-wip-us.apache.org/repos/asf/spark/blob/7d63c36e/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index b79f643..b724050 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -647,7 +647,7 @@ private[spark] class TaskSetManager(
     info.markFailed()
     val index = info.index
     copiesRunning(index) -= 1
-    var accumUpdates: Seq[NewAccumulator[_, _]] = Seq.empty
+    var accumUpdates: Seq[AccumulatorV2[_, _]] = Seq.empty
     val failureReason = s"Lost task ${info.id} in stage ${taskSet.id} (TID 
$tid, ${info.host}): " +
       reason.asInstanceOf[TaskFailedReason].toErrorString
     val failureException: Option[Throwable] = reason match {

http://git-wip-us.apache.org/repos/asf/spark/blob/7d63c36e/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala 
b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
index 9c90049..09eb9c1 100644
--- a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
@@ -273,7 +273,7 @@ private[spark] object AccumulatorSuite {
    * Make an [[AccumulableInfo]] out of an [[Accumulable]] with the intent to 
use the
    * info as an accumulator update.
    */
-  def makeInfo(a: NewAccumulator[_, _]): AccumulableInfo = 
a.toInfo(Some(a.localValue), None)
+  def makeInfo(a: AccumulatorV2[_, _]): AccumulableInfo = 
a.toInfo(Some(a.localValue), None)
 
   /**
    * Run one or more Spark jobs and verify that in at least one job the peak 
execution memory

http://git-wip-us.apache.org/repos/asf/spark/blob/7d63c36e/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala 
b/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala
index 688eb6b..25977a4 100644
--- a/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala
@@ -213,7 +213,7 @@ class InternalAccumulatorSuite extends SparkFunSuite with 
LocalSparkContext {
   private class SaveAccumContextCleaner(sc: SparkContext) extends 
ContextCleaner(sc) {
     private val accumsRegistered = new ArrayBuffer[Long]
 
-    override def registerAccumulatorForCleanup(a: NewAccumulator[_, _]): Unit 
= {
+    override def registerAccumulatorForCleanup(a: AccumulatorV2[_, _]): Unit = 
{
       accumsRegistered += a.id
       super.registerAccumulatorForCleanup(a)
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/7d63c36e/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala 
b/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala
index 94f6e1a..27a1e7b 100644
--- a/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala
@@ -203,7 +203,7 @@ class TaskMetricsSuite extends SparkFunSuite {
     acc1.add(1)
     acc2.add(2)
     val newUpdates = tm.accumulators()
-      .map(a => (a.id, a.asInstanceOf[NewAccumulator[Any, Any]])).toMap
+      .map(a => (a.id, a.asInstanceOf[AccumulatorV2[Any, Any]])).toMap
     assert(newUpdates.contains(acc1.id))
     assert(newUpdates.contains(acc2.id))
     assert(newUpdates.contains(acc3.id))
@@ -230,8 +230,8 @@ private[spark] object TaskMetricsSuite extends Assertions {
    * Note: this does NOT check accumulator ID equality.
    */
   def assertUpdatesEquals(
-      updates1: Seq[NewAccumulator[_, _]],
-      updates2: Seq[NewAccumulator[_, _]]): Unit = {
+      updates1: Seq[AccumulatorV2[_, _]],
+      updates2: Seq[AccumulatorV2[_, _]]): Unit = {
     assert(updates1.size === updates2.size)
     updates1.zip(updates2).foreach { case (acc1, acc2) =>
       // do not assert ID equals here

http://git-wip-us.apache.org/repos/asf/spark/blob/7d63c36e/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 9912d1f..5a5c3a0 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -112,7 +112,7 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with Timeou
     override def stop() = {}
     override def executorHeartbeatReceived(
         execId: String,
-        accumUpdates: Array[(Long, Seq[NewAccumulator[_, _]])],
+        accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])],
         blockManagerId: BlockManagerId): Boolean = true
     override def submitTasks(taskSet: TaskSet) = {
       // normally done by TaskSetManager
@@ -483,7 +483,7 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with Timeou
       override def defaultParallelism(): Int = 2
       override def executorHeartbeatReceived(
           execId: String,
-          accumUpdates: Array[(Long, Seq[NewAccumulator[_, _]])],
+          accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])],
           blockManagerId: BlockManagerId): Boolean = true
       override def executorLost(executorId: String, reason: 
ExecutorLossReason): Unit = {}
       override def applicationAttemptId(): Option[String] = None
@@ -2012,7 +2012,7 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with Timeou
       task: Task[_],
       reason: TaskEndReason,
       result: Any,
-      extraAccumUpdates: Seq[NewAccumulator[_, _]] = Seq.empty,
+      extraAccumUpdates: Seq[AccumulatorV2[_, _]] = Seq.empty,
       taskInfo: TaskInfo = createFakeTaskInfo()): CompletionEvent = {
     val accumUpdates = reason match {
       case Success => task.metrics.accumulators()

http://git-wip-us.apache.org/repos/asf/spark/blob/7d63c36e/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala
index 16027d9..72ac848 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.scheduler
 
-import org.apache.spark.{LocalSparkContext, NewAccumulator, SparkConf, 
SparkContext, SparkFunSuite}
+import org.apache.spark.{AccumulatorV2, LocalSparkContext, SparkConf, 
SparkContext, SparkFunSuite}
 import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
 import org.apache.spark.storage.BlockManagerId
 
@@ -67,6 +67,6 @@ private class DummyTaskScheduler extends TaskScheduler {
   override def applicationAttemptId(): Option[String] = None
   def executorHeartbeatReceived(
       execId: String,
-      accumUpdates: Array[(Long, Seq[NewAccumulator[_, _]])],
+      accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])],
       blockManagerId: BlockManagerId): Boolean = true
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/7d63c36e/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index 339fc42..122a3ec 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -37,7 +37,7 @@ class FakeDAGScheduler(sc: SparkContext, taskScheduler: 
FakeTaskScheduler)
       task: Task[_],
       reason: TaskEndReason,
       result: Any,
-      accumUpdates: Seq[NewAccumulator[_, _]],
+      accumUpdates: Seq[AccumulatorV2[_, _]],
       taskInfo: TaskInfo) {
     taskScheduler.endedTasks(taskInfo.index) = reason
   }
@@ -184,7 +184,7 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
     val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
     val taskSet = FakeTask.createTaskSet(3)
     val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES)
-    val accumUpdatesByTask: Array[Seq[NewAccumulator[_, _]]] = 
taskSet.tasks.map { task =>
+    val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = 
taskSet.tasks.map { task =>
       task.metrics.internalAccums
     }
 
@@ -791,7 +791,7 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
 
   private def createTaskResult(
       id: Int,
-      accumUpdates: Seq[NewAccumulator[_, _]] = Seq.empty): 
DirectTaskResult[Int] = {
+      accumUpdates: Seq[AccumulatorV2[_, _]] = Seq.empty): 
DirectTaskResult[Int] = {
     val valueSer = SparkEnv.get.serializer.newInstance()
     new DirectTaskResult[Int](valueSer.serialize(id), accumUpdates)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/7d63c36e/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
index 7bf9225..40c00ee 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
@@ -19,12 +19,12 @@ package org.apache.spark.sql.execution.metric
 
 import java.text.NumberFormat
 
-import org.apache.spark.{NewAccumulator, SparkContext}
+import org.apache.spark.{AccumulatorV2, SparkContext}
 import org.apache.spark.scheduler.AccumulableInfo
 import org.apache.spark.util.Utils
 
 
-class SQLMetric(val metricType: String, initValue: Long = 0L) extends 
NewAccumulator[Long, Long] {
+class SQLMetric(val metricType: String, initValue: Long = 0L) extends 
AccumulatorV2[Long, Long] {
   // This is a workaround for SPARK-11013.
   // We may use -1 as initial value of the accumulator, if the accumulator is 
valid, we will
   // update it at the end of task and the value will be at least 0. Then we 
can filter out the -1
@@ -33,7 +33,7 @@ class SQLMetric(val metricType: String, initValue: Long = 0L) 
extends NewAccumul
 
   override def copyAndReset(): SQLMetric = new SQLMetric(metricType, initValue)
 
-  override def merge(other: NewAccumulator[Long, Long]): Unit = other match {
+  override def merge(other: AccumulatorV2[Long, Long]): Unit = other match {
     case o: SQLMetric => _value += o.localValue
     case _ => throw new UnsupportedOperationException(
       s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to