Repository: spark
Updated Branches:
  refs/heads/branch-2.1 b86e962c9 -> 3d4756d56


[SPARK-18461][DOCS][STRUCTUREDSTREAMING] Added more information about 
monitoring streaming queries

## What changes were proposed in this pull request?
<img width="941" alt="screen shot 2016-11-15 at 6 27 32 pm" 
src="https://cloud.githubusercontent.com/assets/663212/20332521/4190b858-ab61-11e6-93a6-4bdc05105ed9.png";>
<img width="940" alt="screen shot 2016-11-15 at 6 27 45 pm" 
src="https://cloud.githubusercontent.com/assets/663212/20332525/44a0d01e-ab61-11e6-8668-47f925490d4f.png";>

Author: Tathagata Das <tathagata.das1...@gmail.com>

Closes #15897 from tdas/SPARK-18461.

(cherry picked from commit bb6cdfd9a6a6b6c91aada7c3174436146045ed1e)
Signed-off-by: Michael Armbrust <mich...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3d4756d5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3d4756d5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3d4756d5

Branch: refs/heads/branch-2.1
Commit: 3d4756d56b852dcf4e1bebe621d4a30570873c3c
Parents: b86e962
Author: Tathagata Das <tathagata.das1...@gmail.com>
Authored: Wed Nov 16 11:03:10 2016 -0800
Committer: Michael Armbrust <mich...@databricks.com>
Committed: Wed Nov 16 11:03:19 2016 -0800

----------------------------------------------------------------------
 docs/structured-streaming-programming-guide.md | 182 +++++++++++++++++++-
 1 file changed, 179 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3d4756d5/docs/structured-streaming-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/structured-streaming-programming-guide.md 
b/docs/structured-streaming-programming-guide.md
index d254558..77b66b3 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -1087,9 +1087,185 @@ spark.streams().awaitAnyTermination()  # block until 
any one of them terminates
 </div>
 </div>
 
