[GitHub] spark pull request #20189: [SPARK-22975] MetricsReporter should not throw ex...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/20189#discussion_r160829638 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala --- @@ -39,14 +35,18 @@ class MetricsReporter( // Metric names should not have . in them, so that all the metrics of a query are identified // together in Ganglia as a single metric group - registerGauge("inputRate-total", () => stream.lastProgress.inputRowsPerSecond) - registerGauge("processingRate-total", () => stream.lastProgress.processedRowsPerSecond) - registerGauge("latency", () => stream.lastProgress.durationMs.get("triggerExecution").longValue()) - - private def registerGauge[T](name: String, f: () => T)(implicit num: Numeric[T]): Unit = { + registerGauge("inputRate-total", (lastProgress) => lastProgress.inputRowsPerSecond, 0.0) + registerGauge("processingRate-total", (lastProgress) => lastProgress.processedRowsPerSecond, 0.0) + registerGauge("latency", +(lastProgress) => lastProgress.durationMs.get("triggerExecution").longValue(), 0L) + + private def registerGauge[T]( + name: String, + f: (StreamingQueryProgress) => T, --- End diff -- nit: `(StreamingQueryProgress) => T` -> `StreamingQueryProgress => T,` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20189: [SPARK-22975] MetricsReporter should not throw ex...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/20189#discussion_r160829808 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala --- @@ -424,6 +424,31 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi } } + test("SPARK-22975: MetricsReporter defaults when there was no progress reported") { +withSQLConf("spark.sql.streaming.metricsEnabled" -> "true") { + BlockingSource.latch = new CountDownLatch(1) --- End diff -- You can use `memory` source and `console` sink to simplify the codes like this: ``` test("SPARK-22975: MetricsReporter defaults when there was no progress reported") { withSQLConf("spark.sql.streaming.metricsEnabled" -> "true") { withTempDir { tempDir => val sq = MemoryStream[Int].toDF .writeStream .format("console") .start() .asInstanceOf[StreamingQueryWrapper] .streamingQuery val gauges = sq.streamMetrics.metricRegistry.getGauges assert(gauges.get("latency").getValue.asInstanceOf[Long] == 0) assert(gauges.get("processingRate-total").getValue.asInstanceOf[Double] == 0.0) assert(gauges.get("inputRate-total").getValue.asInstanceOf[Double] == 0.0) sq.stop() } } } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20189: [SPARK-22975] MetricsReporter should not throw ex...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/20189#discussion_r160829544 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala --- @@ -39,14 +35,18 @@ class MetricsReporter( // Metric names should not have . in them, so that all the metrics of a query are identified // together in Ganglia as a single metric group - registerGauge("inputRate-total", () => stream.lastProgress.inputRowsPerSecond) - registerGauge("processingRate-total", () => stream.lastProgress.processedRowsPerSecond) - registerGauge("latency", () => stream.lastProgress.durationMs.get("triggerExecution").longValue()) - - private def registerGauge[T](name: String, f: () => T)(implicit num: Numeric[T]): Unit = { + registerGauge("inputRate-total", (lastProgress) => lastProgress.inputRowsPerSecond, 0.0) --- End diff -- nit: `registerGauge("inputRate-total", _.inputRowsPerSecond, 0.0)` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20189: [SPARK-22975] MetricsReporter should not throw ex...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/20189#discussion_r160829605 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala --- @@ -39,14 +35,18 @@ class MetricsReporter( // Metric names should not have . in them, so that all the metrics of a query are identified // together in Ganglia as a single metric group - registerGauge("inputRate-total", () => stream.lastProgress.inputRowsPerSecond) - registerGauge("processingRate-total", () => stream.lastProgress.processedRowsPerSecond) - registerGauge("latency", () => stream.lastProgress.durationMs.get("triggerExecution").longValue()) - - private def registerGauge[T](name: String, f: () => T)(implicit num: Numeric[T]): Unit = { + registerGauge("inputRate-total", (lastProgress) => lastProgress.inputRowsPerSecond, 0.0) + registerGauge("processingRate-total", (lastProgress) => lastProgress.processedRowsPerSecond, 0.0) + registerGauge("latency", +(lastProgress) => lastProgress.durationMs.get("triggerExecution").longValue(), 0L) --- End diff -- ditto --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20189: [SPARK-22975] MetricsReporter should not throw ex...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/20189#discussion_r160829571 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala --- @@ -39,14 +35,18 @@ class MetricsReporter( // Metric names should not have . in them, so that all the metrics of a query are identified // together in Ganglia as a single metric group - registerGauge("inputRate-total", () => stream.lastProgress.inputRowsPerSecond) - registerGauge("processingRate-total", () => stream.lastProgress.processedRowsPerSecond) - registerGauge("latency", () => stream.lastProgress.durationMs.get("triggerExecution").longValue()) - - private def registerGauge[T](name: String, f: () => T)(implicit num: Numeric[T]): Unit = { + registerGauge("inputRate-total", (lastProgress) => lastProgress.inputRowsPerSecond, 0.0) + registerGauge("processingRate-total", (lastProgress) => lastProgress.processedRowsPerSecond, 0.0) --- End diff -- ditto --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20189: [SPARK-22975] MetricsReporter should not throw ex...
GitHub user mgaido91 opened a pull request: https://github.com/apache/spark/pull/20189 [SPARK-22975] MetricsReporter should not throw exception when there was no progress reported ## What changes were proposed in this pull request? `MetricsReporter ` assumes that there has been some progress for the query, ie. `lastProgress` is not null. If this is not true, as it might happen in particular conditions, a `NullPointerException` can be thrown. The PR checks whether there is a `lastProgress` and if this is not true, it returns a default value for the metrics. ## How was this patch tested? added UT You can merge this pull request into a Git repository by running: $ git pull https://github.com/mgaido91/spark SPARK-22975 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20189.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20189 commit 1185a513bd807df03a24b22370903d17301fd415 Author: Marco GaidoDate: 2018-01-08T22:28:12Z [SPARK-22975] MetricsReporter should not throw exception when there was no progress reported --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org