[GitHub] spark pull request #21634: [SPARK-24648][SQL] SqlMetrics should be threadsaf...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/21634#discussion_r209371636 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala --- @@ -504,4 +504,38 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared test("writing data out metrics with dynamic partition: parquet") { testMetricsDynamicPartition("parquet", "parquet", "t1") } + + test("writing metrics from single thread") { +val nAdds = 10 +val acc = new SQLMetric("test", -10) +assert(acc.isZero()) +acc.set(0) +for (i <- 1 to nAdds) acc.add(1) +assert(!acc.isZero()) +assert(nAdds === acc.value) +acc.reset() +assert(acc.isZero()) + } + + test("writing metrics from multiple threads") { --- End diff -- > Do you mean it's a one-writer, multi-reader scene? Yes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21634: [SPARK-24648][SQL] SqlMetrics should be threadsaf...
Github user dbkerkela commented on a diff in the pull request: https://github.com/apache/spark/pull/21634#discussion_r208693090 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala --- @@ -504,4 +504,38 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared test("writing data out metrics with dynamic partition: parquet") { testMetricsDynamicPartition("parquet", "parquet", "t1") } + + test("writing metrics from single thread") { +val nAdds = 10 +val acc = new SQLMetric("test", -10) +assert(acc.isZero()) +acc.set(0) +for (i <- 1 to nAdds) acc.add(1) +assert(!acc.isZero()) +assert(nAdds === acc.value) +acc.reset() +assert(acc.isZero()) + } + + test("writing metrics from multiple threads") { --- End diff -- This was originally raised because an implementation of broadcast join did have multiple writers. Unfortunately we recently determined that the LongAdder is causing a performance regression and we are going to revert this. @cloud-fan or @hvanhovell can one of you send the rollback PR? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21634: [SPARK-24648][SQL] SqlMetrics should be threadsaf...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21634#discussion_r208691674 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala --- @@ -504,4 +504,38 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared test("writing data out metrics with dynamic partition: parquet") { testMetricsDynamicPartition("parquet", "parquet", "t1") } + + test("writing metrics from single thread") { +val nAdds = 10 +val acc = new SQLMetric("test", -10) +assert(acc.isZero()) +acc.set(0) +for (i <- 1 to nAdds) acc.add(1) +assert(!acc.isZero()) +assert(nAdds === acc.value) +acc.reset() +assert(acc.isZero()) + } + + test("writing metrics from multiple threads") { --- End diff -- Do you mean it's a one-writer, multi-reader scene? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21634: [SPARK-24648][SQL] SqlMetrics should be threadsaf...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/21634#discussion_r206346537 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala --- @@ -504,4 +504,38 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared test("writing data out metrics with dynamic partition: parquet") { testMetricsDynamicPartition("parquet", "parquet", "t1") } + + test("writing metrics from single thread") { +val nAdds = 10 +val acc = new SQLMetric("test", -10) +assert(acc.isZero()) +acc.set(0) +for (i <- 1 to nAdds) acc.add(1) +assert(!acc.isZero()) +assert(nAdds === acc.value) +acc.reset() +assert(acc.isZero()) + } + + test("writing metrics from multiple threads") { --- End diff -- To be clear, we just need AccumulatorV2 can be read in the heart beat thread. That's the only place need to think about concurrence. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21634: [SPARK-24648][SQL] SqlMetrics should be threadsaf...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/21634#discussion_r206346272 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala --- @@ -504,4 +504,38 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared test("writing data out metrics with dynamic partition: parquet") { testMetricsDynamicPartition("parquet", "parquet", "t1") } + + test("writing metrics from single thread") { +val nAdds = 10 +val acc = new SQLMetric("test", -10) +assert(acc.isZero()) +acc.set(0) +for (i <- 1 to nAdds) acc.add(1) +assert(!acc.isZero()) +assert(nAdds === acc.value) +acc.reset() +assert(acc.isZero()) + } + + test("writing metrics from multiple threads") { --- End diff -- FYI, I think this is not a valid test. Spark assumes there should be only one writer at the same time. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21634: [SPARK-24648][SQL] SqlMetrics should be threadsaf...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21634 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21634: [SPARK-24648][SQL] SqlMetrics should be threadsaf...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/21634#discussion_r197841906 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala --- @@ -504,4 +504,38 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared test("writing data out metrics with dynamic partition: parquet") { testMetricsDynamicPartition("parquet", "parquet", "t1") } + + test("writing metrics from single thread") { +val nAdds = 10 +val acc = new SQLMetric("test", -10) +assert(acc.isZero()) +acc.set(0) +for (i <- 1 to nAdds) acc.add(1) +assert(!acc.isZero()) +assert(nAdds === acc.value) +acc.reset() +assert(acc.isZero()) + } + + test("writing metrics from multiple threads") { --- End diff -- Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21634: [SPARK-24648][SQL] SqlMetrics should be threadsaf...
Github user dbkerkela commented on a diff in the pull request: https://github.com/apache/spark/pull/21634#discussion_r197839738 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala --- @@ -504,4 +504,38 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared test("writing data out metrics with dynamic partition: parquet") { testMetricsDynamicPartition("parquet", "parquet", "t1") } + + test("writing metrics from single thread") { +val nAdds = 10 +val acc = new SQLMetric("test", -10) +assert(acc.isZero()) +acc.set(0) +for (i <- 1 to nAdds) acc.add(1) +assert(!acc.isZero()) +assert(nAdds === acc.value) +acc.reset() +assert(acc.isZero()) + } + + test("writing metrics from multiple threads") { --- End diff -- Good question. Checking in a separate branch: org.scalatest.exceptions.TestFailedException: 10 did not equal 56544 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21634: [SPARK-24648][SQL] SqlMetrics should be threadsaf...
Github user dbkerkela commented on a diff in the pull request: https://github.com/apache/spark/pull/21634#discussion_r197807962 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala --- @@ -504,4 +504,38 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared test("writing data out metrics with dynamic partition: parquet") { testMetricsDynamicPartition("parquet", "parquet", "t1") } + + test("writing metrics from single thread") { +val nAdds = 10 +val acc = new SQLMetric("test", -10) +assert(acc.isZero()) +acc.set(0) +for (i <- 1 to nAdds) acc.add(1) +assert(!acc.isZero()) +assert(nAdds === acc.value) +acc.reset() +assert(acc.isZero()) + } + + test("writing metrics from multiple threads") { +implicit val ec: ExecutionContextExecutor = ExecutionContext.global +val nThreads = 1000 --- End diff -- True - I'll rename. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21634: [SPARK-24648][SQL] SqlMetrics should be threadsaf...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/21634#discussion_r197803612 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala --- @@ -504,4 +504,38 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared test("writing data out metrics with dynamic partition: parquet") { testMetricsDynamicPartition("parquet", "parquet", "t1") } + + test("writing metrics from single thread") { +val nAdds = 10 +val acc = new SQLMetric("test", -10) +assert(acc.isZero()) +acc.set(0) +for (i <- 1 to nAdds) acc.add(1) +assert(!acc.isZero()) +assert(nAdds === acc.value) +acc.reset() +assert(acc.isZero()) + } + + test("writing metrics from multiple threads") { --- End diff -- Really dumb question, does this fail without the fix? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21634: [SPARK-24648][SQL] SqlMetrics should be threadsaf...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/21634#discussion_r197803317 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala --- @@ -504,4 +504,38 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared test("writing data out metrics with dynamic partition: parquet") { testMetricsDynamicPartition("parquet", "parquet", "t1") } + + test("writing metrics from single thread") { +val nAdds = 10 +val acc = new SQLMetric("test", -10) +assert(acc.isZero()) +acc.set(0) +for (i <- 1 to nAdds) acc.add(1) +assert(!acc.isZero()) +assert(nAdds === acc.value) +acc.reset() +assert(acc.isZero()) + } + + test("writing metrics from multiple threads") { +implicit val ec: ExecutionContextExecutor = ExecutionContext.global +val nThreads = 1000 --- End diff -- nThreads? This is the number of futures right? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21634: [SPARK-24648][SQL] SqlMetrics should be threadsaf...
GitHub user dbkerkela opened a pull request: https://github.com/apache/spark/pull/21634 [SPARK-24648][SQL] SqlMetrics should be threadsafe Use LongAdder to make SQLMetrics thread safe. ## What changes were proposed in this pull request? Replace += with LongAdder.add() for concurrent counting ## How was this patch tested? Unit tests with local threads You can merge this pull request into a Git repository by running: $ git pull https://github.com/dbkerkela/apache-spark sqlmetrics-concurrency-stacy Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21634.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21634 commit deb08b9404729fc4ad6fc56a3bce7503fbc2e9e5 Author: Stacy Kerkela Date: 2018-06-25T12:33:41Z [SPARK-24648][SQL] SqlMetrics should be threadsafe Use LongAdder to make SQLMetrics threadsafe. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org