Repository: spark
Updated Branches:
  refs/heads/master 5e492e9d5 -> 302bb569f


[SPARK-12884] Move classes to their own files for readability

This is a small step in implementing SPARK-10620, which migrates `TaskMetrics` 
to accumulators. This patch is strictly a cleanup patch and introduces no 
change in functionality. It literally just moves classes to their own files to 
avoid having single monolithic ones that contain 10 different classes.

Parent PR: #10717

Author: Andrew Or <and...@databricks.com>

Closes #10810 from andrewor14/move-things.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/302bb569
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/302bb569
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/302bb569

Branch: refs/heads/master
Commit: 302bb569f3e1f09e2e7cea5e4e7f5c6953b0fc82
Parents: 5e492e9
Author: Andrew Or <and...@databricks.com>
Authored: Mon Jan 18 13:27:18 2016 -0800
Committer: Reynold Xin <r...@databricks.com>
Committed: Mon Jan 18 13:27:18 2016 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/Accumulable.scala    | 226 +++++++++++
 .../scala/org/apache/spark/Accumulator.scala    | 160 ++++++++
 .../scala/org/apache/spark/Accumulators.scala   | 399 -------------------
 .../org/apache/spark/InternalAccumulator.scala  |  58 +++
 .../apache/spark/executor/InputMetrics.scala    |  77 ++++
 .../apache/spark/executor/OutputMetrics.scala   |  53 +++
 .../spark/executor/ShuffleReadMetrics.scala     |  87 ++++
 .../spark/executor/ShuffleWriteMetrics.scala    |  53 +++
 .../org/apache/spark/executor/TaskMetrics.scala | 186 +--------
 9 files changed, 716 insertions(+), 583 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/302bb569/core/src/main/scala/org/apache/spark/Accumulable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Accumulable.scala 
