Repository: spark Updated Branches: refs/heads/master 57d70d26c -> 081b7adda
[SPARK-19378][SS] Ensure continuity of stateOperator and eventTime metrics even if there is no new data in trigger ## What changes were proposed in this pull request? In StructuredStreaming, if a new trigger was skipped because no new data arrived, we suddenly report nothing for the metrics `stateOperator`. We could however easily report the metrics from `lastExecution` to ensure continuity of metrics. ## How was this patch tested? Regression test in `StreamingQueryStatusAndProgressSuite` Author: Burak Yavuz <brk...@gmail.com> Closes #16716 from brkyvz/state-agg. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/081b7add Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/081b7add Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/081b7add Branch: refs/heads/master Commit: 081b7addaf9560563af0ce25912972e91a78cee6 Parents: 57d70d2 Author: Burak Yavuz <brk...@gmail.com> Authored: Tue Jan 31 16:52:53 2017 -0800 Committer: Tathagata Das <tathagata.das1...@gmail.com> Committed: Tue Jan 31 16:52:53 2017 -0800 ---------------------------------------------------------------------- .../execution/streaming/ProgressReporter.scala | 35 +++++++++++----- .../StreamingQueryStatusAndProgressSuite.scala | 42 +++++++++++++++++++- 2 files changed, 64 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/081b7add/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala index c5e9eae..1f74fff 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala @@ -180,6 +180,26 @@ trait ProgressReporter extends Logging { currentStatus = currentStatus.copy(isTriggerActive = false) } + /** Extract statistics about stateful operators from the executed query plan. */ + private def extractStateOperatorMetrics(hasNewData: Boolean): Seq[StateOperatorProgress] = { + if (lastExecution == null) return Nil + // lastExecution could belong to one of the previous triggers if `!hasNewData`. + // Walking the plan again should be inexpensive. + val stateNodes = lastExecution.executedPlan.collect { + case p if p.isInstanceOf[StateStoreSaveExec] => p + } + stateNodes.map { node => + val numRowsUpdated = if (hasNewData) { + node.metrics.get("numUpdatedStateRows").map(_.value).getOrElse(0L) + } else { + 0L + } + new StateOperatorProgress( + numRowsTotal = node.metrics.get("numTotalStateRows").map(_.value).getOrElse(0L), + numRowsUpdated = numRowsUpdated) + } + } + /** Extracts statistics from the most recent query execution. */ private def extractExecutionStats(hasNewData: Boolean): ExecutionStats = { val hasEventTime = logicalPlan.collect { case e: EventTimeWatermark => e }.nonEmpty @@ -187,8 +207,11 @@ trait ProgressReporter extends Logging { if (hasEventTime) Map("watermark" -> formatTimestamp(offsetSeqMetadata.batchWatermarkMs)) else Map.empty[String, String] + // SPARK-19378: Still report metrics even though no data was processed while reporting progress. + val stateOperators = extractStateOperatorMetrics(hasNewData) + if (!hasNewData) { - return ExecutionStats(Map.empty, Seq.empty, watermarkTimestamp) + return ExecutionStats(Map.empty, stateOperators, watermarkTimestamp) } // We want to associate execution plan leaves to sources that generate them, so that we match @@ -237,16 +260,6 @@ trait ProgressReporter extends Logging { Map.empty } - // Extract statistics about stateful operators in the query plan. - val stateNodes = lastExecution.executedPlan.collect { - case p if p.isInstanceOf[StateStoreSaveExec] => p - } - val stateOperators = stateNodes.map { node => - new StateOperatorProgress( - numRowsTotal = node.metrics.get("numTotalStateRows").map(_.value).getOrElse(0L), - numRowsUpdated = node.metrics.get("numUpdatedStateRows").map(_.value).getOrElse(0L)) - } - val eventTimeStats = lastExecution.executedPlan.collect { case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 => val stats = e.eventTimeStats.value http://git-wip-us.apache.org/repos/asf/spark/blob/081b7add/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala index 2035db5..901cf34 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala @@ -20,16 +20,19 @@ package org.apache.spark.sql.streaming import java.util.UUID import scala.collection.JavaConverters._ +import scala.language.postfixOps import org.json4s._ import org.json4s.jackson.JsonMethods._ +import org.scalatest.concurrent.Eventually +import org.scalatest.time.SpanSugar._ import org.apache.spark.sql.execution.streaming.MemoryStream import org.apache.spark.sql.functions._ +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.StreamingQueryStatusAndProgressSuite._ - -class StreamingQueryStatusAndProgressSuite extends StreamTest { +class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually { implicit class EqualsIgnoreCRLF(source: String) { def equalsIgnoreCRLF(target: String): Boolean = { source.replaceAll("\r\n|\r|\n", System.lineSeparator) === @@ -171,6 +174,41 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest { query.stop() } } + + test("SPARK-19378: Continue reporting stateOp metrics even if there is no active trigger") { + import testImplicits._ + + withSQLConf(SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key -> "10") { + val inputData = MemoryStream[Int] + + val query = inputData.toDS().toDF("value") + .select('value) + .groupBy($"value") + .agg(count("*")) + .writeStream + .queryName("metric_continuity") + .format("memory") + .outputMode("complete") + .start() + try { + inputData.addData(1, 2) + query.processAllAvailable() + + val progress = query.lastProgress + assert(progress.stateOperators.length > 0) + // Should emit new progresses every 10 ms, but we could be facing a slow Jenkins + eventually(timeout(1 minute)) { + val nextProgress = query.lastProgress + assert(nextProgress.timestamp !== progress.timestamp) + assert(nextProgress.numInputRows === 0) + assert(nextProgress.stateOperators.head.numRowsTotal === 2) + assert(nextProgress.stateOperators.head.numRowsUpdated === 0) + } + } finally { + query.stop() + } + } + } } object StreamingQueryStatusAndProgressSuite { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org