-Finally, for asynchronous monitoring of streaming queries, you can create and 
attach a `StreamingQueryListener`
-([Scala](api/scala/index.html#org.apache.spark.sql.streaming.StreamingQueryListener)/[Java](api/java/org/apache/spark/sql/streaming/StreamingQueryListener.html)
 docs),
-which will give you regular callback-based updates when queries are started 
and terminated.
+
+## Monitoring Streaming Queries
+There are two ways you can monitor queries. You can directly get the current 
status
+of an active query using `streamingQuery.status`, which will return a 
`StreamingQueryStatus` object
+([Scala](api/scala/index.html#org.apache.spark.sql.streaming.StreamingQueryStatus)/[Java](api/java/org/apache/spark/sql/streaming/StreamingQueryStatus.html)/[Python](api/python/pyspark.sql.html#pyspark.sql.streaming.StreamingQueryStatus)
 docs)
+that has all the details like current ingestion rates, processing rates, 
average latency,
+details of the currently active trigger, etc.
+
+<div class="codetabs">
+<div data-lang="scala"  markdown="1">
+
+{% highlight scala %}
+val query: StreamingQuery = ...
+
+println(query.status)
+
+/* Will print the current status of the query
+
+Status of query 'queryName'
+    Query id: 1
+    Status timestamp: 123
+    Input rate: 15.5 rows/sec
+    Processing rate 23.5 rows/sec
+    Latency: 345.0 ms
+    Trigger details:
+        batchId: 5
+        isDataPresentInTrigger: true
+        isTriggerActive: true
+        latency.getBatch.total: 20
+        latency.getOffset.total: 10
+        numRows.input.total: 100
+    Source statuses [1 source]:
+        Source 1 - MySource1
+            Available offset: 0
+            Input rate: 15.5 rows/sec
+            Processing rate: 23.5 rows/sec
+            Trigger details:
+                numRows.input.source: 100
+                latency.getOffset.source: 10
+                latency.getBatch.source: 20
+    Sink status - MySink
+        Committed offsets: [1, -]
+*/
+{% endhighlight %}
+
+</div>
+<div data-lang="java"  markdown="1">
+
+{% highlight java %}
+StreamingQuery query = ...
+
+System.out.println(query.status);
+
+/* Will print the current status of the query
+
+Status of query 'queryName'
+    Query id: 1
+    Status timestamp: 123
+    Input rate: 15.5 rows/sec
+    Processing rate 23.5 rows/sec
+    Latency: 345.0 ms
+    Trigger details:
+        batchId: 5
+        isDataPresentInTrigger: true
+        isTriggerActive: true
+        latency.getBatch.total: 20
+        latency.getOffset.total: 10
+        numRows.input.total: 100
+    Source statuses [1 source]:
+        Source 1 - MySource1
+            Available offset: 0
+            Input rate: 15.5 rows/sec
+            Processing rate: 23.5 rows/sec
+            Trigger details:
+                numRows.input.source: 100
+                latency.getOffset.source: 10
+                latency.getBatch.source: 20
+    Sink status - MySink
+        Committed offsets: [1, -]
+*/
+{% endhighlight %}
+
+</div>
+<div data-lang="python"  markdown="1">
+
+{% highlight python %}
+query = ...  // a StreamingQuery
+
+print(query.status)
+
+'''
+Will print the current status of the query
+
+Status of query 'queryName'
+    Query id: 1
+    Status timestamp: 123
+    Input rate: 15.5 rows/sec
+    Processing rate 23.5 rows/sec
+    Latency: 345.0 ms
+    Trigger details:
+        batchId: 5
+        isDataPresentInTrigger: true
+        isTriggerActive: true
+        latency.getBatch.total: 20
+        latency.getOffset.total: 10
+        numRows.input.total: 100
+    Source statuses [1 source]:
+        Source 1 - MySource1
+            Available offset: 0
+            Input rate: 15.5 rows/sec
+            Processing rate: 23.5 rows/sec
+            Trigger details:
+                numRows.input.source: 100
+                latency.getOffset.source: 10
+                latency.getBatch.source: 20
+    Sink status - MySink
+        Committed offsets: [1, -]
+'''
+{% endhighlight %}
+
+</div>
+</div>
+
+
+You can also asynchronously monitor all queries associated with a
+`SparkSession` by attaching a `StreamingQueryListener`
+([Scala](api/scala/index.html#org.apache.spark.sql.streaming.StreamingQueryListener)/[Java](api/java/org/apache/spark/sql/streaming/StreamingQueryListener.html)
 docs).
+Once you attach your custom `StreamingQueryListener` object with
+`sparkSession.streams.attachListener()`, you will get callbacks when a query 
is started and
+stopped and when there is progress made in an active query. Here is an example,
+
+<div class="codetabs">
+<div data-lang="scala"  markdown="1">
+
+{% highlight scala %}
+val spark: SparkSession = ...
+
+spark.streams.addListener(new StreamingQueryListener() {
+
+    override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
+        println("Query started: " + queryTerminated.queryStatus.name)
+    }
+    override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): 
Unit = {
+        println("Query terminated: " + queryTerminated.queryStatus.name)
+    }
+    override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
+        println("Query made progress: " + queryProgress.queryStatus)
+    }
+})
+{% endhighlight %}
+
+</div>
+<div data-lang="java"  markdown="1">
+
+{% highlight java %}
+SparkSession spark = ...
+
+spark.streams.addListener(new StreamingQueryListener() {
+
+    @Overrides void onQueryStarted(QueryStartedEvent queryStarted) {
+        System.out.println("Query started: " + 
queryTerminated.queryStatus.name);
+    }
+    @Overrides void onQueryTerminated(QueryTerminatedEvent queryTerminated) {
+        System.out.println("Query terminated: " + 
queryTerminated.queryStatus.name);
+    }
+    @Overrides void onQueryProgress(QueryProgressEvent queryProgress) {
+        System.out.println("Query made progress: " + 
queryProgress.queryStatus);
+    }
+});
+{% endhighlight %}
+
+</div>
+<div data-lang="python"  markdown="1">
+{% highlight bash %}
+Not available in Python.
+{% endhighlight %}
+
+</div>
+</div>
 
 ## Recovering from Failures with Checkpointing 
 In case of a failure or intentional shutdown, you can recover the previous 
progress and state of a previous query, and continue where it left off. This is 
done using checkpointing and write ahead logs. You can configure a query with a 
checkpoint location, and the query will save all the progress information (i.e. 
range of offsets processed in each trigger) and the running aggregates (e.g. 
word counts in the [quick example](#quick-example)) to the checkpoint location. 
As of Spark 2.0, this checkpoint location has to be a path in an HDFS 
compatible file system, and can be set as an option in the DataStreamWriter 
when [starting a query](#starting-streaming-queries). 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to