Repository: spark
Updated Branches:
  refs/heads/branch-2.2 20eea20c7 -> 105ae8680


[SPARK-22975][SS] MetricsReporter should not throw exception when there was no 
progress reported

## What changes were proposed in this pull request?

`MetricsReporter ` assumes that there has been some progress for the query, ie. 
`lastProgress` is not null. If this is not true, as it might happen in 
particular conditions, a `NullPointerException` can be thrown.

The PR checks whether there is a `lastProgress` and if this is not true, it 
returns a default value for the metrics.

## How was this patch tested?

added UT

Author: Marco Gaido <marcogaid...@gmail.com>

Closes #20189 from mgaido91/SPARK-22975.

(cherry picked from commit 54277398afbde92a38ba2802f4a7a3e5910533de)
Signed-off-by: Shixiong Zhu <zsxw...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/105ae868
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/105ae868
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/105ae868

Branch: refs/heads/branch-2.2
Commit: 105ae86801e2d1017ffad422085481f1b9038a1f
Parents: 20eea20
Author: Marco Gaido <marcogaid...@gmail.com>
Authored: Fri Jan 12 11:25:37 2018 -0800
Committer: Shixiong Zhu <zsxw...@gmail.com>
Committed: Fri Jan 12 11:25:59 2018 -0800

----------------------------------------------------------------------
 .../execution/streaming/MetricsReporter.scala   | 21 +++++++++---------
 .../sql/streaming/StreamingQuerySuite.scala     | 23 ++++++++++++++++++++
 2 files changed, 33 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/105ae868/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala
index b84e6ce..66b11ec 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala
@@ -17,15 +17,11 @@
 
 package org.apache.spark.sql.execution.streaming
 
-import java.{util => ju}
-
-import scala.collection.mutable
-
 import com.codahale.metrics.{Gauge, MetricRegistry}
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.metrics.source.{Source => CodahaleSource}
-import org.apache.spark.util.Clock
+import org.apache.spark.sql.streaming.StreamingQueryProgress
 
 /**
  * Serves metrics from a [[org.apache.spark.sql.streaming.StreamingQuery]] to
@@ -39,14 +35,17 @@ class MetricsReporter(
 
   // Metric names should not have . in them, so that all the metrics of a 
query are identified
   // together in Ganglia as a single metric group
-  registerGauge("inputRate-total", () => 
stream.lastProgress.inputRowsPerSecond)
-  registerGauge("processingRate-total", () => 
stream.lastProgress.processedRowsPerSecond)
-  registerGauge("latency", () => 
stream.lastProgress.durationMs.get("triggerExecution").longValue())
-
-  private def registerGauge[T](name: String, f: () => T)(implicit num: 
Numeric[T]): Unit = {
+  registerGauge("inputRate-total", _.inputRowsPerSecond, 0.0)
+  registerGauge("processingRate-total", _.processedRowsPerSecond, 0.0)
+  registerGauge("latency", _.durationMs.get("triggerExecution").longValue(), 
0L)
+
+  private def registerGauge[T](
+      name: String,
+      f: StreamingQueryProgress => T,
+      default: T): Unit = {
     synchronized {
       metricRegistry.register(name, new Gauge[T] {
-        override def getValue: T = f()
+        override def getValue: T = 
Option(stream.lastProgress).map(f).getOrElse(default)
       })
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/105ae868/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index ee5af65..01c34b1 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -425,6 +425,29 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging wi
     }
   }
 
+  test("SPARK-22975: MetricsReporter defaults when there was no progress 
reported") {
+    withSQLConf("spark.sql.streaming.metricsEnabled" -> "true") {
+      BlockingSource.latch = new CountDownLatch(1)
+      withTempDir { tempDir =>
+        val sq = spark.readStream
+          .format("org.apache.spark.sql.streaming.util.BlockingSource")
+          .load()
+          .writeStream
+          .format("org.apache.spark.sql.streaming.util.BlockingSource")
+          .option("checkpointLocation", tempDir.toString)
+          .start()
+          .asInstanceOf[StreamingQueryWrapper]
+          .streamingQuery
+
+        val gauges = sq.streamMetrics.metricRegistry.getGauges
+        assert(gauges.get("latency").getValue.asInstanceOf[Long] == 0)
+        
assert(gauges.get("processingRate-total").getValue.asInstanceOf[Double] == 0.0)
+        assert(gauges.get("inputRate-total").getValue.asInstanceOf[Double] == 
0.0)
+        sq.stop()
+      }
+    }
+  }
+
   test("input row calculation with mixed batch and streaming sources") {
     val streamingTriggerDF = spark.createDataset(1 to 10).toDF
     val streamingInputDF = 
createSingleTriggerStreamingDF(streamingTriggerDF).toDF("value")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to