[GitHub] spark pull request #21634: [SPARK-24648][SQL] SqlMetrics should be threadsaf...

2018-08-10 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/21634#discussion_r209371636
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
 ---
@@ -504,4 +504,38 @@ class SQLMetricsSuite extends SparkFunSuite with 
SQLMetricsTestUtils with Shared
   test("writing data out metrics with dynamic partition: parquet") {
 testMetricsDynamicPartition("parquet", "parquet", "t1")
   }
+
+  test("writing metrics from single thread") {
+val nAdds = 10
+val acc = new SQLMetric("test", -10)
+assert(acc.isZero())
+acc.set(0)
+for (i <- 1 to nAdds) acc.add(1)
+assert(!acc.isZero())
+assert(nAdds === acc.value)
+acc.reset()
+assert(acc.isZero())
+  }
+
+  test("writing metrics from multiple threads") {
--- End diff --

> Do you mean it's a one-writer, multi-reader scene?

Yes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21634: [SPARK-24648][SQL] SqlMetrics should be threadsaf...

2018-08-08 Thread dbkerkela
Github user dbkerkela commented on a diff in the pull request:

https://github.com/apache/spark/pull/21634#discussion_r208693090
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
 ---
@@ -504,4 +504,38 @@ class SQLMetricsSuite extends SparkFunSuite with 
SQLMetricsTestUtils with Shared
   test("writing data out metrics with dynamic partition: parquet") {
 testMetricsDynamicPartition("parquet", "parquet", "t1")
   }
+
+  test("writing metrics from single thread") {
+val nAdds = 10
+val acc = new SQLMetric("test", -10)
+assert(acc.isZero())
+acc.set(0)
+for (i <- 1 to nAdds) acc.add(1)
+assert(!acc.isZero())
+assert(nAdds === acc.value)
+acc.reset()
+assert(acc.isZero())
+  }
+
+  test("writing metrics from multiple threads") {
--- End diff --

This was originally raised because an implementation of broadcast join did 
have multiple writers.  Unfortunately we recently determined that the LongAdder 
is causing a performance regression and we are going to revert this.
@cloud-fan or @hvanhovell can one of you send the rollback PR?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21634: [SPARK-24648][SQL] SqlMetrics should be threadsaf...

2018-08-08 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21634#discussion_r208691674
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
 ---
@@ -504,4 +504,38 @@ class SQLMetricsSuite extends SparkFunSuite with 
SQLMetricsTestUtils with Shared
   test("writing data out metrics with dynamic partition: parquet") {
 testMetricsDynamicPartition("parquet", "parquet", "t1")
   }
+
+  test("writing metrics from single thread") {
+val nAdds = 10
+val acc = new SQLMetric("test", -10)
+assert(acc.isZero())
+acc.set(0)
+for (i <- 1 to nAdds) acc.add(1)
+assert(!acc.isZero())
+assert(nAdds === acc.value)
+acc.reset()
+assert(acc.isZero())
+  }
+
+  test("writing metrics from multiple threads") {
--- End diff --

Do you mean it's a one-writer, multi-reader scene?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21634: [SPARK-24648][SQL] SqlMetrics should be threadsaf...

2018-07-30 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/21634#discussion_r206346537
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
 ---
@@ -504,4 +504,38 @@ class SQLMetricsSuite extends SparkFunSuite with 
SQLMetricsTestUtils with Shared
   test("writing data out metrics with dynamic partition: parquet") {
 testMetricsDynamicPartition("parquet", "parquet", "t1")
   }
+
+  test("writing metrics from single thread") {
+val nAdds = 10
+val acc = new SQLMetric("test", -10)
+assert(acc.isZero())
+acc.set(0)
+for (i <- 1 to nAdds) acc.add(1)
+assert(!acc.isZero())
+assert(nAdds === acc.value)
+acc.reset()
+assert(acc.isZero())
+  }
+
+  test("writing metrics from multiple threads") {
--- End diff --

To be clear, we just need AccumulatorV2 can be read in the heart beat 
thread. That's the only place need to think about concurrence.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21634: [SPARK-24648][SQL] SqlMetrics should be threadsaf...

2018-07-30 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/21634#discussion_r206346272
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
 ---
@@ -504,4 +504,38 @@ class SQLMetricsSuite extends SparkFunSuite with 
SQLMetricsTestUtils with Shared
   test("writing data out metrics with dynamic partition: parquet") {
 testMetricsDynamicPartition("parquet", "parquet", "t1")
   }
+
+  test("writing metrics from single thread") {
+val nAdds = 10
+val acc = new SQLMetric("test", -10)
+assert(acc.isZero())
+acc.set(0)
+for (i <- 1 to nAdds) acc.add(1)
+assert(!acc.isZero())
+assert(nAdds === acc.value)
+acc.reset()
+assert(acc.isZero())
+  }
+
+  test("writing metrics from multiple threads") {
--- End diff --

FYI, I think this is not a valid test. Spark assumes there should be only 
one writer at the same time.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21634: [SPARK-24648][SQL] SqlMetrics should be threadsaf...

2018-06-25 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/21634


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21634: [SPARK-24648][SQL] SqlMetrics should be threadsaf...

2018-06-25 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/21634#discussion_r197841906
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
 ---
@@ -504,4 +504,38 @@ class SQLMetricsSuite extends SparkFunSuite with 
SQLMetricsTestUtils with Shared
   test("writing data out metrics with dynamic partition: parquet") {
 testMetricsDynamicPartition("parquet", "parquet", "t1")
   }
+
+  test("writing metrics from single thread") {
+val nAdds = 10
+val acc = new SQLMetric("test", -10)
+assert(acc.isZero())
+acc.set(0)
+for (i <- 1 to nAdds) acc.add(1)
+assert(!acc.isZero())
+assert(nAdds === acc.value)
+acc.reset()
+assert(acc.isZero())
+  }
+
+  test("writing metrics from multiple threads") {
--- End diff --

Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21634: [SPARK-24648][SQL] SqlMetrics should be threadsaf...

2018-06-25 Thread dbkerkela
Github user dbkerkela commented on a diff in the pull request:

https://github.com/apache/spark/pull/21634#discussion_r197839738
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
 ---
@@ -504,4 +504,38 @@ class SQLMetricsSuite extends SparkFunSuite with 
SQLMetricsTestUtils with Shared
   test("writing data out metrics with dynamic partition: parquet") {
 testMetricsDynamicPartition("parquet", "parquet", "t1")
   }
+
+  test("writing metrics from single thread") {
+val nAdds = 10
+val acc = new SQLMetric("test", -10)
+assert(acc.isZero())
+acc.set(0)
+for (i <- 1 to nAdds) acc.add(1)
+assert(!acc.isZero())
+assert(nAdds === acc.value)
+acc.reset()
+assert(acc.isZero())
+  }
+
+  test("writing metrics from multiple threads") {
--- End diff --

Good question. Checking in a separate branch:
org.scalatest.exceptions.TestFailedException: 10 did not equal 56544



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21634: [SPARK-24648][SQL] SqlMetrics should be threadsaf...

2018-06-25 Thread dbkerkela
Github user dbkerkela commented on a diff in the pull request:

https://github.com/apache/spark/pull/21634#discussion_r197807962
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
 ---
@@ -504,4 +504,38 @@ class SQLMetricsSuite extends SparkFunSuite with 
SQLMetricsTestUtils with Shared
   test("writing data out metrics with dynamic partition: parquet") {
 testMetricsDynamicPartition("parquet", "parquet", "t1")
   }
+
+  test("writing metrics from single thread") {
+val nAdds = 10
+val acc = new SQLMetric("test", -10)
+assert(acc.isZero())
+acc.set(0)
+for (i <- 1 to nAdds) acc.add(1)
+assert(!acc.isZero())
+assert(nAdds === acc.value)
+acc.reset()
+assert(acc.isZero())
+  }
+
+  test("writing metrics from multiple threads") {
+implicit val ec: ExecutionContextExecutor = ExecutionContext.global
+val nThreads = 1000
--- End diff --

True - I'll rename.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21634: [SPARK-24648][SQL] SqlMetrics should be threadsaf...

2018-06-25 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/21634#discussion_r197803612
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
 ---
@@ -504,4 +504,38 @@ class SQLMetricsSuite extends SparkFunSuite with 
SQLMetricsTestUtils with Shared
   test("writing data out metrics with dynamic partition: parquet") {
 testMetricsDynamicPartition("parquet", "parquet", "t1")
   }
+
+  test("writing metrics from single thread") {
+val nAdds = 10
+val acc = new SQLMetric("test", -10)
+assert(acc.isZero())
+acc.set(0)
+for (i <- 1 to nAdds) acc.add(1)
+assert(!acc.isZero())
+assert(nAdds === acc.value)
+acc.reset()
+assert(acc.isZero())
+  }
+
+  test("writing metrics from multiple threads") {
--- End diff --

Really dumb question, does this fail without the fix?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21634: [SPARK-24648][SQL] SqlMetrics should be threadsaf...

2018-06-25 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/21634#discussion_r197803317
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
 ---
@@ -504,4 +504,38 @@ class SQLMetricsSuite extends SparkFunSuite with 
SQLMetricsTestUtils with Shared
   test("writing data out metrics with dynamic partition: parquet") {
 testMetricsDynamicPartition("parquet", "parquet", "t1")
   }
+
+  test("writing metrics from single thread") {
+val nAdds = 10
+val acc = new SQLMetric("test", -10)
+assert(acc.isZero())
+acc.set(0)
+for (i <- 1 to nAdds) acc.add(1)
+assert(!acc.isZero())
+assert(nAdds === acc.value)
+acc.reset()
+assert(acc.isZero())
+  }
+
+  test("writing metrics from multiple threads") {
+implicit val ec: ExecutionContextExecutor = ExecutionContext.global
+val nThreads = 1000
--- End diff --

nThreads? This is the number of futures right?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21634: [SPARK-24648][SQL] SqlMetrics should be threadsaf...

2018-06-25 Thread dbkerkela
GitHub user dbkerkela opened a pull request:

https://github.com/apache/spark/pull/21634

[SPARK-24648][SQL] SqlMetrics should be threadsafe

Use LongAdder to make SQLMetrics thread safe.

## What changes were proposed in this pull request?
Replace += with LongAdder.add() for concurrent counting

## How was this patch tested?
Unit tests with local threads


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dbkerkela/apache-spark 
sqlmetrics-concurrency-stacy

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21634.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21634


commit deb08b9404729fc4ad6fc56a3bce7503fbc2e9e5
Author: Stacy Kerkela 
Date:   2018-06-25T12:33:41Z

[SPARK-24648][SQL] SqlMetrics should be threadsafe

Use LongAdder to make SQLMetrics threadsafe.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org