Repository: spark
Updated Branches:
  refs/heads/branch-2.0 31e5a2a76 -> 2b715251d


[SPARK-15087][CORE][SQL] Remove AccumulatorV2.localValue and keep only value

## What changes were proposed in this pull request?
Remove AccumulatorV2.localValue and keep only value

## How was this patch tested?
existing tests

Author: Sandeep Singh <sand...@techaddict.me>

Closes #12865 from techaddict/SPARK-15087.

(cherry picked from commit ca813330c716bed76ac0034c12f56665960a1105)
Signed-off-by: Reynold Xin <r...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2b715251
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2b715251
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2b715251

Branch: refs/heads/branch-2.0
Commit: 2b715251d3bc9fa7de282356e00cd8bf0fcbcdee
Parents: 31e5a2a
Author: Sandeep Singh <sand...@techaddict.me>
Authored: Tue May 3 11:38:43 2016 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Tue May 3 11:38:50 2016 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/Accumulable.scala    | 10 ++++--
 .../org/apache/spark/executor/Executor.scala    |  2 +-
 .../org/apache/spark/executor/TaskMetrics.scala |  6 ++--
 .../spark/scheduler/TaskSchedulerImpl.scala     |  2 +-
 .../org/apache/spark/util/AccumulatorV2.scala   | 32 ++++++--------------
 .../org/apache/spark/util/JsonProtocol.scala    |  2 +-
 .../org/apache/spark/AccumulatorSuite.scala     |  4 +--
 .../spark/sql/execution/metric/SQLMetrics.scala |  4 +--
 .../spark/sql/execution/ui/SQLListener.scala    |  2 +-
 .../sql/execution/metric/SQLMetricsSuite.scala  |  2 +-
 .../sql/execution/ui/SQLListenerSuite.scala     |  2 +-
 11 files changed, 30 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2b715251/core/src/main/scala/org/apache/spark/Accumulable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Accumulable.scala 
