[GitHub] spark pull request #20189: [SPARK-22975] MetricsReporter should not throw ex...

2018-01-10 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/20189#discussion_r160829638
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala
 ---
@@ -39,14 +35,18 @@ class MetricsReporter(
 
   // Metric names should not have . in them, so that all the metrics of a 
query are identified
   // together in Ganglia as a single metric group
-  registerGauge("inputRate-total", () => 
stream.lastProgress.inputRowsPerSecond)
-  registerGauge("processingRate-total", () => 
stream.lastProgress.processedRowsPerSecond)
-  registerGauge("latency", () => 
stream.lastProgress.durationMs.get("triggerExecution").longValue())
-
-  private def registerGauge[T](name: String, f: () => T)(implicit num: 
Numeric[T]): Unit = {
+  registerGauge("inputRate-total", (lastProgress) => 
lastProgress.inputRowsPerSecond, 0.0)
+  registerGauge("processingRate-total", (lastProgress) => 
lastProgress.processedRowsPerSecond, 0.0)
+  registerGauge("latency",
+(lastProgress) => 
lastProgress.durationMs.get("triggerExecution").longValue(), 0L)
+
+  private def registerGauge[T](
+  name: String,
+  f: (StreamingQueryProgress) => T,
--- End diff --

nit: `(StreamingQueryProgress) => T` -> `StreamingQueryProgress => T,`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20189: [SPARK-22975] MetricsReporter should not throw ex...

2018-01-10 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/20189#discussion_r160829808
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
 ---
@@ -424,6 +424,31 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging wi
 }
   }
 
+  test("SPARK-22975: MetricsReporter defaults when there was no progress 
reported") {
+withSQLConf("spark.sql.streaming.metricsEnabled" -> "true") {
+  BlockingSource.latch = new CountDownLatch(1)
--- End diff --

You can use `memory` source and `console` sink to simplify the codes like 
this:
```
  test("SPARK-22975: MetricsReporter defaults when there was no progress 
reported") {
withSQLConf("spark.sql.streaming.metricsEnabled" -> "true") {
  withTempDir { tempDir =>
val sq = MemoryStream[Int].toDF
  .writeStream
  .format("console")
  .start()
  .asInstanceOf[StreamingQueryWrapper]
  .streamingQuery

val gauges = sq.streamMetrics.metricRegistry.getGauges
assert(gauges.get("latency").getValue.asInstanceOf[Long] == 0)

assert(gauges.get("processingRate-total").getValue.asInstanceOf[Double] == 0.0)
assert(gauges.get("inputRate-total").getValue.asInstanceOf[Double] 
== 0.0)
sq.stop()
  }
}
  }
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20189: [SPARK-22975] MetricsReporter should not throw ex...

2018-01-10 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/20189#discussion_r160829544
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala
 ---
@@ -39,14 +35,18 @@ class MetricsReporter(
 
   // Metric names should not have . in them, so that all the metrics of a 
query are identified
   // together in Ganglia as a single metric group
-  registerGauge("inputRate-total", () => 
stream.lastProgress.inputRowsPerSecond)
-  registerGauge("processingRate-total", () => 
stream.lastProgress.processedRowsPerSecond)
-  registerGauge("latency", () => 
stream.lastProgress.durationMs.get("triggerExecution").longValue())
-
-  private def registerGauge[T](name: String, f: () => T)(implicit num: 
Numeric[T]): Unit = {
+  registerGauge("inputRate-total", (lastProgress) => 
lastProgress.inputRowsPerSecond, 0.0)
--- End diff --

nit: `registerGauge("inputRate-total", _.inputRowsPerSecond, 0.0)`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20189: [SPARK-22975] MetricsReporter should not throw ex...

2018-01-10 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/20189#discussion_r160829605
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala
 ---
@@ -39,14 +35,18 @@ class MetricsReporter(
 
   // Metric names should not have . in them, so that all the metrics of a 
query are identified
   // together in Ganglia as a single metric group
-  registerGauge("inputRate-total", () => 
stream.lastProgress.inputRowsPerSecond)
-  registerGauge("processingRate-total", () => 
stream.lastProgress.processedRowsPerSecond)
-  registerGauge("latency", () => 
stream.lastProgress.durationMs.get("triggerExecution").longValue())
-
-  private def registerGauge[T](name: String, f: () => T)(implicit num: 
Numeric[T]): Unit = {
+  registerGauge("inputRate-total", (lastProgress) => 
lastProgress.inputRowsPerSecond, 0.0)
+  registerGauge("processingRate-total", (lastProgress) => 
lastProgress.processedRowsPerSecond, 0.0)
+  registerGauge("latency",
+(lastProgress) => 
lastProgress.durationMs.get("triggerExecution").longValue(), 0L)
--- End diff --

ditto


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20189: [SPARK-22975] MetricsReporter should not throw ex...

2018-01-10 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/20189#discussion_r160829571
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala
 ---
@@ -39,14 +35,18 @@ class MetricsReporter(
 
   // Metric names should not have . in them, so that all the metrics of a 
query are identified
   // together in Ganglia as a single metric group
-  registerGauge("inputRate-total", () => 
stream.lastProgress.inputRowsPerSecond)
-  registerGauge("processingRate-total", () => 
stream.lastProgress.processedRowsPerSecond)
-  registerGauge("latency", () => 
stream.lastProgress.durationMs.get("triggerExecution").longValue())
-
-  private def registerGauge[T](name: String, f: () => T)(implicit num: 
Numeric[T]): Unit = {
+  registerGauge("inputRate-total", (lastProgress) => 
lastProgress.inputRowsPerSecond, 0.0)
+  registerGauge("processingRate-total", (lastProgress) => 
lastProgress.processedRowsPerSecond, 0.0)
--- End diff --

ditto


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20189: [SPARK-22975] MetricsReporter should not throw ex...

2018-01-08 Thread mgaido91
GitHub user mgaido91 opened a pull request:

https://github.com/apache/spark/pull/20189

[SPARK-22975] MetricsReporter should not throw exception when there was no 
progress reported

## What changes were proposed in this pull request?

`MetricsReporter ` assumes that there has been some progress for the query, 
ie. `lastProgress` is not null. If this is not true, as it might happen in 
particular conditions, a `NullPointerException` can be thrown.

The PR checks whether there is a `lastProgress` and if this is not true, it 
returns a default value for the metrics.

## How was this patch tested?

added UT


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mgaido91/spark SPARK-22975

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20189.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20189


commit 1185a513bd807df03a24b22370903d17301fd415
Author: Marco Gaido 
Date:   2018-01-08T22:28:12Z

[SPARK-22975] MetricsReporter should not throw exception when there was no 
progress reported




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org