Repository: spark
Updated Branches:
  refs/heads/master 57d70d26c -> 081b7adda


[SPARK-19378][SS] Ensure continuity of stateOperator and eventTime metrics even 
if there is no new data in trigger

## What changes were proposed in this pull request?

In StructuredStreaming, if a new trigger was skipped because no new data 
arrived, we suddenly report nothing for the metrics `stateOperator`. We could 
however easily report the metrics from `lastExecution` to ensure continuity of 
metrics.

## How was this patch tested?

Regression test in `StreamingQueryStatusAndProgressSuite`

Author: Burak Yavuz <brk...@gmail.com>

Closes #16716 from brkyvz/state-agg.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/081b7add
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/081b7add
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/081b7add

Branch: refs/heads/master
Commit: 081b7addaf9560563af0ce25912972e91a78cee6
Parents: 57d70d2
Author: Burak Yavuz <brk...@gmail.com>
Authored: Tue Jan 31 16:52:53 2017 -0800
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Tue Jan 31 16:52:53 2017 -0800

----------------------------------------------------------------------
 .../execution/streaming/ProgressReporter.scala  | 35 +++++++++++-----
 .../StreamingQueryStatusAndProgressSuite.scala  | 42 +++++++++++++++++++-
 2 files changed, 64 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/081b7add/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
index c5e9eae..1f74fff 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
@@ -180,6 +180,26 @@ trait ProgressReporter extends Logging {
     currentStatus = currentStatus.copy(isTriggerActive = false)
   }
 
+  /** Extract statistics about stateful operators from the executed query 
plan. */
+  private def extractStateOperatorMetrics(hasNewData: Boolean): 
Seq[StateOperatorProgress] = {
+    if (lastExecution == null) return Nil
+    // lastExecution could belong to one of the previous triggers if 
`!hasNewData`.
+    // Walking the plan again should be inexpensive.
+    val stateNodes = lastExecution.executedPlan.collect {
+      case p if p.isInstanceOf[StateStoreSaveExec] => p
+    }
+    stateNodes.map { node =>
+      val numRowsUpdated = if (hasNewData) {
+        node.metrics.get("numUpdatedStateRows").map(_.value).getOrElse(0L)
+      } else {
+        0L
+      }
+      new StateOperatorProgress(
+        numRowsTotal = 
node.metrics.get("numTotalStateRows").map(_.value).getOrElse(0L),
+        numRowsUpdated = numRowsUpdated)
+    }
+  }
+
   /** Extracts statistics from the most recent query execution. */
   private def extractExecutionStats(hasNewData: Boolean): ExecutionStats = {
     val hasEventTime = logicalPlan.collect { case e: EventTimeWatermark => e 
}.nonEmpty
@@ -187,8 +207,11 @@ trait ProgressReporter extends Logging {
       if (hasEventTime) Map("watermark" -> 
formatTimestamp(offsetSeqMetadata.batchWatermarkMs))
       else Map.empty[String, String]
 
+    // SPARK-19378: Still report metrics even though no data was processed 
while reporting progress.
+    val stateOperators = extractStateOperatorMetrics(hasNewData)
+
     if (!hasNewData) {
-      return ExecutionStats(Map.empty, Seq.empty, watermarkTimestamp)
+      return ExecutionStats(Map.empty, stateOperators, watermarkTimestamp)
     }
 
     // We want to associate execution plan leaves to sources that generate 
them, so that we match
@@ -237,16 +260,6 @@ trait ProgressReporter extends Logging {
         Map.empty
       }
 
-    // Extract statistics about stateful operators in the query plan.
-    val stateNodes = lastExecution.executedPlan.collect {
-      case p if p.isInstanceOf[StateStoreSaveExec] => p
-    }
-    val stateOperators = stateNodes.map { node =>
-      new StateOperatorProgress(
-        numRowsTotal = 
node.metrics.get("numTotalStateRows").map(_.value).getOrElse(0L),
-        numRowsUpdated = 
node.metrics.get("numUpdatedStateRows").map(_.value).getOrElse(0L))
-    }
-
     val eventTimeStats = lastExecution.executedPlan.collect {
       case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 =>
         val stats = e.eventTimeStats.value

http://git-wip-us.apache.org/repos/asf/spark/blob/081b7add/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
index 2035db5..901cf34 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
@@ -20,16 +20,19 @@ package org.apache.spark.sql.streaming
 import java.util.UUID
 
 import scala.collection.JavaConverters._
+import scala.language.postfixOps
 
 import org.json4s._
 import org.json4s.jackson.JsonMethods._
+import org.scalatest.concurrent.Eventually
+import org.scalatest.time.SpanSugar._
 
 import org.apache.spark.sql.execution.streaming.MemoryStream
 import org.apache.spark.sql.functions._
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.streaming.StreamingQueryStatusAndProgressSuite._
 
-
-class StreamingQueryStatusAndProgressSuite extends StreamTest {
+class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually {
   implicit class EqualsIgnoreCRLF(source: String) {
     def equalsIgnoreCRLF(target: String): Boolean = {
       source.replaceAll("\r\n|\r|\n", System.lineSeparator) ===
@@ -171,6 +174,41 @@ class StreamingQueryStatusAndProgressSuite extends 
StreamTest {
       query.stop()
     }
   }
+
+  test("SPARK-19378: Continue reporting stateOp metrics even if there is no 
active trigger") {
+    import testImplicits._
+
+    withSQLConf(SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key -> "10") 
{
+      val inputData = MemoryStream[Int]
+
+      val query = inputData.toDS().toDF("value")
+        .select('value)
+        .groupBy($"value")
+        .agg(count("*"))
+        .writeStream
+        .queryName("metric_continuity")
+        .format("memory")
+        .outputMode("complete")
+        .start()
+      try {
+        inputData.addData(1, 2)
+        query.processAllAvailable()
+
+        val progress = query.lastProgress
+        assert(progress.stateOperators.length > 0)
+        // Should emit new progresses every 10 ms, but we could be facing a 
slow Jenkins
+        eventually(timeout(1 minute)) {
+          val nextProgress = query.lastProgress
+          assert(nextProgress.timestamp !== progress.timestamp)
+          assert(nextProgress.numInputRows === 0)
+          assert(nextProgress.stateOperators.head.numRowsTotal === 2)
+          assert(nextProgress.stateOperators.head.numRowsUpdated === 0)
+        }
+      } finally {
+        query.stop()
+      }
+    }
+  }
 }
 
 object StreamingQueryStatusAndProgressSuite {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to