b/core/src/main/scala/org/apache/spark/Accumulable.scala
index 5c6761e..812145a 100644
--- a/core/src/main/scala/org/apache/spark/Accumulable.scala
+++ b/core/src/main/scala/org/apache/spark/Accumulable.scala
@@ -110,7 +110,13 @@ class Accumulable[R, T] private (
   /**
    * Access the accumulator's current value; only allowed on driver.
    */
-  def value: R = newAcc.value
+  def value: R = {
+    if (newAcc.isAtDriverSide) {
+      newAcc.value
+    } else {
+      throw new UnsupportedOperationException("Can't read accumulator value in 
task")
+    }
+  }
 
   /**
    * Get the current value of this accumulator from within a task.
@@ -121,7 +127,7 @@ class Accumulable[R, T] private (
    * The typical use of this method is to directly mutate the local value, 
eg., to add
    * an element to a Set.
    */
-  def localValue: R = newAcc.localValue
+  def localValue: R = newAcc.value
 
   /**
    * Set the accumulator's value; only allowed on driver.

http://git-wip-us.apache.org/repos/asf/spark/blob/2b715251/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala 
b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 4f74dc9..64e87a9 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -362,7 +362,7 @@ private[spark] class Executor(
               Seq.empty
             }
 
-          val accUpdates = accums.map(acc => acc.toInfo(Some(acc.localValue), 
None))
+          val accUpdates = accums.map(acc => acc.toInfo(Some(acc.value), None))
 
           val serializedTaskEndReason = {
             try {

http://git-wip-us.apache.org/repos/asf/spark/blob/2b715251/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala 
b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index 085aa7f..7f4652c 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -99,7 +99,7 @@ class TaskMetrics private[spark] () extends Serializable {
   /**
    * Storage statuses of any blocks that have been updated as a result of this 
task.
    */
-  def updatedBlockStatuses: Seq[(BlockId, BlockStatus)] = 
_updatedBlockStatuses.localValue
+  def updatedBlockStatuses: Seq[(BlockId, BlockStatus)] = 
_updatedBlockStatuses.value
 
   // Setters and increment-ers
   private[spark] def setExecutorDeserializeTime(v: Long): Unit =
@@ -301,12 +301,12 @@ private[spark] class BlockStatusesAccumulator
 
   override def merge(other: AccumulatorV2[(BlockId, BlockStatus), 
Seq[(BlockId, BlockStatus)]])
   : Unit = other match {
-    case o: BlockStatusesAccumulator => _seq ++= o.localValue
+    case o: BlockStatusesAccumulator => _seq ++= o.value
     case _ => throw new UnsupportedOperationException(
       s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
   }
 
-  override def localValue: Seq[(BlockId, BlockStatus)] = _seq
+  override def value: Seq[(BlockId, BlockStatus)] = _seq
 
   def setValue(newValue: Seq[(BlockId, BlockStatus)]): Unit = {
     _seq.clear()

http://git-wip-us.apache.org/repos/asf/spark/blob/2b715251/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 393680f..8ce8fb1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -394,7 +394,7 @@ private[spark] class TaskSchedulerImpl(
         // deserialized.  This brings trouble to the accumulator framework, 
which depends on
         // serialization to set the `atDriverSide` flag.  Here we call 
`acc.localValue` instead to
         // be more robust about this issue.
-        val accInfos = updates.map(acc => acc.toInfo(Some(acc.localValue), 
None))
+        val accInfos = updates.map(acc => acc.toInfo(Some(acc.value), None))
         taskIdToTaskSetManager.get(id).map { taskSetMgr =>
           (id, taskSetMgr.stageId, taskSetMgr.taskSet.stageAttemptId, accInfos)
         }

http://git-wip-us.apache.org/repos/asf/spark/blob/2b715251/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala 
b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
index 0e280f6..d8f380e 100644
--- a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
+++ b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
@@ -126,23 +126,9 @@ abstract class AccumulatorV2[IN, OUT] extends Serializable 
{
   def merge(other: AccumulatorV2[IN, OUT]): Unit
 
   /**
-   * Access this accumulator's current value; only allowed on driver.
+   * Defines the current value of this accumulator
    */
-  final def value: OUT = {
-    if (atDriverSide) {
-      localValue
-    } else {
-      throw new UnsupportedOperationException("Can't read accumulator value in 
task")
-    }
-  }
-
-  /**
-   * Defines the current value of this accumulator.
-   *
-   * This is NOT the global value of the accumulator.  To get the global value 
after a
-   * completed operation on the dataset, call `value`.
-   */
-  def localValue: OUT
+  def value: OUT
 
   // Called by Java when serializing an object
   final protected def writeReplace(): Any = {
@@ -182,7 +168,7 @@ abstract class AccumulatorV2[IN, OUT] extends Serializable {
     if (metadata == null) {
       "Un-registered Accumulator: " + getClass.getSimpleName
     } else {
-      getClass.getSimpleName + s"(id: $id, name: $name, value: $localValue)"
+      getClass.getSimpleName + s"(id: $id, name: $name, value: $value)"
     }
   }
 }
@@ -321,7 +307,7 @@ class LongAccumulator extends AccumulatorV2[jl.Long, 
jl.Long] {
 
   private[spark] def setValue(newValue: Long): Unit = _sum = newValue
 
-  override def localValue: jl.Long = _sum
+  override def value: jl.Long = _sum
 }
 
 
@@ -386,7 +372,7 @@ class DoubleAccumulator extends AccumulatorV2[jl.Double, 
jl.Double] {
 
   private[spark] def setValue(newValue: Double): Unit = _sum = newValue
 
-  override def localValue: jl.Double = _sum
+  override def value: jl.Double = _sum
 }
 
 
@@ -400,12 +386,12 @@ class ListAccumulator[T] extends AccumulatorV2[T, 
java.util.List[T]] {
   override def add(v: T): Unit = _list.add(v)
 
   override def merge(other: AccumulatorV2[T, java.util.List[T]]): Unit = other 
match {
-    case o: ListAccumulator[T] => _list.addAll(o.localValue)
+    case o: ListAccumulator[T] => _list.addAll(o.value)
     case _ => throw new UnsupportedOperationException(
       s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
   }
 
-  override def localValue: java.util.List[T] = 
java.util.Collections.unmodifiableList(_list)
+  override def value: java.util.List[T] = 
java.util.Collections.unmodifiableList(_list)
 
   private[spark] def setValue(newValue: java.util.List[T]): Unit = {
     _list.clear()
@@ -430,10 +416,10 @@ class LegacyAccumulatorWrapper[R, T](
   override def add(v: T): Unit = _value = param.addAccumulator(_value, v)
 
   override def merge(other: AccumulatorV2[T, R]): Unit = other match {
-    case o: LegacyAccumulatorWrapper[R, T] => _value = 
param.addInPlace(_value, o.localValue)
+    case o: LegacyAccumulatorWrapper[R, T] => _value = 
param.addInPlace(_value, o.value)
     case _ => throw new UnsupportedOperationException(
       s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
   }
 
-  override def localValue: R = _value
+  override def value: R = _value
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/2b715251/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala 
b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index aeab71d..18547d4 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -841,7 +841,7 @@ private[spark] object JsonProtocol {
         val accumUpdates = Utils.jsonOption(json \ "Accumulator Updates")
           .map(_.extract[List[JValue]].map(accumulableInfoFromJson))
           .getOrElse(taskMetricsFromJson(json \ 
"Metrics").accumulators().map(acc => {
-            acc.toInfo(Some(acc.localValue), None)
+            acc.toInfo(Some(acc.value), None)
           }))
         ExceptionFailure(className, description, stackTrace, fullStackTrace, 
None, accumUpdates)
       case `taskResultLost` => TaskResultLost

http://git-wip-us.apache.org/repos/asf/spark/blob/2b715251/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala 
b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
index cade67b..6cbd5ae 100644
--- a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
@@ -70,7 +70,7 @@ class AccumulatorSuite extends SparkFunSuite with Matchers 
with LocalSparkContex
     // serialize and de-serialize it, to simulate sending accumulator to 
executor.
     val acc2 = ser.deserialize[LongAccumulator](ser.serialize(acc))
     // value is reset on the executors
-    assert(acc2.localValue == 0)
+    assert(acc2.value == 0)
     assert(!acc2.isAtDriverSide)
 
     acc2.add(10)
@@ -259,7 +259,7 @@ private[spark] object AccumulatorSuite {
    * Make an [[AccumulableInfo]] out of an [[Accumulable]] with the intent to 
use the
    * info as an accumulator update.
    */
-  def makeInfo(a: AccumulatorV2[_, _]): AccumulableInfo = 
a.toInfo(Some(a.localValue), None)
+  def makeInfo(a: AccumulatorV2[_, _]): AccumulableInfo = 
a.toInfo(Some(a.value), None)
 
   /**
    * Run one or more Spark jobs and verify that in at least one job the peak 
execution memory

http://git-wip-us.apache.org/repos/asf/spark/blob/2b715251/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
index 0f68aaa..f82e0b8 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
@@ -34,7 +34,7 @@ class SQLMetric(val metricType: String, initValue: Long = 0L) 
extends Accumulato
   override def copyAndReset(): SQLMetric = new SQLMetric(metricType, initValue)
 
   override def merge(other: AccumulatorV2[Long, Long]): Unit = other match {
-    case o: SQLMetric => _value += o.localValue
+    case o: SQLMetric => _value += o.value
     case _ => throw new UnsupportedOperationException(
       s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
   }
@@ -45,7 +45,7 @@ class SQLMetric(val metricType: String, initValue: Long = 0L) 
extends Accumulato
 
   def +=(v: Long): Unit = _value += v
 
-  override def localValue: Long = _value
+  override def value: Long = _value
 
   // Provide special identifier as metadata so we can tell that this is a 
`SQLMetric` later
   private[spark] override def toInfo(update: Option[Any], value: Option[Any]): 
AccumulableInfo = {

http://git-wip-us.apache.org/repos/asf/spark/blob/2b715251/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
index 9118593..29c5411 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
@@ -164,7 +164,7 @@ private[sql] class SQLListener(conf: SparkConf) extends 
SparkListener with Loggi
         taskEnd.taskInfo.taskId,
         taskEnd.stageId,
         taskEnd.stageAttemptId,
-        taskEnd.taskMetrics.accumulators().map(a => 
a.toInfo(Some(a.localValue), None)),
+        taskEnd.taskMetrics.accumulators().map(a => a.toInfo(Some(a.value), 
None)),
         finishTask = true)
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/2b715251/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index 8de4d8b..d41e88a 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -302,7 +302,7 @@ class SQLMetricsSuite extends SparkFunSuite with 
SharedSQLContext {
   test("metrics can be loaded by history server") {
     val metric = SQLMetrics.createMetric(sparkContext, "zanzibar")
     metric += 10L
-    val metricInfo = metric.toInfo(Some(metric.localValue), None)
+    val metricInfo = metric.toInfo(Some(metric.value), None)
     metricInfo.update match {
       case Some(v: Long) => assert(v === 10L)
       case Some(v) => fail(s"metric value was not a Long: 
${v.getClass.getName}")

http://git-wip-us.apache.org/repos/asf/spark/blob/2b715251/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
index 9647870..5e08658 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
@@ -366,7 +366,7 @@ class SQLListenerSuite extends SparkFunSuite with 
SharedSQLContext {
     // The listener should only track the ones that are actually SQL metrics.
     val sqlMetric = SQLMetrics.createMetric(sparkContext, "beach umbrella")
     val nonSqlMetric = sparkContext.accumulator[Int](0, "baseball")
-    val sqlMetricInfo = sqlMetric.toInfo(Some(sqlMetric.localValue), None)
+    val sqlMetricInfo = sqlMetric.toInfo(Some(sqlMetric.value), None)
     val nonSqlMetricInfo = nonSqlMetric.toInfo(Some(nonSqlMetric.localValue), 
None)
     val taskInfo = createTaskInfo(0, 0)
     taskInfo.accumulables ++= Seq(sqlMetricInfo, nonSqlMetricInfo)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to