[SPARK-18516][SQL] Split state and progress in streaming This PR separates the status of a `StreamingQuery` into two separate APIs: - `status` - describes the status of a `StreamingQuery` at this moment, including what phase of processing is currently happening and if data is available. - `recentProgress` - an array of statistics about the most recent microbatches that have executed.
A recent progress contains the following information: ``` { "id" : "2be8670a-fce1-4859-a530-748f29553bb6", "name" : "query-29", "timestamp" : 1479705392724, "inputRowsPerSecond" : 230.76923076923077, "processedRowsPerSecond" : 10.869565217391303, "durationMs" : { "triggerExecution" : 276, "queryPlanning" : 3, "getBatch" : 5, "getOffset" : 3, "addBatch" : 234, "walCommit" : 30 }, "currentWatermark" : 0, "stateOperators" : [ ], "sources" : [ { "description" : "KafkaSource[Subscribe[topic-14]]", "startOffset" : { "topic-14" : { "2" : 0, "4" : 1, "1" : 0, "3" : 0, "0" : 0 } }, "endOffset" : { "topic-14" : { "2" : 1, "4" : 2, "1" : 0, "3" : 0, "0" : 1 } }, "numRecords" : 3, "inputRowsPerSecond" : 230.76923076923077, "processedRowsPerSecond" : 10.869565217391303 } ] } ``` Additionally, in order to make it possible to correlate progress updates across restarts, we change the `id` field from an integer that is unique with in the JVM to a `UUID` that is globally unique. Author: Tathagata Das <tathagata.das1...@gmail.com> Author: Michael Armbrust <mich...@databricks.com> Closes #15954 from marmbrus/queryProgress. (cherry picked from commit c3d08e2f29baeebe09bf4c059ace4336af9116b5) Signed-off-by: Michael Armbrust <mich...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/28b57c8a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/28b57c8a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/28b57c8a Branch: refs/heads/branch-2.1 Commit: 28b57c8a124fe55501c4ca4b91320851ace5d735 Parents: 045ae29 Author: Tathagata Das <tathagata.das1...@gmail.com> Authored: Tue Nov 29 17:24:17 2016 -0800 Committer: Michael Armbrust <mich...@databricks.com> Committed: Tue Nov 29 17:24:37 2016 -0800 ---------------------------------------------------------------------- .../spark/sql/kafka010/KafkaSourceSuite.scala | 7 +- project/MimaExcludes.scala | 11 + python/pyspark/sql/streaming.py | 326 ++----------------- python/pyspark/sql/tests.py | 22 ++ .../execution/streaming/MetricsReporter.scala | 53 +++ .../execution/streaming/ProgressReporter.scala | 234 +++++++++++++ .../execution/streaming/StreamExecution.scala | 282 ++++------------ .../sql/execution/streaming/StreamMetrics.scala | 243 -------------- .../org/apache/spark/sql/internal/SQLConf.scala | 8 + .../apache/spark/sql/streaming/SinkStatus.scala | 66 ---- .../spark/sql/streaming/SourceStatus.scala | 95 ------ .../spark/sql/streaming/StreamingQuery.scala | 33 +- .../sql/streaming/StreamingQueryException.scala | 2 +- .../sql/streaming/StreamingQueryListener.scala | 24 +- .../sql/streaming/StreamingQueryManager.scala | 27 +- .../sql/streaming/StreamingQueryStatus.scala | 151 +-------- .../apache/spark/sql/streaming/progress.scala | 193 +++++++++++ .../streaming/StreamMetricsSuite.scala | 213 ------------ .../sql/streaming/FileStreamSourceSuite.scala | 10 +- .../apache/spark/sql/streaming/StreamTest.scala | 73 +---- .../streaming/StreamingQueryListenerSuite.scala | 267 ++++++++------- .../streaming/StreamingQueryManagerSuite.scala | 2 +- .../streaming/StreamingQueryProgressSuite.scala | 98 ++++++ .../streaming/StreamingQueryStatusSuite.scala | 123 ------- .../sql/streaming/StreamingQuerySuite.scala | 260 ++++++++------- .../spark/sql/streaming/WatermarkSuite.scala | 16 +- 26 files changed, 1087 insertions(+), 1752 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/28b57c8a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala index e1af14f..2d6ccb2 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala @@ -442,12 +442,13 @@ class KafkaSourceSuite extends KafkaSourceTest { val mapped = kafka.map(kv => kv._2.toInt + 1) testStream(mapped)( + StartStream(trigger = ProcessingTime(1)), makeSureGetOffsetCalled, AddKafkaData(Set(topic), 1, 2, 3), CheckAnswer(2, 3, 4), - AssertOnLastQueryStatus { status => - assert(status.triggerDetails.get("numRows.input.total").toInt > 0) - assert(status.sourceStatuses(0).processingRate > 0.0) + AssertOnQuery { query => + val recordsRead = query.recentProgresses.map(_.numInputRows).sum + recordsRead == 3 } ) } http://git-wip-us.apache.org/repos/asf/spark/blob/28b57c8a/project/MimaExcludes.scala ---------------------------------------------------------------------- diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 03c9fcc..9739164 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -78,6 +78,17 @@ object MimaExcludes { ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryListener.onQueryTerminated"), ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryListener.onQueryTerminated"), + // [SPARK-18516][SQL] Split state and progress in streaming + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.SourceStatus"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.SinkStatus"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.sinkStatus"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.sourceStatuses"), + ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.streaming.StreamingQuery.id"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.lastProgress"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.recentProgresses"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.id"), + ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryManager.get"), + // [SPARK-17338][SQL] add global temp view ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.dropGlobalTempView"), ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.catalog.Catalog.dropTempView"), http://git-wip-us.apache.org/repos/asf/spark/blob/28b57c8a/python/pyspark/sql/streaming.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index 9c3a237..c420b0d 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -16,6 +16,8 @@ # import sys +import json + if sys.version >= '3': intlike = int basestring = unicode = str @@ -48,10 +50,9 @@ class StreamingQuery(object): @property @since(2.0) def id(self): - """The id of the streaming query. This id is unique across all queries that have been - started in the current process. + """The id of the streaming query. """ - return self._jsq.id() + return self._jsq.id().toString() @property @since(2.0) @@ -87,6 +88,24 @@ class StreamingQuery(object): else: return self._jsq.awaitTermination() + @property + @since(2.1) + def recentProgresses(self): + """Returns an array of the most recent [[StreamingQueryProgress]] updates for this query. + The number of progress updates retained for each stream is configured by Spark session + configuration `spark.sql.streaming.numRecentProgresses`. + """ + return [json.loads(p.json()) for p in self._jsq.recentProgresses()] + + @property + @since(2.1) + def lastProgress(self): + """ + Returns the most recent :class:`StreamingQueryProgress` update of this streaming query. + :return: a map + """ + return json.loads(self._jsq.lastProgress().json()) + @since(2.0) def processAllAvailable(self): """Blocks until all available data in the source has been processed and committed to the @@ -149,8 +168,6 @@ class StreamingQueryManager(object): True >>> sq.stop() """ - if not isinstance(id, intlike): - raise ValueError("The id for the query must be an integer. Got: %s" % id) return StreamingQuery(self._jsqm.get(id)) @since(2.0) @@ -191,303 +208,6 @@ class StreamingQueryManager(object): self._jsqm.resetTerminated() -class StreamingQueryStatus(object): - """A class used to report information about the progress of a StreamingQuery. - - .. note:: Experimental - - .. versionadded:: 2.1 - """ - - def __init__(self, jsqs): - self._jsqs = jsqs - - def __str__(self): - """ - Pretty string of this query status. - - >>> print(sqs) - Status of query 'query' - Query id: 1 - Status timestamp: 123 - Input rate: 15.5 rows/sec - Processing rate 23.5 rows/sec - Latency: 345.0 ms - Trigger details: - batchId: 5 - isDataPresentInTrigger: true - isTriggerActive: true - latency.getBatch.total: 20 - latency.getOffset.total: 10 - numRows.input.total: 100 - Source statuses [1 source]: - Source 1 - MySource1 - Available offset: 0 - Input rate: 15.5 rows/sec - Processing rate: 23.5 rows/sec - Trigger details: - numRows.input.source: 100 - latency.getOffset.source: 10 - latency.getBatch.source: 20 - Sink status - MySink - Committed offsets: [1, -] - """ - return self._jsqs.toString() - - @property - @ignore_unicode_prefix - @since(2.1) - def name(self): - """ - Name of the query. This name is unique across all active queries. - - >>> sqs.name - u'query' - """ - return self._jsqs.name() - - @property - @since(2.1) - def id(self): - """ - Id of the query. This id is unique across all queries that have been started in - the current process. - - >>> int(sqs.id) - 1 - """ - return self._jsqs.id() - - @property - @since(2.1) - def timestamp(self): - """ - Timestamp (ms) of when this query was generated. - - >>> int(sqs.timestamp) - 123 - """ - return self._jsqs.timestamp() - - @property - @since(2.1) - def inputRate(self): - """ - Current total rate (rows/sec) at which data is being generated by all the sources. - - >>> sqs.inputRate - 15.5 - """ - return self._jsqs.inputRate() - - @property - @since(2.1) - def processingRate(self): - """ - Current rate (rows/sec) at which the query is processing data from all the sources. - - >>> sqs.processingRate - 23.5 - """ - return self._jsqs.processingRate() - - @property - @since(2.1) - def latency(self): - """ - Current average latency between the data being available in source and the sink - writing the corresponding output. - - >>> sqs.latency - 345.0 - """ - if (self._jsqs.latency().nonEmpty()): - return self._jsqs.latency().get() - else: - return None - - @property - @ignore_unicode_prefix - @since(2.1) - def sourceStatuses(self): - """ - Current statuses of the sources as a list. - - >>> len(sqs.sourceStatuses) - 1 - >>> sqs.sourceStatuses[0].description - u'MySource1' - """ - return [SourceStatus(ss) for ss in self._jsqs.sourceStatuses()] - - @property - @ignore_unicode_prefix - @since(2.1) - def sinkStatus(self): - """ - Current status of the sink. - - >>> sqs.sinkStatus.description - u'MySink' - """ - return SinkStatus(self._jsqs.sinkStatus()) - - @property - @ignore_unicode_prefix - @since(2.1) - def triggerDetails(self): - """ - Low-level details of the currently active trigger (e.g. number of rows processed - in trigger, latency of intermediate steps, etc.). - - If no trigger is currently active, then it will have details of the last completed trigger. - - >>> sqs.triggerDetails - {u'latency.getBatch.total': u'20', u'numRows.input.total': u'100', - u'isTriggerActive': u'true', u'batchId': u'5', u'latency.getOffset.total': u'10', - u'isDataPresentInTrigger': u'true'} - """ - return self._jsqs.triggerDetails() - - -class SourceStatus(object): - """ - Status and metrics of a streaming Source. - - .. note:: Experimental - - .. versionadded:: 2.1 - """ - - def __init__(self, jss): - self._jss = jss - - def __str__(self): - """ - Pretty string of this source status. - - >>> print(sqs.sourceStatuses[0]) - Status of source MySource1 - Available offset: 0 - Input rate: 15.5 rows/sec - Processing rate: 23.5 rows/sec - Trigger details: - numRows.input.source: 100 - latency.getOffset.source: 10 - latency.getBatch.source: 20 - """ - return self._jss.toString() - - @property - @ignore_unicode_prefix - @since(2.1) - def description(self): - """ - Description of the source corresponding to this status. - - >>> sqs.sourceStatuses[0].description - u'MySource1' - """ - return self._jss.description() - - @property - @ignore_unicode_prefix - @since(2.1) - def offsetDesc(self): - """ - Description of the current offset if known. - - >>> sqs.sourceStatuses[0].offsetDesc - u'0' - """ - return self._jss.offsetDesc() - - @property - @since(2.1) - def inputRate(self): - """ - Current rate (rows/sec) at which data is being generated by the source. - - >>> sqs.sourceStatuses[0].inputRate - 15.5 - """ - return self._jss.inputRate() - - @property - @since(2.1) - def processingRate(self): - """ - Current rate (rows/sec) at which the query is processing data from the source. - - >>> sqs.sourceStatuses[0].processingRate - 23.5 - """ - return self._jss.processingRate() - - @property - @ignore_unicode_prefix - @since(2.1) - def triggerDetails(self): - """ - Low-level details of the currently active trigger (e.g. number of rows processed - in trigger, latency of intermediate steps, etc.). - - If no trigger is currently active, then it will have details of the last completed trigger. - - >>> sqs.sourceStatuses[0].triggerDetails - {u'numRows.input.source': u'100', u'latency.getOffset.source': u'10', - u'latency.getBatch.source': u'20'} - """ - return self._jss.triggerDetails() - - -class SinkStatus(object): - """ - Status and metrics of a streaming Sink. - - .. note:: Experimental - - .. versionadded:: 2.1 - """ - - def __init__(self, jss): - self._jss = jss - - def __str__(self): - """ - Pretty string of this source status. - - >>> print(sqs.sinkStatus) - Status of sink MySink - Committed offsets: [1, -] - """ - return self._jss.toString() - - @property - @ignore_unicode_prefix - @since(2.1) - def description(self): - """ - Description of the source corresponding to this status. - - >>> sqs.sinkStatus.description - u'MySink' - """ - return self._jss.description() - - @property - @ignore_unicode_prefix - @since(2.1) - def offsetDesc(self): - """ - Description of the current offsets up to which data has been written by the sink. - - >>> sqs.sinkStatus.offsetDesc - u'[1, -]' - """ - return self._jss.offsetDesc() - - class Trigger(object): """Used to indicate how often results should be produced by a :class:`StreamingQuery`. @@ -1053,8 +773,6 @@ def _test(): globs['sdf_schema'] = StructType([StructField("data", StringType(), False)]) globs['df'] = \ globs['spark'].readStream.format('text').load('python/test_support/sql/streaming') - globs['sqs'] = StreamingQueryStatus( - spark.sparkContext._jvm.org.apache.spark.sql.streaming.StreamingQueryStatus.testStatus()) (failure_count, test_count) = doctest.testmod( pyspark.sql.streaming, globs=globs, http://git-wip-us.apache.org/repos/asf/spark/blob/28b57c8a/python/pyspark/sql/tests.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 3d46b85..7151f95 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -1082,6 +1082,28 @@ class SQLTests(ReusedPySparkTestCase): q.stop() shutil.rmtree(tmpPath) + def test_stream_status_and_progress(self): + df = self.spark.readStream.format('text').load('python/test_support/sql/streaming') + for q in self.spark._wrapped.streams.active: + q.stop() + tmpPath = tempfile.mkdtemp() + shutil.rmtree(tmpPath) + self.assertTrue(df.isStreaming) + out = os.path.join(tmpPath, 'out') + chk = os.path.join(tmpPath, 'chk') + q = df.writeStream \ + .start(path=out, format='parquet', queryName='this_query', checkpointLocation=chk) + try: + q.processAllAvailable() + lastProgress = q.lastProgress + recentProgresses = q.recentProgresses + self.assertEqual(lastProgress['name'], q.name) + self.assertEqual(lastProgress['id'], q.id) + self.assertTrue(any(p == lastProgress for p in recentProgresses)) + finally: + q.stop() + shutil.rmtree(tmpPath) + def test_stream_await_termination(self): df = self.spark.readStream.format('text').load('python/test_support/sql/streaming') for q in self.spark._wrapped.streams.active: http://git-wip-us.apache.org/repos/asf/spark/blob/28b57c8a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala new file mode 100644 index 0000000..5551d12 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.{util => ju} + +import scala.collection.mutable + +import com.codahale.metrics.{Gauge, MetricRegistry} + +import org.apache.spark.internal.Logging +import org.apache.spark.metrics.source.{Source => CodahaleSource} +import org.apache.spark.util.Clock + +/** + * Serves metrics from a [[org.apache.spark.sql.streaming.StreamingQuery]] to + * Codahale/DropWizard metrics + */ +class MetricsReporter( + stream: StreamExecution, + override val sourceName: String) extends CodahaleSource with Logging { + + override val metricRegistry: MetricRegistry = new MetricRegistry + + // Metric names should not have . in them, so that all the metrics of a query are identified + // together in Ganglia as a single metric group + registerGauge("inputRate-total", () => stream.lastProgress.inputRowsPerSecond) + registerGauge("processingRate-total", () => stream.lastProgress.inputRowsPerSecond) + registerGauge("latency", () => stream.lastProgress.durationMs.get("triggerExecution").longValue()) + + private def registerGauge[T](name: String, f: () => T)(implicit num: Numeric[T]): Unit = { + synchronized { + metricRegistry.register(name, new Gauge[T] { + override def getValue: T = f() + }) + } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/28b57c8a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala new file mode 100644 index 0000000..b7b6e19 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.UUID + +import scala.collection.mutable +import scala.collection.JavaConverters._ + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.QueryExecution +import org.apache.spark.sql.streaming._ +import org.apache.spark.util.Clock + +/** + * Responsible for continually reporting statistics about the amount of data processed as well + * as latency for a streaming query. This trait is designed to be mixed into the + * [[StreamExecution]], who is responsible for calling `startTrigger` and `finishTrigger` + * at the appropriate times. Additionally, the status can updated with `updateStatusMessage` to + * allow reporting on the streams current state (i.e. "Fetching more data"). + */ +trait ProgressReporter extends Logging { + + case class ExecutionStats( + inputRows: Map[Source, Long], stateOperators: Seq[StateOperatorProgress]) + + // Internal state of the stream, required for computing metrics. + protected def id: UUID + protected def name: String + protected def triggerClock: Clock + protected def logicalPlan: LogicalPlan + protected def lastExecution: QueryExecution + protected def newData: Map[Source, DataFrame] + protected def availableOffsets: StreamProgress + protected def committedOffsets: StreamProgress + protected def sources: Seq[Source] + protected def sink: Sink + protected def streamExecutionMetadata: StreamExecutionMetadata + protected def currentBatchId: Long + protected def sparkSession: SparkSession + + // Local timestamps and counters. + private var currentTriggerStartTimestamp = -1L + private var currentTriggerEndTimestamp = -1L + // TODO: Restore this from the checkpoint when possible. + private var lastTriggerStartTimestamp = -1L + private val currentDurationsMs = new mutable.HashMap[String, Long]() + + /** Flag that signals whether any error with input metrics have already been logged */ + private var metricWarningLogged: Boolean = false + + /** Holds the most recent query progress updates. Accesses must lock on the queue itself. */ + private val progressBuffer = new mutable.Queue[StreamingQueryProgress]() + + @volatile + protected var currentStatus: StreamingQueryStatus = + StreamingQueryStatus( + message = "Initializing StreamExecution", + isDataAvailable = false, + isTriggerActive = false) + + /** Returns the current status of the query. */ + def status: StreamingQueryStatus = currentStatus + + /** Returns an array containing the most recent query progress updates. */ + def recentProgresses: Array[StreamingQueryProgress] = progressBuffer.synchronized { + progressBuffer.toArray + } + + /** Returns the most recent query progress update. */ + def lastProgress: StreamingQueryProgress = progressBuffer.synchronized { + progressBuffer.last + } + + /** Begins recording statistics about query progress for a given trigger. */ + protected def startTrigger(): Unit = { + logDebug("Starting Trigger Calculation") + lastTriggerStartTimestamp = currentTriggerStartTimestamp + currentTriggerStartTimestamp = triggerClock.getTimeMillis() + currentStatus = currentStatus.copy(isTriggerActive = true) + currentDurationsMs.clear() + } + + /** Finalizes the query progress and adds it to list of recent status updates. */ + protected def finishTrigger(hasNewData: Boolean): Unit = { + currentTriggerEndTimestamp = triggerClock.getTimeMillis() + + val executionStats: ExecutionStats = if (!hasNewData) { + ExecutionStats(Map.empty, Seq.empty) + } else { + extractExecutionStats + } + + val processingTimeSec = + (currentTriggerEndTimestamp - currentTriggerStartTimestamp).toDouble / 1000 + + val inputTimeSec = if (lastTriggerStartTimestamp >= 0) { + (currentTriggerStartTimestamp - lastTriggerStartTimestamp).toDouble / 1000 + } else { + Double.NaN + } + logDebug(s"Execution stats: $executionStats") + + val sourceProgress = sources.map { source => + val numRecords = executionStats.inputRows.getOrElse(source, 0L) + new SourceProgress( + description = source.toString, + startOffset = committedOffsets.get(source).map(_.json).orNull, + endOffset = availableOffsets.get(source).map(_.json).orNull, + numInputRows = numRecords, + inputRowsPerSecond = numRecords / inputTimeSec, + processedRowsPerSecond = numRecords / processingTimeSec + ) + } + val sinkProgress = new SinkProgress(sink.toString) + + val newProgress = new StreamingQueryProgress( + id = id, + name = name, + timestamp = currentTriggerStartTimestamp, + batchId = currentBatchId, + durationMs = currentDurationsMs.toMap.mapValues(long2Long).asJava, + currentWatermark = streamExecutionMetadata.batchWatermarkMs, + stateOperators = executionStats.stateOperators.toArray, + sources = sourceProgress.toArray, + sink = sinkProgress) + + progressBuffer.synchronized { + progressBuffer += newProgress + while (progressBuffer.length >= sparkSession.sqlContext.conf.streamingProgressRetention) { + progressBuffer.dequeue() + } + } + + logInfo(s"Streaming query made progress: $newProgress") + currentStatus = currentStatus.copy(isTriggerActive = false) + } + + /** Extracts statistics from the most recent query execution. */ + private def extractExecutionStats: ExecutionStats = { + // We want to associate execution plan leaves to sources that generate them, so that we match + // the their metrics (e.g. numOutputRows) to the sources. To do this we do the following. + // Consider the translation from the streaming logical plan to the final executed plan. + // + // streaming logical plan (with sources) <==> trigger's logical plan <==> executed plan + // + // 1. We keep track of streaming sources associated with each leaf in the trigger's logical plan + // - Each logical plan leaf will be associated with a single streaming source. + // - There can be multiple logical plan leaves associated with a streaming source. + // - There can be leaves not associated with any streaming source, because they were + // generated from a batch source (e.g. stream-batch joins) + // + // 2. Assuming that the executed plan has same number of leaves in the same order as that of + // the trigger logical plan, we associate executed plan leaves with corresponding + // streaming sources. + // + // 3. For each source, we sum the metrics of the associated execution plan leaves. + // + val logicalPlanLeafToSource = newData.flatMap { case (source, df) => + df.logicalPlan.collectLeaves().map { leaf => leaf -> source } + } + val allLogicalPlanLeaves = lastExecution.logical.collectLeaves() // includes non-streaming + val allExecPlanLeaves = lastExecution.executedPlan.collectLeaves() + val numInputRows: Map[Source, Long] = + if (allLogicalPlanLeaves.size == allExecPlanLeaves.size) { + val execLeafToSource = allLogicalPlanLeaves.zip(allExecPlanLeaves).flatMap { + case (lp, ep) => logicalPlanLeafToSource.get(lp).map { source => ep -> source } + } + val sourceToNumInputRows = execLeafToSource.map { case (execLeaf, source) => + val numRows = execLeaf.metrics.get("numOutputRows").map(_.value).getOrElse(0L) + source -> numRows + } + sourceToNumInputRows.groupBy(_._1).mapValues(_.map(_._2).sum) // sum up rows for each source + } else { + if (!metricWarningLogged) { + def toString[T](seq: Seq[T]): String = s"(size = ${seq.size}), ${seq.mkString(", ")}" + logWarning( + "Could not report metrics as number leaves in trigger logical plan did not match that" + + s" of the execution plan:\n" + + s"logical plan leaves: ${toString(allLogicalPlanLeaves)}\n" + + s"execution plan leaves: ${toString(allExecPlanLeaves)}\n") + metricWarningLogged = true + } + Map.empty + } + + // Extract statistics about stateful operators in the query plan. + val stateNodes = lastExecution.executedPlan.collect { + case p if p.isInstanceOf[StateStoreSaveExec] => p + } + val stateOperators = stateNodes.map { node => + new StateOperatorProgress( + numRowsTotal = node.metrics.get("numTotalStateRows").map(_.value).getOrElse(0L), + numRowsUpdated = node.metrics.get("numUpdatedStateRows").map(_.value).getOrElse(0L)) + } + + ExecutionStats(numInputRows, stateOperators) + } + + /** Records the duration of running `body` for the next query progress update. */ + protected def reportTimeTaken[T](triggerDetailKey: String)(body: => T): T = { + val startTime = triggerClock.getTimeMillis() + val result = body + val endTime = triggerClock.getTimeMillis() + val timeTaken = math.max(endTime - startTime, 0) + + val previousTime = currentDurationsMs.getOrElse(triggerDetailKey, 0L) + currentDurationsMs.put(triggerDetailKey, previousTime + timeTaken) + logDebug(s"$triggerDetailKey took $timeTaken ms") + result + } + + /** Updates the message returned in `status`. */ + protected def updateStatusMessage(message: String): Unit = { + currentStatus = currentStatus.copy(message = message) + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/28b57c8a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 21664d7..e4f31af 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -17,8 +17,8 @@ package org.apache.spark.sql.execution.streaming +import java.util.UUID import java.util.concurrent.{CountDownLatch, TimeUnit} -import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.locks.ReentrantLock import scala.collection.mutable.ArrayBuffer @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp} import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} import org.apache.spark.sql.catalyst.util._ -import org.apache.spark.sql.execution.{QueryExecution, SparkPlan} +import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.execution.command.ExplainCommand import org.apache.spark.sql.streaming._ import org.apache.spark.util.{Clock, UninterruptibleThread, Utils} @@ -47,7 +47,6 @@ import org.apache.spark.util.{Clock, UninterruptibleThread, Utils} */ class StreamExecution( override val sparkSession: SparkSession, - override val id: Long, override val name: String, checkpointRoot: String, val logicalPlan: LogicalPlan, @@ -55,10 +54,12 @@ class StreamExecution( val trigger: Trigger, val triggerClock: Clock, val outputMode: OutputMode) - extends StreamingQuery with Logging { + extends StreamingQuery with ProgressReporter with Logging { import org.apache.spark.sql.streaming.StreamingQueryListener._ - import StreamMetrics._ + + // TODO: restore this from the checkpoint directory. + override val id: UUID = UUID.randomUUID() private val pollingDelayMs = sparkSession.sessionState.conf.streamingPollingDelay @@ -89,16 +90,16 @@ class StreamExecution( * once, since the field's value may change at any time. */ @volatile - private var availableOffsets = new StreamProgress + protected var availableOffsets = new StreamProgress /** The current batchId or -1 if execution has not yet been initialized. */ - private var currentBatchId: Long = -1 + protected var currentBatchId: Long = -1 /** Stream execution metadata */ - private var streamExecutionMetadata = StreamExecutionMetadata() + protected var streamExecutionMetadata = StreamExecutionMetadata() /** All stream sources present in the query plan. */ - private val sources = + protected val sources = logicalPlan.collect { case s: StreamingExecutionRelation => s.source } /** A list of unique sources in the query plan. */ @@ -113,7 +114,10 @@ class StreamExecution( private var state: State = INITIALIZED @volatile - var lastExecution: QueryExecution = null + var lastExecution: QueryExecution = _ + + /** Holds the most recent input data for each source. */ + protected var newData: Map[Source, DataFrame] = _ @volatile private var streamDeathCause: StreamingQueryException = null @@ -121,16 +125,8 @@ class StreamExecution( /* Get the call site in the caller thread; will pass this into the micro batch thread */ private val callSite = Utils.getCallSite() - /** Metrics for this query */ - private val streamMetrics = - new StreamMetrics(uniqueSources.toSet, triggerClock, s"StructuredStreaming.$name") - - @volatile - private var currentStatus: StreamingQueryStatus = null - - /** Flag that signals whether any error with input metrics have already been logged */ - @volatile - private var metricWarningLogged: Boolean = false + /** Used to report metrics to coda-hale. */ + lazy val streamMetrics = new MetricsReporter(this, s"spark.streaming.$name") /** * The thread that runs the micro-batches of this stream. Note that this thread must be @@ -158,15 +154,6 @@ class StreamExecution( /** Whether the query is currently active or not */ override def isActive: Boolean = state == ACTIVE - /** Returns the current status of the query. */ - override def status: StreamingQueryStatus = currentStatus - - /** Returns current status of all the sources. */ - override def sourceStatuses: Array[SourceStatus] = currentStatus.sourceStatuses.toArray - - /** Returns current status of the sink. */ - override def sinkStatus: SinkStatus = currentStatus.sinkStatus - /** Returns the [[StreamingQueryException]] if the query was terminated by an exception. */ override def exception: Option[StreamingQueryException] = Option(streamDeathCause) @@ -200,8 +187,8 @@ class StreamExecution( if (sparkSession.sessionState.conf.streamingMetricsEnabled) { sparkSession.sparkContext.env.metricsSystem.registerSource(streamMetrics) } - updateStatus() - postEvent(new QueryStartedEvent(currentStatus)) // Assumption: Does not throw exception. + + postEvent(new QueryStartedEvent(id, name)) // Assumption: Does not throw exception. // Unblock starting thread startLatch.countDown() @@ -210,40 +197,45 @@ class StreamExecution( SparkSession.setActiveSession(sparkSession) triggerExecutor.execute(() => { - streamMetrics.reportTriggerStarted(currentBatchId) - streamMetrics.reportTriggerDetail(STATUS_MESSAGE, "Finding new data from sources") - updateStatus() - val isTerminated = reportTimeTaken(TRIGGER_LATENCY) { + startTrigger() + + val isTerminated = if (isActive) { - if (currentBatchId < 0) { - // We'll do this initialization only once - populateStartOffsets() - logDebug(s"Stream running from $committedOffsets to $availableOffsets") - } else { - constructNextBatch() + reportTimeTaken("triggerExecution") { + if (currentBatchId < 0) { + // We'll do this initialization only once + populateStartOffsets() + logDebug(s"Stream running from $committedOffsets to $availableOffsets") + } else { + constructNextBatch() + } + if (dataAvailable) { + currentStatus = currentStatus.copy(isDataAvailable = true) + updateStatusMessage("Processing new data") + runBatch() + } } + + // Report trigger as finished and construct progress object. + finishTrigger(dataAvailable) + postEvent(new QueryProgressEvent(lastProgress)) + if (dataAvailable) { - streamMetrics.reportTriggerDetail(IS_DATA_PRESENT_IN_TRIGGER, true) - streamMetrics.reportTriggerDetail(STATUS_MESSAGE, "Processing new data") - updateStatus() - runBatch() // We'll increase currentBatchId after we complete processing current batch's data currentBatchId += 1 } else { - streamMetrics.reportTriggerDetail(IS_DATA_PRESENT_IN_TRIGGER, false) - streamMetrics.reportTriggerDetail(STATUS_MESSAGE, "No new data") - updateStatus() + currentStatus = currentStatus.copy(isDataAvailable = false) + updateStatusMessage("Waiting for data to arrive") Thread.sleep(pollingDelayMs) } true } else { false } - } - // Update metrics and notify others - streamMetrics.reportTriggerFinished() - updateStatus() - postEvent(new QueryProgressEvent(currentStatus)) + + // Update committed offsets. + committedOffsets ++= availableOffsets + updateStatusMessage("Waiting for next trigger") isTerminated }) } catch { @@ -264,14 +256,12 @@ class StreamExecution( state = TERMINATED // Update metrics and status - streamMetrics.stop() sparkSession.sparkContext.env.metricsSystem.removeSource(streamMetrics) - updateStatus() // Notify others sparkSession.streams.notifyQueryTermination(StreamExecution.this) postEvent( - new QueryTerminatedEvent(currentStatus, exception.map(_.cause).map(Utils.exceptionString))) + new QueryTerminatedEvent(id, exception.map(_.cause).map(Utils.exceptionString))) terminationLatch.countDown() } } @@ -328,14 +318,13 @@ class StreamExecution( val hasNewData = { awaitBatchLock.lock() try { - reportTimeTaken(GET_OFFSET_LATENCY) { - val latestOffsets: Map[Source, Option[Offset]] = uniqueSources.map { s => - reportTimeTaken(s, SOURCE_GET_OFFSET_LATENCY) { - (s, s.getOffset) - } - }.toMap - availableOffsets ++= latestOffsets.filter { case (s, o) => o.nonEmpty }.mapValues(_.get) - } + val latestOffsets: Map[Source, Option[Offset]] = uniqueSources.map { s => + updateStatusMessage(s"Getting offsets from $s") + reportTimeTaken("getOffset") { + (s, s.getOffset) + } + }.toMap + availableOffsets ++= latestOffsets.filter { case (s, o) => o.nonEmpty }.mapValues(_.get) if (dataAvailable) { true @@ -350,8 +339,10 @@ class StreamExecution( if (hasNewData) { // Current batch timestamp in milliseconds streamExecutionMetadata.batchTimestampMs = triggerClock.getTimeMillis() - reportTimeTaken(OFFSET_WAL_WRITE_LATENCY) { - assert(offsetLog.add(currentBatchId, + updateStatusMessage("Writing offsets to log") + reportTimeTaken("walCommit") { + assert(offsetLog.add( + currentBatchId, availableOffsets.toOffsetSeq(sources, streamExecutionMetadata.json)), s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId") logInfo(s"Committed offsets for batch $currentBatchId. " + @@ -384,30 +375,24 @@ class StreamExecution( awaitBatchLock.unlock() } } - reportTimestamp(GET_OFFSET_TIMESTAMP) } /** * Processes any data available between `availableOffsets` and `committedOffsets`. */ private def runBatch(): Unit = { - // TODO: Move this to IncrementalExecution. - // Request unprocessed data from all sources. - val newData = reportTimeTaken(GET_BATCH_LATENCY) { + newData = reportTimeTaken("getBatch") { availableOffsets.flatMap { case (source, available) if committedOffsets.get(source).map(_ != available).getOrElse(true) => val current = committedOffsets.get(source) - val batch = reportTimeTaken(source, SOURCE_GET_BATCH_LATENCY) { - source.getBatch(current, available) - } + val batch = source.getBatch(current, available) logDebug(s"Retrieving data from $source: $current -> $available") Some(source -> batch) case _ => None } } - reportTimestamp(GET_BATCH_TIMESTAMP) // A list of attributes that will need to be updated. var replacements = new ArrayBuffer[(Attribute, Attribute)] @@ -438,7 +423,7 @@ class StreamExecution( cd.dataType) } - val executedPlan = reportTimeTaken(OPTIMIZER_LATENCY) { + val executedPlan = reportTimeTaken("queryPlanning") { lastExecution = new IncrementalExecution( sparkSession, triggerLogicalPlan, @@ -451,11 +436,12 @@ class StreamExecution( val nextBatch = new Dataset(sparkSession, lastExecution, RowEncoder(lastExecution.analyzed.schema)) - sink.addBatch(currentBatchId, nextBatch) - reportNumRows(executedPlan, triggerLogicalPlan, newData) + + reportTimeTaken("addBatch") { + sink.addBatch(currentBatchId, nextBatch) + } // Update the eventTime watermark if we find one in the plan. - // TODO: Does this need to be an AttributeMap? lastExecution.executedPlan.collect { case e: EventTimeWatermarkExec => logTrace(s"Maximum observed eventTime: ${e.maxEventTime.value}") @@ -468,10 +454,6 @@ class StreamExecution( logTrace(s"Event time didn't move: $newWatermark < " + s"$streamExecutionMetadata.currentEventTimeWatermark") } - - if (newWatermark != 0) { - streamMetrics.reportTriggerDetail(EVENT_TIME_WATERMARK, newWatermark) - } } awaitBatchLock.lock() @@ -481,9 +463,6 @@ class StreamExecution( } finally { awaitBatchLock.unlock() } - - // Update committed offsets. - committedOffsets ++= availableOffsets } private def postEvent(event: StreamingQueryListener.Event) { @@ -616,145 +595,12 @@ class StreamExecution( """.stripMargin } - /** - * Report row metrics of the executed trigger - * @param triggerExecutionPlan Execution plan of the trigger - * @param triggerLogicalPlan Logical plan of the trigger, generated from the query logical plan - * @param sourceToDF Source to DataFrame returned by the source.getBatch in this trigger - */ - private def reportNumRows( - triggerExecutionPlan: SparkPlan, - triggerLogicalPlan: LogicalPlan, - sourceToDF: Map[Source, DataFrame]): Unit = { - // We want to associate execution plan leaves to sources that generate them, so that we match - // the their metrics (e.g. numOutputRows) to the sources. To do this we do the following. - // Consider the translation from the streaming logical plan to the final executed plan. - // - // streaming logical plan (with sources) <==> trigger's logical plan <==> executed plan - // - // 1. We keep track of streaming sources associated with each leaf in the trigger's logical plan - // - Each logical plan leaf will be associated with a single streaming source. - // - There can be multiple logical plan leaves associated with a streaming source. - // - There can be leaves not associated with any streaming source, because they were - // generated from a batch source (e.g. stream-batch joins) - // - // 2. Assuming that the executed plan has same number of leaves in the same order as that of - // the trigger logical plan, we associate executed plan leaves with corresponding - // streaming sources. - // - // 3. For each source, we sum the metrics of the associated execution plan leaves. - // - val logicalPlanLeafToSource = sourceToDF.flatMap { case (source, df) => - df.logicalPlan.collectLeaves().map { leaf => leaf -> source } - } - val allLogicalPlanLeaves = triggerLogicalPlan.collectLeaves() // includes non-streaming sources - val allExecPlanLeaves = triggerExecutionPlan.collectLeaves() - val sourceToNumInputRows: Map[Source, Long] = - if (allLogicalPlanLeaves.size == allExecPlanLeaves.size) { - val execLeafToSource = allLogicalPlanLeaves.zip(allExecPlanLeaves).flatMap { - case (lp, ep) => logicalPlanLeafToSource.get(lp).map { source => ep -> source } - } - val sourceToNumInputRows = execLeafToSource.map { case (execLeaf, source) => - val numRows = execLeaf.metrics.get("numOutputRows").map(_.value).getOrElse(0L) - source -> numRows - } - sourceToNumInputRows.groupBy(_._1).mapValues(_.map(_._2).sum) // sum up rows for each source - } else { - if (!metricWarningLogged) { - def toString[T](seq: Seq[T]): String = s"(size = ${seq.size}), ${seq.mkString(", ")}" - logWarning( - "Could not report metrics as number leaves in trigger logical plan did not match that" + - s" of the execution plan:\n" + - s"logical plan leaves: ${toString(allLogicalPlanLeaves)}\n" + - s"execution plan leaves: ${toString(allExecPlanLeaves)}\n") - metricWarningLogged = true - } - Map.empty - } - val numOutputRows = triggerExecutionPlan.metrics.get("numOutputRows").map(_.value) - val stateNodes = triggerExecutionPlan.collect { - case p if p.isInstanceOf[StateStoreSaveExec] => p - } - - streamMetrics.reportNumInputRows(sourceToNumInputRows) - stateNodes.zipWithIndex.foreach { case (s, i) => - streamMetrics.reportTriggerDetail( - NUM_TOTAL_STATE_ROWS(i + 1), - s.metrics.get("numTotalStateRows").map(_.value).getOrElse(0L)) - streamMetrics.reportTriggerDetail( - NUM_UPDATED_STATE_ROWS(i + 1), - s.metrics.get("numUpdatedStateRows").map(_.value).getOrElse(0L)) - } - updateStatus() - } - - private def reportTimeTaken[T](triggerDetailKey: String)(body: => T): T = { - val startTime = triggerClock.getTimeMillis() - val result = body - val endTime = triggerClock.getTimeMillis() - val timeTaken = math.max(endTime - startTime, 0) - streamMetrics.reportTriggerDetail(triggerDetailKey, timeTaken) - updateStatus() - if (triggerDetailKey == TRIGGER_LATENCY) { - logInfo(s"Completed up to $availableOffsets in $timeTaken ms") - } - result - } - - private def reportTimeTaken[T](source: Source, triggerDetailKey: String)(body: => T): T = { - val startTime = triggerClock.getTimeMillis() - val result = body - val endTime = triggerClock.getTimeMillis() - streamMetrics.reportSourceTriggerDetail( - source, triggerDetailKey, math.max(endTime - startTime, 0)) - updateStatus() - result - } - - private def reportTimestamp(triggerDetailKey: String): Unit = { - streamMetrics.reportTriggerDetail(triggerDetailKey, triggerClock.getTimeMillis) - updateStatus() - } - - private def updateStatus(): Unit = { - val localAvailableOffsets = availableOffsets - val sourceStatuses = sources.map { s => - SourceStatus( - s.toString, - localAvailableOffsets.get(s).map(_.json).getOrElse("-"), - streamMetrics.currentSourceInputRate(s), - streamMetrics.currentSourceProcessingRate(s), - streamMetrics.currentSourceTriggerDetails(s)) - }.toArray - val sinkStatus = SinkStatus( - sink.toString, - committedOffsets.toOffsetSeq(sources, streamExecutionMetadata.json).toString) - - currentStatus = - StreamingQueryStatus( - name = name, - id = id, - timestamp = triggerClock.getTimeMillis(), - inputRate = streamMetrics.currentInputRate(), - processingRate = streamMetrics.currentProcessingRate(), - latency = streamMetrics.currentLatency(), - sourceStatuses = sourceStatuses, - sinkStatus = sinkStatus, - triggerDetails = streamMetrics.currentTriggerDetails()) - } - trait State case object INITIALIZED extends State case object ACTIVE extends State case object TERMINATED extends State } -object StreamExecution { - private val _nextId = new AtomicLong(0) - - def nextId: Long = _nextId.getAndIncrement() -} - /** * Contains metadata associated with a stream execution. This information is * persisted to the offset log via the OffsetSeq metadata field. Current http://git-wip-us.apache.org/repos/asf/spark/blob/28b57c8a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetrics.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetrics.scala deleted file mode 100644 index 942e6ed..0000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetrics.scala +++ /dev/null @@ -1,243 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.streaming - -import java.{util => ju} - -import scala.collection.mutable - -import com.codahale.metrics.{Gauge, MetricRegistry} - -import org.apache.spark.internal.Logging -import org.apache.spark.metrics.source.{Source => CodahaleSource} -import org.apache.spark.util.Clock - -/** - * Class that manages all the metrics related to a StreamingQuery. It does the following. - * - Calculates metrics (rates, latencies, etc.) based on information reported by StreamExecution. - * - Allows the current metric values to be queried - * - Serves some of the metrics through Codahale/DropWizard metrics - * - * @param sources Unique set of sources in a query - * @param triggerClock Clock used for triggering in StreamExecution - * @param codahaleSourceName Root name for all the Codahale metrics - */ -class StreamMetrics(sources: Set[Source], triggerClock: Clock, codahaleSourceName: String) - extends CodahaleSource with Logging { - - import StreamMetrics._ - - // Trigger infos - private val triggerDetails = new mutable.HashMap[String, String] - private val sourceTriggerDetails = new mutable.HashMap[Source, mutable.HashMap[String, String]] - - // Rate estimators for sources and sinks - private val inputRates = new mutable.HashMap[Source, RateCalculator] - private val processingRates = new mutable.HashMap[Source, RateCalculator] - - // Number of input rows in the current trigger - private val numInputRows = new mutable.HashMap[Source, Long] - private var currentTriggerStartTimestamp: Long = -1 - private var previousTriggerStartTimestamp: Long = -1 - private var latency: Option[Double] = None - - override val sourceName: String = codahaleSourceName - override val metricRegistry: MetricRegistry = new MetricRegistry - - // =========== Initialization =========== - - // Metric names should not have . in them, so that all the metrics of a query are identified - // together in Ganglia as a single metric group - registerGauge("inputRate-total", currentInputRate) - registerGauge("processingRate-total", () => currentProcessingRate) - registerGauge("latency", () => currentLatency().getOrElse(-1.0)) - - sources.foreach { s => - inputRates.put(s, new RateCalculator) - processingRates.put(s, new RateCalculator) - sourceTriggerDetails.put(s, new mutable.HashMap[String, String]) - - registerGauge(s"inputRate-${s.toString}", () => currentSourceInputRate(s)) - registerGauge(s"processingRate-${s.toString}", () => currentSourceProcessingRate(s)) - } - - // =========== Setter methods =========== - - def reportTriggerStarted(batchId: Long): Unit = synchronized { - numInputRows.clear() - triggerDetails.clear() - sourceTriggerDetails.values.foreach(_.clear()) - - reportTriggerDetail(BATCH_ID, batchId) - sources.foreach(s => reportSourceTriggerDetail(s, BATCH_ID, batchId)) - reportTriggerDetail(IS_TRIGGER_ACTIVE, true) - currentTriggerStartTimestamp = triggerClock.getTimeMillis() - reportTriggerDetail(START_TIMESTAMP, currentTriggerStartTimestamp) - } - - def reportTriggerDetail[T](key: String, value: T): Unit = synchronized { - triggerDetails.put(key, value.toString) - } - - def reportSourceTriggerDetail[T](source: Source, key: String, value: T): Unit = synchronized { - sourceTriggerDetails(source).put(key, value.toString) - } - - def reportNumInputRows(inputRows: Map[Source, Long]): Unit = synchronized { - numInputRows ++= inputRows - } - - def reportTriggerFinished(): Unit = synchronized { - require(currentTriggerStartTimestamp >= 0) - val currentTriggerFinishTimestamp = triggerClock.getTimeMillis() - reportTriggerDetail(FINISH_TIMESTAMP, currentTriggerFinishTimestamp) - triggerDetails.remove(STATUS_MESSAGE) - reportTriggerDetail(IS_TRIGGER_ACTIVE, false) - - // Report number of rows - val totalNumInputRows = numInputRows.values.sum - reportTriggerDetail(NUM_INPUT_ROWS, totalNumInputRows) - numInputRows.foreach { case (s, r) => - reportSourceTriggerDetail(s, NUM_SOURCE_INPUT_ROWS, r) - } - - val currentTriggerDuration = currentTriggerFinishTimestamp - currentTriggerStartTimestamp - val previousInputIntervalOption = if (previousTriggerStartTimestamp >= 0) { - Some(currentTriggerStartTimestamp - previousTriggerStartTimestamp) - } else None - - // Update input rate = num rows received by each source during the previous trigger interval - // Interval is measures as interval between start times of previous and current trigger. - // - // TODO: Instead of trigger start, we should use time when getOffset was called on each source - // as this may be different for each source if there are many sources in the query plan - // and getOffset is called serially on them. - if (previousInputIntervalOption.nonEmpty) { - sources.foreach { s => - inputRates(s).update(numInputRows.getOrElse(s, 0), previousInputIntervalOption.get) - } - } - - // Update processing rate = num rows processed for each source in current trigger duration - sources.foreach { s => - processingRates(s).update(numInputRows.getOrElse(s, 0), currentTriggerDuration) - } - - // Update latency = if data present, 0.5 * previous trigger interval + current trigger duration - if (previousInputIntervalOption.nonEmpty && totalNumInputRows > 0) { - latency = Some((previousInputIntervalOption.get.toDouble / 2) + currentTriggerDuration) - } else { - latency = None - } - - previousTriggerStartTimestamp = currentTriggerStartTimestamp - currentTriggerStartTimestamp = -1 - } - - // =========== Getter methods =========== - - def currentInputRate(): Double = synchronized { - // Since we are calculating source input rates using the same time interval for all sources - // it is fine to calculate total input rate as the sum of per source input rate. - inputRates.map(_._2.currentRate).sum - } - - def currentSourceInputRate(source: Source): Double = synchronized { - inputRates(source).currentRate - } - - def currentProcessingRate(): Double = synchronized { - // Since we are calculating source processing rates using the same time interval for all sources - // it is fine to calculate total processing rate as the sum of per source processing rate. - processingRates.map(_._2.currentRate).sum - } - - def currentSourceProcessingRate(source: Source): Double = synchronized { - processingRates(source).currentRate - } - - def currentLatency(): Option[Double] = synchronized { latency } - - def currentTriggerDetails(): Map[String, String] = synchronized { triggerDetails.toMap } - - def currentSourceTriggerDetails(source: Source): Map[String, String] = synchronized { - sourceTriggerDetails(source).toMap - } - - // =========== Other methods =========== - - private def registerGauge[T](name: String, f: () => T)(implicit num: Numeric[T]): Unit = { - synchronized { - metricRegistry.register(name, new Gauge[T] { - override def getValue: T = f() - }) - } - } - - def stop(): Unit = synchronized { - triggerDetails.clear() - inputRates.valuesIterator.foreach { _.stop() } - processingRates.valuesIterator.foreach { _.stop() } - latency = None - } -} - -object StreamMetrics extends Logging { - /** Simple utility class to calculate rate while avoiding DivideByZero */ - class RateCalculator { - @volatile private var rate: Option[Double] = None - - def update(numRows: Long, timeGapMs: Long): Unit = { - if (timeGapMs > 0) { - rate = Some(numRows.toDouble * 1000 / timeGapMs) - } else { - rate = None - logDebug(s"Rate updates cannot with zero or negative time gap $timeGapMs") - } - } - - def currentRate: Double = rate.getOrElse(0.0) - - def stop(): Unit = { rate = None } - } - - - val BATCH_ID = "batchId" - val IS_TRIGGER_ACTIVE = "isTriggerActive" - val IS_DATA_PRESENT_IN_TRIGGER = "isDataPresentInTrigger" - val STATUS_MESSAGE = "statusMessage" - val EVENT_TIME_WATERMARK = "eventTimeWatermark" - - val START_TIMESTAMP = "timestamp.triggerStart" - val GET_OFFSET_TIMESTAMP = "timestamp.afterGetOffset" - val GET_BATCH_TIMESTAMP = "timestamp.afterGetBatch" - val FINISH_TIMESTAMP = "timestamp.triggerFinish" - - val GET_OFFSET_LATENCY = "latency.getOffset.total" - val GET_BATCH_LATENCY = "latency.getBatch.total" - val OFFSET_WAL_WRITE_LATENCY = "latency.offsetLogWrite" - val OPTIMIZER_LATENCY = "latency.optimizer" - val TRIGGER_LATENCY = "latency.fullTrigger" - val SOURCE_GET_OFFSET_LATENCY = "latency.getOffset.source" - val SOURCE_GET_BATCH_LATENCY = "latency.getBatch.source" - - val NUM_INPUT_ROWS = "numRows.input.total" - val NUM_SOURCE_INPUT_ROWS = "numRows.input.source" - def NUM_TOTAL_STATE_ROWS(aggId: Int): String = s"numRows.state.aggregation$aggId.total" - def NUM_UPDATED_STATE_ROWS(aggId: Int): String = s"numRows.state.aggregation$aggId.updated" -} http://git-wip-us.apache.org/repos/asf/spark/blob/28b57c8a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 5589805..21b26b8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -583,6 +583,12 @@ object SQLConf { .booleanConf .createWithDefault(false) + val STREAMING_PROGRESS_RETENTION = + SQLConfigBuilder("spark.sql.streaming.numRecentProgresses") + .doc("The number of progress updates to retain for a streaming query") + .intConf + .createWithDefault(100) + val NDV_MAX_ERROR = SQLConfigBuilder("spark.sql.statistics.ndv.maxError") .internal() @@ -654,6 +660,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { def streamingMetricsEnabled: Boolean = getConf(STREAMING_METRICS_ENABLED) + def streamingProgressRetention: Int = getConf(STREAMING_PROGRESS_RETENTION) + def filesMaxPartitionBytes: Long = getConf(FILES_MAX_PARTITION_BYTES) def filesOpenCostInBytes: Long = getConf(FILES_OPEN_COST_IN_BYTES) http://git-wip-us.apache.org/repos/asf/spark/blob/28b57c8a/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala deleted file mode 100644 index ab19602..0000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.streaming - -import org.json4s._ -import org.json4s.JsonAST.JValue -import org.json4s.JsonDSL._ -import org.json4s.jackson.JsonMethods._ - -import org.apache.spark.annotation.Experimental -import org.apache.spark.sql.streaming.StreamingQueryStatus.indent - -/** - * :: Experimental :: - * Status and metrics of a streaming sink. - * - * @param description Description of the source corresponding to this status. - * @param offsetDesc Description of the current offsets up to which data has been written - * by the sink. - * @since 2.0.0 - */ -@Experimental -class SinkStatus private( - val description: String, - val offsetDesc: String) { - - /** The compact JSON representation of this status. */ - def json: String = compact(render(jsonValue)) - - /** The pretty (i.e. indented) JSON representation of this status. */ - def prettyJson: String = pretty(render(jsonValue)) - - override def toString: String = - "Status of sink " + indent(prettyString).trim - - private[sql] def jsonValue: JValue = { - ("description" -> JString(description)) ~ - ("offsetDesc" -> JString(offsetDesc)) - } - - private[sql] def prettyString: String = { - s"""$description - |Committed offsets: $offsetDesc - |""".stripMargin - } -} - -/** Companion object, primarily for creating SinkStatus instances internally */ -private[sql] object SinkStatus { - def apply(desc: String, offsetDesc: String): SinkStatus = new SinkStatus(desc, offsetDesc) -} http://git-wip-us.apache.org/repos/asf/spark/blob/28b57c8a/sql/core/src/main/scala/org/apache/spark/sql/streaming/SourceStatus.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/SourceStatus.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/SourceStatus.scala deleted file mode 100644 index cfdf113..0000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/SourceStatus.scala +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.streaming - -import java.{util => ju} - -import scala.collection.JavaConverters._ - -import org.json4s._ -import org.json4s.JsonAST.JValue -import org.json4s.JsonDSL._ -import org.json4s.jackson.JsonMethods._ - -import org.apache.spark.annotation.Experimental -import org.apache.spark.sql.streaming.StreamingQueryStatus.indent -import org.apache.spark.util.JsonProtocol - -/** - * :: Experimental :: - * Status and metrics of a streaming Source. - * - * @param description Description of the source corresponding to this status. - * @param offsetDesc Description of the current offset if known. - * @param inputRate Current rate (rows/sec) at which data is being generated by the source. - * @param processingRate Current rate (rows/sec) at which the query is processing data from - * the source. - * @param triggerDetails Low-level details of the currently active trigger (e.g. number of - * rows processed in trigger, latency of intermediate steps, etc.). - * If no trigger is active, then it will have details of the last completed - * trigger. - * @since 2.0.0 - */ -@Experimental -class SourceStatus private( - val description: String, - val offsetDesc: String, - val inputRate: Double, - val processingRate: Double, - val triggerDetails: ju.Map[String, String]) { - - /** The compact JSON representation of this status. */ - def json: String = compact(render(jsonValue)) - - /** The pretty (i.e. indented) JSON representation of this status. */ - def prettyJson: String = pretty(render(jsonValue)) - - override def toString: String = - "Status of source " + indent(prettyString).trim - - private[sql] def jsonValue: JValue = { - ("description" -> JString(description)) ~ - ("offsetDesc" -> JString(offsetDesc)) ~ - ("inputRate" -> JDouble(inputRate)) ~ - ("processingRate" -> JDouble(processingRate)) ~ - ("triggerDetails" -> JsonProtocol.mapToJson(triggerDetails.asScala)) - } - - private[sql] def prettyString: String = { - val triggerDetailsLines = - triggerDetails.asScala.map { case (k, v) => s"$k: $v" } - s"""$description - |Available offset: $offsetDesc - |Input rate: $inputRate rows/sec - |Processing rate: $processingRate rows/sec - |Trigger details: - |""".stripMargin + indent(triggerDetailsLines) - } -} - -/** Companion object, primarily for creating SourceStatus instances internally */ -private[sql] object SourceStatus { - def apply( - desc: String, - offsetDesc: String, - inputRate: Double, - processingRate: Double, - triggerDetails: Map[String, String]): SourceStatus = { - new SourceStatus(desc, offsetDesc, inputRate, processingRate, triggerDetails.asJava) - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/28b57c8a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala index 374313f..8fc4e43 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.streaming +import java.util.UUID + import org.apache.spark.annotation.Experimental import org.apache.spark.sql.SparkSession @@ -33,25 +35,27 @@ trait StreamingQuery { * Returns the name of the query. This name is unique across all active queries. This can be * set in the `org.apache.spark.sql.streaming.DataStreamWriter` as * `dataframe.writeStream.queryName("query").start()`. + * * @since 2.0.0 */ def name: String /** - * Returns the unique id of this query. This id is automatically generated and is unique across - * all queries that have been started in the current process. - * @since 2.0.0 + * Returns the unique id of this query. + * @since 2.1.0 */ - def id: Long + def id: UUID /** * Returns the `SparkSession` associated with `this`. + * * @since 2.0.0 */ def sparkSession: SparkSession /** - * Whether the query is currently active or not + * Returns `true` if this query is actively running. + * * @since 2.0.0 */ def isActive: Boolean @@ -64,23 +68,26 @@ trait StreamingQuery { /** * Returns the current status of the query. + * * @since 2.0.2 */ def status: StreamingQueryStatus /** - * Returns current status of all the sources. - * @since 2.0.0 + * Returns an array of the most recent [[StreamingQueryProgress]] updates for this query. + * The number of progress updates retained for each stream is configured by Spark session + * configuration `spark.sql.streaming.numRecentProgresses`. + * + * @since 2.1.0 */ - @deprecated("use status.sourceStatuses", "2.0.2") - def sourceStatuses: Array[SourceStatus] + def recentProgresses: Array[StreamingQueryProgress] /** - * Returns current status of the sink. - * @since 2.0.0 + * Returns the most recent [[StreamingQueryProgress]] update of this streaming query. + * + * @since 2.1.0 */ - @deprecated("use status.sinkStatus", "2.0.2") - def sinkStatus: SinkStatus + def lastProgress: StreamingQueryProgress /** * Waits for the termination of `this` query, either by `query.stop()` or by an exception. http://git-wip-us.apache.org/repos/asf/spark/blob/28b57c8a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala index 0a58142..13f11ba 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.execution.streaming.{Offset, OffsetSeq, StreamExecut * :: Experimental :: * Exception that stopped a [[StreamingQuery]]. Use `cause` get the actual exception * that caused the failure. - * @param query Query that caused the exception + * @param query Query that caused the exception * @param message Message of this exception * @param cause Internal cause of this exception * @param startOffset Starting offset (if known) of the range of data in which exception occurred http://git-wip-us.apache.org/repos/asf/spark/blob/28b57c8a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala index 9e311fa..d9ee75c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.streaming +import java.util.UUID + import org.apache.spark.annotation.Experimental import org.apache.spark.scheduler.SparkListenerEvent @@ -81,30 +83,28 @@ object StreamingQueryListener { /** * :: Experimental :: * Event representing the start of a query - * @since 2.0.0 + * @since 2.1.0 */ @Experimental - class QueryStartedEvent private[sql](val queryStatus: StreamingQueryStatus) extends Event + class QueryStartedEvent private[sql](val id: UUID, val name: String) extends Event /** * :: Experimental :: - * Event representing any progress updates in a query - * @since 2.0.0 + * Event representing any progress updates in a query. + * @since 2.1.0 */ @Experimental - class QueryProgressEvent private[sql](val queryStatus: StreamingQueryStatus) extends Event + class QueryProgressEvent private[sql](val progress: StreamingQueryProgress) extends Event /** * :: Experimental :: - * Event representing that termination of a query + * Event representing that termination of a query. * - * @param queryStatus Information about the status of the query. - * @param exception The exception message of the [[StreamingQuery]] if the query was terminated + * @param id The query id. + * @param exception The exception message of the query if the query was terminated * with an exception. Otherwise, it will be `None`. - * @since 2.0.0 + * @since 2.1.0 */ @Experimental - class QueryTerminatedEvent private[sql]( - val queryStatus: StreamingQueryStatus, - val exception: Option[String]) extends Event + class QueryTerminatedEvent private[sql](val id: UUID, val exception: Option[String]) extends Event } http://git-wip-us.apache.org/repos/asf/spark/blob/28b57c8a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala index 53968a8..c448468 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala @@ -17,6 +17,9 @@ package org.apache.spark.sql.streaming +import java.util.UUID +import java.util.concurrent.atomic.AtomicLong + import scala.collection.mutable import org.apache.hadoop.fs.Path @@ -41,7 +44,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) { private[sql] val stateStoreCoordinator = StateStoreCoordinatorRef.forDriver(sparkSession.sparkContext.env) private val listenerBus = new StreamingQueryListenerBus(sparkSession.sparkContext.listenerBus) - private val activeQueries = new mutable.HashMap[Long, StreamingQuery] + private val activeQueries = new mutable.HashMap[UUID, StreamingQuery] private val activeQueriesLock = new Object private val awaitTerminationLock = new Object @@ -59,13 +62,20 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) { /** * Returns the query if there is an active query with the given id, or null. * - * @since 2.0.0 + * @since 2.1.0 */ - def get(id: Long): StreamingQuery = activeQueriesLock.synchronized { + def get(id: UUID): StreamingQuery = activeQueriesLock.synchronized { activeQueries.get(id).orNull } /** + * Returns the query if there is an active query with the given id, or null. + * + * @since 2.1.0 + */ + def get(id: String): StreamingQuery = get(UUID.fromString(id)) + + /** * Wait until any of the queries on the associated SQLContext has terminated since the * creation of the context, or since `resetTerminated()` was called. If any query was terminated * with an exception, then the exception will be thrown. @@ -197,8 +207,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) { trigger: Trigger = ProcessingTime(0), triggerClock: Clock = new SystemClock()): StreamingQuery = { activeQueriesLock.synchronized { - val id = StreamExecution.nextId - val name = userSpecifiedName.getOrElse(s"query-$id") + val name = userSpecifiedName.getOrElse(s"query-${StreamingQueryManager.nextId}") if (activeQueries.values.exists(_.name == name)) { throw new IllegalArgumentException( s"Cannot start query with name $name as a query with that name is already active") @@ -252,7 +261,6 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) { } val query = new StreamExecution( sparkSession, - id, name, checkpointLocation, logicalPlan, @@ -261,7 +269,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) { triggerClock, outputMode) query.start() - activeQueries.put(id, query) + activeQueries.put(query.id, query) query } } @@ -279,3 +287,8 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) { } } } + +private object StreamingQueryManager { + private val _nextId = new AtomicLong(0) + private def nextId: Long = _nextId.getAndIncrement() +} http://git-wip-us.apache.org/repos/asf/spark/blob/28b57c8a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala index ba732ff..4c1a7ce 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala @@ -17,146 +17,17 @@ package org.apache.spark.sql.streaming -import java.{util => ju} - -import scala.collection.JavaConverters._ - -import org.json4s._ -import org.json4s.JsonAST.JValue -import org.json4s.JsonDSL._ -import org.json4s.jackson.JsonMethods._ - -import org.apache.spark.annotation.Experimental -import org.apache.spark.sql.execution.streaming.{LongOffset, OffsetSeq} -import org.apache.spark.util.JsonProtocol - /** - * :: Experimental :: - * A class used to report information about the progress of a [[StreamingQuery]]. + * Reports information about the instantaneous status of a streaming query. * - * @param name Name of the query. This name is unique across all active queries. - * @param id Id of the query. This id is unique across - * all queries that have been started in the current process. - * @param timestamp Timestamp (ms) of when this query was generated. - * @param inputRate Current rate (rows/sec) at which data is being generated by all the sources. - * @param processingRate Current rate (rows/sec) at which the query is processing data from - * all the sources. - * @param latency Current average latency between the data being available in source and the sink - * writing the corresponding output. - * @param sourceStatuses Current statuses of the sources. - * @param sinkStatus Current status of the sink. - * @param triggerDetails Low-level details of the currently active trigger (e.g. number of - * rows processed in trigger, latency of intermediate steps, etc.). - * If no trigger is active, then it will have details of the last completed - * trigger. - * @since 2.0.0 + * @param message A human readable description of what the stream is currently doing. + * @param isDataAvailable True when there is new data to be processed. + * @param isTriggerActive True when the trigger is actively firing, false when waiting for the + * next trigger time. + * + * @since 2.1.0 */ -@Experimental -class StreamingQueryStatus private( - val name: String, - val id: Long, - val timestamp: Long, - val inputRate: Double, - val processingRate: Double, - val latency: Option[Double], - val sourceStatuses: Array[SourceStatus], - val sinkStatus: SinkStatus, - val triggerDetails: ju.Map[String, String]) { - - import StreamingQueryStatus._ - - /** The compact JSON representation of this status. */ - def json: String = compact(render(jsonValue)) - - /** The pretty (i.e. indented) JSON representation of this status. */ - def prettyJson: String = pretty(render(jsonValue)) - - override def toString: String = { - val sourceStatusLines = sourceStatuses.zipWithIndex.map { case (s, i) => - s"Source ${i + 1} - " + indent(s.prettyString).trim - } - val sinkStatusLines = sinkStatus.prettyString.trim - val triggerDetailsLines = triggerDetails.asScala.map { case (k, v) => s"$k: $v" }.toSeq.sorted - val numSources = sourceStatuses.length - val numSourcesString = s"$numSources source" + { if (numSources > 1) "s" else "" } - - val allLines = - s"""|Query id: $id - |Status timestamp: $timestamp - |Input rate: $inputRate rows/sec - |Processing rate $processingRate rows/sec - |Latency: ${latency.getOrElse("-")} ms - |Trigger details: - |${indent(triggerDetailsLines)} - |Source statuses [$numSourcesString]: - |${indent(sourceStatusLines)} - |Sink status - ${indent(sinkStatusLines).trim}""".stripMargin - - s"Status of query '$name'\n${indent(allLines)}" - } - - private[sql] def jsonValue: JValue = { - ("name" -> JString(name)) ~ - ("id" -> JInt(id)) ~ - ("timestamp" -> JInt(timestamp)) ~ - ("inputRate" -> JDouble(inputRate)) ~ - ("processingRate" -> JDouble(processingRate)) ~ - ("latency" -> latency.map(JDouble).getOrElse(JNothing)) ~ - ("triggerDetails" -> JsonProtocol.mapToJson(triggerDetails.asScala)) ~ - ("sourceStatuses" -> JArray(sourceStatuses.map(_.jsonValue).toList)) ~ - ("sinkStatus" -> sinkStatus.jsonValue) - } -} - -/** Companion object, primarily for creating StreamingQueryInfo instances internally */ -private[sql] object StreamingQueryStatus { - def apply( - name: String, - id: Long, - timestamp: Long, - inputRate: Double, - processingRate: Double, - latency: Option[Double], - sourceStatuses: Array[SourceStatus], - sinkStatus: SinkStatus, - triggerDetails: Map[String, String]): StreamingQueryStatus = { - new StreamingQueryStatus(name, id, timestamp, inputRate, processingRate, - latency, sourceStatuses, sinkStatus, triggerDetails.asJava) - } - - def indent(strings: Iterable[String]): String = strings.map(indent).mkString("\n") - def indent(string: String): String = string.split("\n").map(" " + _).mkString("\n") - - /** Create an instance of status for python testing */ - def testStatus(): StreamingQueryStatus = { - import org.apache.spark.sql.execution.streaming.StreamMetrics._ - StreamingQueryStatus( - name = "query", - id = 1, - timestamp = 123, - inputRate = 15.5, - processingRate = 23.5, - latency = Some(345), - sourceStatuses = Array( - SourceStatus( - desc = "MySource1", - offsetDesc = LongOffset(0).json, - inputRate = 15.5, - processingRate = 23.5, - triggerDetails = Map( - NUM_SOURCE_INPUT_ROWS -> "100", - SOURCE_GET_OFFSET_LATENCY -> "10", - SOURCE_GET_BATCH_LATENCY -> "20"))), - sinkStatus = SinkStatus( - desc = "MySink", - offsetDesc = OffsetSeq(Some(LongOffset(1)) :: None :: Nil).toString), - triggerDetails = Map( - BATCH_ID -> "5", - IS_TRIGGER_ACTIVE -> "true", - IS_DATA_PRESENT_IN_TRIGGER -> "true", - GET_OFFSET_LATENCY -> "10", - GET_BATCH_LATENCY -> "20", - NUM_INPUT_ROWS -> "100" - )) - } -} +case class StreamingQueryStatus protected[sql]( + message: String, + isDataAvailable: Boolean, + isTriggerActive: Boolean) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org