Repository: spark
Updated Branches:
  refs/heads/master 386fbd3af -> b2950cef3


Revert "[SPARK-24648][SQL] SqlMetrics should be threadsafe"

This reverts commit 5264164a67df498b73facae207eda12ee133be7d.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b2950cef
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b2950cef
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b2950cef

Branch: refs/heads/master
Commit: b2950cef3c898f59a2c92e8800ff134c44263b9a
Parents: 386fbd3
Author: Wenchen Fan <wenc...@databricks.com>
Authored: Thu Aug 9 20:33:59 2018 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Thu Aug 9 20:33:59 2018 +0800

----------------------------------------------------------------------
 .../spark/sql/execution/metric/SQLMetrics.scala | 33 +++++++-----------
 .../sql/execution/metric/SQLMetricsSuite.scala  | 36 +-------------------
 2 files changed, 14 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b2950cef/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
index 98f58a3..cbf707f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql.execution.metric
 
 import java.text.NumberFormat
 import java.util.Locale
-import java.util.concurrent.atomic.LongAdder
 
 import org.apache.spark.SparkContext
 import org.apache.spark.scheduler.AccumulableInfo
@@ -33,45 +32,40 @@ import org.apache.spark.util.{AccumulatorContext, 
AccumulatorV2, Utils}
  * on the driver side must be explicitly posted using 
[[SQLMetrics.postDriverMetricUpdates()]].
  */
 class SQLMetric(val metricType: String, initValue: Long = 0L) extends 
AccumulatorV2[Long, Long] {
-
   // This is a workaround for SPARK-11013.
   // We may use -1 as initial value of the accumulator, if the accumulator is 
valid, we will
   // update it at the end of task and the value will be at least 0. Then we 
can filter out the -1
   // values before calculate max, min, etc.
-  private[this] val _value = new LongAdder
-  private val _zeroValue = initValue
-  _value.add(initValue)
+  private[this] var _value = initValue
+  private var _zeroValue = initValue
 
   override def copy(): SQLMetric = {
-    val newAcc = new SQLMetric(metricType, initValue)
-    newAcc.add(_value.sum())
+    val newAcc = new SQLMetric(metricType, _value)
+    newAcc._zeroValue = initValue
     newAcc
   }
 
-  override def reset(): Unit = this.set(_zeroValue)
+  override def reset(): Unit = _value = _zeroValue
 
   override def merge(other: AccumulatorV2[Long, Long]): Unit = other match {
-    case o: SQLMetric => _value.add(o.value)
+    case o: SQLMetric => _value += o.value
     case _ => throw new UnsupportedOperationException(
       s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
   }
 
-  override def isZero(): Boolean = _value.sum() == _zeroValue
+  override def isZero(): Boolean = _value == _zeroValue
 
-  override def add(v: Long): Unit = _value.add(v)
+  override def add(v: Long): Unit = _value += v
 
   // We can set a double value to `SQLMetric` which stores only long value, if 
it is
   // average metrics.
   def set(v: Double): Unit = SQLMetrics.setDoubleForAverageMetrics(this, v)
 
-  def set(v: Long): Unit = {
-    _value.reset()
-    _value.add(v)
-  }
+  def set(v: Long): Unit = _value = v
 
-  def +=(v: Long): Unit = _value.add(v)
+  def +=(v: Long): Unit = _value += v
 
-  override def value: Long = _value.sum()
+  override def value: Long = _value
 
   // Provide special identifier as metadata so we can tell that this is a 
`SQLMetric` later
   override def toInfo(update: Option[Any], value: Option[Any]): 
AccumulableInfo = {
@@ -159,7 +153,7 @@ object SQLMetrics {
           Seq.fill(3)(0L)
         } else {
           val sorted = validValues.sorted
-          Seq(sorted.head, sorted(validValues.length / 2), 
sorted(validValues.length - 1))
+          Seq(sorted(0), sorted(validValues.length / 2), 
sorted(validValues.length - 1))
         }
         metric.map(v => numberFormat.format(v.toDouble / baseForAvgMetric))
       }
@@ -179,8 +173,7 @@ object SQLMetrics {
           Seq.fill(4)(0L)
         } else {
           val sorted = validValues.sorted
-          Seq(sorted.sum, sorted.head, sorted(validValues.length / 2),
-            sorted(validValues.length - 1))
+          Seq(sorted.sum, sorted(0), sorted(validValues.length / 2), 
sorted(validValues.length - 1))
         }
         metric.map(strFormat)
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/b2950cef/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index 8263c9c..a3a3f38 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -19,12 +19,12 @@ package org.apache.spark.sql.execution.metric
 
 import java.io.File
 
-import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future}
 import scala.util.Random
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
+import org.apache.spark.sql.execution.ui.SQLAppStatusStore
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
@@ -504,38 +504,4 @@ class SQLMetricsSuite extends SparkFunSuite with 
SQLMetricsTestUtils with Shared
   test("writing data out metrics with dynamic partition: parquet") {
     testMetricsDynamicPartition("parquet", "parquet", "t1")
   }
-
-  test("writing metrics from single thread") {
-    val nAdds = 10
-    val acc = new SQLMetric("test", -10)
-    assert(acc.isZero())
-    acc.set(0)
-    for (i <- 1 to nAdds) acc.add(1)
-    assert(!acc.isZero())
-    assert(nAdds === acc.value)
-    acc.reset()
-    assert(acc.isZero())
-  }
-
-  test("writing metrics from multiple threads") {
-    implicit val ec: ExecutionContextExecutor = ExecutionContext.global
-    val nFutures = 1000
-    val nAdds = 100
-    val acc = new SQLMetric("test", -10)
-    assert(acc.isZero() === true)
-    acc.set(0)
-    val l = for ( i <- 1 to nFutures ) yield {
-      Future {
-        for (j <- 1 to nAdds) acc.add(1)
-        i
-      }
-    }
-    for (futures <- Future.sequence(l)) {
-      assert(nFutures === futures.length)
-      assert(!acc.isZero())
-      assert(nFutures * nAdds === acc.value)
-      acc.reset()
-      assert(acc.isZero())
-    }
-  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to