Repository: spark Updated Branches: refs/heads/branch-2.1 b86e962c9 -> 3d4756d56
[SPARK-18461][DOCS][STRUCTUREDSTREAMING] Added more information about monitoring streaming queries ## What changes were proposed in this pull request? <img width="941" alt="screen shot 2016-11-15 at 6 27 32 pm" src="https://cloud.githubusercontent.com/assets/663212/20332521/4190b858-ab61-11e6-93a6-4bdc05105ed9.png"> <img width="940" alt="screen shot 2016-11-15 at 6 27 45 pm" src="https://cloud.githubusercontent.com/assets/663212/20332525/44a0d01e-ab61-11e6-8668-47f925490d4f.png"> Author: Tathagata Das <tathagata.das1...@gmail.com> Closes #15897 from tdas/SPARK-18461. (cherry picked from commit bb6cdfd9a6a6b6c91aada7c3174436146045ed1e) Signed-off-by: Michael Armbrust <mich...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3d4756d5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3d4756d5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3d4756d5 Branch: refs/heads/branch-2.1 Commit: 3d4756d56b852dcf4e1bebe621d4a30570873c3c Parents: b86e962 Author: Tathagata Das <tathagata.das1...@gmail.com> Authored: Wed Nov 16 11:03:10 2016 -0800 Committer: Michael Armbrust <mich...@databricks.com> Committed: Wed Nov 16 11:03:19 2016 -0800 ---------------------------------------------------------------------- docs/structured-streaming-programming-guide.md | 182 +++++++++++++++++++- 1 file changed, 179 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/3d4756d5/docs/structured-streaming-programming-guide.md ---------------------------------------------------------------------- diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index d254558..77b66b3 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -1087,9 +1087,185 @@ spark.streams().awaitAnyTermination() # block until any one of them terminates </div> </div> -Finally, for asynchronous monitoring of streaming queries, you can create and attach a `StreamingQueryListener` -([Scala](api/scala/index.html#org.apache.spark.sql.streaming.StreamingQueryListener)/[Java](api/java/org/apache/spark/sql/streaming/StreamingQueryListener.html) docs), -which will give you regular callback-based updates when queries are started and terminated. + +## Monitoring Streaming Queries +There are two ways you can monitor queries. You can directly get the current status +of an active query using `streamingQuery.status`, which will return a `StreamingQueryStatus` object +([Scala](api/scala/index.html#org.apache.spark.sql.streaming.StreamingQueryStatus)/[Java](api/java/org/apache/spark/sql/streaming/StreamingQueryStatus.html)/[Python](api/python/pyspark.sql.html#pyspark.sql.streaming.StreamingQueryStatus) docs) +that has all the details like current ingestion rates, processing rates, average latency, +details of the currently active trigger, etc. + +<div class="codetabs"> +<div data-lang="scala" markdown="1"> + +{% highlight scala %} +val query: StreamingQuery = ... + +println(query.status) + +/* Will print the current status of the query + +Status of query 'queryName' + Query id: 1 + Status timestamp: 123 + Input rate: 15.5 rows/sec + Processing rate 23.5 rows/sec + Latency: 345.0 ms + Trigger details: + batchId: 5 + isDataPresentInTrigger: true + isTriggerActive: true + latency.getBatch.total: 20 + latency.getOffset.total: 10 + numRows.input.total: 100 + Source statuses [1 source]: + Source 1 - MySource1 + Available offset: 0 + Input rate: 15.5 rows/sec + Processing rate: 23.5 rows/sec + Trigger details: + numRows.input.source: 100 + latency.getOffset.source: 10 + latency.getBatch.source: 20 + Sink status - MySink + Committed offsets: [1, -] +*/ +{% endhighlight %} + +</div> +<div data-lang="java" markdown="1"> + +{% highlight java %} +StreamingQuery query = ... + +System.out.println(query.status); + +/* Will print the current status of the query + +Status of query 'queryName' + Query id: 1 + Status timestamp: 123 + Input rate: 15.5 rows/sec + Processing rate 23.5 rows/sec + Latency: 345.0 ms + Trigger details: + batchId: 5 + isDataPresentInTrigger: true + isTriggerActive: true + latency.getBatch.total: 20 + latency.getOffset.total: 10 + numRows.input.total: 100 + Source statuses [1 source]: + Source 1 - MySource1 + Available offset: 0 + Input rate: 15.5 rows/sec + Processing rate: 23.5 rows/sec + Trigger details: + numRows.input.source: 100 + latency.getOffset.source: 10 + latency.getBatch.source: 20 + Sink status - MySink + Committed offsets: [1, -] +*/ +{% endhighlight %} + +</div> +<div data-lang="python" markdown="1"> + +{% highlight python %} +query = ... // a StreamingQuery + +print(query.status) + +''' +Will print the current status of the query + +Status of query 'queryName' + Query id: 1 + Status timestamp: 123 + Input rate: 15.5 rows/sec + Processing rate 23.5 rows/sec + Latency: 345.0 ms + Trigger details: + batchId: 5 + isDataPresentInTrigger: true + isTriggerActive: true + latency.getBatch.total: 20 + latency.getOffset.total: 10 + numRows.input.total: 100 + Source statuses [1 source]: + Source 1 - MySource1 + Available offset: 0 + Input rate: 15.5 rows/sec + Processing rate: 23.5 rows/sec + Trigger details: + numRows.input.source: 100 + latency.getOffset.source: 10 + latency.getBatch.source: 20 + Sink status - MySink + Committed offsets: [1, -] +''' +{% endhighlight %} + +</div> +</div> + + +You can also asynchronously monitor all queries associated with a +`SparkSession` by attaching a `StreamingQueryListener` +([Scala](api/scala/index.html#org.apache.spark.sql.streaming.StreamingQueryListener)/[Java](api/java/org/apache/spark/sql/streaming/StreamingQueryListener.html) docs). +Once you attach your custom `StreamingQueryListener` object with +`sparkSession.streams.attachListener()`, you will get callbacks when a query is started and +stopped and when there is progress made in an active query. Here is an example, + +<div class="codetabs"> +<div data-lang="scala" markdown="1"> + +{% highlight scala %} +val spark: SparkSession = ... + +spark.streams.addListener(new StreamingQueryListener() { + + override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = { + println("Query started: " + queryTerminated.queryStatus.name) + } + override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = { + println("Query terminated: " + queryTerminated.queryStatus.name) + } + override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = { + println("Query made progress: " + queryProgress.queryStatus) + } +}) +{% endhighlight %} + +</div> +<div data-lang="java" markdown="1"> + +{% highlight java %} +SparkSession spark = ... + +spark.streams.addListener(new StreamingQueryListener() { + + @Overrides void onQueryStarted(QueryStartedEvent queryStarted) { + System.out.println("Query started: " + queryTerminated.queryStatus.name); + } + @Overrides void onQueryTerminated(QueryTerminatedEvent queryTerminated) { + System.out.println("Query terminated: " + queryTerminated.queryStatus.name); + } + @Overrides void onQueryProgress(QueryProgressEvent queryProgress) { + System.out.println("Query made progress: " + queryProgress.queryStatus); + } +}); +{% endhighlight %} + +</div> +<div data-lang="python" markdown="1"> +{% highlight bash %} +Not available in Python. +{% endhighlight %} + +</div> +</div> ## Recovering from Failures with Checkpointing In case of a failure or intentional shutdown, you can recover the previous progress and state of a previous query, and continue where it left off. This is done using checkpointing and write ahead logs. You can configure a query with a checkpoint location, and the query will save all the progress information (i.e. range of offsets processed in each trigger) and the running aggregates (e.g. word counts in the [quick example](#quick-example)) to the checkpoint location. As of Spark 2.0, this checkpoint location has to be a path in an HDFS compatible file system, and can be set as an option in the DataStreamWriter when [starting a query](#starting-streaming-queries). --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org