b/core/src/main/scala/org/apache/spark/Accumulable.scala
new file mode 100644
index 0000000..a456d42
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/Accumulable.scala
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.io.{ObjectInputStream, Serializable}
+
+import scala.collection.generic.Growable
+import scala.reflect.ClassTag
+
+import org.apache.spark.serializer.JavaSerializer
+import org.apache.spark.util.Utils
+
+
+/**
+ * A data type that can be accumulated, ie has an commutative and associative 
"add" operation,
+ * but where the result type, `R`, may be different from the element type 
being added, `T`.
+ *
+ * You must define how to add data, and how to merge two of these together.  
For some data types,
+ * such as a counter, these might be the same operation. In that case, you can 
use the simpler
+ * [[org.apache.spark.Accumulator]]. They won't always be the same, though -- 
e.g., imagine you are
+ * accumulating a set. You will add items to the set, and you will union two 
sets together.
+ *
+ * @param initialValue initial value of accumulator
+ * @param param helper object defining how to add elements of type `R` and `T`
+ * @param name human-readable name for use in Spark's web UI
+ * @param internal if this [[Accumulable]] is internal. Internal 
[[Accumulable]]s will be reported
+ *                 to the driver via heartbeats. For internal 
[[Accumulable]]s, `R` must be
+ *                 thread safe so that they can be reported correctly.
+ * @tparam R the full accumulated data (result type)
+ * @tparam T partial data that can be added in
+ */
+class Accumulable[R, T] private[spark] (
+    initialValue: R,
+    param: AccumulableParam[R, T],
+    val name: Option[String],
+    internal: Boolean)
+  extends Serializable {
+
+  private[spark] def this(
+      @transient initialValue: R, param: AccumulableParam[R, T], internal: 
Boolean) = {
+    this(initialValue, param, None, internal)
+  }
+
+  def this(@transient initialValue: R, param: AccumulableParam[R, T], name: 
Option[String]) =
+    this(initialValue, param, name, false)
+
+  def this(@transient initialValue: R, param: AccumulableParam[R, T]) =
+    this(initialValue, param, None)
+
+  val id: Long = Accumulators.newId
+
+  @volatile @transient private var value_ : R = initialValue // Current value 
on master
+  val zero = param.zero(initialValue)  // Zero value to be passed to workers
+  private var deserialized = false
+
+  Accumulators.register(this)
+
+  /**
+   * If this [[Accumulable]] is internal. Internal [[Accumulable]]s will be 
reported to the driver
+   * via heartbeats. For internal [[Accumulable]]s, `R` must be thread safe so 
that they can be
+   * reported correctly.
+   */
+  private[spark] def isInternal: Boolean = internal
+
+  /**
+   * Add more data to this accumulator / accumulable
+   * @param term the data to add
+   */
+  def += (term: T) { value_ = param.addAccumulator(value_, term) }
+
+  /**
+   * Add more data to this accumulator / accumulable
+   * @param term the data to add
+   */
+  def add(term: T) { value_ = param.addAccumulator(value_, term) }
+
+  /**
+   * Merge two accumulable objects together
+   *
+   * Normally, a user will not want to use this version, but will instead call 
`+=`.
+   * @param term the other `R` that will get merged with this
+   */
+  def ++= (term: R) { value_ = param.addInPlace(value_, term)}
+
+  /**
+   * Merge two accumulable objects together
+   *
+   * Normally, a user will not want to use this version, but will instead call 
`add`.
+   * @param term the other `R` that will get merged with this
+   */
+  def merge(term: R) { value_ = param.addInPlace(value_, term)}
+
+  /**
+   * Access the accumulator's current value; only allowed on master.
+   */
+  def value: R = {
+    if (!deserialized) {
+      value_
+    } else {
+      throw new UnsupportedOperationException("Can't read accumulator value in 
task")
+    }
+  }
+
+  /**
+   * Get the current value of this accumulator from within a task.
+   *
+   * This is NOT the global value of the accumulator.  To get the global value 
after a
+   * completed operation on the dataset, call `value`.
+   *
+   * The typical use of this method is to directly mutate the local value, 
eg., to add
+   * an element to a Set.
+   */
+  def localValue: R = value_
+
+  /**
+   * Set the accumulator's value; only allowed on master.
+   */
+  def value_= (newValue: R) {
+    if (!deserialized) {
+      value_ = newValue
+    } else {
+      throw new UnsupportedOperationException("Can't assign accumulator value 
in task")
+    }
+  }
+
+  /**
+   * Set the accumulator's value; only allowed on master
+   */
+  def setValue(newValue: R) {
+    this.value = newValue
+  }
+
+  // Called by Java when deserializing an object
+  private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException 
{
+    in.defaultReadObject()
+    value_ = zero
+    deserialized = true
+    // Automatically register the accumulator when it is deserialized with the 
task closure.
+    //
+    // Note internal accumulators sent with task are deserialized before the 
TaskContext is created
+    // and are registered in the TaskContext constructor. Other internal 
accumulators, such SQL
+    // metrics, still need to register here.
+    val taskContext = TaskContext.get()
+    if (taskContext != null) {
+      taskContext.registerAccumulator(this)
+    }
+  }
+
+  override def toString: String = if (value_ == null) "null" else 
value_.toString
+}
+
+
+/**
+ * Helper object defining how to accumulate values of a particular type. An 
implicit
+ * AccumulableParam needs to be available when you create [[Accumulable]]s of 
a specific type.
+ *
+ * @tparam R the full accumulated data (result type)
+ * @tparam T partial data that can be added in
+ */
+trait AccumulableParam[R, T] extends Serializable {
+  /**
+   * Add additional data to the accumulator value. Is allowed to modify and 
return `r`
+   * for efficiency (to avoid allocating objects).
+   *
+   * @param r the current value of the accumulator
+   * @param t the data to be added to the accumulator
+   * @return the new value of the accumulator
+   */
+  def addAccumulator(r: R, t: T): R
+
+  /**
+   * Merge two accumulated values together. Is allowed to modify and return 
the first value
+   * for efficiency (to avoid allocating objects).
+   *
+   * @param r1 one set of accumulated data
+   * @param r2 another set of accumulated data
+   * @return both data sets merged together
+   */
+  def addInPlace(r1: R, r2: R): R
+
+  /**
+   * Return the "zero" (identity) value for an accumulator type, given its 
initial value. For
+   * example, if R was a vector of N dimensions, this would return a vector of 
N zeroes.
+   */
+  def zero(initialValue: R): R
+}
+
+
+private[spark] class
+GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with 
Serializable: ClassTag, T]
+  extends AccumulableParam[R, T] {
+
+  def addAccumulator(growable: R, elem: T): R = {
+    growable += elem
+    growable
+  }
+
+  def addInPlace(t1: R, t2: R): R = {
+    t1 ++= t2
+    t1
+  }
+
+  def zero(initialValue: R): R = {
+    // We need to clone initialValue, but it's hard to specify that R should 
also be Cloneable.
+    // Instead we'll serialize it to a buffer and load it back.
+    val ser = new JavaSerializer(new SparkConf(false)).newInstance()
+    val copy = ser.deserialize[R](ser.serialize(initialValue))
+    copy.clear()   // In case it contained stuff
+    copy
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/302bb569/core/src/main/scala/org/apache/spark/Accumulator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Accumulator.scala 
b/core/src/main/scala/org/apache/spark/Accumulator.scala
new file mode 100644
index 0000000..007136e
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/Accumulator.scala
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import scala.collection.{mutable, Map}
+import scala.ref.WeakReference
+
+
+/**
+ * A simpler value of [[Accumulable]] where the result type being accumulated 
is the same
+ * as the types of elements being merged, i.e. variables that are only "added" 
to through an
+ * associative operation and can therefore be efficiently supported in 
parallel. They can be used
+ * to implement counters (as in MapReduce) or sums. Spark natively supports 
accumulators of numeric
+ * value types, and programmers can add support for new types.
+ *
+ * An accumulator is created from an initial value `v` by calling 
[[SparkContext#accumulator]].
+ * Tasks running on the cluster can then add to it using the 
[[Accumulable#+=]] operator.
+ * However, they cannot read its value. Only the driver program can read the 
accumulator's value,
+ * using its value method.
+ *
+ * The interpreter session below shows an accumulator being used to add up the 
elements of an array:
+ *
+ * {{{
+ * scala> val accum = sc.accumulator(0)
+ * accum: spark.Accumulator[Int] = 0
+ *
+ * scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
+ * ...
+ * 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
+ *
+ * scala> accum.value
+ * res2: Int = 10
+ * }}}
+ *
+ * @param initialValue initial value of accumulator
+ * @param param helper object defining how to add elements of type `T`
+ * @tparam T result type
+ */
+class Accumulator[T] private[spark] (
+    @transient private[spark] val initialValue: T,
+    param: AccumulatorParam[T],
+    name: Option[String],
+    internal: Boolean)
+  extends Accumulable[T, T](initialValue, param, name, internal) {
+
+  def this(initialValue: T, param: AccumulatorParam[T], name: Option[String]) 
= {
+    this(initialValue, param, name, false)
+  }
+
+  def this(initialValue: T, param: AccumulatorParam[T]) = {
+    this(initialValue, param, None, false)
+  }
+}
+
+
+// TODO: The multi-thread support in accumulators is kind of lame; check
+// if there's a more intuitive way of doing it right
+private[spark] object Accumulators extends Logging {
+  /**
+   * This global map holds the original accumulator objects that are created 
on the driver.
+   * It keeps weak references to these objects so that accumulators can be 
garbage-collected
+   * once the RDDs and user-code that reference them are cleaned up.
+   */
+  val originals = mutable.Map[Long, WeakReference[Accumulable[_, _]]]()
+
+  private var lastId: Long = 0
+
+  def newId(): Long = synchronized {
+    lastId += 1
+    lastId
+  }
+
+  def register(a: Accumulable[_, _]): Unit = synchronized {
+    originals(a.id) = new WeakReference[Accumulable[_, _]](a)
+  }
+
+  def remove(accId: Long) {
+    synchronized {
+      originals.remove(accId)
+    }
+  }
+
+  // Add values to the original accumulators with some given IDs
+  def add(values: Map[Long, Any]): Unit = synchronized {
+    for ((id, value) <- values) {
+      if (originals.contains(id)) {
+        // Since we are now storing weak references, we must check whether the 
underlying data
+        // is valid.
+        originals(id).get match {
+          case Some(accum) => accum.asInstanceOf[Accumulable[Any, Any]] ++= 
value
+          case None =>
+            throw new IllegalAccessError("Attempted to access garbage 
collected Accumulator.")
+        }
+      } else {
+        logWarning(s"Ignoring accumulator update for unknown accumulator id 
$id")
+      }
+    }
+  }
+
+}
+
+
+/**
+ * A simpler version of [[org.apache.spark.AccumulableParam]] where the only 
data type you can add
+ * in is the same type as the accumulated value. An implicit AccumulatorParam 
object needs to be
+ * available when you create Accumulators of a specific type.
+ *
+ * @tparam T type of value to accumulate
+ */
+trait AccumulatorParam[T] extends AccumulableParam[T, T] {
+  def addAccumulator(t1: T, t2: T): T = {
+    addInPlace(t1, t2)
+  }
+}
+
+
+object AccumulatorParam {
+
+  // The following implicit objects were in SparkContext before 1.2 and users 
had to
+  // `import SparkContext._` to enable them. Now we move them here to make the 
compiler find
+  // them automatically. However, as there are duplicate codes in SparkContext 
for backward
+  // compatibility, please update them accordingly if you modify the following 
implicit objects.
+
+  implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] {
+    def addInPlace(t1: Double, t2: Double): Double = t1 + t2
+    def zero(initialValue: Double): Double = 0.0
+  }
+
+  implicit object IntAccumulatorParam extends AccumulatorParam[Int] {
+    def addInPlace(t1: Int, t2: Int): Int = t1 + t2
+    def zero(initialValue: Int): Int = 0
+  }
+
+  implicit object LongAccumulatorParam extends AccumulatorParam[Long] {
+    def addInPlace(t1: Long, t2: Long): Long = t1 + t2
+    def zero(initialValue: Long): Long = 0L
+  }
+
+  implicit object FloatAccumulatorParam extends AccumulatorParam[Float] {
+    def addInPlace(t1: Float, t2: Float): Float = t1 + t2
+    def zero(initialValue: Float): Float = 0f
+  }
+
+  // TODO: Add AccumulatorParams for other types, e.g. lists and strings
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/302bb569/core/src/main/scala/org/apache/spark/Accumulators.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala 
b/core/src/main/scala/org/apache/spark/Accumulators.scala
deleted file mode 100644
index 5592b75..0000000
--- a/core/src/main/scala/org/apache/spark/Accumulators.scala
+++ /dev/null
@@ -1,399 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark
-
-import java.io.{ObjectInputStream, Serializable}
-
-import scala.collection.generic.Growable
-import scala.collection.Map
-import scala.collection.mutable
-import scala.ref.WeakReference
-import scala.reflect.ClassTag
-
-import org.apache.spark.serializer.JavaSerializer
-import org.apache.spark.util.Utils
-
-/**
- * A data type that can be accumulated, ie has an commutative and associative 
"add" operation,
- * but where the result type, `R`, may be different from the element type 
being added, `T`.
- *
- * You must define how to add data, and how to merge two of these together.  
For some data types,
- * such as a counter, these might be the same operation. In that case, you can 
use the simpler
- * [[org.apache.spark.Accumulator]]. They won't always be the same, though -- 
e.g., imagine you are
- * accumulating a set. You will add items to the set, and you will union two 
sets together.
- *
- * @param initialValue initial value of accumulator
- * @param param helper object defining how to add elements of type `R` and `T`
- * @param name human-readable name for use in Spark's web UI
- * @param internal if this [[Accumulable]] is internal. Internal 
[[Accumulable]]s will be reported
- *                 to the driver via heartbeats. For internal 
[[Accumulable]]s, `R` must be
- *                 thread safe so that they can be reported correctly.
- * @tparam R the full accumulated data (result type)
- * @tparam T partial data that can be added in
- */
-class Accumulable[R, T] private[spark] (
-    initialValue: R,
-    param: AccumulableParam[R, T],
-    val name: Option[String],
-    internal: Boolean)
-  extends Serializable {
-
-  private[spark] def this(
-      @transient initialValue: R, param: AccumulableParam[R, T], internal: 
Boolean) = {
-    this(initialValue, param, None, internal)
-  }
-
-  def this(@transient initialValue: R, param: AccumulableParam[R, T], name: 
Option[String]) =
-    this(initialValue, param, name, false)
-
-  def this(@transient initialValue: R, param: AccumulableParam[R, T]) =
-    this(initialValue, param, None)
-
-  val id: Long = Accumulators.newId
-
-  @volatile @transient private var value_ : R = initialValue // Current value 
on master
-  val zero = param.zero(initialValue)  // Zero value to be passed to workers
-  private var deserialized = false
-
-  Accumulators.register(this)
-
-  /**
-   * If this [[Accumulable]] is internal. Internal [[Accumulable]]s will be 
reported to the driver
-   * via heartbeats. For internal [[Accumulable]]s, `R` must be thread safe so 
that they can be
-   * reported correctly.
-   */
-  private[spark] def isInternal: Boolean = internal
-
-  /**
-   * Add more data to this accumulator / accumulable
-   * @param term the data to add
-   */
-  def += (term: T) { value_ = param.addAccumulator(value_, term) }
-
-  /**
-   * Add more data to this accumulator / accumulable
-   * @param term the data to add
-   */
-  def add(term: T) { value_ = param.addAccumulator(value_, term) }
-
-  /**
-   * Merge two accumulable objects together
-   *
-   * Normally, a user will not want to use this version, but will instead call 
`+=`.
-   * @param term the other `R` that will get merged with this
-   */
-  def ++= (term: R) { value_ = param.addInPlace(value_, term)}
-
-  /**
-   * Merge two accumulable objects together
-   *
-   * Normally, a user will not want to use this version, but will instead call 
`add`.
-   * @param term the other `R` that will get merged with this
-   */
-  def merge(term: R) { value_ = param.addInPlace(value_, term)}
-
-  /**
-   * Access the accumulator's current value; only allowed on master.
-   */
-  def value: R = {
-    if (!deserialized) {
-      value_
-    } else {
-      throw new UnsupportedOperationException("Can't read accumulator value in 
task")
-    }
-  }
-
-  /**
-   * Get the current value of this accumulator from within a task.
-   *
-   * This is NOT the global value of the accumulator.  To get the global value 
after a
-   * completed operation on the dataset, call `value`.
-   *
-   * The typical use of this method is to directly mutate the local value, 
eg., to add
-   * an element to a Set.
-   */
-  def localValue: R = value_
-
-  /**
-   * Set the accumulator's value; only allowed on master.
-   */
-  def value_= (newValue: R) {
-    if (!deserialized) {
-      value_ = newValue
-    } else {
-      throw new UnsupportedOperationException("Can't assign accumulator value 
in task")
-    }
-  }
-
-  /**
-   * Set the accumulator's value; only allowed on master
-   */
-  def setValue(newValue: R) {
-    this.value = newValue
-  }
-
-  // Called by Java when deserializing an object
-  private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException 
{
-    in.defaultReadObject()
-    value_ = zero
-    deserialized = true
-    // Automatically register the accumulator when it is deserialized with the 
task closure.
-    //
-    // Note internal accumulators sent with task are deserialized before the 
TaskContext is created
-    // and are registered in the TaskContext constructor. Other internal 
accumulators, such SQL
-    // metrics, still need to register here.
-    val taskContext = TaskContext.get()
-    if (taskContext != null) {
-      taskContext.registerAccumulator(this)
-    }
-  }
-
-  override def toString: String = if (value_ == null) "null" else 
value_.toString
-}
-
-/**
- * Helper object defining how to accumulate values of a particular type. An 
implicit
- * AccumulableParam needs to be available when you create [[Accumulable]]s of 
a specific type.
- *
- * @tparam R the full accumulated data (result type)
- * @tparam T partial data that can be added in
- */
-trait AccumulableParam[R, T] extends Serializable {
-  /**
-   * Add additional data to the accumulator value. Is allowed to modify and 
return `r`
-   * for efficiency (to avoid allocating objects).
-   *
-   * @param r the current value of the accumulator
-   * @param t the data to be added to the accumulator
-   * @return the new value of the accumulator
-   */
-  def addAccumulator(r: R, t: T): R
-
-  /**
-   * Merge two accumulated values together. Is allowed to modify and return 
the first value
-   * for efficiency (to avoid allocating objects).
-   *
-   * @param r1 one set of accumulated data
-   * @param r2 another set of accumulated data
-   * @return both data sets merged together
-   */
-  def addInPlace(r1: R, r2: R): R
-
-  /**
-   * Return the "zero" (identity) value for an accumulator type, given its 
initial value. For
-   * example, if R was a vector of N dimensions, this would return a vector of 
N zeroes.
-   */
-  def zero(initialValue: R): R
-}
-
-private[spark] class
-GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with 
Serializable: ClassTag, T]
-  extends AccumulableParam[R, T] {
-
-  def addAccumulator(growable: R, elem: T): R = {
-    growable += elem
-    growable
-  }
-
-  def addInPlace(t1: R, t2: R): R = {
-    t1 ++= t2
-    t1
-  }
-
-  def zero(initialValue: R): R = {
-    // We need to clone initialValue, but it's hard to specify that R should 
also be Cloneable.
-    // Instead we'll serialize it to a buffer and load it back.
-    val ser = new JavaSerializer(new SparkConf(false)).newInstance()
-    val copy = ser.deserialize[R](ser.serialize(initialValue))
-    copy.clear()   // In case it contained stuff
-    copy
-  }
-}
-
-/**
- * A simpler value of [[Accumulable]] where the result type being accumulated 
is the same
- * as the types of elements being merged, i.e. variables that are only "added" 
to through an
- * associative operation and can therefore be efficiently supported in 
parallel. They can be used
- * to implement counters (as in MapReduce) or sums. Spark natively supports 
accumulators of numeric
- * value types, and programmers can add support for new types.
- *
- * An accumulator is created from an initial value `v` by calling 
[[SparkContext#accumulator]].
- * Tasks running on the cluster can then add to it using the 
[[Accumulable#+=]] operator.
- * However, they cannot read its value. Only the driver program can read the 
accumulator's value,
- * using its value method.
- *
- * The interpreter session below shows an accumulator being used to add up the 
elements of an array:
- *
- * {{{
- * scala> val accum = sc.accumulator(0)
- * accum: spark.Accumulator[Int] = 0
- *
- * scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
- * ...
- * 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
- *
- * scala> accum.value
- * res2: Int = 10
- * }}}
- *
- * @param initialValue initial value of accumulator
- * @param param helper object defining how to add elements of type `T`
- * @tparam T result type
- */
-class Accumulator[T] private[spark] (
-    @transient private[spark] val initialValue: T,
-    param: AccumulatorParam[T],
-    name: Option[String],
-    internal: Boolean)
-  extends Accumulable[T, T](initialValue, param, name, internal) {
-
-  def this(initialValue: T, param: AccumulatorParam[T], name: Option[String]) 
= {
-    this(initialValue, param, name, false)
-  }
-
-  def this(initialValue: T, param: AccumulatorParam[T]) = {
-    this(initialValue, param, None, false)
-  }
-}
-
-/**
- * A simpler version of [[org.apache.spark.AccumulableParam]] where the only 
data type you can add
- * in is the same type as the accumulated value. An implicit AccumulatorParam 
object needs to be
- * available when you create Accumulators of a specific type.
- *
- * @tparam T type of value to accumulate
- */
-trait AccumulatorParam[T] extends AccumulableParam[T, T] {
-  def addAccumulator(t1: T, t2: T): T = {
-    addInPlace(t1, t2)
-  }
-}
-
-object AccumulatorParam {
-
-  // The following implicit objects were in SparkContext before 1.2 and users 
had to
-  // `import SparkContext._` to enable them. Now we move them here to make the 
compiler find
-  // them automatically. However, as there are duplicate codes in SparkContext 
for backward
-  // compatibility, please update them accordingly if you modify the following 
implicit objects.
-
-  implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] {
-    def addInPlace(t1: Double, t2: Double): Double = t1 + t2
-    def zero(initialValue: Double): Double = 0.0
-  }
-
-  implicit object IntAccumulatorParam extends AccumulatorParam[Int] {
-    def addInPlace(t1: Int, t2: Int): Int = t1 + t2
-    def zero(initialValue: Int): Int = 0
-  }
-
-  implicit object LongAccumulatorParam extends AccumulatorParam[Long] {
-    def addInPlace(t1: Long, t2: Long): Long = t1 + t2
-    def zero(initialValue: Long): Long = 0L
-  }
-
-  implicit object FloatAccumulatorParam extends AccumulatorParam[Float] {
-    def addInPlace(t1: Float, t2: Float): Float = t1 + t2
-    def zero(initialValue: Float): Float = 0f
-  }
-
-  // TODO: Add AccumulatorParams for other types, e.g. lists and strings
-}
-
-// TODO: The multi-thread support in accumulators is kind of lame; check
-// if there's a more intuitive way of doing it right
-private[spark] object Accumulators extends Logging {
-  /**
-   * This global map holds the original accumulator objects that are created 
on the driver.
-   * It keeps weak references to these objects so that accumulators can be 
garbage-collected
-   * once the RDDs and user-code that reference them are cleaned up.
-   */
-  val originals = mutable.Map[Long, WeakReference[Accumulable[_, _]]]()
-
-  private var lastId: Long = 0
-
-  def newId(): Long = synchronized {
-    lastId += 1
-    lastId
-  }
-
-  def register(a: Accumulable[_, _]): Unit = synchronized {
-    originals(a.id) = new WeakReference[Accumulable[_, _]](a)
-  }
-
-  def remove(accId: Long) {
-    synchronized {
-      originals.remove(accId)
-    }
-  }
-
-  // Add values to the original accumulators with some given IDs
-  def add(values: Map[Long, Any]): Unit = synchronized {
-    for ((id, value) <- values) {
-      if (originals.contains(id)) {
-        // Since we are now storing weak references, we must check whether the 
underlying data
-        // is valid.
-        originals(id).get match {
-          case Some(accum) => accum.asInstanceOf[Accumulable[Any, Any]] ++= 
value
-          case None =>
-            throw new IllegalAccessError("Attempted to access garbage 
collected Accumulator.")
-        }
-      } else {
-        logWarning(s"Ignoring accumulator update for unknown accumulator id 
$id")
-      }
-    }
-  }
-
-}
-
-private[spark] object InternalAccumulator {
-  val PEAK_EXECUTION_MEMORY = "peakExecutionMemory"
-  val TEST_ACCUMULATOR = "testAccumulator"
-
-  // For testing only.
-  // This needs to be a def since we don't want to reuse the same accumulator 
across stages.
-  private def maybeTestAccumulator: Option[Accumulator[Long]] = {
-    if (sys.props.contains("spark.testing")) {
-      Some(new Accumulator(
-        0L, AccumulatorParam.LongAccumulatorParam, Some(TEST_ACCUMULATOR), 
internal = true))
-    } else {
-      None
-    }
-  }
-
-  /**
-   * Accumulators for tracking internal metrics.
-   *
-   * These accumulators are created with the stage such that all tasks in the 
stage will
-   * add to the same set of accumulators. We do this to report the 
distribution of accumulator
-   * values across all tasks within each stage.
-   */
-  def create(sc: SparkContext): Seq[Accumulator[Long]] = {
-    val internalAccumulators = Seq(
-        // Execution memory refers to the memory used by internal data 
structures created
-        // during shuffles, aggregations and joins. The value of this 
accumulator should be
-        // approximately the sum of the peak sizes across all such data 
structures created
-        // in this task. For SQL jobs, this only tracks all unsafe operators 
and ExternalSort.
-        new Accumulator(
-          0L, AccumulatorParam.LongAccumulatorParam, 
Some(PEAK_EXECUTION_MEMORY), internal = true)
-      ) ++ maybeTestAccumulator.toSeq
-    internalAccumulators.foreach { accumulator =>
-      sc.cleaner.foreach(_.registerAccumulatorForCleanup(accumulator))
-    }
-    internalAccumulators
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/302bb569/core/src/main/scala/org/apache/spark/InternalAccumulator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/InternalAccumulator.scala 
b/core/src/main/scala/org/apache/spark/InternalAccumulator.scala
new file mode 100644
index 0000000..6ea997c
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/InternalAccumulator.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+
+// This is moved to its own file because many more things will be added to it 
in SPARK-10620.
+private[spark] object InternalAccumulator {
+  val PEAK_EXECUTION_MEMORY = "peakExecutionMemory"
+  val TEST_ACCUMULATOR = "testAccumulator"
+
+  // For testing only.
+  // This needs to be a def since we don't want to reuse the same accumulator 
across stages.
+  private def maybeTestAccumulator: Option[Accumulator[Long]] = {
+    if (sys.props.contains("spark.testing")) {
+      Some(new Accumulator(
+        0L, AccumulatorParam.LongAccumulatorParam, Some(TEST_ACCUMULATOR), 
internal = true))
+    } else {
+      None
+    }
+  }
+
+  /**
+   * Accumulators for tracking internal metrics.
+   *
+   * These accumulators are created with the stage such that all tasks in the 
stage will
+   * add to the same set of accumulators. We do this to report the 
distribution of accumulator
+   * values across all tasks within each stage.
+   */
+  def create(sc: SparkContext): Seq[Accumulator[Long]] = {
+    val internalAccumulators = Seq(
+      // Execution memory refers to the memory used by internal data 
structures created
+      // during shuffles, aggregations and joins. The value of this 
accumulator should be
+      // approximately the sum of the peak sizes across all such data 
structures created
+      // in this task. For SQL jobs, this only tracks all unsafe operators and 
ExternalSort.
+      new Accumulator(
+        0L, AccumulatorParam.LongAccumulatorParam, 
Some(PEAK_EXECUTION_MEMORY), internal = true)
+    ) ++ maybeTestAccumulator.toSeq
+    internalAccumulators.foreach { accumulator =>
+      sc.cleaner.foreach(_.registerAccumulatorForCleanup(accumulator))
+    }
+    internalAccumulators
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/302bb569/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala 
b/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala
new file mode 100644
index 0000000..8f1d7f8
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import org.apache.spark.annotation.DeveloperApi
+
+
+/**
+ * :: DeveloperApi ::
+ * Method by which input data was read.  Network means that the data was read 
over the network
+ * from a remote block manager (which may have stored the data on-disk or 
in-memory).
+ */
+@DeveloperApi
+object DataReadMethod extends Enumeration with Serializable {
+  type DataReadMethod = Value
+  val Memory, Disk, Hadoop, Network = Value
+}
+
+
+/**
+ * :: DeveloperApi ::
+ * Metrics about reading input data.
+ */
+@DeveloperApi
+case class InputMetrics(readMethod: DataReadMethod.Value) {
+
+  /**
+   * This is volatile so that it is visible to the updater thread.
+   */
+  @volatile @transient var bytesReadCallback: Option[() => Long] = None
+
+  /**
+   * Total bytes read.
+   */
+  private var _bytesRead: Long = _
+  def bytesRead: Long = _bytesRead
+  def incBytesRead(bytes: Long): Unit = _bytesRead += bytes
+
+  /**
+   * Total records read.
+   */
+  private var _recordsRead: Long = _
+  def recordsRead: Long = _recordsRead
+  def incRecordsRead(records: Long): Unit = _recordsRead += records
+
+  /**
+   * Invoke the bytesReadCallback and mutate bytesRead.
+   */
+  def updateBytesRead() {
+    bytesReadCallback.foreach { c =>
+      _bytesRead = c()
+    }
+  }
+
+  /**
+   * Register a function that can be called to get up-to-date information on 
how many bytes the task
+   * has read from an input source.
+   */
+  def setBytesReadCallback(f: Option[() => Long]) {
+    bytesReadCallback = f
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/302bb569/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala 
b/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala
new file mode 100644
index 0000000..ad132d0
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import org.apache.spark.annotation.DeveloperApi
+
+
+/**
+ * :: DeveloperApi ::
+ * Method by which output data was written.
+ */
+@DeveloperApi
+object DataWriteMethod extends Enumeration with Serializable {
+  type DataWriteMethod = Value
+  val Hadoop = Value
+}
+
+
+/**
+ * :: DeveloperApi ::
+ * Metrics about writing output data.
+ */
+@DeveloperApi
+case class OutputMetrics(writeMethod: DataWriteMethod.Value) {
+  /**
+   * Total bytes written
+   */
+  private var _bytesWritten: Long = _
+  def bytesWritten: Long = _bytesWritten
+  private[spark] def setBytesWritten(value : Long): Unit = _bytesWritten = 
value
+
+  /**
+   * Total records written
+   */
+  private var _recordsWritten: Long = 0L
+  def recordsWritten: Long = _recordsWritten
+  private[spark] def setRecordsWritten(value: Long): Unit = _recordsWritten = 
value
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/302bb569/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala 
b/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala
new file mode 100644
index 0000000..e985b35
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import org.apache.spark.annotation.DeveloperApi
+
+
+/**
+ * :: DeveloperApi ::
+ * Metrics pertaining to shuffle data read in a given task.
+ */
+@DeveloperApi
+class ShuffleReadMetrics extends Serializable {
+  /**
+   * Number of remote blocks fetched in this shuffle by this task
+   */
+  private var _remoteBlocksFetched: Int = _
+  def remoteBlocksFetched: Int = _remoteBlocksFetched
+  private[spark] def incRemoteBlocksFetched(value: Int) = _remoteBlocksFetched 
+= value
+  private[spark] def decRemoteBlocksFetched(value: Int) = _remoteBlocksFetched 
-= value
+
+  /**
+   * Number of local blocks fetched in this shuffle by this task
+   */
+  private var _localBlocksFetched: Int = _
+  def localBlocksFetched: Int = _localBlocksFetched
+  private[spark] def incLocalBlocksFetched(value: Int) = _localBlocksFetched 
+= value
+  private[spark] def decLocalBlocksFetched(value: Int) = _localBlocksFetched 
-= value
+
+  /**
+   * Time the task spent waiting for remote shuffle blocks. This only includes 
the time
+   * blocking on shuffle input data. For instance if block B is being fetched 
while the task is
+   * still not finished processing block A, it is not considered to be 
blocking on block B.
+   */
+  private var _fetchWaitTime: Long = _
+  def fetchWaitTime: Long = _fetchWaitTime
+  private[spark] def incFetchWaitTime(value: Long) = _fetchWaitTime += value
+  private[spark] def decFetchWaitTime(value: Long) = _fetchWaitTime -= value
+
+  /**
+   * Total number of remote bytes read from the shuffle by this task
+   */
+  private var _remoteBytesRead: Long = _
+  def remoteBytesRead: Long = _remoteBytesRead
+  private[spark] def incRemoteBytesRead(value: Long) = _remoteBytesRead += 
value
+  private[spark] def decRemoteBytesRead(value: Long) = _remoteBytesRead -= 
value
+
+  /**
+   * Shuffle data that was read from the local disk (as opposed to from a 
remote executor).
+   */
+  private var _localBytesRead: Long = _
+  def localBytesRead: Long = _localBytesRead
+  private[spark] def incLocalBytesRead(value: Long) = _localBytesRead += value
+
+  /**
+   * Total bytes fetched in the shuffle by this task (both remote and local).
+   */
+  def totalBytesRead: Long = _remoteBytesRead + _localBytesRead
+
+  /**
+   * Number of blocks fetched in this shuffle by this task (remote or local)
+   */
+  def totalBlocksFetched: Int = _remoteBlocksFetched + _localBlocksFetched
+
+  /**
+   * Total number of records read from the shuffle by this task
+   */
+  private var _recordsRead: Long = _
+  def recordsRead: Long = _recordsRead
+  private[spark] def incRecordsRead(value: Long) = _recordsRead += value
+  private[spark] def decRecordsRead(value: Long) = _recordsRead -= value
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/302bb569/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala 
b/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala
new file mode 100644
index 0000000..469ebe2
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import org.apache.spark.annotation.DeveloperApi
+
+
+/**
+ * :: DeveloperApi ::
+ * Metrics pertaining to shuffle data written in a given task.
+ */
+@DeveloperApi
+class ShuffleWriteMetrics extends Serializable {
+  /**
+   * Number of bytes written for the shuffle by this task
+   */
+  @volatile private var _shuffleBytesWritten: Long = _
+  def shuffleBytesWritten: Long = _shuffleBytesWritten
+  private[spark] def incShuffleBytesWritten(value: Long) = 
_shuffleBytesWritten += value
+  private[spark] def decShuffleBytesWritten(value: Long) = 
_shuffleBytesWritten -= value
+
+  /**
+   * Time the task spent blocking on writes to disk or buffer cache, in 
nanoseconds
+   */
+  @volatile private var _shuffleWriteTime: Long = _
+  def shuffleWriteTime: Long = _shuffleWriteTime
+  private[spark] def incShuffleWriteTime(value: Long) = _shuffleWriteTime += 
value
+  private[spark] def decShuffleWriteTime(value: Long) = _shuffleWriteTime -= 
value
+
+  /**
+   * Total number of records written to the shuffle by this task
+   */
+  @volatile private var _shuffleRecordsWritten: Long = _
+  def shuffleRecordsWritten: Long = _shuffleRecordsWritten
+  private[spark] def incShuffleRecordsWritten(value: Long) = 
_shuffleRecordsWritten += value
+  private[spark] def decShuffleRecordsWritten(value: Long) = 
_shuffleRecordsWritten -= value
+  private[spark] def setShuffleRecordsWritten(value: Long) = 
_shuffleRecordsWritten = value
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/302bb569/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala 
b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index 42207a9..ce1fcbf 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -27,6 +27,7 @@ import org.apache.spark.executor.DataReadMethod.DataReadMethod
 import org.apache.spark.storage.{BlockId, BlockStatus}
 import org.apache.spark.util.Utils
 
+
 /**
  * :: DeveloperApi ::
  * Metrics tracked during the execution of a task.
@@ -241,6 +242,7 @@ class TaskMetrics extends Serializable {
   }
 }
 
+
 private[spark] object TaskMetrics {
   private val hostNameCache = new ConcurrentHashMap[String, String]()
 
@@ -251,187 +253,3 @@ private[spark] object TaskMetrics {
     if (canonicalHost != null) canonicalHost else host
   }
 }
-
-/**
- * :: DeveloperApi ::
- * Method by which input data was read.  Network means that the data was read 
over the network
- * from a remote block manager (which may have stored the data on-disk or 
in-memory).
- */
-@DeveloperApi
-object DataReadMethod extends Enumeration with Serializable {
-  type DataReadMethod = Value
-  val Memory, Disk, Hadoop, Network = Value
-}
-
-/**
- * :: DeveloperApi ::
- * Method by which output data was written.
- */
-@DeveloperApi
-object DataWriteMethod extends Enumeration with Serializable {
-  type DataWriteMethod = Value
-  val Hadoop = Value
-}
-
-/**
- * :: DeveloperApi ::
- * Metrics about reading input data.
- */
-@DeveloperApi
-case class InputMetrics(readMethod: DataReadMethod.Value) {
-
-  /**
-   * This is volatile so that it is visible to the updater thread.
-   */
-  @volatile @transient var bytesReadCallback: Option[() => Long] = None
-
-  /**
-   * Total bytes read.
-   */
-  private var _bytesRead: Long = _
-  def bytesRead: Long = _bytesRead
-  def incBytesRead(bytes: Long): Unit = _bytesRead += bytes
-
-  /**
-   * Total records read.
-   */
-  private var _recordsRead: Long = _
-  def recordsRead: Long = _recordsRead
-  def incRecordsRead(records: Long): Unit = _recordsRead += records
-
-  /**
-   * Invoke the bytesReadCallback and mutate bytesRead.
-   */
-  def updateBytesRead() {
-    bytesReadCallback.foreach { c =>
-      _bytesRead = c()
-    }
-  }
-
- /**
-  * Register a function that can be called to get up-to-date information on 
how many bytes the task
-  * has read from an input source.
-  */
-  def setBytesReadCallback(f: Option[() => Long]) {
-    bytesReadCallback = f
-  }
-}
-
-/**
- * :: DeveloperApi ::
- * Metrics about writing output data.
- */
-@DeveloperApi
-case class OutputMetrics(writeMethod: DataWriteMethod.Value) {
-  /**
-   * Total bytes written
-   */
-  private var _bytesWritten: Long = _
-  def bytesWritten: Long = _bytesWritten
-  private[spark] def setBytesWritten(value : Long): Unit = _bytesWritten = 
value
-
-  /**
-   * Total records written
-   */
-  private var _recordsWritten: Long = 0L
-  def recordsWritten: Long = _recordsWritten
-  private[spark] def setRecordsWritten(value: Long): Unit = _recordsWritten = 
value
-}
-
-/**
- * :: DeveloperApi ::
- * Metrics pertaining to shuffle data read in a given task.
- */
-@DeveloperApi
-class ShuffleReadMetrics extends Serializable {
-  /**
-   * Number of remote blocks fetched in this shuffle by this task
-   */
-  private var _remoteBlocksFetched: Int = _
-  def remoteBlocksFetched: Int = _remoteBlocksFetched
-  private[spark] def incRemoteBlocksFetched(value: Int) = _remoteBlocksFetched 
+= value
-  private[spark] def decRemoteBlocksFetched(value: Int) = _remoteBlocksFetched 
-= value
-
-  /**
-   * Number of local blocks fetched in this shuffle by this task
-   */
-  private var _localBlocksFetched: Int = _
-  def localBlocksFetched: Int = _localBlocksFetched
-  private[spark] def incLocalBlocksFetched(value: Int) = _localBlocksFetched 
+= value
-  private[spark] def decLocalBlocksFetched(value: Int) = _localBlocksFetched 
-= value
-
-  /**
-   * Time the task spent waiting for remote shuffle blocks. This only includes 
the time
-   * blocking on shuffle input data. For instance if block B is being fetched 
while the task is
-   * still not finished processing block A, it is not considered to be 
blocking on block B.
-   */
-  private var _fetchWaitTime: Long = _
-  def fetchWaitTime: Long = _fetchWaitTime
-  private[spark] def incFetchWaitTime(value: Long) = _fetchWaitTime += value
-  private[spark] def decFetchWaitTime(value: Long) = _fetchWaitTime -= value
-
-  /**
-   * Total number of remote bytes read from the shuffle by this task
-   */
-  private var _remoteBytesRead: Long = _
-  def remoteBytesRead: Long = _remoteBytesRead
-  private[spark] def incRemoteBytesRead(value: Long) = _remoteBytesRead += 
value
-  private[spark] def decRemoteBytesRead(value: Long) = _remoteBytesRead -= 
value
-
-  /**
-   * Shuffle data that was read from the local disk (as opposed to from a 
remote executor).
-   */
-  private var _localBytesRead: Long = _
-  def localBytesRead: Long = _localBytesRead
-  private[spark] def incLocalBytesRead(value: Long) = _localBytesRead += value
-
-  /**
-   * Total bytes fetched in the shuffle by this task (both remote and local).
-   */
-  def totalBytesRead: Long = _remoteBytesRead + _localBytesRead
-
-  /**
-   * Number of blocks fetched in this shuffle by this task (remote or local)
-   */
-  def totalBlocksFetched: Int = _remoteBlocksFetched + _localBlocksFetched
-
-  /**
-   * Total number of records read from the shuffle by this task
-   */
-  private var _recordsRead: Long = _
-  def recordsRead: Long = _recordsRead
-  private[spark] def incRecordsRead(value: Long) = _recordsRead += value
-  private[spark] def decRecordsRead(value: Long) = _recordsRead -= value
-}
-
-/**
- * :: DeveloperApi ::
- * Metrics pertaining to shuffle data written in a given task.
- */
-@DeveloperApi
-class ShuffleWriteMetrics extends Serializable {
-  /**
-   * Number of bytes written for the shuffle by this task
-   */
-  @volatile private var _shuffleBytesWritten: Long = _
-  def shuffleBytesWritten: Long = _shuffleBytesWritten
-  private[spark] def incShuffleBytesWritten(value: Long) = 
_shuffleBytesWritten += value
-  private[spark] def decShuffleBytesWritten(value: Long) = 
_shuffleBytesWritten -= value
-
-  /**
-   * Time the task spent blocking on writes to disk or buffer cache, in 
nanoseconds
-   */
-  @volatile private var _shuffleWriteTime: Long = _
-  def shuffleWriteTime: Long = _shuffleWriteTime
-  private[spark] def incShuffleWriteTime(value: Long) = _shuffleWriteTime += 
value
-  private[spark] def decShuffleWriteTime(value: Long) = _shuffleWriteTime -= 
value
-
-  /**
-   * Total number of records written to the shuffle by this task
-   */
-  @volatile private var _shuffleRecordsWritten: Long = _
-  def shuffleRecordsWritten: Long = _shuffleRecordsWritten
-  private[spark] def incShuffleRecordsWritten(value: Long) = 
_shuffleRecordsWritten += value
-  private[spark] def decShuffleRecordsWritten(value: Long) = 
_shuffleRecordsWritten -= value
-  private[spark] def setShuffleRecordsWritten(value: Long) = 
_shuffleRecordsWritten = value
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to